Skip to content

Commit

Permalink
add log to follower state
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Apr 4, 2023
1 parent 52c46c8 commit 31510b6
Show file tree
Hide file tree
Showing 19 changed files with 243 additions and 53 deletions.
10 changes: 9 additions & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,15 @@ func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilde
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
}

followerState, err := badgerState.NewFollowerState(state, node.Storage.Index, node.Storage.Payloads, node.Tracer, node.ProtocolEvents, blocktimer.DefaultBlockTimer)
followerState, err := badgerState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
state,
node.Storage.Index,
node.Storage.Payloads,
blocktimer.DefaultBlockTimer,
)
builder.FollowerState = followerState

return err
Expand Down
12 changes: 10 additions & 2 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,15 @@ func main() {
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
}
followerState, err = badgerState.NewFollowerState(state, node.Storage.Index, node.Storage.Payloads, node.Tracer, node.ProtocolEvents, blocktimer.DefaultBlockTimer)
followerState, err = badgerState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
state,
node.Storage.Index,
node.Storage.Payloads,
blocktimer.DefaultBlockTimer,
)
return err
}).
Module("transactions mempool", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -229,7 +237,7 @@ func main() {
return nil
}).
Component("machine account config validator", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
//@TODO use fallback logic for flowClient similar to DKG/QC contract clients
// @TODO use fallback logic for flowClient similar to DKG/QC contract clients
flowClient, err := common.FlowClient(flowClientConfigs[0])
if err != nil {
return nil, fmt.Errorf("failed to get flow client connection option for access node (0): %s %w", flowClientConfigs[0].AccessAddress, err)
Expand Down
14 changes: 12 additions & 2 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,17 @@ func main() {
return err
}

mutableState, err = badgerState.NewFullConsensusState(state, node.Storage.Index, node.Storage.Payloads, node.Tracer, node.ProtocolEvents, blockTimer, receiptValidator, sealValidator)
mutableState, err = badgerState.NewFullConsensusState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
state,
node.Storage.Index,
node.Storage.Payloads,
blockTimer,
receiptValidator,
sealValidator,
)
return err
}).
Module("random beacon key", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -377,7 +387,7 @@ func main() {
return nil
}).
Component("machine account config validator", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
//@TODO use fallback logic for flowClient similar to DKG/QC contract clients
// @TODO use fallback logic for flowClient similar to DKG/QC contract clients
flowClient, err := common.FlowClient(flowClientConfigs[0])
if err != nil {
return nil, fmt.Errorf("failed to get flow client connection option for access node (0): %s %w", flowClientConfigs[0].AccessAddress, err)
Expand Down
10 changes: 9 additions & 1 deletion cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,15 @@ func (exeNode *ExecutionNode) LoadMutableFollowerState(node *NodeConfig) error {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
}
var err error
exeNode.followerState, err = badgerState.NewFollowerState(bState, node.Storage.Index, node.Storage.Payloads, node.Tracer, node.ProtocolEvents, blocktimer.DefaultBlockTimer)
exeNode.followerState, err = badgerState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
bState,
node.Storage.Index,
node.Storage.Payloads,
blocktimer.DefaultBlockTimer,
)
return err
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,15 @@ func (builder *ObserverServiceBuilder) buildFollowerState() *ObserverServiceBuil
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
}

followerState, err := badgerState.NewFollowerState(state, node.Storage.Index, node.Storage.Payloads, node.Tracer, node.ProtocolEvents, blocktimer.DefaultBlockTimer)
followerState, err := badgerState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
state,
node.Storage.Index,
node.Storage.Payloads,
blocktimer.DefaultBlockTimer,
)
builder.FollowerState = followerState

