From d8b58f4ca0db69480fabc2f3593fb5b9fa9d10cd Mon Sep 17 00:00:00 2001 From: Petar Dambovaliev Date: Sun, 10 Mar 2024 20:40:47 +0200 Subject: [PATCH] feat: thread safe state and additional fixes (#9) * save * save * save * save * save * save * save --- core/broadcast.go | 17 ++++----- core/messages.go | 4 +- core/mocks_test.go | 5 +++ core/state.go | 52 ++++++++++++++++++++++--- core/store.go | 4 +- core/tendermint.go | 89 +++++++++++++++++++++++++++++-------------- core/timeout.go | 5 +-- core/types.go | 13 ++++++- messages/collector.go | 8 ++++ 9 files changed, 145 insertions(+), 52 deletions(-) diff --git a/core/broadcast.go b/core/broadcast.go index be1b899b3cf..971b44b056f 100644 --- a/core/broadcast.go +++ b/core/broadcast.go @@ -6,11 +6,10 @@ import ( // buildProposalMessage builds a proposal message using the given proposal func (t *Tendermint) buildProposalMessage(proposal []byte) *types.ProposalMessage { - // TODO make thread safe var ( - height = t.state.view.Height - round = t.state.view.Round - validRound = t.state.validRound + height = t.state.LoadHeight() + round = t.state.LoadRound() + validRound = t.state.LoadValidRound() ) // Build the proposal message (assumes the node will sign it) @@ -32,10 +31,9 @@ func (t *Tendermint) buildProposalMessage(proposal []byte) *types.ProposalMessag // buildPrevoteMessage builds a prevote message using the given proposal identifier func (t *Tendermint) buildPrevoteMessage(id []byte) *types.PrevoteMessage { - // TODO make thread safe var ( - height = t.state.view.Height - round = t.state.view.Round + height = t.state.LoadHeight() + round = t.state.LoadRound() processID = t.node.ID() ) @@ -59,10 +57,9 @@ func (t *Tendermint) buildPrevoteMessage(id []byte) *types.PrevoteMessage { // //nolint:unused // Temporarily unused func (t *Tendermint) buildPrecommitMessage(id []byte) *types.PrecommitMessage { - // TODO make thread safe var ( - height = t.state.view.Height - round = t.state.view.Round + height = t.state.LoadHeight() + round = t.state.LoadRound() processID = t.node.ID() ) diff --git a/core/messages.go b/core/messages.go index 549891d21d6..7cc8afce882 100644 --- a/core/messages.go +++ b/core/messages.go @@ -85,8 +85,8 @@ func (t *Tendermint) verifyMessage(message message) error { var ( view = message.GetView() - currentHeight = t.state.view.GetHeight() // TODO make thread safe - currentRound = t.state.view.GetRound() // TODO make thread safe + currentHeight = t.state.LoadHeight() + currentRound = t.state.LoadRound() ) // Make sure the height is valid. diff --git a/core/mocks_test.go b/core/mocks_test.go index 500247e622a..7bf84e4a8e8 100644 --- a/core/mocks_test.go +++ b/core/mocks_test.go @@ -104,6 +104,11 @@ type mockVerifier struct { isValidatorFn isValidator } +func (m *mockVerifier) Quorum(msgs []Message) bool { + // TODO implement me + panic("implement me") +} + func (m *mockVerifier) IsProposer(id []byte, height, round uint64) bool { if m.isProposerFn != nil { return m.isProposerFn(id, height, round) diff --git a/core/state.go b/core/state.go index 91146661a55..a202fddca87 100644 --- a/core/state.go +++ b/core/state.go @@ -1,9 +1,13 @@ package core -import "github.com/gnolang/go-tendermint/messages/types" +import ( + "sync/atomic" + + "github.com/gnolang/go-tendermint/messages/types" +) // step is the current state step -type step uint8 +type step uint32 const ( propose step = iota @@ -24,6 +28,16 @@ func (n step) String() string { return "" } +func (n *step) Set(newStep step) { + atomic.SwapUint32((*uint32)(n), uint32(newStep)) +} + +func (n *step) Load() step { + s := atomic.LoadUint32((*uint32)(n)) + + return step(s) +} + // state holds information about the current consensus state // TODO make thread safe type state struct { @@ -33,7 +47,11 @@ type state struct { acceptedProposalID []byte lockedValue []byte - validValue []byte + + // no concurrent writes/reads + // no need sync primitive + // used in startRound() + validValue []byte lockedRound int64 validRound int64 @@ -41,9 +59,33 @@ type state struct { step step } +func (s *state) LoadHeight() uint64 { + return atomic.LoadUint64(&s.view.Height) +} + +func (s *state) LoadRound() uint64 { + return atomic.LoadUint64(&s.view.Round) +} + +func (s *state) LoadValidRound() int64 { + return atomic.LoadInt64(&s.validRound) +} + +func (s *state) LoadLockedRound() int64 { + return atomic.LoadInt64(&s.lockedRound) +} + +func (s *state) IncRound() { + atomic.AddUint64(&s.view.Round, 1) +} + +func (s *state) SetRound(r uint64) { + atomic.SwapUint64(&s.view.Round, r) +} + // newState creates a fresh state using the given view -func newState(view *types.View) *state { - return &state{ +func newState(view *types.View) state { + return state{ view: view, step: propose, acceptedProposal: nil, diff --git a/core/store.go b/core/store.go index 621f829c32b..c63daf2fac3 100644 --- a/core/store.go +++ b/core/store.go @@ -13,8 +13,8 @@ type store struct { } // newStore creates a new message store -func newStore() *store { - return &store{ +func newStore() store { + return store{ proposeMessages: messages.NewCollector[types.ProposalMessage](), prevoteMessages: messages.NewCollector[types.PrevoteMessage](), precommitMessages: messages.NewCollector[types.PrecommitMessage](), diff --git a/core/tendermint.go b/core/tendermint.go index 9d39188da9c..68c0c3ab863 100644 --- a/core/tendermint.go +++ b/core/tendermint.go @@ -2,8 +2,11 @@ package core import ( "context" + "log/slog" "sync" + "github.com/gnolang/go-tendermint/messages" + "github.com/gnolang/go-tendermint/messages/types" ) @@ -16,10 +19,10 @@ type Tendermint struct { signer Signer // state is the current Tendermint consensus state - state *state + state state // store is the message store - store *store + store store // roundExpired is the channel for signalizing // round change events (to the next round, from the current one) @@ -31,12 +34,16 @@ type Tendermint struct { // wg is the barrier for keeping all // parallel consensus processes synced wg sync.WaitGroup + + log slog.Logger } // RunSequence runs the Tendermint consensus sequence for a given height, // returning only when a proposal has been finalized (consensus reached), or // the context has been cancelled func (t *Tendermint) RunSequence(ctx context.Context, h uint64) []byte { + t.log.Debug("RunSequence", slog.Any("height", h), slog.Any("node", t.node.ID())) + // Initialize the state before starting the sequence t.state = newState(&types.View{ Height: h, @@ -57,26 +64,29 @@ func (t *Tendermint) RunSequence(ctx context.Context, h uint64) []byte { // Check if the proposal has been finalized if proposal == nil { + t.log.Info("RunSequence received empty proposal", slog.Any("height", h), slog.Any("round", t.state.LoadRound())) // 65: Function OnTimeoutPrecommit(height, round) : // 66: if height = hP ∧ round = roundP then // 67: StartRound(roundP + 1) - // TODO start NEXT round (view.Round + 1) + t.state.IncRound() continue } + t.log.Info("RunSequence: received\n", slog.Any("height", h), slog.Any("proposal", proposal)) + return proposal - case _ = <-t.watchForRoundJumps(ctxRound): //nolint:gosimple // Temporarily unassigned + case recvRound := <-t.watchForRoundJumps(ctxRound): //nolint:gosimple // Temporarily unassigned + t.log.Info("RunSequence", slog.Any("height", h), slog.Any("from_round", t.state.LoadRound()), slog.Any("to_round", recvRound)) teardown() - - // TODO start NEW round (that was received) + t.state.SetRound(recvRound) case <-t.roundExpired: + t.log.Info("RunSequence round expired: %v\n", slog.Any("height", h), slog.Any("round", t.state.LoadRound())) teardown() - - // TODO start NEXT round (view.Round + 1) + t.state.IncRound() case <-ctx.Done(): teardown() - // TODO log + t.log.Info("RunSequence done", slog.Any("height", h)) return nil } } @@ -112,23 +122,43 @@ func (t *Tendermint) watchForRoundJumps(ctx context.Context) <-chan uint64 { } for { + var majority bool + select { case <-ctx.Done(): return case getProposeFn := <-proposeCh: - // TODO count messages - _ = getProposeFn() + prpMsgs := getProposeFn() + msgs := make([]Message, 0) + + messages.ConvertToInterface(prpMsgs, func(m *types.ProposalMessage) { + msgs = append(msgs, m) + }) + + majority = t.verifier.Quorum(msgs) case getPrevoteFn := <-prevoteCh: - // TODO count messages - _ = getPrevoteFn() + prvMsgs := getPrevoteFn() + msgs := make([]Message, 0) + + messages.ConvertToInterface(prvMsgs, func(m *types.PrevoteMessage) { + msgs = append(msgs, m) + }) + + majority = t.verifier.Quorum(msgs) case getPrecommitFn := <-precommitCh: - // TODO count messages - _ = getPrecommitFn() + prcMsgs := getPrecommitFn() + msgs := make([]Message, 0) + + messages.ConvertToInterface(prcMsgs, func(m *types.PrecommitMessage) { + msgs = append(msgs, m) + }) + + majority = t.verifier.Quorum(msgs) } - // TODO check if the condition (F+1) is met + // check if the condition (F+1) is met // and signal the round jump - if false { + if majority { signalRoundJump(0) } } @@ -166,19 +196,23 @@ func (t *Tendermint) finalizeProposal(ctx context.Context) <-chan []byte { // the state machine is in full swing and // the runs the same flow for everyone (proposer / non-proposers) func (t *Tendermint) startRound(ctx context.Context) { - // TODO make thread safe + height := t.state.LoadHeight() + round := t.state.LoadRound() + // Check if the current process is the proposer for this view - if !t.verifier.IsProposer(t.node.ID(), t.state.view.Height, t.state.view.Round) { + if !t.verifier.IsProposer(t.node.ID(), height, round) { // The current process is NOT the proposer, only schedule a timeout // // 21: schedule OnTimeoutPropose(hP , roundP) to be executed after timeoutPropose(roundP) var ( callback = func() { - t.onTimeoutPropose(t.state.view.Round) + t.onTimeoutPropose(round) } - timeoutPropose = t.timeouts[propose].calculateTimeout(t.state.view.Round) + timeoutPropose = t.timeouts[propose].calculateTimeout(round) ) + t.log.Info("startRound scheduling a timeout", slog.Any("height", height), slog.Any("round", round), slog.Any("timeout", timeoutPropose.Milliseconds())) + t.scheduleTimeout(ctx, timeoutPropose, callback) return @@ -196,12 +230,13 @@ func (t *Tendermint) startRound(ctx context.Context) { // Check if a new proposal needs to be built if proposal == nil { + t.log.Info("RunSequence: Last valid proposal is nil. Building a proposal.", slog.Any("height", height), slog.Any("round", round)) // No previous valid value present, // build a new proposal. // // 17: else // 18: proposal ← getValue() - proposal = t.node.BuildProposal(t.state.view.Height) + proposal = t.node.BuildProposal(height) } // Build the propose message @@ -215,7 +250,6 @@ func (t *Tendermint) startRound(ctx context.Context) { // 19: broadcast t.broadcast.BroadcastProposal(proposeMessage) - // TODO make thread safe // Save the accepted proposal in the state. // NOTE: This is different from validValue / lockedValue, // since they require a 2F+1 quorum of specific messages @@ -231,17 +265,16 @@ func (t *Tendermint) startRound(ctx context.Context) { // Since the current process is the proposer, // it can directly move to the prevote state - // TODO make threads safe - // // 27/33: stepP ← prevote - t.state.step = prevote + t.state.step.Set(prevote) } // runStates runs the consensus states, depending on the current step func (t *Tendermint) runStates(ctx context.Context) []byte { for { - // TODO make thread safe - switch t.state.step { + currentStep := t.state.step.Load() + + switch currentStep { case propose: t.runPropose(ctx) case prevote: diff --git a/core/timeout.go b/core/timeout.go index c72f86e9e57..21081b97e5d 100644 --- a/core/timeout.go +++ b/core/timeout.go @@ -46,7 +46,6 @@ func (t *Tendermint) scheduleTimeout( // 59: broadcast // 60: stepP ← prevote func (t *Tendermint) onTimeoutPropose(round uint64) { - // TODO make thread safe var ( // TODO Evaluate if the round information is even required. // We cancel the top-level timeout context upon every round change, @@ -54,8 +53,8 @@ func (t *Tendermint) onTimeoutPropose(round uint64) { // Essentially, I believe the only param we do need to check is // the current state in the SM, since this method can be executed async when // the SM is in a different state - currentRound = t.state.view.Round - currentStep = t.state.step + currentRound = t.state.LoadRound() + currentStep = t.state.step.Load() ) // Make sure the timeout context is still valid diff --git a/core/types.go b/core/types.go index 2fd962f9ca2..1aa5e2f731e 100644 --- a/core/types.go +++ b/core/types.go @@ -12,13 +12,17 @@ type Signer interface { type Verifier interface { IsProposer(id []byte, height uint64, round uint64) bool IsValidator(from []byte) bool + Quorum(msgs []Message) bool } +// Node interface is an abstraction over a single entity that runs +// the consensus algorithm +// +// Hash must not modify the slice proposal, even temporarily. +// Implementations must not retain proposal. type Node interface { ID() []byte - Hash(proposal []byte) []byte - BuildProposal(height uint64) []byte } @@ -27,3 +31,8 @@ type Broadcast interface { BroadcastPrevote(message *types.PrevoteMessage) BroadcastPrecommit(message *types.PrecommitMessage) } + +type Message interface { + Verify() error + GetSender() []byte +} diff --git a/messages/collector.go b/messages/collector.go index f16dd3843f4..4ce7c82f5ab 100644 --- a/messages/collector.go +++ b/messages/collector.go @@ -13,6 +13,14 @@ type msgType interface { types.ProposalMessage | types.PrevoteMessage | types.PrecommitMessage } +// this is because Go doesn't support covariance on slices +// []*T -> []I does not work +func ConvertToInterface[T msgType](msgs []*T, convertFunc func(m *T)) { + for _, msg := range msgs { + convertFunc(msg) + } +} + type ( // collection are the actual received messages. // Maps a unique identifier -> their message (of a specific type) to avoid duplicates.