Skip to content

Commit

Permalink
Merge pull request #4772 from onflow/petera/4739-index-tx-results-events
Browse files Browse the repository at this point in the history
[Access] Index tx results and events
  • Loading branch information
peterargue authored Sep 29, 2023
2 parents 603e204 + 743c770 commit f7a3a89
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 18 deletions.
55 changes: 47 additions & 8 deletions module/state_synchronization/indexer/indexer_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"fmt"
"time"

"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/utils/logging"
)

Expand All @@ -19,23 +21,29 @@ type IndexerCore struct {
registers storage.RegisterIndex
headers storage.Headers
events storage.Events
results storage.LightTransactionResults
log zerolog.Logger
batcher bstorage.BatchBuilder
}

// New execution state indexer used to ingest block execution data and index it by height.
// The passed RegisterIndex storage must be populated to include the first and last height otherwise the indexer
// won't be initialized to ensure we have bootstrapped the storage first.
func New(
log zerolog.Logger,
batcher bstorage.BatchBuilder,
registers storage.RegisterIndex,
headers storage.Headers,
events storage.Events,
results storage.LightTransactionResults,
) (*IndexerCore, error) {
return &IndexerCore{
log: log.With().Str("component", "execution_indexer").Logger(),
batcher: batcher,
registers: registers,
headers: headers,
events: events,
log: log.With().Str("component", "execution_indexer").Logger(),
results: results,
}, nil
}

Expand Down Expand Up @@ -86,23 +94,54 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
lg.Warn().Msg("reindexing block data")
}

start := time.Now()

// concurrently process indexing of block data
g := errgroup.Group{}

// TODO: collections are currently indexed using the ingestion engine. In many cases, they are
// downloaded and indexed before the block is sealed. However, when a node is catching up, it
// may download the execution data first. In that case, we should index the collections here.

g.Go(func() error {
start := time.Now()

events := make([]flow.Event, 0)
results := make([]flow.LightTransactionResult, 0)
for _, chunk := range data.ChunkExecutionDatas {
events = append(events, chunk.Events...)
results = append(results, chunk.TransactionResults...)
}

err := c.indexEvents(data.BlockID, events)
batch := bstorage.NewBatch(c.batcher)

err := c.events.BatchStore(data.BlockID, []flow.EventsList{events}, batch)
if err != nil {
return fmt.Errorf("could not index events at height %d: %w", block.Height, err)
}

err = c.results.BatchStore(data.BlockID, results, batch)
if err != nil {
return fmt.Errorf("could not index transaction results at height %d: %w", block.Height, err)
}

batch.Flush()
if err != nil {
return fmt.Errorf("batch flush error: %w", err)
}

lg.Debug().
Int("event_count", len(events)).
Int("result_count", len(results)).
Dur("duration_ms", time.Since(start)).
Msg("indexed badger data")

return nil
})

g.Go(func() error {
start := time.Now()

// we are iterating all the registers and overwrite any existing register at the same path
// this will make sure if we have multiple register changes only the last change will get persisted
// if block has two chucks:
Expand Down Expand Up @@ -131,6 +170,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti

lg.Debug().
Int("register_count", len(payloads)).
Dur("duration_ms", time.Since(start)).
Msg("indexed registers")

return nil
Expand All @@ -140,13 +180,12 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
if err != nil {
return fmt.Errorf("failed to index block data at height %d: %w", block.Height, err)
}
return nil
}

func (c *IndexerCore) indexEvents(blockID flow.Identifier, events flow.EventsList) error {
// Note: the last chunk in an execution data is the system chunk. All events in that ChunkExecutionData are service events.
err := c.events.Store(blockID, []flow.EventsList{events})
return err
lg.Debug().
Dur("duration_ms", time.Since(start)).
Msg("indexed block data")

return nil
}

func (c *IndexerCore) indexRegisters(registers map[ledger.Path]*ledger.Payload, height uint64) error {
Expand Down
Loading

0 comments on commit f7a3a89

Please sign in to comment.