return err
Expand Down
10 changes: 9 additions & 1 deletion cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,15 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
if !ok {
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
}
followerState, err = badgerState.NewFollowerState(state, node.Storage.Index, node.Storage.Payloads, node.Tracer, node.ProtocolEvents, blocktimer.DefaultBlockTimer)
followerState, err = badgerState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
state,
node.Storage.Index,
node.Storage.Payloads,
blocktimer.DefaultBlockTimer,
)
return err
}).
Module("verification metrics", func(node *NodeConfig) error {
Expand Down
28 changes: 19 additions & 9 deletions consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ func createNode(
statusesDB := storage.NewEpochStatuses(metricsCollector, db)
consumer := events.NewDistributor()

localID := identity.ID()

// log with node index an ID
log := unittest.Logger().With().
Int("index", index).
Hex("node_id", localID[:]).
Logger()

state, err := bprotocol.Bootstrap(
metricsCollector,
db,
Expand All @@ -395,24 +403,26 @@ func createNode(
blockTimer, err := blocktimer.NewBlockTimer(1*time.Millisecond, 90*time.Second)
require.NoError(t, err)

fullState, err := bprotocol.NewFullConsensusState(state, indexDB, payloadsDB, tracer, consumer, blockTimer, util.MockReceiptValidator(), util.MockSealValidator(sealsDB))
fullState, err := bprotocol.NewFullConsensusState(
log,
tracer,
consumer,
state,
indexDB,
payloadsDB,
blockTimer,
util.MockReceiptValidator(),
util.MockSealValidator(sealsDB),
)
require.NoError(t, err)

localID := identity.ID()

node := &Node{
db: db,
dbDir: dbDir,
index: index,
id: identity,
}

// log with node index an ID
log := unittest.Logger().With().
Int("index", index).
Hex("node_id", localID[:]).
Logger()

stopper.AddNode(node)

counterConsumer := &CounterConsumer{
Expand Down
11 changes: 10 additions & 1 deletion engine/common/follower/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestFollowerHappyPath(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
log := unittest.Logger()
consumer := events.NewNoop()
all := storageutil.StorageLayer(t, db)

Expand All @@ -57,7 +58,15 @@ func TestFollowerHappyPath(t *testing.T) {
mockTimer := util.MockBlockTimer()

// create follower state
followerState, err := pbadger.NewFollowerState(state, all.Index, all.Payloads, tracer, consumer, mockTimer)
followerState, err := pbadger.NewFollowerState(
log,
tracer,
consumer,
state,
all.Index,
all.Payloads,
mockTimer,
)
require.NoError(t, err)
finalizer := moduleconsensus.NewFinalizer(db, all.Headers, followerState, tracer)
rootHeader, err := rootSnapshot.Head()
Expand Down
27 changes: 23 additions & 4 deletions engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func GenericNodeFromParticipants(t testing.TB, hub *stub.Hub, identity *flow.Ide

// creates state fixture and bootstrap it.
rootSnapshot := unittest.RootSnapshotFixture(participants)
stateFixture := CompleteStateFixture(t, metrics, tracer, rootSnapshot)
stateFixture := CompleteStateFixture(t, log, metrics, tracer, rootSnapshot)

require.NoError(t, err)
for _, option := range options {
Expand All @@ -146,7 +146,7 @@ func GenericNode(
Logger()
metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
stateFixture := CompleteStateFixture(t, metrics, tracer, root)
stateFixture := CompleteStateFixture(t, log, metrics, tracer, root)

head, err := root.Head()
require.NoError(t, err)
Expand Down Expand Up @@ -220,6 +220,7 @@ func LocalFixture(t testing.TB, identity *flow.Identity) module.Local {
// CompleteStateFixture is a test helper that creates, bootstraps, and returns a StateFixture for sake of unit testing.
func CompleteStateFixture(
t testing.TB,
log zerolog.Logger,
metric *metrics.NoopCollector,
tracer module.Tracer,
rootSnapshot protocol.Snapshot,
Expand Down Expand Up @@ -248,7 +249,17 @@ func CompleteStateFixture(
)
require.NoError(t, err)

mutableState, err := badgerstate.NewFullConsensusState(state, s.Index, s.Payloads, tracer, consumer, util.MockBlockTimer(), util.MockReceiptValidator(), util.MockSealValidator(s.Seals))
mutableState, err := badgerstate.NewFullConsensusState(
log,
tracer,
consumer,
state,
s.Index,
s.Payloads,
util.MockBlockTimer(),
util.MockReceiptValidator(),
util.MockSealValidator(s.Seals),
)
require.NoError(t, err)

return &testmock.StateFixture{
Expand Down Expand Up @@ -542,7 +553,15 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit
protoState, ok := node.State.(*badgerstate.ParticipantState)
require.True(t, ok)

followerState, err := badgerstate.NewFollowerState(protoState.State, node.Index, node.Payloads, node.Tracer, node.ProtocolEvents, blocktimer.DefaultBlockTimer)
followerState, err := badgerstate.NewFollowerState(
node.Log,
node.Tracer,
node.ProtocolEvents,
protoState.State,
node.Index,
node.Payloads,
blocktimer.DefaultBlockTimer,
)
require.NoError(t, err)

dbDir := unittest.TempDir(t)
Expand Down
3 changes: 2 additions & 1 deletion engine/verification/assigner/blockconsumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ func withConsumer(
processedHeight := bstorage.NewConsumerProgress(db, module.ConsumeProgressVerificationBlockHeight)
collector := &metrics.NoopCollector{}
tracer := trace.NewNoopTracer()
log := unittest.Logger()
participants := unittest.IdentityListFixture(5, unittest.WithAllRoles())
rootSnapshot := unittest.RootSnapshotFixture(participants)
s := testutil.CompleteStateFixture(t, collector, tracer, rootSnapshot)
s := testutil.CompleteStateFixture(t, log, collector, tracer, rootSnapshot)

engine := &mockBlockProcessor{
process: process,
Expand Down
19 changes: 14 additions & 5 deletions engine/verification/utils/unittest/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,10 @@ func withConsumers(t *testing.T,
ops ...CompleteExecutionReceiptBuilderOpt) {

tracer := trace.NewNoopTracer()
log := zerolog.Nop()

// bootstraps system with one node of each role.
s, verID, participants := bootstrapSystem(t, tracer, authorized)
s, verID, participants := bootstrapSystem(t, log, tracer, authorized)
exeID := participants.Filter(filter.HasRole(flow.RoleExecution))[0]
conID := participants.Filter(filter.HasRole(flow.RoleConsensus))[0]
// generates a chain of blocks in the form of root <- R1 <- C1 <- R2 <- C2 <- ... where Rs are distinct reference
Expand Down Expand Up @@ -601,17 +602,25 @@ func withConsumers(t *testing.T,
// Otherwise, it bootstraps the verification node as unauthorized in current epoch.
//
// As the return values, it returns the state, local module, and list of identities in system.
func bootstrapSystem(t *testing.T, tracer module.Tracer, authorized bool) (*enginemock.StateFixture, *flow.Identity,
flow.IdentityList) {
func bootstrapSystem(
t *testing.T,
log zerolog.Logger,
tracer module.Tracer,
authorized bool,
) (
*enginemock.StateFixture,
*flow.Identity,
flow.IdentityList,
) {
// creates identities to bootstrap system with
verID := unittest.IdentityFixture(unittest.WithRole(flow.RoleVerification))
identities := unittest.CompleteIdentitySet(verID)
identities = append(identities, unittest.IdentityFixture(unittest.WithRole(flow.RoleExecution))) // adds extra execution node

// bootstraps the system
collector := &metrics.NoopCollector{}
rootSnapshot := unittest.RootSnapshotFixture(identities)
stateFixture := testutil.CompleteStateFixture(t, collector, tracer, rootSnapshot)
stateFixture := testutil.CompleteStateFixture(t, log, collector, tracer, rootSnapshot)
// bootstraps the system

if !authorized {
// creates a new verification node identity that is unauthorized for this epoch
Expand Down
10 changes: 9 additions & 1 deletion follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,15 @@ func (builder *FollowerServiceBuilder) buildFollowerState() *FollowerServiceBuil
return fmt.Errorf("only implementations of type badger.State are currently supported but read-only state has type %T", node.State)
}

followerState, err := badgerState.NewFollowerState(state, node.Storage.Index, node.Storage.Payloads, node.Tracer, node.ProtocolEvents, blocktimer.DefaultBlockTimer)
followerState, err := badgerState.NewFollowerState(
node.Logger,
node.Tracer,
node.ProtocolEvents,
state,
node.Storage.Index,
node.Storage.Payloads,
blocktimer.DefaultBlockTimer,
)
builder.FollowerState = followerState

return err
Expand Down
3 changes: 2 additions & 1 deletion insecure/wintermute/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package wintermute
import (
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/engine/testutil"
Expand Down Expand Up @@ -179,7 +180,7 @@ func bootstrapWintermuteFlowSystem(t *testing.T) (*enginemock.StateFixture, flow

// bootstraps the system
rootSnapshot := unittest.RootSnapshotFixture(identities)
stateFixture := testutil.CompleteStateFixture(t, metrics.NewNoopCollector(), trace.NewNoopTracer(), rootSnapshot)
stateFixture := testutil.CompleteStateFixture(t, zerolog.Nop(), metrics.NewNoopCollector(), trace.NewNoopTracer(), rootSnapshot)

return stateFixture, identities, append(corruptedEnIds, corruptedVnIds...).NodeIDs()
}
Expand Down
12 changes: 11 additions & 1 deletion module/builder/collection/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/dgraph-io/badger/v2"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -74,6 +75,7 @@ func (suite *BuilderSuite) SetupTest() {

metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
log := zerolog.Nop()
all := sutil.StorageLayer(suite.T(), suite.db)
consumer := events.NewNoop()
suite.headers = all.Headers
Expand Down Expand Up @@ -102,7 +104,15 @@ func (suite *BuilderSuite) SetupTest() {
state, err := pbadger.Bootstrap(metrics, suite.db, all.Headers, all.Seals, all.Results, all.Blocks, all.QuorumCertificates, all.Setups, all.EpochCommits, all.Statuses, rootSnapshot)
require.NoError(suite.T(), err)

suite.protoState, err = pbadger.NewFollowerState(state, all.Index, all.Payloads, tracer, consumer, util.MockBlockTimer())
suite.protoState, err = pbadger.NewFollowerState(
log,
tracer,
consumer,
state,
all.Index,
all.Payloads,
util.MockBlockTimer(),
)
require.NoError(suite.T(), err)

// add some transactions to transaction pool
Expand Down
3 changes: 2 additions & 1 deletion module/jobqueue/finalized_block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func withReader(

collector := &metrics.NoopCollector{}
tracer := trace.NewNoopTracer()
log := unittest.Logger()
participants := unittest.IdentityListFixture(5, unittest.WithAllRoles())
rootSnapshot := unittest.RootSnapshotFixture(participants)
s := testutil.CompleteStateFixture(t, collector, tracer, rootSnapshot)
s := testutil.CompleteStateFixture(t, log, collector, tracer, rootSnapshot)

reader := jobqueue.NewFinalizedBlockReader(s.State, s.Storage.Blocks)

Expand Down
Loading

0 comments on commit 31510b6

Please sign in to comment.