Skip to content

Commit

Permalink
Merge pull request #5163 from onflow/petera/4829-index-collection-exe…
Browse files Browse the repository at this point in the history
…cdata

[Access] Index collections from execution data
  • Loading branch information
peterargue committed Jan 4, 2024
1 parent 7be756d commit 4f05dcc
Show file tree
Hide file tree
Showing 11 changed files with 591 additions and 326 deletions.
18 changes: 12 additions & 6 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ type FlowAccessNodeBuilder struct {
ExecutionIndexerCore *indexer.IndexerCore
ScriptExecutor *backend.ScriptExecutor
RegistersAsyncStore *execution.RegistersAsyncStore
IndexerDependencies *cmd.DependencyList

// The sync engine participants provider is the libp2p peer store for the access node
// which is not available until after the network has started.
Expand Down Expand Up @@ -495,8 +496,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess

// setup dependency chain to ensure indexer starts after the requester
requesterDependable := module.NewProxiedReadyDoneAware()
indexerDependencies := cmd.NewDependencyList()
indexerDependencies.Add(requesterDependable)
builder.IndexerDependencies.Add(requesterDependable)

builder.
AdminCommand("read-execution-data", func(config *cmd.NodeConfig) commands.AdminCommand {
Expand Down Expand Up @@ -719,13 +719,13 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess

checkpointHeight := builder.SealedRootBlock.Header.Height

buutstrap, err := pStorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, builder.Logger)
bootstrap, err := pStorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, builder.Logger)
if err != nil {
return nil, fmt.Errorf("could not create registers bootstrapper: %w", err)
return nil, fmt.Errorf("could not create registers bootstrap: %w", err)
}

// TODO: find a way to hook a context up to this to allow a graceful shutdown
err = buutstrap.IndexCheckpointFile(context.Background())
err = bootstrap.IndexCheckpointFile(context.Background())
if err != nil {
return nil, fmt.Errorf("could not load checkpoint file: %w", err)
}
Expand All @@ -746,6 +746,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.Storage.Headers,
builder.Storage.Events,
builder.Storage.LightTransactionResults,
builder.IngestEng.OnCollection,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -792,7 +793,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.ScriptExecutor.InitReporter(builder.ExecutionIndexer, scripts)

return builder.ExecutionIndexer, nil
}, indexerDependencies)
}, builder.IndexerDependencies)
}

if builder.stateStreamConf.ListenAddr != "" {
Expand Down Expand Up @@ -876,6 +877,7 @@ func FlowAccessNode(nodeBuilder *cmd.FlowNodeBuilder) *FlowAccessNodeBuilder {
AccessNodeConfig: DefaultAccessNodeConfig(),
FlowNodeBuilder: nodeBuilder,
FollowerDistributor: dist,
IndexerDependencies: cmd.NewDependencyList(),
}
}

Expand Down Expand Up @@ -1144,6 +1146,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.BuildExecutionSyncComponents()
}

ingestionDependable := module.NewProxiedReadyDoneAware()
builder.IndexerDependencies.Add(ingestionDependable)

builder.
BuildConsensusFollower().
Module("collection node client", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1436,6 +1441,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
if err != nil {
return nil, err
}
ingestionDependable.Init(builder.IngestEng)
builder.RequestEng.WithHandle(builder.IngestEng.OnCollection)
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.IngestEng.OnFinalizedBlock)

Expand Down
Loading

0 comments on commit 4f05dcc

Please sign in to comment.