diff --git a/node/node.go b/node/node.go index 6fba4224d..f4ffa27eb 100644 --- a/node/node.go +++ b/node/node.go @@ -517,10 +517,7 @@ func createVotePoolReactor(config *cfg.Config, } votePoolLogger := logger.With("module", "votepool") - votePool, err := votepool.NewVotePool(logger, vals, eventBus) - if err != nil { - return nil, nil, err - } + votePool := votepool.NewVotePool(logger, vals, eventBus) votePoolReactor := votepool.NewReactor(votePool, eventBus) votePoolReactor.SetLogger(votePoolLogger) return votePoolReactor, votePool, nil diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index c7aa6745b..bea59aeb5 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -532,10 +532,7 @@ func createVotePoolReactor(config *cfg.Config, } votePoolLogger := logger.With("module", "votepool") - votePool, err := votepool.NewVotePool(logger, vals, eventBus) - if err != nil { - return nil, nil, err - } + votePool := votepool.NewVotePool(logger, vals, eventBus) votePoolReactor := votepool.NewReactor(votePool, eventBus) votePoolReactor.SetLogger(votePoolLogger) return votePoolReactor, votePool, nil diff --git a/votepool/event_type.go b/votepool/event_type.go index 54f0b8bd9..83c260dfb 100644 --- a/votepool/event_type.go +++ b/votepool/event_type.go @@ -10,6 +10,6 @@ const ( // FromBscCrossChainEvent defines the type of cross chain events from BSC to the current chain. FromBscCrossChainEvent EventType = 2 - // DataChallengeEvent defines the type of events for data availability challenges. + // DataAvailabilityChallengeEvent defines the type of events for data availability challenges. DataAvailabilityChallengeEvent EventType = 3 ) diff --git a/votepool/interface.go b/votepool/interface.go index 84d6a2fd6..3bdffd5c8 100644 --- a/votepool/interface.go +++ b/votepool/interface.go @@ -9,13 +9,13 @@ import ( type VotePool interface { service.Service - // AddVote will add a vote to the Pool. Different validations can be conducted before adding. + // AddVote will add a vote to the Pool. Different types of validations can be conducted before adding. AddVote(vote *Vote) error - // GetVotesByEventTypeAndHash will filter votes by event hash and event type. + // GetVotesByEventTypeAndHash will query votes by event hash and event type. GetVotesByEventTypeAndHash(eventType EventType, eventHash []byte) ([]*Vote, error) - // GetVotesByEventType will filter votes by event type. + // GetVotesByEventType will query votes by event type. GetVotesByEventType(eventType EventType) ([]*Vote, error) // FlushVotes will clear all votes in the Pool, no matter what types of events. diff --git a/votepool/pool.go b/votepool/pool.go index b4198b06d..448253c13 100644 --- a/votepool/pool.go +++ b/votepool/pool.go @@ -18,7 +18,7 @@ const ( // The number of cached votes (i.e., keys) to quickly filter out when adding votes. cacheVoteSize = 1024 - // Vote will assign the expired at time when adding to the Pool. + // Vote will be assigned the expired at time when adding to the Pool. voteKeepAliveAfter = time.Second * 30 // Votes in the Pool will be pruned periodically to remove useless ones. @@ -49,8 +49,9 @@ func newVoteStore() *voteStore { return s } +// addVote will add a vote to the store. // Be noted: no validation is conducted in this layer. -func (s *voteStore) addVote(vote *Vote) error { +func (s *voteStore) addVote(vote *Vote) { eventHashStr := string(vote.EventHash[:]) pubKeyStr := string(vote.PubKey[:]) s.mtx.Lock() @@ -61,15 +62,12 @@ func (s *voteStore) addVote(vote *Vote) error { subM = make(map[string]*Vote) s.voteMap[eventHashStr] = subM } - if _, ok := subM[pubKeyStr]; !ok { - subM[pubKeyStr] = vote - s.queue.Insert(vote) - } - - return nil + subM[pubKeyStr] = vote + s.queue.Insert(vote) } -func (s *voteStore) getVotesByEventHash(eventHash []byte) ([]*Vote, error) { +// getVotesByEventHash will query events by event hash. +func (s *voteStore) getVotesByEventHash(eventHash []byte) []*Vote { s.mtx.RLock() defer s.mtx.RUnlock() @@ -79,10 +77,11 @@ func (s *voteStore) getVotesByEventHash(eventHash []byte) ([]*Vote, error) { votes = append(votes, v) } } - return votes, nil + return votes } -func (s *voteStore) getAllVotes() ([]*Vote, error) { +// getAllVotes will return all votes in the store. +func (s *voteStore) getAllVotes() []*Vote { s.mtx.RLock() defer s.mtx.RUnlock() @@ -92,9 +91,10 @@ func (s *voteStore) getAllVotes() ([]*Vote, error) { votes = append(votes, v) } } - return votes, nil + return votes } +// flushVotes will clear all votes in the store. func (s *voteStore) flushVotes() { s.mtx.Lock() defer s.mtx.Unlock() @@ -103,6 +103,7 @@ func (s *voteStore) flushVotes() { s.queue = NewVoteQueue() } +// pruneVotes will prune votes which are expired and return the pruned votes' keys. func (s *voteStore) pruneVotes() []string { keys := make([]string, 0) current := &Vote{expireAt: time.Now()} @@ -134,8 +135,8 @@ type Pool struct { eventBus *types.EventBus // to subscribe validator update events and publish new added vote events } -// NewVotePool creates a Pool, the init validators should be supplied. -func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *types.EventBus) (*Pool, error) { +// NewVotePool creates a Pool. The initial validators should be supplied. +func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *types.EventBus) *Pool { eventTypes := []EventType{ToBscCrossChainEvent, FromBscCrossChainEvent, DataAvailabilityChallengeEvent} ticker := time.NewTicker(pruneVoteInterval) @@ -145,10 +146,7 @@ func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *typ stores[et] = store } - cache, err := lru.New(cacheVoteSize) - if err != nil { - panic(err) - } + cache, _ := lru.New(cacheVoteSize) // positive parameter will never return error // set the initial validators validatorVerifier := NewFromValidatorVerifier() @@ -163,7 +161,7 @@ func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *typ } votePool.BaseService = *service.NewBaseService(logger, "VotePool", votePool) - return votePool, nil + return votePool } // OnStart implements Service. @@ -193,24 +191,21 @@ func (p *Pool) AddVote(vote *Vote) error { return errors.New("unsupported event type") } - if ok := p.cache.Contains(vote.Key()); ok { + if ok = p.cache.Contains(vote.Key()); ok { return nil } - if err := p.validatorVerifier.Validate(vote); err != nil { + if err = p.validatorVerifier.Validate(vote); err != nil { return err } - if err := p.blsVerifier.Validate(vote); err != nil { + if err = p.blsVerifier.Validate(vote); err != nil { return err } vote.expireAt = time.Now().Add(voteKeepAliveAfter) - err = store.addVote(vote) - if err != nil { - return err - } - err = p.eventBus.Publish(eventBusVotePoolUpdates, *vote) - if err != nil { + store.addVote(vote) + + if err = p.eventBus.Publish(eventBusVotePoolUpdates, *vote); err != nil { p.Logger.Error("Cannot publish vote pool event", "err", err.Error()) } p.cache.Add(vote.Key(), struct{}{}) @@ -223,7 +218,7 @@ func (p *Pool) GetVotesByEventTypeAndHash(eventType EventType, eventHash []byte) if !ok { return nil, errors.New("unsupported event type") } - return store.getVotesByEventHash(eventHash) + return store.getVotesByEventHash(eventHash), nil } // GetVotesByEventType implements VotePool. @@ -232,7 +227,7 @@ func (p *Pool) GetVotesByEventType(eventType EventType) ([]*Vote, error) { if !ok { return nil, errors.New("unsupported event type") } - return store.getAllVotes() + return store.getAllVotes(), nil } // FlushVotes implements VotePool. @@ -250,7 +245,8 @@ func (p *Pool) validatorUpdateRoutine() { } sub, err := p.eventBus.Subscribe(context.Background(), "VotePoolService", types.EventQueryValidatorSetUpdates, eventBusSubscribeCap) if err != nil { - panic(err) + p.Logger.Error("Cannot subscribe to validator set update event", "err", err.Error()) + return } for { select { diff --git a/votepool/pool_test.go b/votepool/pool_test.go index b06049344..327a2f3f3 100644 --- a/votepool/pool_test.go +++ b/votepool/pool_test.go @@ -39,10 +39,7 @@ func makeVotePool() (blsCommon.SecretKey, *types.Validator, blsCommon.SecretKey, panic(err) } - votePool, err := NewVotePool(logger, vals, eventBus) - if err != nil { - panic(err) - } + votePool := NewVotePool(logger, vals, eventBus) err = votePool.Start() if err != nil { panic(err) diff --git a/votepool/queue.go b/votepool/queue.go index f0166e3d6..b3d57a5c8 100644 --- a/votepool/queue.go +++ b/votepool/queue.go @@ -5,7 +5,7 @@ import ( "errors" ) -// VoteQueue represents a priority queue for Votes. The expiredAt filed of a Vote will be used as priority. +// VoteQueue represents a priority queue for Votes. The expiredAt field of a Vote will be used as priority. type VoteQueue struct { itemHeap *itemHeap } diff --git a/votepool/reactor.go b/votepool/reactor.go index eee8ca927..6d33a7527 100644 --- a/votepool/reactor.go +++ b/votepool/reactor.go @@ -149,11 +149,13 @@ func (voteR *Reactor) broadcastVotes(peer p2p.Peer) { } sub, err := voteR.eventBus.Subscribe(context.Background(), string(peer.ID()), eventVotePoolAdded, eventBusSubscribeCap) if err != nil { - panic(err) + voteR.Logger.Error("Cannot subscribe to vote update event", "err", err.Error()) + return } cache, ok := peer.Get(peerVoteCacheKey).(*lru.Cache) if !ok { // this should not happen - panic(fmt.Sprintf("Peer %v has no cache state", peer)) + voteR.Logger.Error(fmt.Sprintf("Peer %v has no cache state", peer)) + return } for { select { diff --git a/votepool/reactor_test.go b/votepool/reactor_test.go index 65862c053..be8f6eccf 100644 --- a/votepool/reactor_test.go +++ b/votepool/reactor_test.go @@ -65,10 +65,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) ([]blsCommon.SecretKey, [ panic(err) } - votePool, err := NewVotePool(logger, vals, eventBus) - if err != nil { - panic(err) - } + votePool := NewVotePool(logger, vals, eventBus) eventBuses[i] = eventBus votePools[i] = votePool