Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

refactor: refine codes of votepool #20

Merged
merged 1 commit into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,7 @@ func createVotePoolReactor(config *cfg.Config,
}

votePoolLogger := logger.With("module", "votepool")
votePool, err := votepool.NewVotePool(logger, vals, eventBus)
if err != nil {
return nil, nil, err
}
votePool := votepool.NewVotePool(logger, vals, eventBus)
votePoolReactor := votepool.NewReactor(votePool, eventBus)
votePoolReactor.SetLogger(votePoolLogger)
return votePoolReactor, votePool, nil
Expand Down
5 changes: 1 addition & 4 deletions test/maverick/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,7 @@ func createVotePoolReactor(config *cfg.Config,
}

votePoolLogger := logger.With("module", "votepool")
votePool, err := votepool.NewVotePool(logger, vals, eventBus)
if err != nil {
return nil, nil, err
}
votePool := votepool.NewVotePool(logger, vals, eventBus)
votePoolReactor := votepool.NewReactor(votePool, eventBus)
votePoolReactor.SetLogger(votePoolLogger)
return votePoolReactor, votePool, nil
Expand Down
2 changes: 1 addition & 1 deletion votepool/event_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ const (
// FromBscCrossChainEvent defines the type of cross chain events from BSC to the current chain.
FromBscCrossChainEvent EventType = 2

// DataChallengeEvent defines the type of events for data availability challenges.
// DataAvailabilityChallengeEvent defines the type of events for data availability challenges.
DataAvailabilityChallengeEvent EventType = 3
)
6 changes: 3 additions & 3 deletions votepool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
type VotePool interface {
service.Service

// AddVote will add a vote to the Pool. Different validations can be conducted before adding.
// AddVote will add a vote to the Pool. Different types of validations can be conducted before adding.
AddVote(vote *Vote) error

// GetVotesByEventTypeAndHash will filter votes by event hash and event type.
// GetVotesByEventTypeAndHash will query votes by event hash and event type.
GetVotesByEventTypeAndHash(eventType EventType, eventHash []byte) ([]*Vote, error)

// GetVotesByEventType will filter votes by event type.
// GetVotesByEventType will query votes by event type.
GetVotesByEventType(eventType EventType) ([]*Vote, error)

// FlushVotes will clear all votes in the Pool, no matter what types of events.
Expand Down
58 changes: 27 additions & 31 deletions votepool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
// The number of cached votes (i.e., keys) to quickly filter out when adding votes.
cacheVoteSize = 1024

// Vote will assign the expired at time when adding to the Pool.
// Vote will be assigned the expired at time when adding to the Pool.
voteKeepAliveAfter = time.Second * 30

// Votes in the Pool will be pruned periodically to remove useless ones.
Expand Down Expand Up @@ -49,8 +49,9 @@ func newVoteStore() *voteStore {
return s
}

// addVote will add a vote to the store.
// Be noted: no validation is conducted in this layer.
func (s *voteStore) addVote(vote *Vote) error {
func (s *voteStore) addVote(vote *Vote) {
eventHashStr := string(vote.EventHash[:])
pubKeyStr := string(vote.PubKey[:])
s.mtx.Lock()
Expand All @@ -61,15 +62,12 @@ func (s *voteStore) addVote(vote *Vote) error {
subM = make(map[string]*Vote)
s.voteMap[eventHashStr] = subM
}
if _, ok := subM[pubKeyStr]; !ok {
subM[pubKeyStr] = vote
s.queue.Insert(vote)
}

return nil
subM[pubKeyStr] = vote
s.queue.Insert(vote)
}

func (s *voteStore) getVotesByEventHash(eventHash []byte) ([]*Vote, error) {
// getVotesByEventHash will query events by event hash.
func (s *voteStore) getVotesByEventHash(eventHash []byte) []*Vote {
s.mtx.RLock()
defer s.mtx.RUnlock()

Expand All @@ -79,10 +77,11 @@ func (s *voteStore) getVotesByEventHash(eventHash []byte) ([]*Vote, error) {
votes = append(votes, v)
}
}
return votes, nil
return votes
}

func (s *voteStore) getAllVotes() ([]*Vote, error) {
// getAllVotes will return all votes in the store.
func (s *voteStore) getAllVotes() []*Vote {
s.mtx.RLock()
defer s.mtx.RUnlock()

Expand All @@ -92,9 +91,10 @@ func (s *voteStore) getAllVotes() ([]*Vote, error) {
votes = append(votes, v)
}
}
return votes, nil
return votes
}

// flushVotes will clear all votes in the store.
func (s *voteStore) flushVotes() {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -103,6 +103,7 @@ func (s *voteStore) flushVotes() {
s.queue = NewVoteQueue()
}

// pruneVotes will prune votes which are expired and return the pruned votes' keys.
func (s *voteStore) pruneVotes() []string {
keys := make([]string, 0)
current := &Vote{expireAt: time.Now()}
Expand Down Expand Up @@ -134,8 +135,8 @@ type Pool struct {
eventBus *types.EventBus // to subscribe validator update events and publish new added vote events
}

// NewVotePool creates a Pool, the init validators should be supplied.
func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *types.EventBus) (*Pool, error) {
// NewVotePool creates a Pool. The initial validators should be supplied.
func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *types.EventBus) *Pool {
eventTypes := []EventType{ToBscCrossChainEvent, FromBscCrossChainEvent, DataAvailabilityChallengeEvent}

ticker := time.NewTicker(pruneVoteInterval)
Expand All @@ -145,10 +146,7 @@ func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *typ
stores[et] = store
}

cache, err := lru.New(cacheVoteSize)
if err != nil {
panic(err)
}
cache, _ := lru.New(cacheVoteSize) // positive parameter will never return error

// set the initial validators
validatorVerifier := NewFromValidatorVerifier()
Expand All @@ -163,7 +161,7 @@ func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *typ
}
votePool.BaseService = *service.NewBaseService(logger, "VotePool", votePool)

return votePool, nil
return votePool
}

// OnStart implements Service.
Expand Down Expand Up @@ -193,24 +191,21 @@ func (p *Pool) AddVote(vote *Vote) error {
return errors.New("unsupported event type")
}

if ok := p.cache.Contains(vote.Key()); ok {
if ok = p.cache.Contains(vote.Key()); ok {
return nil
}

if err := p.validatorVerifier.Validate(vote); err != nil {
if err = p.validatorVerifier.Validate(vote); err != nil {
return err
}
if err := p.blsVerifier.Validate(vote); err != nil {
if err = p.blsVerifier.Validate(vote); err != nil {
return err
}

vote.expireAt = time.Now().Add(voteKeepAliveAfter)
err = store.addVote(vote)
if err != nil {
return err
}
err = p.eventBus.Publish(eventBusVotePoolUpdates, *vote)
if err != nil {
store.addVote(vote)

if err = p.eventBus.Publish(eventBusVotePoolUpdates, *vote); err != nil {
p.Logger.Error("Cannot publish vote pool event", "err", err.Error())
}
p.cache.Add(vote.Key(), struct{}{})
Expand All @@ -223,7 +218,7 @@ func (p *Pool) GetVotesByEventTypeAndHash(eventType EventType, eventHash []byte)
if !ok {
return nil, errors.New("unsupported event type")
}
return store.getVotesByEventHash(eventHash)
return store.getVotesByEventHash(eventHash), nil
}

// GetVotesByEventType implements VotePool.
Expand All @@ -232,7 +227,7 @@ func (p *Pool) GetVotesByEventType(eventType EventType) ([]*Vote, error) {
if !ok {
return nil, errors.New("unsupported event type")
}
return store.getAllVotes()
return store.getAllVotes(), nil
}

// FlushVotes implements VotePool.
Expand All @@ -250,7 +245,8 @@ func (p *Pool) validatorUpdateRoutine() {
}
sub, err := p.eventBus.Subscribe(context.Background(), "VotePoolService", types.EventQueryValidatorSetUpdates, eventBusSubscribeCap)
if err != nil {
panic(err)
p.Logger.Error("Cannot subscribe to validator set update event", "err", err.Error())
return
}
for {
select {
Expand Down
5 changes: 1 addition & 4 deletions votepool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ func makeVotePool() (blsCommon.SecretKey, *types.Validator, blsCommon.SecretKey,
panic(err)
}

votePool, err := NewVotePool(logger, vals, eventBus)
if err != nil {
panic(err)
}
votePool := NewVotePool(logger, vals, eventBus)
err = votePool.Start()
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion votepool/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
)

// VoteQueue represents a priority queue for Votes. The expiredAt filed of a Vote will be used as priority.
// VoteQueue represents a priority queue for Votes. The expiredAt field of a Vote will be used as priority.
type VoteQueue struct {
itemHeap *itemHeap
}
Expand Down
6 changes: 4 additions & 2 deletions votepool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,13 @@ func (voteR *Reactor) broadcastVotes(peer p2p.Peer) {
}
sub, err := voteR.eventBus.Subscribe(context.Background(), string(peer.ID()), eventVotePoolAdded, eventBusSubscribeCap)
if err != nil {
panic(err)
voteR.Logger.Error("Cannot subscribe to vote update event", "err", err.Error())
return
}
cache, ok := peer.Get(peerVoteCacheKey).(*lru.Cache)
if !ok { // this should not happen
panic(fmt.Sprintf("Peer %v has no cache state", peer))
voteR.Logger.Error(fmt.Sprintf("Peer %v has no cache state", peer))
return
}
for {
select {
Expand Down
5 changes: 1 addition & 4 deletions votepool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) ([]blsCommon.SecretKey, [
panic(err)
}

votePool, err := NewVotePool(logger, vals, eventBus)
if err != nil {
panic(err)
}
votePool := NewVotePool(logger, vals, eventBus)

eventBuses[i] = eventBus
votePools[i] = votePool
Expand Down