Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
refator: refine codes of votepool (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
forcodedancing authored Mar 17, 2023
1 parent 78ff8e4 commit 492cf85
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 54 deletions.
5 changes: 1 addition & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,7 @@ func createVotePoolReactor(config *cfg.Config,
}

votePoolLogger := logger.With("module", "votepool")
votePool, err := votepool.NewVotePool(logger, vals, eventBus)
if err != nil {
return nil, nil, err
}
votePool := votepool.NewVotePool(logger, vals, eventBus)
votePoolReactor := votepool.NewReactor(votePool, eventBus)
votePoolReactor.SetLogger(votePoolLogger)
return votePoolReactor, votePool, nil
Expand Down
5 changes: 1 addition & 4 deletions test/maverick/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,7 @@ func createVotePoolReactor(config *cfg.Config,
}

votePoolLogger := logger.With("module", "votepool")
votePool, err := votepool.NewVotePool(logger, vals, eventBus)
if err != nil {
return nil, nil, err
}
votePool := votepool.NewVotePool(logger, vals, eventBus)
votePoolReactor := votepool.NewReactor(votePool, eventBus)
votePoolReactor.SetLogger(votePoolLogger)
return votePoolReactor, votePool, nil
Expand Down
2 changes: 1 addition & 1 deletion votepool/event_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ const (
// FromBscCrossChainEvent defines the type of cross chain events from BSC to the current chain.
FromBscCrossChainEvent EventType = 2

// DataChallengeEvent defines the type of events for data availability challenges.
// DataAvailabilityChallengeEvent defines the type of events for data availability challenges.
DataAvailabilityChallengeEvent EventType = 3
)
6 changes: 3 additions & 3 deletions votepool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
type VotePool interface {
service.Service

// AddVote will add a vote to the Pool. Different validations can be conducted before adding.
// AddVote will add a vote to the Pool. Different types of validations can be conducted before adding.
AddVote(vote *Vote) error

// GetVotesByEventTypeAndHash will filter votes by event hash and event type.
// GetVotesByEventTypeAndHash will query votes by event hash and event type.
GetVotesByEventTypeAndHash(eventType EventType, eventHash []byte) ([]*Vote, error)

// GetVotesByEventType will filter votes by event type.
// GetVotesByEventType will query votes by event type.
GetVotesByEventType(eventType EventType) ([]*Vote, error)

// FlushVotes will clear all votes in the Pool, no matter what types of events.
Expand Down
58 changes: 27 additions & 31 deletions votepool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
// The number of cached votes (i.e., keys) to quickly filter out when adding votes.
cacheVoteSize = 1024

// Vote will assign the expired at time when adding to the Pool.
// Vote will be assigned the expired at time when adding to the Pool.
voteKeepAliveAfter = time.Second * 30

// Votes in the Pool will be pruned periodically to remove useless ones.
Expand Down Expand Up @@ -49,8 +49,9 @@ func newVoteStore() *voteStore {
return s
}

// addVote will add a vote to the store.
// Be noted: no validation is conducted in this layer.
func (s *voteStore) addVote(vote *Vote) error {
func (s *voteStore) addVote(vote *Vote) {
eventHashStr := string(vote.EventHash[:])
pubKeyStr := string(vote.PubKey[:])
s.mtx.Lock()
Expand All @@ -61,15 +62,12 @@ func (s *voteStore) addVote(vote *Vote) error {
subM = make(map[string]*Vote)
s.voteMap[eventHashStr] = subM
}
if _, ok := subM[pubKeyStr]; !ok {
subM[pubKeyStr] = vote
s.queue.Insert(vote)
}

return nil
subM[pubKeyStr] = vote
s.queue.Insert(vote)
}

func (s *voteStore) getVotesByEventHash(eventHash []byte) ([]*Vote, error) {
// getVotesByEventHash will query events by event hash.
func (s *voteStore) getVotesByEventHash(eventHash []byte) []*Vote {
s.mtx.RLock()
defer s.mtx.RUnlock()

Expand All @@ -79,10 +77,11 @@ func (s *voteStore) getVotesByEventHash(eventHash []byte) ([]*Vote, error) {
votes = append(votes, v)
}
}
return votes, nil
return votes
}

func (s *voteStore) getAllVotes() ([]*Vote, error) {
// getAllVotes will return all votes in the store.
func (s *voteStore) getAllVotes() []*Vote {
s.mtx.RLock()
defer s.mtx.RUnlock()

Expand All @@ -92,9 +91,10 @@ func (s *voteStore) getAllVotes() ([]*Vote, error) {
votes = append(votes, v)
}
}
return votes, nil
return votes
}

// flushVotes will clear all votes in the store.
func (s *voteStore) flushVotes() {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -103,6 +103,7 @@ func (s *voteStore) flushVotes() {
s.queue = NewVoteQueue()
}

// pruneVotes will prune votes which are expired and return the pruned votes' keys.
func (s *voteStore) pruneVotes() []string {
keys := make([]string, 0)
current := &Vote{expireAt: time.Now()}
Expand Down Expand Up @@ -134,8 +135,8 @@ type Pool struct {
eventBus *types.EventBus // to subscribe validator update events and publish new added vote events
}

// NewVotePool creates a Pool, the init validators should be supplied.
func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *types.EventBus) (*Pool, error) {
// NewVotePool creates a Pool. The initial validators should be supplied.
func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *types.EventBus) *Pool {
eventTypes := []EventType{ToBscCrossChainEvent, FromBscCrossChainEvent, DataAvailabilityChallengeEvent}

ticker := time.NewTicker(pruneVoteInterval)
Expand All @@ -145,10 +146,7 @@ func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *typ
stores[et] = store
}

cache, err := lru.New(cacheVoteSize)
if err != nil {
panic(err)
}
cache, _ := lru.New(cacheVoteSize) // positive parameter will never return error

// set the initial validators
validatorVerifier := NewFromValidatorVerifier()
Expand All @@ -163,7 +161,7 @@ func NewVotePool(logger log.Logger, validators []*types.Validator, eventBus *typ
}
votePool.BaseService = *service.NewBaseService(logger, "VotePool", votePool)

return votePool, nil
return votePool
}

// OnStart implements Service.
Expand Down Expand Up @@ -193,24 +191,21 @@ func (p *Pool) AddVote(vote *Vote) error {
return errors.New("unsupported event type")
}

if ok := p.cache.Contains(vote.Key()); ok {
if ok = p.cache.Contains(vote.Key()); ok {
return nil
}

if err := p.validatorVerifier.Validate(vote); err != nil {
if err = p.validatorVerifier.Validate(vote); err != nil {
return err
}
if err := p.blsVerifier.Validate(vote); err != nil {
if err = p.blsVerifier.Validate(vote); err != nil {
return err
}

vote.expireAt = time.Now().Add(voteKeepAliveAfter)
err = store.addVote(vote)
if err != nil {
return err
}
err = p.eventBus.Publish(eventBusVotePoolUpdates, *vote)
if err != nil {
store.addVote(vote)

if err = p.eventBus.Publish(eventBusVotePoolUpdates, *vote); err != nil {
p.Logger.Error("Cannot publish vote pool event", "err", err.Error())
}
p.cache.Add(vote.Key(), struct{}{})
Expand All @@ -223,7 +218,7 @@ func (p *Pool) GetVotesByEventTypeAndHash(eventType EventType, eventHash []byte)
if !ok {
return nil, errors.New("unsupported event type")
}
return store.getVotesByEventHash(eventHash)
return store.getVotesByEventHash(eventHash), nil
}

// GetVotesByEventType implements VotePool.
Expand All @@ -232,7 +227,7 @@ func (p *Pool) GetVotesByEventType(eventType EventType) ([]*Vote, error) {
if !ok {
return nil, errors.New("unsupported event type")
}
return store.getAllVotes()
return store.getAllVotes(), nil
}

// FlushVotes implements VotePool.
Expand All @@ -250,7 +245,8 @@ func (p *Pool) validatorUpdateRoutine() {
}
sub, err := p.eventBus.Subscribe(context.Background(), "VotePoolService", types.EventQueryValidatorSetUpdates, eventBusSubscribeCap)
if err != nil {
panic(err)
p.Logger.Error("Cannot subscribe to validator set update event", "err", err.Error())
return
}
for {
select {
Expand Down
5 changes: 1 addition & 4 deletions votepool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ func makeVotePool() (blsCommon.SecretKey, *types.Validator, blsCommon.SecretKey,
panic(err)
}

votePool, err := NewVotePool(logger, vals, eventBus)
if err != nil {
panic(err)
}
votePool := NewVotePool(logger, vals, eventBus)
err = votePool.Start()
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion votepool/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
)

// VoteQueue represents a priority queue for Votes. The expiredAt filed of a Vote will be used as priority.
// VoteQueue represents a priority queue for Votes. The expiredAt field of a Vote will be used as priority.
type VoteQueue struct {
itemHeap *itemHeap
}
Expand Down
6 changes: 4 additions & 2 deletions votepool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,13 @@ func (voteR *Reactor) broadcastVotes(peer p2p.Peer) {
}
sub, err := voteR.eventBus.Subscribe(context.Background(), string(peer.ID()), eventVotePoolAdded, eventBusSubscribeCap)
if err != nil {
panic(err)
voteR.Logger.Error("Cannot subscribe to vote update event", "err", err.Error())
return
}
cache, ok := peer.Get(peerVoteCacheKey).(*lru.Cache)
if !ok { // this should not happen
panic(fmt.Sprintf("Peer %v has no cache state", peer))
voteR.Logger.Error(fmt.Sprintf("Peer %v has no cache state", peer))
return
}
for {
select {
Expand Down
5 changes: 1 addition & 4 deletions votepool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) ([]blsCommon.SecretKey, [
panic(err)
}

votePool, err := NewVotePool(logger, vals, eventBus)
if err != nil {
panic(err)
}
votePool := NewVotePool(logger, vals, eventBus)

eventBuses[i] = eventBus
votePools[i] = votePool
Expand Down

0 comments on commit 492cf85

Please sign in to comment.