Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Enable Event streaming on REST API #4547

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
61be7dd
Added websocket handler, common http handler, refactored rest, added …
UlyanaAndrukhiv Jul 5, 2023
6fcc308
Updated last commit
UlyanaAndrukhiv Jul 7, 2023
3040fe0
Reverted back flag name
UlyanaAndrukhiv Jul 7, 2023
82b29ab
Added comments
UlyanaAndrukhiv Jul 7, 2023
39e8768
Refactored handlers, added filters for subscribe_events endpoint
UlyanaAndrukhiv Jul 13, 2023
334f752
Merged with master, updated tests, moved creating state stream backen…
UlyanaAndrukhiv Jul 24, 2023
6a68c01
Fixed flags
UlyanaAndrukhiv Jul 24, 2023
d6f51da
Updated tests
UlyanaAndrukhiv Jul 24, 2023
6f298e6
Added test, added Hijack impl for response writer in metrics
UlyanaAndrukhiv Jul 24, 2023
feb6dda
Updated subscribe_events rest route, remove subscribe_handler
UlyanaAndrukhiv Jul 28, 2023
88da0b1
Added unit tests
UlyanaAndrukhiv Aug 2, 2023
fbbc240
Added part of intagration tests
UlyanaAndrukhiv Aug 2, 2023
d07b5fa
Added integration test for rest event streaming, updated unit tests, …
UlyanaAndrukhiv Aug 3, 2023
40bac0a
Merged with master
UlyanaAndrukhiv Aug 3, 2023
b372ed9
Updated routeUrlMap init for rest
UlyanaAndrukhiv Aug 3, 2023
379ec43
Removed unnecessary log
UlyanaAndrukhiv Aug 4, 2023
8068f97
Added more comments
UlyanaAndrukhiv Aug 4, 2023
7c6442c
Moved part of state_stream impl back to access/state_stream package
UlyanaAndrukhiv Aug 9, 2023
296afff
Updated rest test according to comments, removed unnecessary empty li…
UlyanaAndrukhiv Aug 9, 2023
efcf292
Reverted back imports order
UlyanaAndrukhiv Aug 9, 2023
7f3297b
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 9, 2023
cae72f0
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 10, 2023
9d73bfb
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 15, 2023
31ba626
Refactored subscribeEvents function
UlyanaAndrukhiv Aug 16, 2023
1a9f924
Added more comments, linted
UlyanaAndrukhiv Aug 16, 2023
48eed8b
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 16, 2023
4477f31
Updated unit tests, linted, added more comments
UlyanaAndrukhiv Aug 17, 2023
a9a4f62
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 17, 2023
6322ba9
Updated tests, linted
UlyanaAndrukhiv Aug 17, 2023
7908805
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 18, 2023
3cd7b32
Upgraded state streaming impl
UlyanaAndrukhiv Aug 18, 2023
83f9805
Remove unnecessary comment
UlyanaAndrukhiv Aug 18, 2023
f70ba98
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 21, 2023
75c757b
Removed unnecessary check for event types
UlyanaAndrukhiv Aug 22, 2023
f387b4f
Linted integration test
UlyanaAndrukhiv Aug 23, 2023
1053dc7
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 23, 2023
86b66c9
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 25, 2023
12a19de
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 30, 2023
df96439
Refactored according to comments
UlyanaAndrukhiv Sep 4, 2023
212d98f
Added checking connection for closing from client side
UlyanaAndrukhiv Sep 5, 2023
a425791
Fixed unit test for subscribe events
UlyanaAndrukhiv Sep 5, 2023
ec3ec18
Merged with master
UlyanaAndrukhiv Sep 6, 2023
f4f172c
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 7, 2023
23cad55
Updated according to comments
UlyanaAndrukhiv Sep 12, 2023
20b10fb
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 12, 2023
f6325b6
Updated error according to comment
UlyanaAndrukhiv Sep 12, 2023
3252ec4
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 13, 2023
c630e4a
Added RouterBuilder and updated rest unit tests. Added fixes accordin…
UlyanaAndrukhiv Sep 13, 2023
b9e4bef
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 14, 2023
47dc938
Updated according to commits
UlyanaAndrukhiv Sep 14, 2023
4d60ad7
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
durkmurder Sep 14, 2023
b20cb94
Updated according to last comments
UlyanaAndrukhiv Sep 14, 2023
1022c09
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 14, 2023
303c02f
Added small fixes according to last comments
UlyanaAndrukhiv Sep 15, 2023
5cbbec6
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merged with master, updated tests, moved creating state stream backen…
… to separate module
  • Loading branch information
