diff --git a/consensus/hotstuff/votecollector/staking_vote_processor_test.go b/consensus/hotstuff/votecollector/staking_vote_processor_test.go index d82059b3405..082aee074e0 100644 --- a/consensus/hotstuff/votecollector/staking_vote_processor_test.go +++ b/consensus/hotstuff/votecollector/staking_vote_processor_test.go @@ -268,9 +268,7 @@ func TestStakingVoteProcessorV2_BuildVerifyQC(t *testing.T) { }).Sort(order.Canonical) leader := stakingSigners[0] - - block := helper.MakeBlock(helper.WithBlockView(view), - helper.WithBlockProposer(leader.NodeID)) + block := helper.MakeBlock(helper.WithBlockView(view), helper.WithBlockProposer(leader.NodeID)) committee := &mockhotstuff.DynamicCommittee{} committee.On("IdentitiesByEpoch", block.View).Return(stakingSigners.ToSkeleton(), nil) diff --git a/model/flow/epoch.go b/model/flow/epoch.go index 6d075ff4826..e2373b1b4af 100644 --- a/model/flow/epoch.go +++ b/model/flow/epoch.go @@ -434,6 +434,11 @@ type EventIDs struct { CommitID Identifier } +// ID returns hash of the event IDs. +func (e *EventIDs) ID() Identifier { + return MakeID(e) +} + func NewEpochStatus(previousSetup, previousCommit, currentSetup, currentCommit, nextSetup, nextCommit Identifier) (*EpochStatus, error) { status := &EpochStatus{ PreviousEpoch: EventIDs{ diff --git a/model/flow/identity.go b/model/flow/identity.go index 185de5026c0..eeedbb4aa18 100644 --- a/model/flow/identity.go +++ b/model/flow/identity.go @@ -570,7 +570,8 @@ func (il IdentityList) SamplePct(pct float64) (IdentityList, error) { // Union returns a new identity list containing every identity that occurs in // either `il`, or `other`, or both. There are no duplicates in the output, // where duplicates are identities with the same node ID. -// The returned IdentityList is sorted +// Receiver `il` and/or method input `other` can be nil or empty. +// The returned IdentityList is sorted in canonical order. func (il IdentityList) Union(other IdentityList) IdentityList { maxLen := len(il) + len(other) diff --git a/model/flow/protocol_state.go b/model/flow/protocol_state.go new file mode 100644 index 00000000000..d3eb0316f84 --- /dev/null +++ b/model/flow/protocol_state.go @@ -0,0 +1,230 @@ +package flow + +import "fmt" + +// DynamicIdentityEntry encapsulates nodeID and dynamic portion of identity. +type DynamicIdentityEntry struct { + NodeID Identifier + Dynamic DynamicIdentity +} + +type DynamicIdentityEntryList []*DynamicIdentityEntry + +// ProtocolStateEntry represents a snapshot of the identity table (i.e. the set of all notes authorized to +// be part of the network) at some point in time. It allows to reconstruct the state of identity table using +// epoch setup events and dynamic identities. It tracks attempts of invalid state transitions. +// It also holds information about the next epoch, if it has been already committed. +// This structure is used to persist protocol state in the database. +// +// Note that the current implementation does not store the identity table directly. Instead, we store +// the original events that constituted the _initial_ identity table at the beginning of the epoch +// plus some modifiers. We intend to restructure this code soon. +// TODO: https://github.com/onflow/flow-go/issues/4649 +type ProtocolStateEntry struct { + // setup and commit event IDs for current epoch. + CurrentEpochEventIDs EventIDs + // setup and commit event IDs for previous epoch. + PreviousEpochEventIDs EventIDs + // Part of identity table that can be changed during the epoch. + // Always sorted in canonical order. + Identities DynamicIdentityEntryList + // InvalidStateTransitionAttempted encodes whether an invalid state transition + // has been detected in this fork. When this happens, epoch fallback is triggered + // AFTER the fork is finalized. + InvalidStateTransitionAttempted bool + // NextEpochProtocolState describes protocol state of the next epoch + NextEpochProtocolState *ProtocolStateEntry +} + +// RichProtocolStateEntry is a ProtocolStateEntry which has additional fields that are cached +// from storage layer for convenience. +// Using this structure instead of ProtocolStateEntry allows us to avoid querying +// the database for epoch setups and commits and full identity table. +// It holds several invariants, such as: +// - CurrentEpochSetup and CurrentEpochCommit are for the same epoch. Never nil. +// - PreviousEpochSetup and PreviousEpochCommit are for the same epoch. Can be nil. +// - Identities is a full identity table for the current epoch. +// Identities are sorted in canonical order. Without duplicates. Never nil. +// - NextEpochProtocolState is a protocol state for the next epoch. Can be nil. +type RichProtocolStateEntry struct { + *ProtocolStateEntry + + CurrentEpochSetup *EpochSetup + CurrentEpochCommit *EpochCommit + PreviousEpochSetup *EpochSetup + PreviousEpochCommit *EpochCommit + Identities IdentityList + + NextEpochProtocolState *RichProtocolStateEntry +} + +// NewRichProtocolStateEntry constructs a rich protocol state entry from a protocol state entry and additional data. +// No errors are expected during normal operation. +func NewRichProtocolStateEntry( + protocolState *ProtocolStateEntry, + previousEpochSetup *EpochSetup, + previousEpochCommit *EpochCommit, + currentEpochSetup *EpochSetup, + currentEpochCommit *EpochCommit, + nextEpochSetup *EpochSetup, + nextEpochCommit *EpochCommit, +) (*RichProtocolStateEntry, error) { + result := &RichProtocolStateEntry{ + ProtocolStateEntry: protocolState, + CurrentEpochSetup: currentEpochSetup, + CurrentEpochCommit: currentEpochCommit, + PreviousEpochSetup: previousEpochSetup, + PreviousEpochCommit: previousEpochCommit, + Identities: nil, + NextEpochProtocolState: nil, + } + + var err error + // if next epoch has been already committed, fill in data for it as well. + if protocolState.NextEpochProtocolState != nil { + // sanity check consistency of input data + if protocolState.NextEpochProtocolState.CurrentEpochEventIDs.SetupID != nextEpochSetup.ID() { + return nil, fmt.Errorf("inconsistent EpochSetup for constucting RichProtocolStateEntry, next protocol state states ID %v while input event has ID %v", + protocolState.NextEpochProtocolState.CurrentEpochEventIDs.SetupID, nextEpochSetup.ID()) + } + if protocolState.NextEpochProtocolState.CurrentEpochEventIDs.CommitID != ZeroID { + if protocolState.NextEpochProtocolState.CurrentEpochEventIDs.CommitID != nextEpochCommit.ID() { + return nil, fmt.Errorf("inconsistent EpochCommit for constucting RichProtocolStateEntry, next protocol state states ID %v while input event has ID %v", + protocolState.NextEpochProtocolState.CurrentEpochEventIDs.CommitID, nextEpochCommit.ID()) + } + } + + // if next epoch is available, it means that we have observed epoch setup event and we are not anymore in staking phase, + // so we need to build the identity table using current and next epoch setup events. + // so we need to build the identity table using current and next epoch setup events. + result.Identities, err = buildIdentityTable( + protocolState.Identities, + currentEpochSetup.Participants, + nextEpochSetup.Participants, + ) + if err != nil { + return nil, fmt.Errorf("could not build identity table for setup/commit phase: %w", err) + } + + nextEpochProtocolState := protocolState.NextEpochProtocolState + nextEpochIdentityTable, err := buildIdentityTable( + nextEpochProtocolState.Identities, + nextEpochSetup.Participants, + currentEpochSetup.Participants, + ) + if err != nil { + return nil, fmt.Errorf("could not build next epoch identity table: %w", err) + } + + // fill identities for next epoch + result.NextEpochProtocolState = &RichProtocolStateEntry{ + ProtocolStateEntry: nextEpochProtocolState, + CurrentEpochSetup: nextEpochSetup, + CurrentEpochCommit: nextEpochCommit, + PreviousEpochSetup: result.CurrentEpochSetup, // previous epoch setup is current epoch setup + PreviousEpochCommit: result.CurrentEpochCommit, // previous epoch setup is current epoch setup + Identities: nextEpochIdentityTable, + NextEpochProtocolState: nil, // always nil + } + } else { + // if next epoch is not yet created, it means that we are in staking phase, + // so we need to build the identity table using previous and current epoch setup events. + var otherIdentities IdentityList + if previousEpochSetup != nil { + otherIdentities = previousEpochSetup.Participants + } + result.Identities, err = buildIdentityTable( + protocolState.Identities, + currentEpochSetup.Participants, + otherIdentities, + ) + if err != nil { + return nil, fmt.Errorf("could not build identity table for staking phase: %w", err) + } + } + + return result, nil +} + +// ID returns hash of entry by hashing all fields. +func (e *ProtocolStateEntry) ID() Identifier { + if e == nil { + return ZeroID + } + body := struct { + CurrentEpochEventIDs Identifier + PreviousEpochEventIDs Identifier + Identities DynamicIdentityEntryList + InvalidStateTransitionAttempted bool + NextEpochProtocolStateID Identifier + }{ + CurrentEpochEventIDs: e.CurrentEpochEventIDs.ID(), + PreviousEpochEventIDs: e.PreviousEpochEventIDs.ID(), + Identities: e.Identities, + InvalidStateTransitionAttempted: e.InvalidStateTransitionAttempted, + NextEpochProtocolStateID: e.NextEpochProtocolState.ID(), + } + return MakeID(body) +} + +// Sorted returns whether the list is sorted by the input ordering. +func (ll DynamicIdentityEntryList) Sorted(less IdentifierOrder) bool { + for i := 0; i < len(ll)-1; i++ { + a := ll[i] + b := ll[i+1] + if !less(a.NodeID, b.NodeID) { + return false + } + } + return true +} + +// buildIdentityTable constructs the full identity table for the target epoch by combining data from: +// 1. The target epoch's Dynamic Identities. +// 2. The target epoch's IdentitySkeletons +// (recorded in EpochSetup event and immutable throughout the epoch). +// 3. [optional] An adjacent epoch's IdentitySkeletons (can be empty or nil), as recorded in the +// adjacent epoch's setup event. For a target epoch N, the epochs N-1 and N+1 are defined to be +// adjacent. Adjacent epochs do not _necessarily_ exist (e.g. consider a spork comprising only +// a single epoch), in which case this input is nil or empty. +// +// It also performs sanity checks to make sure that the data is consistent. +// No errors are expected during normal operation. +func buildIdentityTable( + targetEpochDynamicIdentities DynamicIdentityEntryList, + targetEpochIdentitySkeletons IdentityList, // TODO: change to `IdentitySkeletonList` + adjacentEpochIdentitySkeletons IdentityList, // TODO: change to `IdentitySkeletonList` +) (IdentityList, error) { + // produce a unique set for current and previous epoch participants + allEpochParticipants := targetEpochIdentitySkeletons.Union(adjacentEpochIdentitySkeletons) + // sanity check: size of identities should be equal to previous and current epoch participants combined + if len(allEpochParticipants) != len(targetEpochDynamicIdentities) { + return nil, fmt.Errorf("invalid number of identities in protocol state: expected %d, got %d", len(allEpochParticipants), len(targetEpochDynamicIdentities)) + } + + // build full identity table for current epoch + var result IdentityList + for i, identity := range targetEpochDynamicIdentities { + // sanity check: identities should be sorted in canonical order + if identity.NodeID != allEpochParticipants[i].NodeID { + return nil, fmt.Errorf("identites in protocol state are not in canonical order: expected %s, got %s", allEpochParticipants[i].NodeID, identity.NodeID) + } + result = append(result, &Identity{ + IdentitySkeleton: allEpochParticipants[i].IdentitySkeleton, + DynamicIdentity: identity.Dynamic, + }) + } + return result, nil +} + +// DynamicIdentityEntryListFromIdentities converts IdentityList to DynamicIdentityEntryList. +func DynamicIdentityEntryListFromIdentities(identities IdentityList) DynamicIdentityEntryList { + dynamicIdentities := make(DynamicIdentityEntryList, 0, len(identities)) + for _, identity := range identities { + dynamicIdentities = append(dynamicIdentities, &DynamicIdentityEntry{ + NodeID: identity.NodeID, + Dynamic: identity.DynamicIdentity, + }) + } + return dynamicIdentities +} diff --git a/model/flow/protocol_state_test.go b/model/flow/protocol_state_test.go new file mode 100644 index 00000000000..8fbce1e6fb2 --- /dev/null +++ b/model/flow/protocol_state_test.go @@ -0,0 +1,119 @@ +package flow_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestNewRichProtocolStateEntry checks that NewRichProtocolStateEntry creates valid identity tables depending on the state +// of epoch which is derived from the protocol state entry. +func TestNewRichProtocolStateEntry(t *testing.T) { + // Conditions right after a spork: + // * no previous epoch exists from the perspective of the freshly-sporked protocol state + // * network is currently in the staking phase for the next epoch, hence no service events for the next epoch exist + t.Run("staking-root-protocol-state", func(t *testing.T) { + currentEpochSetup := unittest.EpochSetupFixture() + currentEpochCommit := unittest.EpochCommitFixture() + stateEntry := &flow.ProtocolStateEntry{ + CurrentEpochEventIDs: flow.EventIDs{ + SetupID: currentEpochSetup.ID(), + CommitID: currentEpochCommit.ID(), + }, + PreviousEpochEventIDs: flow.EventIDs{}, + Identities: flow.DynamicIdentityEntryListFromIdentities(currentEpochSetup.Participants), + InvalidStateTransitionAttempted: false, + NextEpochProtocolState: nil, + } + entry, err := flow.NewRichProtocolStateEntry( + stateEntry, + nil, + nil, + currentEpochSetup, + currentEpochCommit, + nil, + nil, + ) + assert.NoError(t, err) + assert.Equal(t, currentEpochSetup.Participants, entry.Identities, "should be equal to current epoch setup participants") + }) + + // Common situation during the staking phase for epoch N+1 + // * we are currently in Epoch N + // * previous epoch N-1 is known (specifically EpochSetup and EpochCommit events) + // * network is currently in the staking phase for the next epoch, hence no service events for the next epoch exist + t.Run("staking-phase", func(t *testing.T) { + stateEntry := unittest.ProtocolStateFixture() + richEntry, err := flow.NewRichProtocolStateEntry( + stateEntry.ProtocolStateEntry, + stateEntry.PreviousEpochSetup, + stateEntry.PreviousEpochCommit, + stateEntry.CurrentEpochSetup, + stateEntry.CurrentEpochCommit, + nil, + nil, + ) + assert.NoError(t, err) + expectedIdentities := stateEntry.CurrentEpochSetup.Participants.Union(stateEntry.PreviousEpochSetup.Participants) + assert.Equal(t, expectedIdentities, richEntry.Identities, "should be equal to current epoch setup participants + previous epoch setup participants") + assert.Nil(t, richEntry.NextEpochProtocolState) + }) + + // Common situation during the epoch setup phase for epoch N+1 + // * we are currently in Epoch N + // * previous epoch N-1 is known (specifically EpochSetup and EpochCommit events) + // * network is currently in the setup phase for the next epoch, i.e. EpochSetup event (starting setup phase) has already been observed + t.Run("setup-phase", func(t *testing.T) { + stateEntry := unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState(), func(entry *flow.RichProtocolStateEntry) { + entry.NextEpochProtocolState.CurrentEpochCommit = nil + entry.NextEpochProtocolState.CurrentEpochEventIDs.CommitID = flow.ZeroID + }) + + richEntry, err := flow.NewRichProtocolStateEntry( + stateEntry.ProtocolStateEntry, + stateEntry.PreviousEpochSetup, + stateEntry.PreviousEpochCommit, + stateEntry.CurrentEpochSetup, + stateEntry.CurrentEpochCommit, + stateEntry.NextEpochProtocolState.CurrentEpochSetup, + nil, + ) + assert.NoError(t, err) + expectedIdentities := stateEntry.CurrentEpochSetup.Participants.Union(stateEntry.NextEpochProtocolState.CurrentEpochSetup.Participants) + assert.Equal(t, expectedIdentities, richEntry.Identities, "should be equal to current epoch setup participants + next epoch setup participants") + assert.Nil(t, richEntry.NextEpochProtocolState.CurrentEpochCommit) + expectedIdentities = stateEntry.NextEpochProtocolState.CurrentEpochSetup.Participants.Union(stateEntry.CurrentEpochSetup.Participants) + assert.Equal(t, expectedIdentities, richEntry.NextEpochProtocolState.Identities, "should be equal to next epoch setup participants + current epoch setup participants") + }) + + // TODO: include test for epoch setup phase where no prior epoch exist (i.e. first epoch setup phase after spork) + + // Common situation during the epoch commit phase for epoch N+1 + // * we are currently in Epoch N + // * previous epoch N-1 is known (specifically EpochSetup and EpochCommit events) + // * The network has completed the epoch setup phase, i.e. published the EpochSetup and EpochCommit events for epoch N+1. + t.Run("commit-phase", func(t *testing.T) { + stateEntry := unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState()) + + richEntry, err := flow.NewRichProtocolStateEntry( + stateEntry.ProtocolStateEntry, + stateEntry.PreviousEpochSetup, + stateEntry.PreviousEpochCommit, + stateEntry.CurrentEpochSetup, + stateEntry.CurrentEpochCommit, + stateEntry.NextEpochProtocolState.CurrentEpochSetup, + stateEntry.NextEpochProtocolState.CurrentEpochCommit, + ) + assert.NoError(t, err) + expectedIdentities := stateEntry.CurrentEpochSetup.Participants.Union(stateEntry.NextEpochProtocolState.CurrentEpochSetup.Participants) + assert.Equal(t, expectedIdentities, richEntry.Identities, "should be equal to current epoch setup participants + next epoch setup participants") + expectedIdentities = stateEntry.NextEpochProtocolState.CurrentEpochSetup.Participants.Union(stateEntry.CurrentEpochSetup.Participants) + assert.Equal(t, expectedIdentities, richEntry.NextEpochProtocolState.Identities, "should be equal to next epoch setup participants + current epoch setup participants") + }) + + // TODO: include test for epoch commit phase where no prior epoch exist (i.e. first epoch commit phase after spork) + +} diff --git a/module/metrics/labels.go b/module/metrics/labels.go index c67b9283f38..93b7d21dcba 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -60,6 +60,7 @@ const ( ResourceQC = "qc" ResourceMyReceipt = "my_receipt" ResourceCollection = "collection" + ResourceProtocolState = "protocol_state" ResourceApproval = "approval" ResourceSeal = "seal" ResourcePendingIncorporatedSeal = "pending_incorporated_seal" diff --git a/module/signature/signer_indices_test.go b/module/signature/signer_indices_test.go index c2210cbdecc..3357d092cca 100644 --- a/module/signature/signer_indices_test.go +++ b/module/signature/signer_indices_test.go @@ -355,11 +355,11 @@ func Test_DecodeSignerIndicesToIdentities(t *testing.T) { decodedSigners, err := signature.DecodeSignerIndicesToIdentities(identities.ToSkeleton(), signerIndices) require.NoError(t, err) - // Note that sampling from `identities` generates an _unordered_ list `signers`. Though, - // this is fine, as `EncodeSignersToIndices` as no ordering requirement on its input `signers`. + // Note that sampling from `identities` generates an _unordered_ list `signers`. + // This is fine, as `EncodeSignersToIndices` has no ordering requirement on its input `signers`. // Nevertheless, note that the output of `DecodeSignerIndicesToIdentities` is _always_ canonically // ordered. Therefore, we need to order the input `signers` (so far unordered) before comparing it - // to the decoded output (canonically ordered) + // to the decoded output (canonically ordered). slices.SortFunc(signers, func(lhs, rhs *flow.IdentitySkeleton) bool { return order.IdentifierCanonical(lhs.NodeID, rhs.NodeID) }) diff --git a/storage/badger/cluster_payloads.go b/storage/badger/cluster_payloads.go index 0fc3ba3ee28..6a7efae75b1 100644 --- a/storage/badger/cluster_payloads.go +++ b/storage/badger/cluster_payloads.go @@ -47,6 +47,7 @@ func NewClusterPayloads(cacheMetrics module.CacheMetrics, db *badger.DB) *Cluste func (cp *ClusterPayloads) storeTx(blockID flow.Identifier, payload *cluster.Payload) func(*transaction.Tx) error { return cp.cache.PutTx(blockID, payload) } + func (cp *ClusterPayloads) retrieveTx(blockID flow.Identifier) func(*badger.Txn) (*cluster.Payload, error) { return func(tx *badger.Txn) (*cluster.Payload, error) { val, err := cp.cache.Get(blockID)(tx) diff --git a/storage/badger/operation/prefix.go b/storage/badger/operation/prefix.go index e75497257ca..d63fed18c41 100644 --- a/storage/badger/operation/prefix.go +++ b/storage/badger/operation/prefix.go @@ -46,6 +46,7 @@ const ( codeExecutionReceiptMeta = 36 codeResultApproval = 37 codeChunk = 38 + codeProtocolState = 39 // codes for indexing single identifier by identifier/integeter codeHeightToBlock = 40 // index mapping height to block ID @@ -54,6 +55,7 @@ const ( codeRefHeightToClusterBlock = 43 // index reference block height to cluster block IDs codeBlockIDToFinalizedSeal = 44 // index _finalized_ seal by sealed block ID codeBlockIDToQuorumCertificate = 45 // index of quorum certificates by block ID + codeProtocolStateByBlockID = 46 // index of protocol state entry by block ID // codes for indexing multiple identifiers by identifier // NOTE: 51 was used for identity indexes before epochs diff --git a/storage/badger/operation/protocol_state.go b/storage/badger/operation/protocol_state.go new file mode 100644 index 00000000000..3534a5b4679 --- /dev/null +++ b/storage/badger/operation/protocol_state.go @@ -0,0 +1,39 @@ +package operation + +import ( + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/model/flow" +) + +// InsertProtocolState inserts a protocol state by ID. +// Error returns: +// - storage.ErrAlreadyExists if the key already exists in the database. +// - generic error in case of unexpected failure from the database layer or encoding failure. +func InsertProtocolState(protocolStateID flow.Identifier, protocolState *flow.ProtocolStateEntry) func(*badger.Txn) error { + return insert(makePrefix(codeProtocolState, protocolStateID), protocolState) +} + +// RetrieveProtocolState retrieves a protocol state by ID. +// Error returns: +// - storage.ErrNotFound if the key does not exist in the database +// - generic error in case of unexpected failure from the database layer +func RetrieveProtocolState(protocolStateID flow.Identifier, protocolState *flow.ProtocolStateEntry) func(*badger.Txn) error { + return retrieve(makePrefix(codeProtocolState, protocolStateID), protocolState) +} + +// IndexProtocolState indexes a protocol state by block ID. +// Error returns: +// - storage.ErrAlreadyExists if the key already exists in the database. +// - generic error in case of unexpected failure from the database layer or encoding failure. +func IndexProtocolState(blockID flow.Identifier, protocolStateID flow.Identifier) func(*badger.Txn) error { + return insert(makePrefix(codeProtocolStateByBlockID, blockID), protocolStateID) +} + +// LookupProtocolState finds protocol state ID by block ID. +// Error returns: +// - storage.ErrNotFound if the key does not exist in the database +// - generic error in case of unexpected failure from the database layer +func LookupProtocolState(blockID flow.Identifier, protocolStateID *flow.Identifier) func(*badger.Txn) error { + return retrieve(makePrefix(codeProtocolStateByBlockID, blockID), protocolStateID) +} diff --git a/storage/badger/operation/protocol_state_test.go b/storage/badger/operation/protocol_state_test.go new file mode 100644 index 00000000000..1f29e1b7b49 --- /dev/null +++ b/storage/badger/operation/protocol_state_test.go @@ -0,0 +1,39 @@ +package operation + +import ( + "testing" + + "github.com/dgraph-io/badger/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestInsertProtocolState tests if basic badger operations on ProtocolState work as expected. +func TestInsertProtocolState(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + expected := unittest.ProtocolStateFixture().ProtocolStateEntry + + protocolStateID := expected.ID() + err := db.Update(InsertProtocolState(protocolStateID, expected)) + require.Nil(t, err) + + var actual flow.ProtocolStateEntry + err = db.View(RetrieveProtocolState(protocolStateID, &actual)) + require.Nil(t, err) + + assert.Equal(t, expected, &actual) + + blockID := unittest.IdentifierFixture() + err = db.Update(IndexProtocolState(blockID, protocolStateID)) + require.Nil(t, err) + + var actualProtocolStateID flow.Identifier + err = db.View(LookupProtocolState(blockID, &actualProtocolStateID)) + require.Nil(t, err) + + assert.Equal(t, protocolStateID, actualProtocolStateID) + }) +} diff --git a/storage/badger/protocol_state.go b/storage/badger/protocol_state.go new file mode 100644 index 00000000000..c25514a46f3 --- /dev/null +++ b/storage/badger/protocol_state.go @@ -0,0 +1,191 @@ +package badger + +import ( + "fmt" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/flow/order" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/badger/operation" + "github.com/onflow/flow-go/storage/badger/transaction" +) + +// ProtocolState implements persistent storage for storing identity table instances. +// Protocol state uses an embedded cache without storing capabilities(store happens on first retrieval) to avoid unnecessary +// operations and to speed up access to frequently used identity tables. +// TODO: update naming to IdentityTable +type ProtocolState struct { + db *badger.DB + cache *Cache[flow.Identifier, *flow.RichProtocolStateEntry] +} + +var _ storage.ProtocolState = (*ProtocolState)(nil) + +// NewProtocolState creates a ProtocolState instance, which is a database of identity table instances. +// It supports storing, caching and retrieving by ID or the additionally indexed block ID. +func NewProtocolState(collector module.CacheMetrics, + epochSetups storage.EpochSetups, + epochCommits storage.EpochCommits, + db *badger.DB, + cacheSize uint, +) *ProtocolState { + retrieve := func(protocolStateID flow.Identifier) func(tx *badger.Txn) (*flow.RichProtocolStateEntry, error) { + var protocolStateEntry flow.ProtocolStateEntry + return func(tx *badger.Txn) (*flow.RichProtocolStateEntry, error) { + err := operation.RetrieveProtocolState(protocolStateID, &protocolStateEntry)(tx) + if err != nil { + return nil, err + } + result, err := newRichProtocolStateEntry(&protocolStateEntry, epochSetups, epochCommits) + if err != nil { + return nil, fmt.Errorf("could not create rich identity table entry: %w", err) + } + return result, nil + } + } + + return &ProtocolState{ + db: db, + cache: newCache[flow.Identifier, *flow.RichProtocolStateEntry](collector, metrics.ResourceProtocolState, + withLimit[flow.Identifier, *flow.RichProtocolStateEntry](cacheSize), + withStore(noopStore[flow.Identifier, *flow.RichProtocolStateEntry]), + withRetrieve(retrieve)), + } +} + +// StoreTx allows us to store an identity table as part of a DB tx, while still going through the caching layer. +// Per convention, the given Identity Table must be in canonical order, otherwise an exception is returned. +// Expected error returns during normal operations: +// - storage.ErrAlreadyExists if an Identity Table with the given id is already stored +func (s *ProtocolState) StoreTx(id flow.Identifier, protocolState *flow.ProtocolStateEntry) func(*transaction.Tx) error { + return func(tx *transaction.Tx) error { + if !protocolState.Identities.Sorted(order.IdentifierCanonical) { + return fmt.Errorf("sanity check failed: identities are not sorted") + } + if protocolState.NextEpochProtocolState != nil { + if !protocolState.NextEpochProtocolState.Identities.Sorted(order.IdentifierCanonical) { + return fmt.Errorf("sanity check failed: next epoch identities are not sorted") + } + } + return transaction.WithTx(operation.InsertProtocolState(id, protocolState))(tx) + } +} + +// Index indexes the identity table by block ID. +// Error returns: +// - storage.ErrAlreadyExists if an identity table for the given blockID has already been indexed +func (s *ProtocolState) Index(blockID flow.Identifier, protocolStateID flow.Identifier) func(*transaction.Tx) error { + return func(tx *transaction.Tx) error { + err := transaction.WithTx(operation.IndexProtocolState(blockID, protocolStateID))(tx) + if err != nil { + return fmt.Errorf("could not index identity table for block (%x): %w", blockID[:], err) + } + return nil + } +} + +// ByID retrieves the identity table by its ID. +// Error returns: +// - storage.ErrNotFound if no identity table with the given ID exists +func (s *ProtocolState) ByID(id flow.Identifier) (*flow.RichProtocolStateEntry, error) { + tx := s.db.NewTransaction(false) + defer tx.Discard() + return s.byID(id)(tx) +} + +// ByBlockID retrieves the identity table by the respective block ID. +// TODO: clarify whether the blockID is the block that defines this identity table or the _child_ block where the identity table is applied. CAUTION: surface for bugs! +// Error returns: +// - storage.ErrNotFound if no identity table for the given blockID exists +func (s *ProtocolState) ByBlockID(blockID flow.Identifier) (*flow.RichProtocolStateEntry, error) { + tx := s.db.NewTransaction(false) + defer tx.Discard() + return s.byBlockID(blockID)(tx) +} + +// byID retrieves the identity table by its ID. Error returns: +// - storage.ErrNotFound if no identity table with the given ID exists +func (s *ProtocolState) byID(protocolStateID flow.Identifier) func(*badger.Txn) (*flow.RichProtocolStateEntry, error) { + return s.cache.Get(protocolStateID) +} + +// byBlockID retrieves the identity table by the respective block ID. +// TODO: clarify whether the blockID is the block that defines this identity table or the _child_ block where the identity table is applied. CAUTION: surface for bugs! +// Error returns: +// - storage.ErrNotFound if no identity table for the given blockID exists +func (s *ProtocolState) byBlockID(blockID flow.Identifier) func(*badger.Txn) (*flow.RichProtocolStateEntry, error) { + return func(tx *badger.Txn) (*flow.RichProtocolStateEntry, error) { + var protocolStateID flow.Identifier + err := operation.LookupProtocolState(blockID, &protocolStateID)(tx) + if err != nil { + return nil, fmt.Errorf("could not lookup identity table ID for block (%x): %w", blockID[:], err) + } + return s.byID(protocolStateID)(tx) + } +} + +// newRichProtocolStateEntry constructs a rich protocol state entry from a protocol state entry. +// It queries and fills in epoch setups and commits for previous and current epochs and possibly next epoch. +// No errors are expected during normal operation. +func newRichProtocolStateEntry( + protocolState *flow.ProtocolStateEntry, + setups storage.EpochSetups, + commits storage.EpochCommits, +) (*flow.RichProtocolStateEntry, error) { + var ( + previousEpochSetup *flow.EpochSetup + previousEpochCommit *flow.EpochCommit + nextEpochSetup *flow.EpochSetup + nextEpochCommit *flow.EpochCommit + err error + ) + // query and fill in epoch setups and commits for previous and current epochs + if protocolState.PreviousEpochEventIDs.SetupID != flow.ZeroID { + previousEpochSetup, err = setups.ByID(protocolState.PreviousEpochEventIDs.SetupID) + if err != nil { + return nil, fmt.Errorf("could not retrieve previous epoch setup: %w", err) + } + previousEpochCommit, err = commits.ByID(protocolState.PreviousEpochEventIDs.CommitID) + if err != nil { + return nil, fmt.Errorf("could not retrieve previous epoch commit: %w", err) + } + } + + currentEpochSetup, err := setups.ByID(protocolState.CurrentEpochEventIDs.SetupID) + if err != nil { + return nil, fmt.Errorf("could not retrieve current epoch setup: %w", err) + } + currentEpochCommit, err := commits.ByID(protocolState.CurrentEpochEventIDs.CommitID) + if err != nil { + return nil, fmt.Errorf("could not retrieve current epoch commit: %w", err) + } + + // if next epoch has been already committed, fill in data for it as well. + if protocolState.NextEpochProtocolState != nil { + nextEpochProtocolState := *protocolState.NextEpochProtocolState + nextEpochSetup, err = setups.ByID(nextEpochProtocolState.CurrentEpochEventIDs.SetupID) + if err != nil { + return nil, fmt.Errorf("could not retrieve next epoch setup: %w", err) + } + if nextEpochProtocolState.CurrentEpochEventIDs.CommitID != flow.ZeroID { + nextEpochCommit, err = commits.ByID(nextEpochProtocolState.CurrentEpochEventIDs.CommitID) + if err != nil { + return nil, fmt.Errorf("could not retrieve next epoch commit: %w", err) + } + } + } + + return flow.NewRichProtocolStateEntry( + protocolState, + previousEpochSetup, + previousEpochCommit, + currentEpochSetup, + currentEpochCommit, + nextEpochSetup, + nextEpochCommit, + ) +} diff --git a/storage/badger/protocol_state_test.go b/storage/badger/protocol_state_test.go new file mode 100644 index 00000000000..18bcb6d71f1 --- /dev/null +++ b/storage/badger/protocol_state_test.go @@ -0,0 +1,228 @@ +package badger + +import ( + "testing" + + "github.com/dgraph-io/badger/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/storage/badger/transaction" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestProtocolStateStorage tests if the protocol state is stored, retrieved and indexed correctly +func TestProtocolStateStorage(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + metrics := metrics.NewNoopCollector() + + setups := NewEpochSetups(metrics, db) + commits := NewEpochCommits(metrics, db) + store := NewProtocolState(metrics, setups, commits, db, DefaultCacheSize) + + expected := unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState()) + protocolStateID := expected.ID() + blockID := unittest.IdentifierFixture() + + // store protocol state and auxiliary info + err := transaction.Update(db, func(tx *transaction.Tx) error { + // store epoch events to be able to retrieve them later + err := setups.StoreTx(expected.PreviousEpochSetup)(tx) + require.NoError(t, err) + err = setups.StoreTx(expected.CurrentEpochSetup)(tx) + require.NoError(t, err) + err = setups.StoreTx(expected.NextEpochProtocolState.CurrentEpochSetup)(tx) + require.NoError(t, err) + err = commits.StoreTx(expected.PreviousEpochCommit)(tx) + require.NoError(t, err) + err = commits.StoreTx(expected.CurrentEpochCommit)(tx) + require.NoError(t, err) + err = commits.StoreTx(expected.NextEpochProtocolState.CurrentEpochCommit)(tx) + require.NoError(t, err) + + err = store.StoreTx(protocolStateID, expected.ProtocolStateEntry)(tx) + require.NoError(t, err) + return store.Index(blockID, protocolStateID)(tx) + }) + require.NoError(t, err) + + // fetch protocol state + actual, err := store.ByID(protocolStateID) + require.NoError(t, err) + require.Equal(t, expected, actual) + + assertRichProtocolStateValidity(t, actual) + + // fetch protocol state by block ID + actualByBlockID, err := store.ByBlockID(blockID) + require.NoError(t, err) + require.Equal(t, expected, actualByBlockID) + + assertRichProtocolStateValidity(t, actualByBlockID) + }) +} + +// TestProtocolStateStoreInvalidProtocolState tests that storing protocol state which has unsorted identities fails for +// current and next epoch protocol states. +func TestProtocolStateStoreInvalidProtocolState(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + metrics := metrics.NewNoopCollector() + setups := NewEpochSetups(metrics, db) + commits := NewEpochCommits(metrics, db) + store := NewProtocolState(metrics, setups, commits, db, DefaultCacheSize) + invalid := unittest.ProtocolStateFixture().ProtocolStateEntry + // swap first and second elements to break canonical order + invalid.Identities[0], invalid.Identities[1] = invalid.Identities[1], invalid.Identities[0] + + err := transaction.Update(db, store.StoreTx(invalid.ID(), invalid)) + require.Error(t, err) + + invalid = unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState()).ProtocolStateEntry + // swap first and second elements to break canonical order + invalid.NextEpochProtocolState.Identities[0], invalid.NextEpochProtocolState.Identities[1] = invalid.NextEpochProtocolState.Identities[1], invalid.NextEpochProtocolState.Identities[0] + + err = transaction.Update(db, store.StoreTx(invalid.ID(), invalid)) + require.Error(t, err) + }) +} + +// TestProtocolStateMergeParticipants tests that merging participants between epochs works correctly. We always take participants +// from current epoch and additionally add participants from previous epoch if they are not present in current epoch. +// If the same participant is in the previous and current epochs, we should see it only once in the merged list and the dynamic portion has to be from current epoch. +func TestProtocolStateMergeParticipants(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + metrics := metrics.NewNoopCollector() + + setups := NewEpochSetups(metrics, db) + commits := NewEpochCommits(metrics, db) + store := NewProtocolState(metrics, setups, commits, db, DefaultCacheSize) + + stateEntry := unittest.ProtocolStateFixture() + require.Equal(t, stateEntry.CurrentEpochSetup.Participants[1], stateEntry.PreviousEpochSetup.Participants[1]) + // change address of participant in current epoch, so we can distinguish it from the one in previous epoch + // when performing assertion. + newAddress := "123" + nodeID := stateEntry.CurrentEpochSetup.Participants[1].NodeID + stateEntry.CurrentEpochSetup.Participants[1].Address = newAddress + stateEntry.CurrentEpochEventIDs.SetupID = stateEntry.CurrentEpochSetup.ID() + protocolStateID := stateEntry.ID() + + // store protocol state and auxiliary info + err := transaction.Update(db, func(tx *transaction.Tx) error { + // store epoch events to be able to retrieve them later + err := setups.StoreTx(stateEntry.PreviousEpochSetup)(tx) + require.NoError(t, err) + err = setups.StoreTx(stateEntry.CurrentEpochSetup)(tx) + require.NoError(t, err) + err = commits.StoreTx(stateEntry.PreviousEpochCommit)(tx) + require.NoError(t, err) + err = commits.StoreTx(stateEntry.CurrentEpochCommit)(tx) + require.NoError(t, err) + + return store.StoreTx(protocolStateID, stateEntry.ProtocolStateEntry)(tx) + }) + require.NoError(t, err) + + // fetch protocol state + actual, err := store.ByID(protocolStateID) + require.NoError(t, err) + require.Equal(t, stateEntry, actual) + + assertRichProtocolStateValidity(t, actual) + identity, ok := actual.Identities.ByNodeID(nodeID) + require.True(t, ok) + require.Equal(t, newAddress, identity.Address) + }) +} + +// TestProtocolStateRootSnapshot tests that storing and retrieving root protocol state (in case of bootstrap) works as expected. +// Specifically, this means that no prior epoch exists (situation after a spork) from the perspective of the freshly-sporked network. +func TestProtocolStateRootSnapshot(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + metrics := metrics.NewNoopCollector() + + setups := NewEpochSetups(metrics, db) + commits := NewEpochCommits(metrics, db) + store := NewProtocolState(metrics, setups, commits, db, DefaultCacheSize) + expected := unittest.RootProtocolStateFixture() + + protocolStateID := expected.ID() + blockID := unittest.IdentifierFixture() + + // store protocol state and auxiliary info + err := transaction.Update(db, func(tx *transaction.Tx) error { + // store epoch events to be able to retrieve them later + err := setups.StoreTx(expected.CurrentEpochSetup)(tx) + require.NoError(t, err) + err = commits.StoreTx(expected.CurrentEpochCommit)(tx) + require.NoError(t, err) + + err = store.StoreTx(protocolStateID, expected.ProtocolStateEntry)(tx) + require.NoError(t, err) + return store.Index(blockID, protocolStateID)(tx) + }) + require.NoError(t, err) + + // fetch protocol state + actual, err := store.ByID(protocolStateID) + require.NoError(t, err) + require.Equal(t, expected, actual) + + assertRichProtocolStateValidity(t, actual) + + // fetch protocol state by block ID + actualByBlockID, err := store.ByBlockID(blockID) + require.NoError(t, err) + require.Equal(t, expected, actualByBlockID) + + assertRichProtocolStateValidity(t, actualByBlockID) + }) +} + +// assertRichProtocolStateValidity checks if RichProtocolState holds its invariant and is correctly populated by storage layer. +func assertRichProtocolStateValidity(t *testing.T, state *flow.RichProtocolStateEntry) { + // invariant: CurrentEpochSetup and CurrentEpochCommit are for the same epoch. Never nil. + assert.Equal(t, state.CurrentEpochSetup.Counter, state.CurrentEpochCommit.Counter, "current epoch setup and commit should be for the same epoch") + + // invariant: CurrentEpochSetup and CurrentEpochCommit IDs are the equal to the ID of the protocol state entry. Never nil. + assert.Equal(t, state.CurrentEpochSetup.ID(), state.ProtocolStateEntry.CurrentEpochEventIDs.SetupID, "epoch setup should be for correct event ID") + assert.Equal(t, state.CurrentEpochCommit.ID(), state.ProtocolStateEntry.CurrentEpochEventIDs.CommitID, "epoch commit should be for correct event ID") + + var previousEpochParticipants flow.IdentityList + // invariant: PreviousEpochSetup and PreviousEpochCommit should be present if respective ID is not zero. + if state.PreviousEpochEventIDs.SetupID != flow.ZeroID { + // invariant: PreviousEpochSetup and PreviousEpochCommit are for the same epoch. Never nil. + assert.Equal(t, state.CurrentEpochSetup.Counter, state.PreviousEpochSetup.Counter+1, "current epoch setup should be next after previous epoch") + assert.Equal(t, state.PreviousEpochSetup.Counter, state.PreviousEpochCommit.Counter, "previous epoch setup and commit should be for the same epoch") + + // invariant: PreviousEpochSetup and PreviousEpochCommit IDs are the equal to the ID of the protocol state entry. Never nil. + assert.Equal(t, state.PreviousEpochSetup.ID(), state.ProtocolStateEntry.PreviousEpochEventIDs.SetupID, "epoch setup should be for correct event ID") + assert.Equal(t, state.PreviousEpochCommit.ID(), state.ProtocolStateEntry.PreviousEpochEventIDs.CommitID, "epoch commit should be for correct event ID") + + previousEpochParticipants = state.PreviousEpochSetup.Participants + } + + // invariant: Identities is a full identity table for the current epoch. Identities are sorted in canonical order. Without duplicates. Never nil. + var allIdentities flow.IdentityList + if state.NextEpochProtocolState != nil { + allIdentities = state.CurrentEpochSetup.Participants.Union(state.NextEpochProtocolState.CurrentEpochSetup.Participants) + } else { + allIdentities = state.CurrentEpochSetup.Participants.Union(previousEpochParticipants) + } + + assert.Equal(t, allIdentities, state.Identities, "identities should be a full identity table for the current epoch, without duplicates") + + for i, identity := range state.ProtocolStateEntry.Identities { + assert.Equal(t, identity.NodeID, allIdentities[i].NodeID, "identity node ID should match") + } + + nextEpochState := state.NextEpochProtocolState + if nextEpochState == nil { + return + } + + // invariant: NextEpochProtocolState is a protocol state for the next epoch. Can be nil. + assertRichProtocolStateValidity(t, nextEpochState) +} diff --git a/storage/protocol_state.go b/storage/protocol_state.go new file mode 100644 index 00000000000..6f35ccbe41d --- /dev/null +++ b/storage/protocol_state.go @@ -0,0 +1,32 @@ +package storage + +import ( + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage/badger/transaction" +) + +// ProtocolState represents persistent storage for protocol state entries. +// TODO: update naming to IdentityTable +type ProtocolState interface { + // StoreTx allows us to store an identity table as part of a DB tx, while still going through the caching layer. + // Per convention, the given Identity Table must be in canonical order, otherwise an exception is returned. + // Expected error returns during normal operations: + // - storage.ErrAlreadyExists if an Identity Table with the given id is already stored + StoreTx(id flow.Identifier, protocolState *flow.ProtocolStateEntry) func(*transaction.Tx) error + + // Index indexes the identity table by block ID. + // Error returns: + // - storage.ErrAlreadyExists if an identity table for the given blockID has already been indexed + Index(blockID flow.Identifier, protocolStateID flow.Identifier) func(*transaction.Tx) error + + // ByID retrieves the identity table by its ID. + // Error returns: + // - storage.ErrNotFound if no identity table with the given ID exists + ByID(id flow.Identifier) (*flow.RichProtocolStateEntry, error) + + // ByBlockID retrieves the identity table by the respective block ID. + // TODO: clarify whether the blockID is the block that defines this identity table or the _child_ block where the identity table is applied. CAUTION: surface for bugs! + // Error returns: + // - storage.ErrNotFound if no identity table for the given blockID exists + ByBlockID(blockID flow.Identifier) (*flow.RichProtocolStateEntry, error) +} diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index 42ab8ec6ef3..2f213633de1 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -10,12 +10,10 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/require" - "github.com/onflow/cadence" + "github.com/stretchr/testify/require" sdk "github.com/onflow/flow-go-sdk" - "github.com/onflow/flow-go/network/message" hotstuff "github.com/onflow/flow-go/consensus/hotstuff/model" "github.com/onflow/flow-go/crypto" @@ -42,6 +40,7 @@ import ( "github.com/onflow/flow-go/module/signature" "github.com/onflow/flow-go/module/updatable_configs" "github.com/onflow/flow-go/network/channels" + "github.com/onflow/flow-go/network/message" "github.com/onflow/flow-go/network/p2p/keyutils" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/state/protocol/inmem" @@ -2527,6 +2526,142 @@ func ChunkExecutionDataFixture(t *testing.T, minSize int, opts ...func(*executio } } +// RootProtocolStateFixture creates a fixture with correctly structured data for root protocol state. +// This can be useful for testing bootstrap when there is no previous epoch. +func RootProtocolStateFixture() *flow.RichProtocolStateEntry { + currentEpochSetup := EpochSetupFixture(func(setup *flow.EpochSetup) { + setup.Counter = 1 + }) + currentEpochCommit := EpochCommitFixture(func(commit *flow.EpochCommit) { + commit.Counter = currentEpochSetup.Counter + }) + + allIdentities := currentEpochSetup.Participants + + return &flow.RichProtocolStateEntry{ + ProtocolStateEntry: &flow.ProtocolStateEntry{ + CurrentEpochEventIDs: flow.EventIDs{ + SetupID: currentEpochSetup.ID(), + CommitID: currentEpochCommit.ID(), + }, + PreviousEpochEventIDs: flow.EventIDs{ + SetupID: flow.ZeroID, + CommitID: flow.ZeroID, + }, + Identities: flow.DynamicIdentityEntryListFromIdentities(allIdentities), + InvalidStateTransitionAttempted: false, + NextEpochProtocolState: nil, + }, + CurrentEpochSetup: currentEpochSetup, + CurrentEpochCommit: currentEpochCommit, + PreviousEpochSetup: nil, + PreviousEpochCommit: nil, + Identities: allIdentities, + NextEpochProtocolState: nil, + } +} + +// ProtocolStateFixture creates a fixture with correctly structured data. The returned Identity Table +// represents the common situation during the staking phase of Epoch N+1: +// - we are currently in Epoch N +// - previous epoch N-1 is known (specifically EpochSetup and EpochCommit events) +// - network is currently in the staking phase to setup the next epoch, hence no service +// events for the next epoch exist +// +// In particular, the following consistency requirements hold: +// - Epoch setup and commit counters are set to match. +// - Identities are constructed from setup events. +// - Identities are sorted in canonical order. +func ProtocolStateFixture(options ...func(*flow.RichProtocolStateEntry)) *flow.RichProtocolStateEntry { + prevEpochSetup := EpochSetupFixture() + prevEpochCommit := EpochCommitFixture(func(commit *flow.EpochCommit) { + commit.Counter = prevEpochSetup.Counter + }) + currentEpochSetup := EpochSetupFixture(func(setup *flow.EpochSetup) { + setup.Counter = prevEpochSetup.Counter + 1 + // reuse same participant for current epoch + sameParticipant := *prevEpochSetup.Participants[1] + setup.Participants[1] = &sameParticipant + }) + currentEpochCommit := EpochCommitFixture(func(commit *flow.EpochCommit) { + commit.Counter = currentEpochSetup.Counter + }) + + allIdentities := currentEpochSetup.Participants.Union(prevEpochSetup.Participants) + + entry := &flow.RichProtocolStateEntry{ + ProtocolStateEntry: &flow.ProtocolStateEntry{ + CurrentEpochEventIDs: flow.EventIDs{ + SetupID: currentEpochSetup.ID(), + CommitID: currentEpochCommit.ID(), + }, + PreviousEpochEventIDs: flow.EventIDs{ + SetupID: prevEpochSetup.ID(), + CommitID: prevEpochCommit.ID(), + }, + Identities: flow.DynamicIdentityEntryListFromIdentities(allIdentities), + InvalidStateTransitionAttempted: false, + NextEpochProtocolState: nil, + }, + CurrentEpochSetup: currentEpochSetup, + CurrentEpochCommit: currentEpochCommit, + PreviousEpochSetup: prevEpochSetup, + PreviousEpochCommit: prevEpochCommit, + Identities: allIdentities, + NextEpochProtocolState: nil, + } + + for _, option := range options { + option(entry) + } + + return entry +} + +// WithNextEpochProtocolState creates a fixture with correctly structured data for next epoch. +// The resulting Identity Table represents the common situation during the epoch commit phase for Epoch N+1: +// - We are currently in Epoch N. +// - The previous epoch N-1 is known (specifically EpochSetup and EpochCommit events). +// - The network has completed the epoch setup phase, i.e. published the EpochSetup and EpochCommit events for epoch N+1. +func WithNextEpochProtocolState() func(entry *flow.RichProtocolStateEntry) { + return func(entry *flow.RichProtocolStateEntry) { + nextEpochSetup := EpochSetupFixture(func(setup *flow.EpochSetup) { + setup.Counter = entry.CurrentEpochSetup.Counter + 1 + // reuse same participant for current epoch + sameParticipant := *entry.CurrentEpochSetup.Participants[1] + setup.Participants[1] = &sameParticipant + }) + nextEpochCommit := EpochCommitFixture(func(commit *flow.EpochCommit) { + commit.Counter = nextEpochSetup.Counter + }) + allIdentities := nextEpochSetup.Participants.Union(entry.CurrentEpochSetup.Participants) + + entry.Identities = entry.CurrentEpochSetup.Participants.Union(nextEpochSetup.Participants) + entry.ProtocolStateEntry.Identities = flow.DynamicIdentityEntryListFromIdentities(entry.Identities) + + entry.ProtocolStateEntry.NextEpochProtocolState = &flow.ProtocolStateEntry{ + CurrentEpochEventIDs: flow.EventIDs{ + SetupID: nextEpochSetup.ID(), + CommitID: nextEpochCommit.ID(), + }, + PreviousEpochEventIDs: entry.CurrentEpochEventIDs, + Identities: flow.DynamicIdentityEntryListFromIdentities(allIdentities), + InvalidStateTransitionAttempted: false, + NextEpochProtocolState: nil, + } + + entry.NextEpochProtocolState = &flow.RichProtocolStateEntry{ + ProtocolStateEntry: entry.ProtocolStateEntry.NextEpochProtocolState, + CurrentEpochSetup: nextEpochSetup, + CurrentEpochCommit: nextEpochCommit, + PreviousEpochSetup: entry.CurrentEpochSetup, + PreviousEpochCommit: entry.CurrentEpochCommit, + Identities: allIdentities, + NextEpochProtocolState: nil, + } + } +} + func CreateSendTxHttpPayload(tx flow.TransactionBody) map[string]interface{} { tx.Arguments = [][]uint8{} // fix how fixture creates nil values auth := make([]string, len(tx.Authorizers))