From c9188b4e35af14c84ef45198f68620c84b7d3290 Mon Sep 17 00:00:00 2001
From: Peter Argue <89119817+peterargue@users.noreply.github.com>
Date: Wed, 27 Sep 2023 14:40:40 -0700
Subject: [PATCH 1/4] [Access] Index tx results and events from exec data

---
 .../indexer/indexer_core.go                   |  50 ++++-
 .../indexer/indexer_core_test.go              | 196 +++++++++++++++++-
 .../indexer/indexer_test.go                   |   1 +
 utils/unittest/fixtures.go                    |  12 ++
 4 files changed, 248 insertions(+), 11 deletions(-)

diff --git a/module/state_synchronization/indexer/indexer_core.go b/module/state_synchronization/indexer/indexer_core.go
index bb1f2a01167..483d294f552 100644
--- a/module/state_synchronization/indexer/indexer_core.go
+++ b/module/state_synchronization/indexer/indexer_core.go
@@ -2,7 +2,9 @@ package indexer
 
 import (
 	"fmt"
+	"time"
 
+	"github.com/dgraph-io/badger/v2"
 	"github.com/rs/zerolog"
 	"golang.org/x/sync/errgroup"
 
@@ -11,6 +13,7 @@ import (
 	"github.com/onflow/flow-go/model/flow"
 	"github.com/onflow/flow-go/module/executiondatasync/execution_data"
 	"github.com/onflow/flow-go/storage"
+	bstorage "github.com/onflow/flow-go/storage/badger"
 	"github.com/onflow/flow-go/utils/logging"
 )
 
@@ -19,7 +22,9 @@ type IndexerCore struct {
 	registers storage.RegisterIndex
 	headers   storage.Headers
 	events    storage.Events
+	results   storage.LightTransactionResults
 	log       zerolog.Logger
+	db        *badger.DB
 }
 
 // New execution state indexer used to ingest block execution data and index it by height.
@@ -27,15 +32,19 @@ type IndexerCore struct {
 // won't be initialized to ensure we have bootstrapped the storage first.
 func New(
 	log zerolog.Logger,
+	db *badger.DB,
 	registers storage.RegisterIndex,
 	headers storage.Headers,
 	events storage.Events,
+	results storage.LightTransactionResults,
 ) (*IndexerCore, error) {
 	return &IndexerCore{
+		log:       log.With().Str("component", "execution_indexer").Logger(),
+		db:        db,
 		registers: registers,
 		headers:   headers,
 		events:    events,
-		log:       log.With().Str("component", "execution_indexer").Logger(),
+		results:   results,
 	}, nil
 }
 
@@ -86,23 +95,54 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
 		lg.Warn().Msg("reindexing block data")
 	}
 
+	start := time.Now()
+
 	// concurrently process indexing of block data
 	g := errgroup.Group{}
 
+	// TODO: collections are currently indexed using the ingestion engine. In many cases, they are
+	// downloaded and indexed before the block is sealed. However, when a node is catching up, it
+	// may download the execution data first. In that case, we should index the collections here.
+
 	g.Go(func() error {
+		start := time.Now()
+
 		events := make([]flow.Event, 0)
+		results := make([]flow.LightTransactionResult, 0)
 		for _, chunk := range data.ChunkExecutionDatas {
 			events = append(events, chunk.Events...)
+			results = append(results, chunk.TransactionResults...)
 		}
 
-		err := c.indexEvents(data.BlockID, events)
+		batch := bstorage.NewBatch(c.db)
+
+		err := c.events.BatchStore(data.BlockID, []flow.EventsList{events}, batch)
 		if err != nil {
 			return fmt.Errorf("could not index events at height %d: %w", block.Height, err)
 		}
+
+		err = c.results.BatchStore(data.BlockID, results, batch)
+		if err != nil {
+			return fmt.Errorf("could not index transaction results at height %d: %w", block.Height, err)
+		}
+
+		batch.Flush()
+		if err != nil {
+			return fmt.Errorf("batch flush error: %w", err)
+		}
+
+		lg.Debug().
+			Int("event_count", len(events)).
+			Int("result_count", len(results)).
+			Dur("duration_ms", time.Since(start)).
+			Msg("indexed badger data")
+
 		return nil
 	})
 
 	g.Go(func() error {
+		start := time.Now()
+
 		// we are iterating all the registers and overwrite any existing register at the same path
 		// this will make sure if we have multiple register changes only the last change will get persisted
 		// if block has two chucks:
@@ -131,6 +171,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
 
 		lg.Debug().
 			Int("register_count", len(payloads)).
+			Dur("duration_ms", time.Since(start)).
 			Msg("indexed registers")
 
 		return nil
@@ -140,6 +181,11 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
 	if err != nil {
 		return fmt.Errorf("failed to index block data at height %d: %w", block.Height, err)
 	}
+
+	lg.Debug().
+		Dur("duration_ms", time.Since(start)).
+		Msg("indexed block data")
+
 	return nil
 }
 
diff --git a/module/state_synchronization/indexer/indexer_core_test.go b/module/state_synchronization/indexer/indexer_core_test.go
index 5d7793ec745..d83774266b5 100644
--- a/module/state_synchronization/indexer/indexer_core_test.go
+++ b/module/state_synchronization/indexer/indexer_core_test.go
@@ -34,6 +34,7 @@ type indexCoreTest struct {
 	indexer          *IndexerCore
 	registers        *storagemock.RegisterIndex
 	events           *storagemock.Events
+	results          *storagemock.LightTransactionResults
 	headers          *storagemock.Headers
 	ctx              context.Context
 	blocks           []*flow.Block
@@ -54,6 +55,7 @@ func newIndexCoreTest(
 		t:         t,
 		registers: storagemock.NewRegisterIndex(t),
 		events:    storagemock.NewEvents(t),
+		results:   storagemock.NewLightTransactionResults(t),
 		blocks:    blocks,
 		ctx:       context.Background(),
 		data:      exeData,
@@ -103,11 +105,22 @@ func (i *indexCoreTest) setStoreRegisters(f func(t *testing.T, entries flow.Regi
 	return i
 }
 
-func (i *indexCoreTest) setStoreEvents(f func(t *testing.T, ID flow.Identifier, events []flow.EventsList) error) *indexCoreTest {
+func (i *indexCoreTest) setStoreEvents(f func(*testing.T, flow.Identifier, []flow.EventsList) error) *indexCoreTest {
 	i.events.
-		On("Store", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.EventsList")).
-		Return(func(ID flow.Identifier, events []flow.EventsList) error {
-			return f(i.t, ID, events)
+		On("BatchStore", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.EventsList"), mock.Anything).
+		Return(func(blockID flow.Identifier, events []flow.EventsList, batch storage.BatchStorage) error {
+			require.NotNil(i.t, batch)
+			return f(i.t, blockID, events)
+		})
+	return i
+}
+
+func (i *indexCoreTest) setStoreTransactionResults(f func(*testing.T, flow.Identifier, []flow.LightTransactionResult) error) *indexCoreTest {
+	i.results.
+		On("BatchStore", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.LightTransactionResult"), mock.Anything).
+		Return(func(blockID flow.Identifier, results []flow.LightTransactionResult, batch storage.BatchStorage) error {
+			require.NotNil(i.t, batch)
+			return f(i.t, blockID, results)
 		})
 	return i
 }
@@ -123,13 +136,25 @@ func (i *indexCoreTest) setGetRegisters(f func(t *testing.T, ID flow.RegisterID,
 
 func (i *indexCoreTest) useDefaultEvents() *indexCoreTest {
 	i.events.
-		On("Store", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.EventsList")).
+		On("BatchStore", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.EventsList"), mock.Anything).
+		Return(nil)
+	return i
+}
+
+func (i *indexCoreTest) useDefaultTransactionResults() *indexCoreTest {
+	i.results.
+		On("BatchStore", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("[]flow.LightTransactionResult"), mock.Anything).
 		Return(nil)
 	return i
 }
 
 func (i *indexCoreTest) initIndexer() *indexCoreTest {
-	indexer, err := New(zerolog.New(os.Stdout), i.registers, i.headers, i.events)
+	db, dbDir := unittest.TempBadgerDB(i.t)
+	i.t.Cleanup(func() {
+		require.NoError(i.t, os.RemoveAll(dbDir))
+	})
+
+	indexer, err := New(zerolog.New(os.Stdout), db, i.registers, i.headers, i.events, i.results)
 	require.NoError(i.t, err)
 	i.indexer = indexer
 	return i
@@ -165,6 +190,7 @@ func TestExecutionState_IndexBlockData(t *testing.T) {
 			initIndexer().
 			useDefaultLastHeight().
 			useDefaultEvents().
+			useDefaultTransactionResults().
 			// make sure update registers match in length and are same as block data ledger payloads
 			setStoreRegisters(func(t *testing.T, entries flow.RegisterEntries, height uint64) error {
 				assert.Equal(t, height, block.Header.Height)
@@ -208,6 +234,7 @@ func TestExecutionState_IndexBlockData(t *testing.T) {
 			initIndexer().
 			useDefaultEvents().
 			useDefaultLastHeight().
+			useDefaultTransactionResults().
 			// make sure update registers match in length and are same as block data ledger payloads
 			setStoreRegisters(func(t *testing.T, entries flow.RegisterEntries, height uint64) error {
 				for _, entry := range entries {
@@ -227,6 +254,152 @@ func TestExecutionState_IndexBlockData(t *testing.T) {
 		assert.True(t, testRegisterFound)
 	})
 
+	t.Run("Index Events", func(t *testing.T) {
+		expectedEvents := unittest.EventsFixture(20)
+		ed := &execution_data.BlockExecutionData{
+			BlockID: block.ID(),
+			ChunkExecutionDatas: []*execution_data.ChunkExecutionData{
+				// split events into 2 chunks
+				{Events: expectedEvents[:10]},
+				{Events: expectedEvents[10:]},
+			},
+		}
+		execData := execution_data.NewBlockExecutionDataEntity(block.ID(), ed)
+
+		err := newIndexCoreTest(t, blocks, execData).
+			initIndexer().
+			useDefaultLastHeight().
+			// make sure all events are stored at once in order
+			setStoreEvents(func(t *testing.T, actualBlockID flow.Identifier, actualEvents []flow.EventsList) error {
+				assert.Equal(t, block.ID(), actualBlockID)
+				require.Len(t, actualEvents, 1)
+				require.Len(t, actualEvents[0], len(expectedEvents))
+				for i, expected := range expectedEvents {
+					assert.Equal(t, expected, actualEvents[0][i])
+				}
+				return nil
+			}).
+			// make sure an empty set of transaction results were stored
+			setStoreTransactionResults(func(t *testing.T, actualBlockID flow.Identifier, actualResults []flow.LightTransactionResult) error {
+				assert.Equal(t, block.ID(), actualBlockID)
+				require.Len(t, actualResults, 0)
+				return nil
+			}).
+			// make sure an empty set of register entries was stored
+			setStoreRegisters(func(t *testing.T, entries flow.RegisterEntries, height uint64) error {
+				assert.Equal(t, height, block.Header.Height)
+				assert.Equal(t, 0, entries.Len())
+				return nil
+			}).
+			runIndexBlockData()
+
+		assert.NoError(t, err)
+	})
+
+	t.Run("Index Tx Results", func(t *testing.T) {
+		expectedResults := unittest.LightTransactionResultsFixture(20)
+		ed := &execution_data.BlockExecutionData{
+			BlockID: block.ID(),
+			ChunkExecutionDatas: []*execution_data.ChunkExecutionData{
+				// split events into 2 chunks
+				{TransactionResults: expectedResults[:10]},
+				{TransactionResults: expectedResults[10:]},
+			},
+		}
+		execData := execution_data.NewBlockExecutionDataEntity(block.ID(), ed)
+
+		err := newIndexCoreTest(t, blocks, execData).
+			initIndexer().
+			useDefaultLastHeight().
+			// make sure an empty set of events were stored
+			setStoreEvents(func(t *testing.T, actualBlockID flow.Identifier, actualEvents []flow.EventsList) error {
+				assert.Equal(t, block.ID(), actualBlockID)
+				require.Len(t, actualEvents, 1)
+				require.Len(t, actualEvents[0], 0)
+				return nil
+			}).
+			// make sure all results are stored at once in order
+			setStoreTransactionResults(func(t *testing.T, actualBlockID flow.Identifier, actualResults []flow.LightTransactionResult) error {
+				assert.Equal(t, block.ID(), actualBlockID)
+				require.Len(t, actualResults, len(expectedResults))
+				for i, expected := range expectedResults {
+					assert.Equal(t, expected, actualResults[i])
+				}
+				return nil
+			}).
+			// make sure an empty set of register entries was stored
+			setStoreRegisters(func(t *testing.T, entries flow.RegisterEntries, height uint64) error {
+				assert.Equal(t, height, block.Header.Height)
+				assert.Equal(t, 0, entries.Len())
+				return nil
+			}).
+			runIndexBlockData()
+
+		assert.NoError(t, err)
+	})
+
+	t.Run("Index AllTheThings", func(t *testing.T) {
+		expectedEvents := unittest.EventsFixture(20)
+		expectedResults := unittest.LightTransactionResultsFixture(20)
+		expectedTries := []*ledger.TrieUpdate{trieUpdateFixture(t), trieUpdateFixture(t)}
+		expectedPayloads := make([]*ledger.Payload, 0)
+		for _, trie := range expectedTries {
+			expectedPayloads = append(expectedPayloads, trie.Payloads...)
+		}
+
+		ed := &execution_data.BlockExecutionData{
+			BlockID: block.ID(),
+			ChunkExecutionDatas: []*execution_data.ChunkExecutionData{
+				{
+					Events:             expectedEvents[:10],
+					TransactionResults: expectedResults[:10],
+					TrieUpdate:         expectedTries[0],
+				},
+				{
+					TransactionResults: expectedResults[10:],
+					Events:             expectedEvents[10:],
+					TrieUpdate:         expectedTries[1],
+				},
+			},
+		}
+		execData := execution_data.NewBlockExecutionDataEntity(block.ID(), ed)
+
+		err := newIndexCoreTest(t, blocks, execData).
+			initIndexer().
+			useDefaultLastHeight().
+			// make sure all events are stored at once in order
+			setStoreEvents(func(t *testing.T, actualBlockID flow.Identifier, actualEvents []flow.EventsList) error {
+				assert.Equal(t, block.ID(), actualBlockID)
+				require.Len(t, actualEvents, 1)
+				require.Len(t, actualEvents[0], len(expectedEvents))
+				for i, expected := range expectedEvents {
+					assert.Equal(t, expected, actualEvents[0][i])
+				}
+				return nil
+			}).
+			// make sure all results are stored at once in order
+			setStoreTransactionResults(func(t *testing.T, actualBlockID flow.Identifier, actualResults []flow.LightTransactionResult) error {
+				assert.Equal(t, block.ID(), actualBlockID)
+				require.Len(t, actualResults, len(expectedResults))
+				for i, expected := range expectedResults {
+					assert.Equal(t, expected, actualResults[i])
+				}
+				return nil
+			}).
+			// make sure update registers match in length and are same as block data ledger payloads
+			setStoreRegisters(func(t *testing.T, entries flow.RegisterEntries, actualHeight uint64) error {
+				assert.Equal(t, actualHeight, block.Header.Height)
+				assert.Equal(t, entries.Len(), len(expectedPayloads))
+
+				// make sure all the registers from the execution data have been stored as well the value matches
+				trieRegistersPayloadComparer(t, expectedPayloads, entries)
+				return nil
+			}).
+			runIndexBlockData()
+
+		assert.NoError(t, err)
+	})
+
 	// this test makes sure we get correct error when we try to index block that is not
 	// within the range of indexed heights.
 	t.Run("Invalid Heights", func(t *testing.T) {
@@ -422,10 +595,15 @@ func TestIndexerIntegration_StoreAndGet(t *testing.T) {
 	regKey := "code"
 	registerID := flow.NewRegisterID(regOwner, regKey)
 
+	db, dbDir := unittest.TempBadgerDB(t)
+	t.Cleanup(func() {
+		require.NoError(t, os.RemoveAll(dbDir))
+	})
+
 	// this test makes sure index values for a single register are correctly updated and always last value is returned
 	t.Run("Single Index Value Changes", func(t *testing.T) {
 		pebbleStorage.RunWithRegistersStorageAtInitialHeights(t, 0, 0, func(registers *pebbleStorage.Registers) {
-			index, err := New(zerolog.Nop(), registers, nil, nil)
+			index, err := New(zerolog.Nop(), db, registers, nil, nil, nil)
 			require.NoError(t, err)
 
 			values := [][]byte{[]byte("1"), []byte("1"), []byte("2"), []byte("3") /*nil,*/, []byte("4")}
@@ -452,7 +630,7 @@ func TestIndexerIntegration_StoreAndGet(t *testing.T) {
 	// e.g. we index A{h(1) -> X}, A{h(2) -> Y}, when we request h(4) we get value Y
 	t.Run("Single Index Value At Later Heights", func(t *testing.T) {
 		pebbleStorage.RunWithRegistersStorageAtInitialHeights(t, 0, 0, func(registers *pebbleStorage.Registers) {
-			index, err := New(zerolog.Nop(), registers, nil, nil)
+			index, err := New(zerolog.Nop(), db, registers, nil, nil, nil)
 			require.NoError(t, err)
 
 			storeValues := [][]byte{[]byte("1"), []byte("2")}
@@ -483,7 +661,7 @@ func TestIndexerIntegration_StoreAndGet(t *testing.T) {
 	// this test makes sure we correctly handle weird payloads
 	t.Run("Empty and Nil Payloads", func(t *testing.T) {
 		pebbleStorage.RunWithRegistersStorageAtInitialHeights(t, 0, 0, func(registers *pebbleStorage.Registers) {
-			index, err := New(zerolog.Nop(), registers, nil, nil)
+			index, err := New(zerolog.Nop(), db, registers, nil, nil, nil)
 			require.NoError(t, err)
 
 			require.NoError(t, index.indexRegisters(map[ledger.Path]*ledger.Payload{}, 1))
diff --git a/module/state_synchronization/indexer/indexer_test.go b/module/state_synchronization/indexer/indexer_test.go
index 0e9423de31f..47827c009e6 100644
--- a/module/state_synchronization/indexer/indexer_test.go
+++ b/module/state_synchronization/indexer/indexer_test.go
@@ -44,6 +44,7 @@ func newIndexerTest(t *testing.T, availableBlocks int, lastIndexedIndex int) *in
 		}).
 		useDefaultBlockByHeight().
 		useDefaultEvents().
+		useDefaultTransactionResults().
 		initIndexer()
 
 	executionData := mempool.NewExecutionData(t)
diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go
index fe91f8229a3..53e06648339 100644
--- a/utils/unittest/fixtures.go
+++ b/utils/unittest/fixtures.go
@@ -2368,6 +2368,18 @@ func TransactionResultsFixture(n int) []flow.TransactionResult {
 	return results
 }
 
+func LightTransactionResultsFixture(n int) []flow.LightTransactionResult {
+	results := make([]flow.LightTransactionResult, 0, n)
+	for i := 0; i < n; i++ {
+		results = append(results, flow.LightTransactionResult{
+			TransactionID:   IdentifierFixture(),
+			Failed:          i%2 == 0,
+			ComputationUsed: Uint64InRange(1, 10_000),
+		})
+	}
+	return results
+}
+
 func AllowAllPeerFilter() func(peer.ID) error {
 	return func(_ peer.ID) error {
 		return nil

From eb2e24206466ce771c67ec52fc01a322faccf643 Mon Sep 17 00:00:00 2001
From: Peter Argue <89119817+peterargue@users.noreply.github.com>
Date: Thu, 28 Sep 2023 14:38:51 -0700
Subject: [PATCH 2/4] close badger db on test shutdown

---
 module/state_synchronization/indexer/indexer_core_test.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/module/state_synchronization/indexer/indexer_core_test.go b/module/state_synchronization/indexer/indexer_core_test.go
index d83774266b5..9254ba32184 100644
--- a/module/state_synchronization/indexer/indexer_core_test.go
+++ b/module/state_synchronization/indexer/indexer_core_test.go
@@ -151,6 +151,7 @@ func (i *indexCoreTest) useDefaultTransactionResults() *indexCoreTest {
 func (i *indexCoreTest) initIndexer() *indexCoreTest {
 	db, dbDir := unittest.TempBadgerDB(i.t)
 	i.t.Cleanup(func() {
+		require.NoError(i.t, db.Close())
 		require.NoError(i.t, os.RemoveAll(dbDir))
 	})
 

From ca8d353e52f43ec89a9f316b3a4b9ce4c9c74a67 Mon Sep 17 00:00:00 2001
From: Peter Argue <89119817+peterargue@users.noreply.github.com>
Date: Fri, 29 Sep 2023 05:36:49 -0700
Subject: [PATCH 3/4] remove unused function

---
 module/state_synchronization/indexer/indexer_core.go | 6 ------
 1 file changed, 6 deletions(-)

diff --git a/module/state_synchronization/indexer/indexer_core.go b/module/state_synchronization/indexer/indexer_core.go
index 483d294f552..17ed7cfa614 100644
--- a/module/state_synchronization/indexer/indexer_core.go
+++ b/module/state_synchronization/indexer/indexer_core.go
@@ -189,12 +189,6 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
 	return nil
 }
 
-func (c *IndexerCore) indexEvents(blockID flow.Identifier, events flow.EventsList) error {
-	// Note: the last chunk in an execution data is the system chunk. All events in that ChunkExecutionData are service events.
-	err := c.events.Store(blockID, []flow.EventsList{events})
-	return err
-}
-
 func (c *IndexerCore) indexRegisters(registers map[ledger.Path]*ledger.Payload, height uint64) error {
 	regEntries := make(flow.RegisterEntries, 0, len(registers))
 

From f8c0c34974c78f3f0032795ce7f7ec2b6a116309 Mon Sep 17 00:00:00 2001
From: Peter Argue <89119817+peterargue@users.noreply.github.com>
Date: Fri, 29 Sep 2023 11:10:41 -0700
Subject: [PATCH 4/4] use interface instead of passing full badger db

---
 module/state_synchronization/indexer/indexer_core.go | 9 ++++-----
 storage/badger/batch.go                              | 6 +++++-
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/module/state_synchronization/indexer/indexer_core.go b/module/state_synchronization/indexer/indexer_core.go
index 17ed7cfa614..9027fd222ef 100644
--- a/module/state_synchronization/indexer/indexer_core.go
+++ b/module/state_synchronization/indexer/indexer_core.go
@@ -4,7 +4,6 @@ import (
 	"fmt"
 	"time"
 
-	"github.com/dgraph-io/badger/v2"
 	"github.com/rs/zerolog"
 	"golang.org/x/sync/errgroup"
 
@@ -24,7 +23,7 @@ type IndexerCore struct {
 	events    storage.Events
 	results   storage.LightTransactionResults
 	log       zerolog.Logger
-	db        *badger.DB
+	batcher   bstorage.BatchBuilder
 }
 
 // New execution state indexer used to ingest block execution data and index it by height.
@@ -32,7 +31,7 @@ type IndexerCore struct {
 // won't be initialized to ensure we have bootstrapped the storage first.
 func New(
 	log zerolog.Logger,
-	db *badger.DB,
+	batcher bstorage.BatchBuilder,
 	registers storage.RegisterIndex,
 	headers storage.Headers,
 	events storage.Events,
@@ -40,7 +39,7 @@ func New(
 ) (*IndexerCore, error) {
 	return &IndexerCore{
 		log:       log.With().Str("component", "execution_indexer").Logger(),
-		db:        db,
+		batcher:   batcher,
 		registers: registers,
 		headers:   headers,
 		events:    events,
@@ -114,7 +113,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
 			results = append(results, chunk.TransactionResults...)
 		}
 
-		batch := bstorage.NewBatch(c.db)
+		batch := bstorage.NewBatch(c.batcher)
 
 		err := c.events.BatchStore(data.BlockID, []flow.EventsList{events}, batch)
 		if err != nil {
diff --git a/storage/badger/batch.go b/storage/badger/batch.go
index a3977544f4a..0ea68c82fcb 100644
--- a/storage/badger/batch.go
+++ b/storage/badger/batch.go
@@ -6,6 +6,10 @@ import (
 	"github.com/dgraph-io/badger/v2"
 )
 
+type BatchBuilder interface {
+	NewWriteBatch() *badger.WriteBatch
+}
+
 type Batch struct {
 	writer *badger.WriteBatch
 
@@ -13,7 +17,7 @@ type Batch struct {
 	callbacks []func()
 }
 
-func NewBatch(db *badger.DB) *Batch {
+func NewBatch(db BatchBuilder) *Batch {
 	batch := db.NewWriteBatch()
 	return &Batch{
 		writer:    batch,