Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Fixed public network execution data service component #5375

13 changes: 12 additions & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,13 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
})

if builder.publicNetworkExecutionDataEnabled {
var publicBsDependable *module.ProxiedReadyDoneAware

builder.Module("public blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
publicBsDependable = module.NewProxiedReadyDoneAware()
builder.PeerManagerDependencies.Add(publicBsDependable)
return nil
})
builder.Component("public network execution data service", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
opts := []network.BlobServiceOption{
blob.WithBitswapOptions(
Expand All @@ -685,7 +692,11 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil, fmt.Errorf("could not register blob service: %w", err)
}

return builder.PublicBlobService, nil
// add blobservice into ReadyDoneAware dependency passed to peer manager
// this starts the blob service and configures peer manager to wait for the blobservice
// to be ready before starting
publicBsDependable.Init(builder.PublicBlobService)
return &module.NoopReadyDoneAware{}, nil
peterargue marked this conversation as resolved.
Show resolved Hide resolved
})
}

Expand Down
3 changes: 3 additions & 0 deletions integration/testnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ const (
// PrimaryAN is the container name for the primary access node to use for API requests
PrimaryAN = "access_1"

// PrimaryON is the container name for the primary observer node to use for API requests
PrimaryON = "observer_1"

DefaultViewsInStakingAuction uint64 = 5
DefaultViewsInDKGPhase uint64 = 50
DefaultViewsInEpoch uint64 = 200
Expand Down
74 changes: 55 additions & 19 deletions integration/tests/access/cohort3/execution_state_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ type ExecutionStateSyncSuite struct {

log zerolog.Logger

bridgeID flow.Identifier
ghostID flow.Identifier
bridgeID flow.Identifier
ghostID flow.Identifier
observerName string

// root context for the current test
ctx context.Context
Expand Down Expand Up @@ -75,11 +76,12 @@ func (s *ExecutionStateSyncSuite) buildNetworkConfig() {
bridgeANConfig := testnet.NewNodeConfig(
flow.RoleAccess,
testnet.WithID(s.bridgeID),
testnet.WithLogLevel(zerolog.DebugLevel),
testnet.WithLogLevel(zerolog.InfoLevel),
testnet.WithAdditionalFlag("--supports-observer=true"),
testnet.WithAdditionalFlag("--execution-data-sync-enabled=true"),
testnet.WithAdditionalFlag(fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir)),
testnet.WithAdditionalFlag("--execution-data-retry-delay=1s"),
testnet.WithAdditionalFlagf("--public-network-execution-data-sync-enabled=true"),
)

// add the ghost (access) node config
Expand Down Expand Up @@ -108,18 +110,28 @@ func (s *ExecutionStateSyncSuite) buildNetworkConfig() {
testnet.NewNodeConfig(flow.RoleVerification, testnet.WithLogLevel(zerolog.FatalLevel)),
bridgeANConfig,
ghostNode,
// TODO: add observer
}

conf := testnet.NewNetworkConfig("execution state sync test", net)
// add the observer node config
s.observerName = testnet.PrimaryON
observers := []testnet.ObserverConfig{{
ContainerName: s.observerName,
LogLevel: zerolog.DebugLevel,
AdditionalFlags: []string{
fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir),
"--execution-data-sync-enabled=true",
},
}}

conf := testnet.NewNetworkConfig("execution state sync test", net, testnet.WithObservers(observers...))
s.net = testnet.PrepareFlowNetwork(s.T(), conf, flow.Localnet)
}

// TestHappyPath tests that Execution Nodes generate execution data, and Access Nodes are able to
// successfully sync the data
func (s *ExecutionStateSyncSuite) TestHappyPath() {
// Let the network run for this many blocks
runBlocks := uint64(20)
runBlocks := uint64(60)

// We will check that execution data was downloaded for this many blocks
// It has to be less than runBlocks since it's not possible to see which height the AN stopped
Expand All @@ -135,31 +147,55 @@ func (s *ExecutionStateSyncSuite) TestHappyPath() {
s.BlockState.WaitForSealed(s.T(), blockA.Header.Height+runBlocks)
s.net.StopContainers()

metrics := metrics.NewNoopCollector()

// start an execution data service using the Access Node's execution data db
an := s.net.ContainerByID(s.bridgeID)
eds := s.nodeExecutionDataStore(an)
anEds := s.nodeExecutionDataStore(an)

// setup storage objects needed to get the execution data id
db, err := an.DB()
anDB, err := an.DB()
require.NoError(s.T(), err, "could not open db")

metrics := metrics.NewNoopCollector()
headers := storage.NewHeaders(metrics, db)
results := storage.NewExecutionResults(metrics, db)
anHeaders := storage.NewHeaders(metrics, anDB)
anResults := storage.NewExecutionResults(metrics, anDB)

// start an execution data service using the Observer Node's execution data db
on := s.net.ContainerByName(s.observerName)
onEds := s.nodeExecutionDataStore(on)

// setup storage objects needed to get the execution data id
onDB, err := on.DB()
require.NoError(s.T(), err, "could not open db")

onHeaders := storage.NewHeaders(metrics, onDB)
onResults := storage.NewExecutionResults(metrics, onDB)

// Loop through checkBlocks and verify the execution data was downloaded correctly
for i := blockA.Header.Height; i <= blockA.Header.Height+checkBlocks; i++ {
header, err := headers.ByHeight(i)
require.NoError(s.T(), err, "could not get header")
// access node
header, err := anHeaders.ByHeight(i)
require.NoError(s.T(), err, "%s: could not get header", testnet.PrimaryAN)

result, err := anResults.ByBlockID(header.ID())
require.NoError(s.T(), err, "%s: could not get sealed result", testnet.PrimaryAN)

ed, err := anEds.Get(s.ctx, result.ExecutionDataID)
if assert.NoError(s.T(), err, "%s: could not get execution data for height %v", testnet.PrimaryAN, i) {
s.T().Logf("%s: got execution data for height %d", testnet.PrimaryAN, i)
assert.Equal(s.T(), header.ID(), ed.BlockID)
}

result, err := results.ByBlockID(header.ID())
require.NoError(s.T(), err, "could not get sealed result")
// observer node
header, err = onHeaders.ByHeight(i)
require.NoError(s.T(), err, "%s: could not get header", testnet.PrimaryON)

s.T().Logf("getting execution data for height %d, block %s, execution_data %s", header.Height, header.ID(), result.ExecutionDataID)
result, err = onResults.ByID(result.ID())
require.NoError(s.T(), err, "%s: could not get sealed result from ON`s storage", testnet.PrimaryON)

ed, err := eds.Get(s.ctx, result.ExecutionDataID)
if assert.NoError(s.T(), err, "could not get execution data for height %v", i) {
s.T().Logf("got execution data for height %d", i)
ed, err = onEds.Get(s.ctx, result.ExecutionDataID)
if assert.NoError(s.T(), err, "%s: could not get execution data for height %v", testnet.PrimaryON, i) {
s.T().Logf("%s: got execution data for height %d", testnet.PrimaryON, i)
assert.Equal(s.T(), header.ID(), ed.BlockID)
}
}
Expand Down
1 change: 1 addition & 0 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type EngineRegistry interface {

// RegisterBlobService registers a BlobService on the given channel, using the given datastore to retrieve values.
// The returned BlobService can be used to request blocks from the network.
// RegisterBlobService starts the BlobService component using the network's context.
// TODO: We should return a function that can be called to unregister / close the BlobService
RegisterBlobService(channel channels.Channel, store datastore.Batching, opts ...BlobServiceOption) (BlobService, error)

Expand Down
Loading