Skip to content

Commit

Permalink
feat: thread safe state and additional fixes (#9)
Browse files Browse the repository at this point in the history
* save

* save

* save

* save

* save

* save

* save
  • Loading branch information
petar-dambovaliev authored Mar 10, 2024
1 parent 81b2fc3 commit d8b58f4
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 52 deletions.
17 changes: 7 additions & 10 deletions core/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (

// buildProposalMessage builds a proposal message using the given proposal
func (t *Tendermint) buildProposalMessage(proposal []byte) *types.ProposalMessage {
// TODO make thread safe
var (
height = t.state.view.Height
round = t.state.view.Round
validRound = t.state.validRound
height = t.state.LoadHeight()
round = t.state.LoadRound()
validRound = t.state.LoadValidRound()
)

// Build the proposal message (assumes the node will sign it)
Expand All @@ -32,10 +31,9 @@ func (t *Tendermint) buildProposalMessage(proposal []byte) *types.ProposalMessag

// buildPrevoteMessage builds a prevote message using the given proposal identifier
func (t *Tendermint) buildPrevoteMessage(id []byte) *types.PrevoteMessage {
// TODO make thread safe
var (
height = t.state.view.Height
round = t.state.view.Round
height = t.state.LoadHeight()
round = t.state.LoadRound()

processID = t.node.ID()
)
Expand All @@ -59,10 +57,9 @@ func (t *Tendermint) buildPrevoteMessage(id []byte) *types.PrevoteMessage {
//
//nolint:unused // Temporarily unused
func (t *Tendermint) buildPrecommitMessage(id []byte) *types.PrecommitMessage {
// TODO make thread safe
var (
height = t.state.view.Height
round = t.state.view.Round
height = t.state.LoadHeight()
round = t.state.LoadRound()

processID = t.node.ID()
)
Expand Down
4 changes: 2 additions & 2 deletions core/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func (t *Tendermint) verifyMessage(message message) error {
var (
view = message.GetView()

currentHeight = t.state.view.GetHeight() // TODO make thread safe
currentRound = t.state.view.GetRound() // TODO make thread safe
currentHeight = t.state.LoadHeight()
currentRound = t.state.LoadRound()
)

// Make sure the height is valid.
Expand Down
5 changes: 5 additions & 0 deletions core/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ type mockVerifier struct {
isValidatorFn isValidator
}

func (m *mockVerifier) Quorum(msgs []Message) bool {
// TODO implement me
panic("implement me")
}

func (m *mockVerifier) IsProposer(id []byte, height, round uint64) bool {
if m.isProposerFn != nil {
return m.isProposerFn(id, height, round)
Expand Down
52 changes: 47 additions & 5 deletions core/state.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package core

import "github.com/gnolang/go-tendermint/messages/types"
import (
"sync/atomic"

"github.com/gnolang/go-tendermint/messages/types"
)

// step is the current state step
type step uint8
type step uint32

const (
propose step = iota
Expand All @@ -24,6 +28,16 @@ func (n step) String() string {
return ""
}

func (n *step) Set(newStep step) {
atomic.SwapUint32((*uint32)(n), uint32(newStep))
}

func (n *step) Load() step {
s := atomic.LoadUint32((*uint32)(n))

return step(s)
}

// state holds information about the current consensus state
// TODO make thread safe
type state struct {
Expand All @@ -33,17 +47,45 @@ type state struct {
acceptedProposalID []byte

lockedValue []byte
validValue []byte

// no concurrent writes/reads
// no need sync primitive
// used in startRound()
validValue []byte

lockedRound int64
validRound int64

step step
}

func (s *state) LoadHeight() uint64 {
return atomic.LoadUint64(&s.view.Height)
}

func (s *state) LoadRound() uint64 {
return atomic.LoadUint64(&s.view.Round)
}

func (s *state) LoadValidRound() int64 {
return atomic.LoadInt64(&s.validRound)
}

func (s *state) LoadLockedRound() int64 {
return atomic.LoadInt64(&s.lockedRound)
}

func (s *state) IncRound() {
atomic.AddUint64(&s.view.Round, 1)
}

func (s *state) SetRound(r uint64) {
atomic.SwapUint64(&s.view.Round, r)
}

// newState creates a fresh state using the given view
func newState(view *types.View) *state {
return &state{
func newState(view *types.View) state {
return state{
view: view,
step: propose,
acceptedProposal: nil,
Expand Down
4 changes: 2 additions & 2 deletions core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ type store struct {
}

// newStore creates a new message store
func newStore() *store {
return &store{
func newStore() store {
return store{
proposeMessages: messages.NewCollector[types.ProposalMessage](),
prevoteMessages: messages.NewCollector[types.PrevoteMessage](),
precommitMessages: messages.NewCollector[types.PrecommitMessage](),
Expand Down
89 changes: 61 additions & 28 deletions core/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package core

import (
"context"
"log/slog"
"sync"

"github.com/gnolang/go-tendermint/messages"

"github.com/gnolang/go-tendermint/messages/types"
)

Expand All @@ -16,10 +19,10 @@ type Tendermint struct {
signer Signer

// state is the current Tendermint consensus state
state *state
state state

// store is the message store
store *store
store store

// roundExpired is the channel for signalizing
// round change events (to the next round, from the current one)
Expand All @@ -31,12 +34,16 @@ type Tendermint struct {
// wg is the barrier for keeping all
// parallel consensus processes synced
wg sync.WaitGroup

log slog.Logger
}

// RunSequence runs the Tendermint consensus sequence for a given height,
// returning only when a proposal has been finalized (consensus reached), or
// the context has been cancelled
func (t *Tendermint) RunSequence(ctx context.Context, h uint64) []byte {
t.log.Debug("RunSequence", slog.Any("height", h), slog.Any("node", t.node.ID()))

// Initialize the state before starting the sequence
t.state = newState(&types.View{
Height: h,
Expand All @@ -57,26 +64,29 @@ func (t *Tendermint) RunSequence(ctx context.Context, h uint64) []byte {

// Check if the proposal has been finalized
if proposal == nil {
t.log.Info("RunSequence received empty proposal", slog.Any("height", h), slog.Any("round", t.state.LoadRound()))
// 65: Function OnTimeoutPrecommit(height, round) :
// 66: if height = hP ∧ round = roundP then
// 67: StartRound(roundP + 1)
// TODO start NEXT round (view.Round + 1)
t.state.IncRound()
continue
}

t.log.Info("RunSequence: received\n", slog.Any("height", h), slog.Any("proposal", proposal))

return proposal
case _ = <-t.watchForRoundJumps(ctxRound): //nolint:gosimple // Temporarily unassigned
case recvRound := <-t.watchForRoundJumps(ctxRound): //nolint:gosimple // Temporarily unassigned
t.log.Info("RunSequence", slog.Any("height", h), slog.Any("from_round", t.state.LoadRound()), slog.Any("to_round", recvRound))
teardown()

// TODO start NEW round (that was received)
t.state.SetRound(recvRound)
case <-t.roundExpired:
t.log.Info("RunSequence round expired: %v\n", slog.Any("height", h), slog.Any("round", t.state.LoadRound()))
teardown()

// TODO start NEXT round (view.Round + 1)
t.state.IncRound()
case <-ctx.Done():
teardown()

// TODO log
t.log.Info("RunSequence done", slog.Any("height", h))
return nil
}
}
Expand Down Expand Up @@ -112,23 +122,43 @@ func (t *Tendermint) watchForRoundJumps(ctx context.Context) <-chan uint64 {
}

for {
var majority bool

select {
case <-ctx.Done():
return
case getProposeFn := <-proposeCh:
// TODO count messages
_ = getProposeFn()
prpMsgs := getProposeFn()
msgs := make([]Message, 0)

messages.ConvertToInterface(prpMsgs, func(m *types.ProposalMessage) {
msgs = append(msgs, m)
})

majority = t.verifier.Quorum(msgs)
case getPrevoteFn := <-prevoteCh:
// TODO count messages
_ = getPrevoteFn()
prvMsgs := getPrevoteFn()
msgs := make([]Message, 0)

messages.ConvertToInterface(prvMsgs, func(m *types.PrevoteMessage) {
msgs = append(msgs, m)
})

majority = t.verifier.Quorum(msgs)
case getPrecommitFn := <-precommitCh:
// TODO count messages
_ = getPrecommitFn()
prcMsgs := getPrecommitFn()
msgs := make([]Message, 0)

messages.ConvertToInterface(prcMsgs, func(m *types.PrecommitMessage) {
msgs = append(msgs, m)
})

majority = t.verifier.Quorum(msgs)
}

// TODO check if the condition (F+1) is met
// check if the condition (F+1) is met
// and signal the round jump
if false {
if majority {
signalRoundJump(0)
}
}
Expand Down Expand Up @@ -166,19 +196,23 @@ func (t *Tendermint) finalizeProposal(ctx context.Context) <-chan []byte {
// the state machine is in full swing and
// the runs the same flow for everyone (proposer / non-proposers)
func (t *Tendermint) startRound(ctx context.Context) {
// TODO make thread safe
height := t.state.LoadHeight()
round := t.state.LoadRound()

// Check if the current process is the proposer for this view
if !t.verifier.IsProposer(t.node.ID(), t.state.view.Height, t.state.view.Round) {
if !t.verifier.IsProposer(t.node.ID(), height, round) {
// The current process is NOT the proposer, only schedule a timeout
//
// 21: schedule OnTimeoutPropose(hP , roundP) to be executed after timeoutPropose(roundP)
var (
callback = func() {
t.onTimeoutPropose(t.state.view.Round)
t.onTimeoutPropose(round)
}
timeoutPropose = t.timeouts[propose].calculateTimeout(t.state.view.Round)
timeoutPropose = t.timeouts[propose].calculateTimeout(round)
)

t.log.Info("startRound scheduling a timeout", slog.Any("height", height), slog.Any("round", round), slog.Any("timeout", timeoutPropose.Milliseconds()))

t.scheduleTimeout(ctx, timeoutPropose, callback)

return
Expand All @@ -196,12 +230,13 @@ func (t *Tendermint) startRound(ctx context.Context) {

// Check if a new proposal needs to be built
if proposal == nil {
t.log.Info("RunSequence: Last valid proposal is nil. Building a proposal.", slog.Any("height", height), slog.Any("round", round))
// No previous valid value present,
// build a new proposal.
//
// 17: else
// 18: proposal ← getValue()
proposal = t.node.BuildProposal(t.state.view.Height)
proposal = t.node.BuildProposal(height)
}

// Build the propose message
Expand All @@ -215,7 +250,6 @@ func (t *Tendermint) startRound(ctx context.Context) {
// 19: broadcast <PROPOSAL, hp, roundP, proposal, validRoundP>
t.broadcast.BroadcastProposal(proposeMessage)

// TODO make thread safe
// Save the accepted proposal in the state.
// NOTE: This is different from validValue / lockedValue,
// since they require a 2F+1 quorum of specific messages
Expand All @@ -231,17 +265,16 @@ func (t *Tendermint) startRound(ctx context.Context) {

// Since the current process is the proposer,
// it can directly move to the prevote state
// TODO make threads safe
//
// 27/33: stepP ← prevote
t.state.step = prevote
t.state.step.Set(prevote)
}

// runStates runs the consensus states, depending on the current step
func (t *Tendermint) runStates(ctx context.Context) []byte {
for {
// TODO make thread safe
switch t.state.step {
currentStep := t.state.step.Load()

switch currentStep {
case propose:
t.runPropose(ctx)
case prevote:
Expand Down
5 changes: 2 additions & 3 deletions core/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,15 @@ func (t *Tendermint) scheduleTimeout(
// 59: broadcast <PREVOTE, hP, roundP, nil>
// 60: stepP ← prevote
func (t *Tendermint) onTimeoutPropose(round uint64) {
// TODO make thread safe
var (
// TODO Evaluate if the round information is even required.
// We cancel the top-level timeout context upon every round change,
// so this condition that the round != currentRound will always be false.
// Essentially, I believe the only param we do need to check is
// the current state in the SM, since this method can be executed async when
// the SM is in a different state
currentRound = t.state.view.Round
currentStep = t.state.step
currentRound = t.state.LoadRound()
currentStep = t.state.step.Load()
)

// Make sure the timeout context is still valid
Expand Down
Loading

0 comments on commit d8b58f4

Please sign in to comment.