UlyanaAndrukhiv committed Jul 24, 2023
commit 334f75235223b3ad1fc37ac261e801a42f6f64d7
190 changes: 106 additions & 84 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
"github.com/onflow/flow-go/engine/access/state_stream"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
common_state_stream "github.com/onflow/flow-go/engine/common/state_stream"
cstate_stream "github.com/onflow/flow-go/engine/common/state_stream"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
Expand Down Expand Up @@ -118,8 +118,10 @@ type AccessNodeConfig struct {
apiRatelimits map[string]int
apiBurstlimits map[string]int
rpcConf rpc.Config
stateStreamConf common_state_stream.Config
stateStreamBackend common_state_stream.API
stateStreamBackend *cstate_stream.StateStreamBackend
stateStreamConf cstate_stream.Config
execDataBroadcaster *engine.Broadcaster
executionDataCache *execdatacache.ExecutionDataCache
stateStreamFilterConf map[string]int
ExecutionNodeAddress string // deprecated
HistoricalAccessRPCs []access.AccessAPIClient
Expand Down Expand Up @@ -167,17 +169,18 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
},
stateStreamConf: common_state_stream.Config{
stateStreamConf: cstate_stream.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
ExecutionDataCacheSize: common_state_stream.DefaultCacheSize,
ClientSendTimeout: common_state_stream.DefaultSendTimeout,
ClientSendBufferSize: common_state_stream.DefaultSendBufferSize,
MaxGlobalStreams: common_state_stream.DefaultMaxGlobalStreams,
EventFilterConfig: common_state_stream.DefaultEventFilterConfig,
ResponseLimit: common_state_stream.DefaultResponseLimit,
ExecutionDataCacheSize: cstate_stream.DefaultCacheSize,
ClientSendTimeout: cstate_stream.DefaultSendTimeout,
ClientSendBufferSize: cstate_stream.DefaultSendBufferSize,
MaxGlobalStreams: cstate_stream.DefaultMaxGlobalStreams,
EventFilterConfig: cstate_stream.DefaultEventFilterConfig,
ResponseLimit: cstate_stream.DefaultResponseLimit,
},
stateStreamBackend: nil,
stateStreamFilterConf: nil,
execDataBroadcaster: nil,
ExecutionNodeAddress: "localhost:9000",
logTxTimeToFinalized: false,
logTxTimeToExecuted: false,
Expand Down Expand Up @@ -439,7 +442,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
var processedNotifications storage.ConsumerProgress
var bsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData

