diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index e33c6e64326..b5525de5003 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -35,6 +35,7 @@ import ( hotstuffvalidator "github.com/onflow/flow-go/consensus/hotstuff/validator" "github.com/onflow/flow-go/consensus/hotstuff/verification" recovery "github.com/onflow/flow-go/consensus/recovery/protocol" + "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/access/apiproxy" "github.com/onflow/flow-go/engine/access/rest" restapiproxy "github.com/onflow/flow-go/engine/access/rest/apiproxy" @@ -57,6 +58,7 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/blobs" "github.com/onflow/flow-go/module/chainsync" + "github.com/onflow/flow-go/module/execution" "github.com/onflow/flow-go/module/executiondatasync/execution_data" execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" @@ -132,6 +134,8 @@ type ObserverServiceConfig struct { registersDBPath string checkpointFile string apiTimeout time.Duration + stateStreamConf statestreambackend.Config + stateStreamFilterConf map[string]int upstreamNodeAddresses []string upstreamNodePublicKeys []string upstreamIdentities flow.IdentitySkeletonList // the identity list of upstream peers the node uses to forward API requests to @@ -141,7 +145,6 @@ type ObserverServiceConfig struct { executionDataDir string executionDataStartHeight uint64 executionDataConfig edrequester.ExecutionDataConfig - executionDataCacheSize uint32 // TODO: remove it when state stream is added } // DefaultObserverServiceConfig defines all the default values for the ObserverServiceConfig @@ -171,6 +174,18 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig { MaxMsgSize: grpcutils.DefaultMaxMsgSize, CompressorName: grpcutils.NoCompressor, }, + stateStreamConf: statestreambackend.Config{ + MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize, + ExecutionDataCacheSize: state_stream.DefaultCacheSize, + ClientSendTimeout: state_stream.DefaultSendTimeout, + ClientSendBufferSize: state_stream.DefaultSendBufferSize, + MaxGlobalStreams: state_stream.DefaultMaxGlobalStreams, + EventFilterConfig: state_stream.DefaultEventFilterConfig, + ResponseLimit: state_stream.DefaultResponseLimit, + HeartbeatInterval: state_stream.DefaultHeartbeatInterval, + RegisterIDsRequestLimit: state_stream.DefaultRegisterIDsRequestLimit, + }, + stateStreamFilterConf: nil, rpcMetricsEnabled: false, apiRatelimits: nil, apiBurstlimits: nil, @@ -195,7 +210,6 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig { RetryDelay: edrequester.DefaultRetryDelay, MaxRetryDelay: edrequester.DefaultMaxRetryDelay, }, - executionDataCacheSize: state_stream.DefaultCacheSize, } } @@ -206,39 +220,48 @@ type ObserverServiceBuilder struct { *ObserverServiceConfig // components - LibP2PNode p2p.LibP2PNode - FollowerState stateprotocol.FollowerState - SyncCore *chainsync.Core - RpcEng *rpc.Engine - FollowerDistributor *pubsub.FollowerDistributor - Committee hotstuff.DynamicCommittee - Finalized *flow.Header - Pending []*flow.Header - FollowerCore module.HotStuffFollower - ExecutionDataRequester state_synchronization.ExecutionDataRequester - ExecutionIndexer *indexer.Indexer - ExecutionIndexerCore *indexer.IndexerCore - IndexerDependencies *cmd.DependencyList + + LibP2PNode p2p.LibP2PNode + FollowerState stateprotocol.FollowerState + SyncCore *chainsync.Core + RpcEng *rpc.Engine + FollowerDistributor *pubsub.FollowerDistributor + Committee hotstuff.DynamicCommittee + Finalized *flow.Header + Pending []*flow.Header + FollowerCore module.HotStuffFollower + ExecutionIndexer *indexer.Indexer + ExecutionIndexerCore *indexer.IndexerCore + IndexerDependencies *cmd.DependencyList + + ExecutionDataDownloader execution_data.Downloader + ExecutionDataRequester state_synchronization.ExecutionDataRequester + ExecutionDataStore execution_data.ExecutionDataStore + + RegistersAsyncStore *execution.RegistersAsyncStore + EventsIndex *backend.EventsIndex // available until after the network has started. Hence, a factory function that needs to be called just before // creating the sync engine SyncEngineParticipantsProviderFactory func() module.IdentifierProvider // engines - FollowerEng *follower.ComplianceEngine - SyncEng *synceng.Engine + FollowerEng *follower.ComplianceEngine + SyncEng *synceng.Engine + StateStreamEng *statestreambackend.Engine // Public network peerID peer.ID RestMetrics *metrics.RestCollector AccessMetrics module.AccessMetrics + // grpc servers - secureGrpcServer *grpcserver.GrpcServer - unsecureGrpcServer *grpcserver.GrpcServer + secureGrpcServer *grpcserver.GrpcServer + unsecureGrpcServer *grpcserver.GrpcServer + stateStreamGrpcServer *grpcserver.GrpcServer - ExecutionDataDownloader execution_data.Downloader - ExecutionDataStore execution_data.ExecutionDataStore + stateStreamBackend *statestreambackend.StateStreamBackend } // deriveBootstrapPeerIdentities derives the Flow Identity of the bootstrap peers from the parameters. @@ -630,10 +653,53 @@ func (builder *ObserverServiceBuilder) extraFlags() { "execution-data-max-retry-delay", defaultConfig.executionDataConfig.MaxRetryDelay, "maximum delay for exponential backoff when fetching execution data fails e.g. 5m") - flags.Uint32Var(&builder.executionDataCacheSize, + + // Streaming API + flags.StringVar(&builder.stateStreamConf.ListenAddr, + "state-stream-addr", + defaultConfig.stateStreamConf.ListenAddr, + "the address the state stream server listens on (if empty the server will not be started)") + flags.Uint32Var(&builder.stateStreamConf.ExecutionDataCacheSize, "execution-data-cache-size", - defaultConfig.executionDataCacheSize, + defaultConfig.stateStreamConf.ExecutionDataCacheSize, "block execution data cache size") + flags.Uint32Var(&builder.stateStreamConf.MaxGlobalStreams, + "state-stream-global-max-streams", defaultConfig.stateStreamConf.MaxGlobalStreams, + "global maximum number of concurrent streams") + flags.UintVar(&builder.stateStreamConf.MaxExecutionDataMsgSize, + "state-stream-max-message-size", + defaultConfig.stateStreamConf.MaxExecutionDataMsgSize, + "maximum size for a gRPC message containing block execution data") + flags.StringToIntVar(&builder.stateStreamFilterConf, + "state-stream-event-filter-limits", + defaultConfig.stateStreamFilterConf, + "event filter limits for ExecutionData SubscribeEvents API e.g. EventTypes=100,Addresses=100,Contracts=100 etc.") + flags.DurationVar(&builder.stateStreamConf.ClientSendTimeout, + "state-stream-send-timeout", + defaultConfig.stateStreamConf.ClientSendTimeout, + "maximum wait before timing out while sending a response to a streaming client e.g. 30s") + flags.UintVar(&builder.stateStreamConf.ClientSendBufferSize, + "state-stream-send-buffer-size", + defaultConfig.stateStreamConf.ClientSendBufferSize, + "maximum number of responses to buffer within a stream") + flags.Float64Var(&builder.stateStreamConf.ResponseLimit, + "state-stream-response-limit", + defaultConfig.stateStreamConf.ResponseLimit, + "max number of responses per second to send over streaming endpoints. this helps manage resources consumed by each client querying data not in the cache e.g. 3 or 0.5. 0 means no limit") + flags.Uint64Var(&builder.stateStreamConf.HeartbeatInterval, + "state-stream-heartbeat-interval", + defaultConfig.stateStreamConf.HeartbeatInterval, + "default interval in blocks at which heartbeat messages should be sent. applied when client did not specify a value.") + flags.Uint32Var(&builder.stateStreamConf.RegisterIDsRequestLimit, + "state-stream-max-register-values", + defaultConfig.stateStreamConf.RegisterIDsRequestLimit, + "maximum number of register ids to include in a single request to the GetRegisters endpoint") + + flags.StringVar(&builder.rpcConf.BackendConfig.EventQueryMode, + "event-query-mode", + defaultConfig.rpcConf.BackendConfig.EventQueryMode, + "mode to use when querying events. one of [local-only, execution-nodes-only(default), failover]") + }).ValidateFlags(func() error { if builder.executionDataSyncEnabled { if builder.executionDataConfig.FetchTimeout <= 0 { @@ -652,6 +718,33 @@ func (builder *ObserverServiceBuilder) extraFlags() { return errors.New("execution-data-max-search-ahead must be greater than 0") } } + if builder.stateStreamConf.ListenAddr != "" { + if builder.stateStreamConf.ExecutionDataCacheSize == 0 { + return errors.New("execution-data-cache-size must be greater than 0") + } + if builder.stateStreamConf.ClientSendBufferSize == 0 { + return errors.New("state-stream-send-buffer-size must be greater than 0") + } + if len(builder.stateStreamFilterConf) > 3 { + return errors.New("state-stream-event-filter-limits must have at most 3 keys (EventTypes, Addresses, Contracts)") + } + for key, value := range builder.stateStreamFilterConf { + switch key { + case "EventTypes", "Addresses", "Contracts": + if value <= 0 { + return fmt.Errorf("state-stream-event-filter-limits %s must be greater than 0", key) + } + default: + return errors.New("state-stream-event-filter-limits may only contain the keys EventTypes, Addresses, Contracts") + } + } + if builder.stateStreamConf.ResponseLimit < 0 { + return errors.New("state-stream-response-limit must be greater than or equal to 0") + } + if builder.stateStreamConf.RegisterIDsRequestLimit <= 0 { + return errors.New("state-stream-max-register-values must be greater than 0") + } + } return nil }) @@ -976,7 +1069,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS heroCacheCollector = metrics.AccessNodeExecutionDataCacheMetrics(builder.MetricsRegisterer) } - execDataCacheBackend = herocache.NewBlockExecutionData(builder.executionDataCacheSize, builder.Logger, heroCacheCollector) + execDataCacheBackend = herocache.NewBlockExecutionData(builder.stateStreamConf.ExecutionDataCacheSize, builder.Logger, heroCacheCollector) // Execution Data cache that uses a blobstore as the backend (instead of a downloader) // This ensures that it simply returns a not found error if the blob doesn't exist @@ -1195,10 +1288,90 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS // setup requester to notify indexer when new execution data is received execDataDistributor.AddOnExecutionDataReceivedConsumer(builder.ExecutionIndexer.OnExecutionData) + err = builder.EventsIndex.Initialize(builder.ExecutionIndexer) + if err != nil { + return nil, err + } + + err = builder.RegistersAsyncStore.Initialize(registers) + if err != nil { + return nil, err + } + return builder.ExecutionIndexer, nil }, builder.IndexerDependencies) } + if builder.stateStreamConf.ListenAddr != "" { + builder.Component("exec state stream engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + for key, value := range builder.stateStreamFilterConf { + switch key { + case "EventTypes": + builder.stateStreamConf.MaxEventTypes = value + case "Addresses": + builder.stateStreamConf.MaxAddresses = value + case "Contracts": + builder.stateStreamConf.MaxContracts = value + } + } + builder.stateStreamConf.RpcMetricsEnabled = builder.rpcMetricsEnabled + + highestAvailableHeight, err := builder.ExecutionDataRequester.HighestConsecutiveHeight() + if err != nil { + return nil, fmt.Errorf("could not get highest consecutive height: %w", err) + } + broadcaster := engine.NewBroadcaster() + + eventQueryMode, err := backend.ParseIndexQueryMode(builder.rpcConf.BackendConfig.EventQueryMode) + if err != nil { + return nil, fmt.Errorf("could not parse event query mode: %w", err) + } + + // use the events index for events if enabled and the node is configured to use it for + // regular event queries + useIndex := builder.executionDataIndexingEnabled && + eventQueryMode != backend.IndexQueryModeExecutionNodesOnly + + builder.stateStreamBackend, err = statestreambackend.New( + node.Logger, + builder.stateStreamConf, + node.State, + node.Storage.Headers, + node.Storage.Seals, + node.Storage.Results, + builder.ExecutionDataStore, + executionDataStoreCache, + broadcaster, + builder.executionDataConfig.InitialBlockHeight, + highestAvailableHeight, + builder.RegistersAsyncStore, + builder.EventsIndex, + useIndex, + ) + if err != nil { + return nil, fmt.Errorf("could not create state stream backend: %w", err) + } + + stateStreamEng, err := statestreambackend.NewEng( + node.Logger, + builder.stateStreamConf, + executionDataStoreCache, + node.Storage.Headers, + node.RootChainID, + builder.stateStreamGrpcServer, + builder.stateStreamBackend, + broadcaster, + ) + if err != nil { + return nil, fmt.Errorf("could not create state stream engine: %w", err) + } + builder.StateStreamEng = stateStreamEng + + execDataDistributor.AddOnExecutionDataReceivedConsumer(builder.StateStreamEng.OnExecutionData) + + return builder.StateStreamEng, nil + }) + } return builder } @@ -1315,20 +1488,40 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.apiBurstlimits, grpcserver.WithTransportCredentials(builder.rpcConf.TransportCredentials)).Build() - builder.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger, - builder.rpcConf.UnsecureGRPCListenAddr, - builder.rpcConf.MaxMsgSize, + builder.stateStreamGrpcServer = grpcserver.NewGrpcServerBuilder( + node.Logger, + builder.stateStreamConf.ListenAddr, + builder.stateStreamConf.MaxExecutionDataMsgSize, builder.rpcMetricsEnabled, builder.apiRatelimits, - builder.apiBurstlimits).Build() + builder.apiBurstlimits, + grpcserver.WithStreamInterceptor()).Build() + + if builder.rpcConf.UnsecureGRPCListenAddr != builder.stateStreamConf.ListenAddr { + builder.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger, + builder.rpcConf.UnsecureGRPCListenAddr, + builder.rpcConf.MaxMsgSize, + builder.rpcMetricsEnabled, + builder.apiRatelimits, + builder.apiBurstlimits).Build() + } else { + builder.unsecureGrpcServer = builder.stateStreamGrpcServer + } return nil }) - + builder.Module("async register store", func(node *cmd.NodeConfig) error { + builder.RegistersAsyncStore = execution.NewRegistersAsyncStore() + return nil + }) builder.Module("events storage", func(node *cmd.NodeConfig) error { builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB) return nil }) + builder.Module("events index", func(node *cmd.NodeConfig) error { + builder.EventsIndex = backend.NewEventsIndex(builder.Storage.Events) + return nil + }) builder.Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { accessMetrics := builder.AccessMetrics config := builder.rpcConf @@ -1396,7 +1589,6 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { return nil, err } - stateStreamConfig := statestreambackend.Config{} engineBuilder, err := rpc.NewBuilder( node.Logger, node.State, @@ -1409,8 +1601,8 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { restHandler, builder.secureGrpcServer, builder.unsecureGrpcServer, - nil, // state streaming is not supported - stateStreamConfig, + builder.stateStreamBackend, + builder.stateStreamConf, ) if err != nil { return nil, err @@ -1456,10 +1648,15 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { return builder.secureGrpcServer, nil }) - // build unsecure grpc server - builder.Component("unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { - return builder.unsecureGrpcServer, nil + builder.Component("state stream unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + return builder.stateStreamGrpcServer, nil }) + + if builder.rpcConf.UnsecureGRPCListenAddr != builder.stateStreamConf.ListenAddr { + builder.Component("unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + return builder.unsecureGrpcServer, nil + }) + } } func loadNetworkingKey(path string) (crypto.PrivateKey, error) { diff --git a/integration/localnet/builder/bootstrap.go b/integration/localnet/builder/bootstrap.go index 5724b11f363..1a6b7a2162b 100644 --- a/integration/localnet/builder/bootstrap.go +++ b/integration/localnet/builder/bootstrap.go @@ -469,10 +469,12 @@ func prepareObserverService(i int, observerName string, agPublicKey string) Serv fmt.Sprintf("--secure-rpc-addr=%s:%s", observerName, testnet.GRPCSecurePort), fmt.Sprintf("--http-addr=%s:%s", observerName, testnet.GRPCWebPort), fmt.Sprintf("--rest-addr=%s:%s", observerName, testnet.RESTPort), + fmt.Sprintf("--state-stream-addr=%s:%s", observerName, testnet.ExecutionStatePort), "--execution-data-dir=/data/execution-data", "--execution-data-sync-enabled=true", "--execution-data-indexing-enabled=true", "--execution-state-dir=/data/execution-state", + "--event-query-mode=execution-nodes-only", ) service.AddExposedPorts( @@ -480,6 +482,7 @@ func prepareObserverService(i int, observerName string, agPublicKey string) Serv testnet.GRPCSecurePort, testnet.GRPCWebPort, testnet.RESTPort, + testnet.ExecutionStatePort, ) // observer services rely on the access gateway diff --git a/integration/testnet/network.go b/integration/testnet/network.go index 52ec09614d2..5a4484c39be 100644 --- a/integration/testnet/network.go +++ b/integration/testnet/network.go @@ -783,6 +783,9 @@ func (net *FlowNetwork) addObserver(t *testing.T, conf ObserverConfig) { nodeContainer.exposePort(RESTPort, testingdock.RandomPort(t)) nodeContainer.AddFlag("rest-addr", nodeContainer.ContainerAddr(RESTPort)) + nodeContainer.exposePort(ExecutionStatePort, testingdock.RandomPort(t)) + nodeContainer.AddFlag("state-stream-addr", nodeContainer.ContainerAddr(ExecutionStatePort)) + nodeContainer.opts.HealthCheck = testingdock.HealthCheckCustom(nodeContainer.HealthcheckCallback()) suiteContainer := net.suite.Container(containerOpts) diff --git a/integration/tests/access/cohort3/execution_state_sync_test.go b/integration/tests/access/cohort3/execution_state_sync_test.go index 7d6118ef4b8..38a156549db 100644 --- a/integration/tests/access/cohort3/execution_state_sync_test.go +++ b/integration/tests/access/cohort3/execution_state_sync_test.go @@ -120,6 +120,7 @@ func (s *ExecutionStateSyncSuite) buildNetworkConfig() { AdditionalFlags: []string{ fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir), "--execution-data-sync-enabled=true", + "--event-query-mode=execution-nodes-only", }, }} diff --git a/integration/tests/access/cohort3/grpc_state_stream_test.go b/integration/tests/access/cohort3/grpc_state_stream_test.go index 3bb8614b70d..0614b78f2e2 100644 --- a/integration/tests/access/cohort3/grpc_state_stream_test.go +++ b/integration/tests/access/cohort3/grpc_state_stream_test.go @@ -72,6 +72,8 @@ func (s *GrpcStateStreamSuite) SetupTest() { testnet.WithAdditionalFlag("--execution-data-indexing-enabled=true"), testnet.WithAdditionalFlagf("--execution-state-dir=%s", testnet.DefaultExecutionStateDir), testnet.WithAdditionalFlag("--event-query-mode=local-only"), + testnet.WithAdditionalFlag("--supports-observer=true"), + testnet.WithAdditionalFlagf("--public-network-execution-data-sync-enabled=true"), ) controlANConfig := testnet.NewNodeConfig( flow.RoleAccess, @@ -104,7 +106,20 @@ func (s *GrpcStateStreamSuite) SetupTest() { controlANConfig, // access_2 } - conf := testnet.NewNetworkConfig("access_event_streaming_test", nodeConfigs) + // add the observer node config + observers := []testnet.ObserverConfig{{ + ContainerName: testnet.PrimaryON, + LogLevel: zerolog.DebugLevel, + AdditionalFlags: []string{ + fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir), + fmt.Sprintf("--execution-state-dir=%s", testnet.DefaultExecutionStateDir), + "--execution-data-sync-enabled=true", + "--event-query-mode=execution-nodes-only", + "--execution-data-indexing-enabled=true", + }, + }} + + conf := testnet.NewNetworkConfig("access_event_streaming_test", nodeConfigs, testnet.WithObservers(observers...)) s.net = testnet.PrepareFlowNetwork(s.T(), conf, flow.Localnet) // start the network @@ -124,12 +139,19 @@ func (s *GrpcStateStreamSuite) TestHappyPath() { sdkClientControlAN, err := getClient(controlANURL) s.Require().NoError(err) + testONURL := fmt.Sprintf("localhost:%s", s.net.ContainerByName(testnet.PrimaryON).Port(testnet.ExecutionStatePort)) + sdkClientTestON, err := getClient(testONURL) + s.Require().NoError(err) + time.Sleep(20 * time.Second) - testEvents, testErrs, err := SubscribeEventsByBlockHeight(s.ctx, sdkClientTestAN, 0, &executiondata.EventFilter{}) + testANEvents, testANErrs, err := SubscribeEventsByBlockHeight(s.ctx, sdkClientTestAN, 0, &executiondata.EventFilter{}) + s.Require().NoError(err) + + controlANEvents, controlANErrs, err := SubscribeEventsByBlockHeight(s.ctx, sdkClientControlAN, 0, &executiondata.EventFilter{}) s.Require().NoError(err) - controlEvents, controlErrs, err := SubscribeEventsByBlockHeight(s.ctx, sdkClientControlAN, 0, &executiondata.EventFilter{}) + testONEvents, testONErrs, err := SubscribeEventsByBlockHeight(s.ctx, sdkClientTestON, 0, &executiondata.EventFilter{}) s.Require().NoError(err) txCount := 10 @@ -161,28 +183,37 @@ func (s *GrpcStateStreamSuite) TestHappyPath() { targetEvent := flow.EventType("flow.AccountCreated") - foundTxCount := 0 + foundANTxCount := 0 + foundONTxCount := 0 r := newResponseTracker() for { select { - case err := <-testErrs: + case err := <-testANErrs: s.Require().NoErrorf(err, "unexpected test AN error") - case err := <-controlErrs: + case err := <-controlANErrs: s.Require().NoErrorf(err, "unexpected control AN error") - case event := <-testEvents: + case err := <-testONErrs: + s.Require().NoErrorf(err, "unexpected test ON error") + case event := <-testANEvents: if has(event.Events, targetEvent) { - s.T().Logf("adding test events: %d %d %v", event.BlockHeight, len(event.Events), event.Events) - r.Add(s.T(), event.BlockHeight, "test", &event) - foundTxCount++ + s.T().Logf("adding access test events: %d %d %v", event.BlockHeight, len(event.Events), event.Events) + r.Add(s.T(), event.BlockHeight, "access_test", &event) + foundANTxCount++ } - case event := <-controlEvents: + case event := <-controlANEvents: if has(event.Events, targetEvent) { s.T().Logf("adding control events: %d %d %v", event.BlockHeight, len(event.Events), event.Events) - r.Add(s.T(), event.BlockHeight, "control", &event) + r.Add(s.T(), event.BlockHeight, "access_control", &event) + } + case event := <-testONEvents: + if has(event.Events, targetEvent) { + s.T().Logf("adding observer test events: %d %d %v", event.BlockHeight, len(event.Events), event.Events) + r.Add(s.T(), event.BlockHeight, "observer_test", &event) + foundONTxCount++ } } - if foundTxCount >= txCount { + if foundANTxCount >= txCount && foundONTxCount >= txCount { break } } @@ -208,21 +239,24 @@ func (r *ResponseTracker) Add(t *testing.T, blockHeight uint64, name string, eve } r.r[blockHeight][name] = *events - if len(r.r[blockHeight]) != 2 { + if len(r.r[blockHeight]) != 3 { return } - err := r.compare(t, r.r[blockHeight]) + err := r.compare(t, r.r[blockHeight]["access_control"], r.r[blockHeight]["access_test"]) if err != nil { - log.Fatalf("failure comparing %d: %v", blockHeight, err) + log.Fatalf("failure comparing access and access data %d: %v", blockHeight, err) } + + err = r.compare(t, r.r[blockHeight]["access_control"], r.r[blockHeight]["observer_test"]) + if err != nil { + log.Fatalf("failure comparing access and observer data %d: %v", blockHeight, err) + } + delete(r.r, blockHeight) } -func (r *ResponseTracker) compare(t *testing.T, data map[string]flow.BlockEvents) error { - controlData := data["control"] - testData := data["test"] - +func (r *ResponseTracker) compare(t *testing.T, controlData flow.BlockEvents, testData flow.BlockEvents) error { require.Equal(t, controlData.BlockID, testData.BlockID) require.Equal(t, controlData.BlockHeight, testData.BlockHeight) require.Equal(t, len(controlData.Events), len(testData.Events)) @@ -261,7 +295,6 @@ func SubscribeEventsByBlockHeight( Filter: filter, HeartbeatInterval: 1, } - stream, err := client.SubscribeEvents(ctx, req) if err != nil { return nil, nil, err