Skip to content

Commit

Permalink
storage layer test refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Mar 31, 2023
1 parent fc5b515 commit cf86037
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 146 deletions.
84 changes: 41 additions & 43 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/network/mocknetwork"
protocol "github.com/onflow/flow-go/state/protocol/mock"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/util"
"github.com/onflow/flow-go/utils/unittest"
Expand Down Expand Up @@ -109,23 +110,20 @@ func (suite *Suite) SetupTest() {
}

func (suite *Suite) RunTest(
f func(handler *access.Handler, db *badger.DB, blocks *storage.Blocks, headers *storage.Headers, results *storage.ExecutionResults),
f func(handler *access.Handler, db *badger.DB, all *storage.All),
) {
unittest.RunWithBadgerDB(suite.T(), func(db *badger.DB) {
headers, _, _, _, _, blocks, _, _, _, _, results := util.StorageLayer(suite.T(), db)
transactions := storage.NewTransactions(suite.metrics, db)
collections := storage.NewCollections(db, transactions)
receipts := storage.NewExecutionReceipts(suite.metrics, db, results, storage.DefaultCacheSize)
all := util.StorageLayer(suite.T(), db)

suite.backend = backend.New(suite.state,
suite.collClient,
nil,
blocks,
headers,
collections,
transactions,
receipts,
results,
all.Blocks,
all.Headers,
all.Collections,
all.Transactions,
all.Receipts,
all.Results,
suite.chainID,
suite.metrics,
nil,
Expand All @@ -138,12 +136,12 @@ func (suite *Suite) RunTest(
)

handler := access.NewHandler(suite.backend, suite.chainID.Chain(), access.WithBlockSignerDecoder(suite.signerIndicesDecoder))
f(handler, db, blocks, headers, results)
f(handler, db, all)
})
}

func (suite *Suite) TestSendAndGetTransaction() {
suite.RunTest(func(handler *access.Handler, _ *badger.DB, _ *storage.Blocks, _ *storage.Headers, _ *storage.ExecutionResults) {
suite.RunTest(func(handler *access.Handler, _ *badger.DB, _ *storage.All) {
referenceBlock := unittest.BlockHeaderFixture()
transaction := unittest.TransactionFixture()
transaction.SetReferenceBlockID(referenceBlock.ID())
Expand Down Expand Up @@ -196,7 +194,7 @@ func (suite *Suite) TestSendAndGetTransaction() {
}

func (suite *Suite) TestSendExpiredTransaction() {
suite.RunTest(func(handler *access.Handler, _ *badger.DB, _ *storage.Blocks, _ *storage.Headers, _ *storage.ExecutionResults) {
suite.RunTest(func(handler *access.Handler, _ *badger.DB, _ *storage.All) {
referenceBlock := unittest.BlockHeaderFixture()

// create latest block that is past the expiry window
Expand Down Expand Up @@ -251,8 +249,8 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {

// create storage
metrics := metrics.NewNoopCollector()
transactions := storage.NewTransactions(metrics, db)
collections := storage.NewCollections(db, transactions)
transactions := bstorage.NewTransactions(metrics, db)
collections := bstorage.NewCollections(db, transactions)

// create collection node cluster
count := 2
Expand Down Expand Up @@ -349,16 +347,16 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() {
}

func (suite *Suite) TestGetBlockByIDAndHeight() {
suite.RunTest(func(handler *access.Handler, db *badger.DB, blocks *storage.Blocks, _ *storage.Headers, _ *storage.ExecutionResults) {
suite.RunTest(func(handler *access.Handler, db *badger.DB, all *storage.All) {

// test block1 get by ID
block1 := unittest.BlockFixture()
// test block2 get by height
block2 := unittest.BlockFixture()
block2.Header.Height = 2

require.NoError(suite.T(), blocks.Store(&block1))
require.NoError(suite.T(), blocks.Store(&block2))
require.NoError(suite.T(), all.Blocks.Store(&block1))
require.NoError(suite.T(), all.Blocks.Store(&block2))

// the follower logic should update height index on the block storage when a block is finalized
err := db.Update(operation.IndexBlockHeight(block2.Header.Height, block2.ID()))
Expand Down Expand Up @@ -473,7 +471,7 @@ func (suite *Suite) TestGetBlockByIDAndHeight() {
}

func (suite *Suite) TestGetExecutionResultByBlockID() {
suite.RunTest(func(handler *access.Handler, db *badger.DB, blocks *storage.Blocks, _ *storage.Headers, executionResults *storage.ExecutionResults) {
suite.RunTest(func(handler *access.Handler, db *badger.DB, all *storage.All) {

// test block1 get by ID
nonexistingID := unittest.IdentifierFixture()
Expand All @@ -483,8 +481,8 @@ func (suite *Suite) TestGetExecutionResultByBlockID() {
unittest.WithExecutionResultBlockID(blockID),
unittest.WithServiceEvents(2))

require.NoError(suite.T(), executionResults.Store(er))
require.NoError(suite.T(), executionResults.Index(blockID, er.ID()))
require.NoError(suite.T(), all.Results.Store(er))
require.NoError(suite.T(), all.Results.Index(blockID, er.ID()))

assertResp := func(resp *accessproto.ExecutionResultForBlockIDResponse, err error, executionResult *flow.ExecutionResult) {
require.NoError(suite.T(), err)
Expand Down Expand Up @@ -555,9 +553,9 @@ func (suite *Suite) TestGetExecutionResultByBlockID() {
// is reported as sealed
func (suite *Suite) TestGetSealedTransaction() {
unittest.RunWithBadgerDB(suite.T(), func(db *badger.DB) {
headers, _, _, _, _, blocks, _, _, _, _, _ := util.StorageLayer(suite.T(), db)
results := storage.NewExecutionResults(suite.metrics, db)
receipts := storage.NewExecutionReceipts(suite.metrics, db, results, storage.DefaultCacheSize)
all := util.StorageLayer(suite.T(), db)
results := bstorage.NewExecutionResults(suite.metrics, db)
receipts := bstorage.NewExecutionReceipts(suite.metrics, db, results, bstorage.DefaultCacheSize)
enIdentities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
enNodeIDs := flow.IdentifierList(enIdentities.NodeIDs())

Expand Down Expand Up @@ -594,8 +592,8 @@ func (suite *Suite) TestGetSealedTransaction() {

// initialize storage
metrics := metrics.NewNoopCollector()
transactions := storage.NewTransactions(metrics, db)
collections := storage.NewCollections(db, transactions)
transactions := bstorage.NewTransactions(metrics, db)
collections := bstorage.NewCollections(db, transactions)
collectionsToMarkFinalized, err := stdmap.NewTimes(100)
require.NoError(suite.T(), err)
collectionsToMarkExecuted, err := stdmap.NewTimes(100)
Expand All @@ -606,8 +604,8 @@ func (suite *Suite) TestGetSealedTransaction() {
backend := backend.New(suite.state,
suite.collClient,
nil,
blocks,
headers,
all.Blocks,
all.Headers,
collections,
transactions,
receipts,
Expand All @@ -625,19 +623,19 @@ func (suite *Suite) TestGetSealedTransaction() {

handler := access.NewHandler(backend, suite.chainID.Chain())

rpcEngBuilder, err := rpc.NewBuilder(suite.log, suite.state, rpc.Config{}, nil, nil, blocks, headers, collections, transactions, receipts,
rpcEngBuilder, err := rpc.NewBuilder(suite.log, suite.state, rpc.Config{}, nil, nil, all.Blocks, all.Headers, collections, transactions, receipts,
results, suite.chainID, metrics, metrics, 0, 0, false, false, nil, nil)
require.NoError(suite.T(), err)
rpcEng, err := rpcEngBuilder.WithLegacy().Build()
require.NoError(suite.T(), err)

// create the ingest engine
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, blocks, headers, collections,
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections,
transactions, results, receipts, metrics, collectionsToMarkFinalized, collectionsToMarkExecuted, blocksToMarkExecuted, rpcEng)
require.NoError(suite.T(), err)

// 1. Assume that follower engine updated the block storage and the protocol state. The block is reported as sealed
err = blocks.Store(&block)
err = all.Blocks.Store(&block)
require.NoError(suite.T(), err)
suite.snapshot.On("Head").Return(block.Header, nil).Twice()

Expand Down Expand Up @@ -683,11 +681,11 @@ func (suite *Suite) TestGetSealedTransaction() {
// the correct block id
func (suite *Suite) TestExecuteScript() {
unittest.RunWithBadgerDB(suite.T(), func(db *badger.DB) {
headers, _, _, _, _, blocks, _, _, _, _, _ := util.StorageLayer(suite.T(), db)
transactions := storage.NewTransactions(suite.metrics, db)
collections := storage.NewCollections(db, transactions)
results := storage.NewExecutionResults(suite.metrics, db)
receipts := storage.NewExecutionReceipts(suite.metrics, db, results, storage.DefaultCacheSize)
all := util.StorageLayer(suite.T(), db)
transactions := bstorage.NewTransactions(suite.metrics, db)
collections := bstorage.NewCollections(db, transactions)
results := bstorage.NewExecutionResults(suite.metrics, db)
receipts := bstorage.NewExecutionReceipts(suite.metrics, db, results, bstorage.DefaultCacheSize)

identities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
suite.snapshot.On("Identities", mock.Anything).Return(identities, nil)
Expand All @@ -699,8 +697,8 @@ func (suite *Suite) TestExecuteScript() {
suite.backend = backend.New(suite.state,
suite.collClient,
nil,
blocks,
headers,
all.Blocks,
all.Headers,
collections,
transactions,
receipts,
Expand Down Expand Up @@ -731,14 +729,14 @@ func (suite *Suite) TestExecuteScript() {
suite.net.On("Register", channels.ReceiveReceipts, mock.Anything).Return(conduit, nil).
Once()
// create the ingest engine
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, blocks, headers, collections,
ingestEng, err := ingestion.New(suite.log, suite.net, suite.state, suite.me, suite.request, all.Blocks, all.Headers, collections,
transactions, results, receipts, metrics, collectionsToMarkFinalized, collectionsToMarkExecuted, blocksToMarkExecuted, nil)
require.NoError(suite.T(), err)

// create a block and a seal pointing to that block
lastBlock := unittest.BlockFixture()
lastBlock.Header.Height = 2
err = blocks.Store(&lastBlock)
err = all.Blocks.Store(&lastBlock)
require.NoError(suite.T(), err)
err = db.Update(operation.IndexBlockHeight(lastBlock.Header.Height, lastBlock.ID()))
require.NoError(suite.T(), err)
Expand All @@ -755,7 +753,7 @@ func (suite *Suite) TestExecuteScript() {
// create another block as a predecessor of the block created earlier
prevBlock := unittest.BlockFixture()
prevBlock.Header.Height = lastBlock.Header.Height - 1
err = blocks.Store(&prevBlock)
err = all.Blocks.Store(&prevBlock)
require.NoError(suite.T(), err)
err = db.Update(operation.IndexBlockHeight(prevBlock.Header.Height, prevBlock.ID()))
require.NoError(suite.T(), err)
Expand Down
29 changes: 15 additions & 14 deletions module/builder/collection/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"github.com/onflow/flow-go/state/protocol/events"
"github.com/onflow/flow-go/state/protocol/inmem"
"github.com/onflow/flow-go/state/protocol/util"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/badger/procedure"
sutil "github.com/onflow/flow-go/storage/util"
Expand All @@ -43,9 +44,9 @@ type BuilderSuite struct {
genesis *model.Block
chainID flow.ChainID

headers *storage.Headers
payloads *storage.ClusterPayloads
blocks *storage.Blocks
headers storage.Headers
payloads storage.ClusterPayloads
blocks storage.Blocks

state cluster.MutableState

Expand Down Expand Up @@ -73,11 +74,11 @@ func (suite *BuilderSuite) SetupTest() {

metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
headers, _, seals, index, conPayloads, blocks, qcs, setups, commits, statuses, results := sutil.StorageLayer(suite.T(), suite.db)
all := sutil.StorageLayer(suite.T(), suite.db)
consumer := events.NewNoop()
suite.headers = headers
suite.blocks = blocks
suite.payloads = storage.NewClusterPayloads(metrics, suite.db)
suite.headers = all.Headers
suite.blocks = all.Blocks
suite.payloads = bstorage.NewClusterPayloads(metrics, suite.db)

clusterQC := unittest.QuorumCertificateFixture(unittest.QCWithRootBlockID(suite.genesis.ID()))
clusterStateRoot, err := clusterkv.NewStateRoot(suite.genesis, clusterQC)
Expand All @@ -98,10 +99,10 @@ func (suite *BuilderSuite) SetupTest() {
rootSnapshot, err := inmem.SnapshotFromBootstrapState(root, result, seal, unittest.QuorumCertificateFixture(unittest.QCWithRootBlockID(root.ID())))
require.NoError(suite.T(), err)

state, err := pbadger.Bootstrap(metrics, suite.db, headers, seals, results, blocks, qcs, setups, commits, statuses, rootSnapshot)
state, err := pbadger.Bootstrap(metrics, suite.db, all.Headers, all.Seals, all.Results, all.Blocks, all.QuorumCertificates, all.Setups, all.EpochCommits, all.Statuses, rootSnapshot)
require.NoError(suite.T(), err)

suite.protoState, err = pbadger.NewFollowerState(state, index, conPayloads, tracer, consumer, util.MockBlockTimer())
suite.protoState, err = pbadger.NewFollowerState(state, all.Index, all.Payloads, tracer, consumer, util.MockBlockTimer())
require.NoError(suite.T(), err)

// add some transactions to transaction pool
Expand Down Expand Up @@ -979,10 +980,10 @@ func benchmarkBuildOn(b *testing.B, size int) {

metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
headers, _, _, _, _, blocks, _, _, _, _, _ := sutil.StorageLayer(suite.T(), suite.db)
suite.headers = headers
suite.blocks = blocks
suite.payloads = storage.NewClusterPayloads(metrics, suite.db)
all := sutil.StorageLayer(suite.T(), suite.db)
suite.headers = all.Headers
suite.blocks = all.Blocks
suite.payloads = bstorage.NewClusterPayloads(metrics, suite.db)

qc := unittest.QuorumCertificateFixture(unittest.QCWithRootBlockID(suite.genesis.ID()))
stateRoot, err := clusterkv.NewStateRoot(suite.genesis, qc)
Expand Down
8 changes: 4 additions & 4 deletions state/cluster/badger/mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ func (suite *MutatorSuite) SetupTest() {

metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
headers, _, seals, index, conPayloads, blocks, qcs, setups, commits, statuses, results := util.StorageLayer(suite.T(), suite.db)
all := util.StorageLayer(suite.T(), suite.db)
colPayloads := storage.NewClusterPayloads(metrics, suite.db)

clusterStateRoot, err := NewStateRoot(suite.genesis, unittest.QuorumCertificateFixture())
suite.NoError(err)
clusterState, err := Bootstrap(suite.db, clusterStateRoot)
suite.Assert().Nil(err)
suite.state, err = NewMutableState(clusterState, tracer, headers, colPayloads)
suite.state, err = NewMutableState(clusterState, tracer, all.Headers, colPayloads)
suite.Assert().Nil(err)
consumer := events.NewNoop()

Expand All @@ -86,10 +86,10 @@ func (suite *MutatorSuite) SetupTest() {

suite.protoGenesis = genesis.Header

state, err := pbadger.Bootstrap(metrics, suite.db, headers, seals, results, blocks, qcs, setups, commits, statuses, rootSnapshot)
state, err := pbadger.Bootstrap(metrics, suite.db, all.Headers, all.Seals, all.Results, all.Blocks, all.QuorumCertificates, all.Setups, all.EpochCommits, all.Statuses, rootSnapshot)
require.NoError(suite.T(), err)

suite.protoState, err = pbadger.NewFollowerState(state, index, conPayloads, tracer, consumer, protocolutil.MockBlockTimer())
suite.protoState, err = pbadger.NewFollowerState(state, all.Index, all.Payloads, tracer, consumer, protocolutil.MockBlockTimer())
require.NoError(suite.T(), err)
}

Expand Down
6 changes: 3 additions & 3 deletions state/cluster/badger/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ func (suite *SnapshotSuite) SetupTest() {
metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()

headers, _, seals, _, _, blocks, qcs, setups, commits, statuses, results := util.StorageLayer(suite.T(), suite.db)
all := util.StorageLayer(suite.T(), suite.db)
colPayloads := storage.NewClusterPayloads(metrics, suite.db)

clusterStateRoot, err := NewStateRoot(suite.genesis, unittest.QuorumCertificateFixture())
suite.Assert().Nil(err)
clusterState, err := Bootstrap(suite.db, clusterStateRoot)
suite.Assert().Nil(err)
suite.state, err = NewMutableState(clusterState, tracer, headers, colPayloads)
suite.state, err = NewMutableState(clusterState, tracer, all.Headers, colPayloads)
suite.Assert().Nil(err)

participants := unittest.IdentityListFixture(5, unittest.WithAllRoles())
root := unittest.RootSnapshotFixture(participants)

suite.protoState, err = pbadger.Bootstrap(metrics, suite.db, headers, seals, results, blocks, qcs, setups, commits, statuses, root)
suite.protoState, err = pbadger.Bootstrap(metrics, suite.db, all.Headers, all.Seals, all.Results, all.Blocks, all.QuorumCertificates, all.Setups, all.EpochCommits, all.Statuses, root)
require.NoError(suite.T(), err)

suite.Require().Nil(err)
Expand Down
Loading

0 comments on commit cf86037

Please sign in to comment.