builder.
AdminCommand("read-execution-data", func(config *cmd.NodeConfig) commands.AdminCommand {
Expand Down Expand Up @@ -548,27 +550,11 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN

execDataDistributor = edrequester.NewExecutionDataDistributor()

var heroCacheCollector module.HeroCacheMetrics = metrics.NewNoopCollector()
if builder.HeroCacheMetricsEnable {
heroCacheCollector = metrics.AccessNodeExecutionDataCacheMetrics(builder.MetricsRegisterer)
}

execDataCacheBackend = herocache.NewBlockExecutionData(builder.stateStreamConf.ExecutionDataCacheSize, builder.Logger, heroCacheCollector)
// Execution Data cache with a downloader as the backend. This is used by the requester
// to download and cache execution data for each block.
executionDataCache := execdatacache.NewExecutionDataCache(
builder.ExecutionDataDownloader,
builder.Storage.Headers,
builder.Storage.Seals,
builder.Storage.Results,
execDataCacheBackend,
)

builder.ExecutionDataRequester = edrequester.New(
builder.Logger,
metrics.NewExecutionDataRequesterCollector(),
builder.ExecutionDataDownloader,
executionDataCache,
builder.executionDataCache,
processedBlockHeight,
processedNotifications,
builder.State,
Expand All @@ -584,64 +570,16 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN

if builder.stateStreamConf.ListenAddr != "" {
builder.Component("exec state stream engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
for key, value := range builder.stateStreamFilterConf {
switch key {
case "EventTypes":
builder.stateStreamConf.MaxEventTypes = value
case "Addresses":
builder.stateStreamConf.MaxAddresses = value
case "Contracts":
builder.stateStreamConf.MaxContracts = value
}
}
builder.stateStreamConf.RpcMetricsEnabled = builder.rpcMetricsEnabled

// Execution Data cache that uses a blobstore as the backend (instead of a downloader)
// This ensures that it simply returns a not found error if the blob doesn't exist
// instead of attempting to download it from the network. It shares a cache backend instance
// with the requester's implementation.
executionDataCache := execdatacache.NewExecutionDataCache(
builder.ExecutionDataStore,
builder.Storage.Headers,
builder.Storage.Seals,
builder.Storage.Results,
execDataCacheBackend,
)

highestAvailableHeight, err := builder.ExecutionDataRequester.HighestConsecutiveHeight()
if err != nil {
return nil, fmt.Errorf("could not get highest consecutive height: %w", err)
}

broadcaster := engine.NewBroadcaster()

backend, err := common_state_stream.New(
node.Logger,
builder.stateStreamConf,
node.State,
node.Storage.Headers,
node.Storage.Seals,
node.Storage.Results,
builder.ExecutionDataStore,
executionDataCache,
broadcaster,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
}

stateStreamEng, err := state_stream.NewEng(
node.Logger,
builder.stateStreamConf,
executionDataCache,
builder.executionDataCache,
node.Storage.Headers,
node.RootChainID,
builder.apiRatelimits,
builder.apiBurstlimits,
backend,
broadcaster,
builder.stateStreamGrpcServer,
builder.stateStreamBackend,
builder.execDataBroadcaster,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream engine: %w", err)
Expand Down Expand Up @@ -920,10 +858,6 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
if builder.executionDataSyncEnabled {
builder.BuildExecutionDataRequester()
}

builder.
BuildConsensusFollower().
Module("collection node client", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1059,6 +993,72 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

return nil
}).
Module("state stream backend", func(node *cmd.NodeConfig) error {
for key, value := range builder.stateStreamFilterConf {
switch key {
case "EventTypes":
builder.stateStreamConf.MaxEventTypes = value
case "Addresses":
builder.stateStreamConf.MaxAddresses = value
case "Contracts":
builder.stateStreamConf.MaxContracts = value
}
}
builder.stateStreamConf.RpcMetricsEnabled = builder.rpcMetricsEnabled

var heroCacheCollector module.HeroCacheMetrics = metrics.NewNoopCollector()
if builder.HeroCacheMetricsEnable {
heroCacheCollector = metrics.AccessNodeExecutionDataCacheMetrics(builder.MetricsRegisterer)
}

execDataCacheBackend := herocache.NewBlockExecutionData(builder.stateStreamConf.ExecutionDataCacheSize, builder.Logger, heroCacheCollector)
// Execution Data cache with a downloader as the backend. This is used by the requester
// to download and cache execution data for each block.
builder.executionDataCache = execdatacache.NewExecutionDataCache(
builder.ExecutionDataDownloader,
builder.Storage.Headers,
builder.Storage.Seals,
builder.Storage.Results,
execDataCacheBackend,
)

// Execution Data cache that uses a blobstore as the backend (instead of a downloader)
// This ensures that it simply returns a not found error if the blob doesn't exist
// instead of attempting to download it from the network. It shares a cache backend instance
// with the requester's implementation.
builder.executionDataCache = execdatacache.NewExecutionDataCache(
builder.ExecutionDataStore,
builder.Storage.Headers,
builder.Storage.Seals,
builder.Storage.Results,
execDataCacheBackend,
)

highestAvailableHeight, err := builder.ExecutionDataRequester.HighestConsecutiveHeight()
if err != nil {
return fmt.Errorf("could not get highest consecutive height: %w", err)
}

builder.execDataBroadcaster = engine.NewBroadcaster()

builder.stateStreamBackend, err = cstate_stream.New(
node.Logger,
builder.stateStreamConf,
node.State,
node.Storage.Headers,
node.Storage.Seals,
node.Storage.Results,
builder.ExecutionDataStore,
builder.executionDataCache,
builder.execDataBroadcaster,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
)
if err != nil {
return fmt.Errorf("could not create state stream backend: %w", err)
}
return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down Expand Up @@ -1112,6 +1112,10 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.AccessMetrics,
builder.rpcMetricsEnabled,
builder.Me,
backend,
backend,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
builder.stateStreamBackend,
builder.stateStreamConf.EventFilterConfig,
builder.stateStreamConf.MaxGlobalStreams,
Expand Down Expand Up @@ -1200,6 +1204,24 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
})
}

if builder.executionDataSyncEnabled {
builder.BuildExecutionDataRequester()
}

builder.Component("secure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.secureGrpcServer, nil
})

builder.Component("state stream unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.stateStreamGrpcServer, nil
})

if builder.rpcConf.UnsecureGRPCListenAddr != builder.stateStreamConf.ListenAddr {
builder.Component("unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.unsecureGrpcServer, nil
})
}

builder.Component("ping engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ping, err := pingeng.New(
node.Logger,
Expand Down
4 changes: 4 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,10 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
accessMetrics,
builder.rpcMetricsEnabled,
builder.Me,
accessBackend,
restHandler,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
nil,
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
state_stream.DefaultEventFilterConfig,
0,
Expand Down
34 changes: 24 additions & 10 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package access

import (
"context"

"io"
"os"
"testing"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/access/state_stream"
cstatestream "github.com/onflow/flow-go/engine/common/state_stream"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
Expand Down Expand Up @@ -114,7 +114,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {

suite.broadcaster = engine.NewBroadcaster()

suite.execDataHeroCache = herocache.NewBlockExecutionData(state_stream.DefaultCacheSize, suite.log, metrics.NewNoopCollector())
suite.execDataHeroCache = herocache.NewBlockExecutionData(cstatestream.DefaultCacheSize, suite.log, metrics.NewNoopCollector())
suite.execDataCache = cache.NewExecutionDataCache(suite.eds, suite.headers, suite.seals, suite.results, suite.execDataHeroCache)

accessIdentity := unittest.IdentityFixture(unittest.WithRole(flow.RoleAccess))
Expand Down Expand Up @@ -203,6 +203,9 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
backend,
suite.secureGrpcServer,
suite.unsecureGrpcServer,
nil,
cstatestream.DefaultEventFilterConfig,
0,
)
assert.NoError(suite.T(), err)
suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build()
Expand All @@ -224,25 +227,36 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
},
).Maybe()

conf := state_stream.Config{
ClientSendTimeout: state_stream.DefaultSendTimeout,
ClientSendBufferSize: state_stream.DefaultSendBufferSize,
conf := cstatestream.Config{
ClientSendTimeout: cstatestream.DefaultSendTimeout,
ClientSendBufferSize: cstatestream.DefaultSendBufferSize,
}

// create state stream engine
suite.stateStreamEng, err = state_stream.NewEng(
stateStreamBackend, err := cstatestream.New(
suite.log,
conf,
nil,
suite.execDataCache,
suite.state,
suite.headers,
suite.seals,
suite.results,
suite.chainID,
nil,
suite.execDataCache,
nil,
rootBlock.Header.Height,
rootBlock.Header.Height,
)
assert.NoError(suite.T(), err)

// create state stream engine
suite.stateStreamEng, err = state_stream.NewEng(
suite.log,
conf,
suite.execDataCache,
suite.headers,
suite.chainID,
suite.unsecureGrpcServer,
stateStreamBackend,
nil,
)
assert.NoError(suite.T(), err)

Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.