Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Enable Event streaming on REST API #4547

Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
61be7dd
Added websocket handler, common http handler, refactored rest, added …
UlyanaAndrukhiv Jul 5, 2023
6fcc308
Updated last commit
UlyanaAndrukhiv Jul 7, 2023
3040fe0
Reverted back flag name
UlyanaAndrukhiv Jul 7, 2023
82b29ab
Added comments
UlyanaAndrukhiv Jul 7, 2023
39e8768
Refactored handlers, added filters for subscribe_events endpoint
UlyanaAndrukhiv Jul 13, 2023
334f752
Merged with master, updated tests, moved creating state stream backen…
UlyanaAndrukhiv Jul 24, 2023
6a68c01
Fixed flags
UlyanaAndrukhiv Jul 24, 2023
d6f51da
Updated tests
UlyanaAndrukhiv Jul 24, 2023
6f298e6
Added test, added Hijack impl for response writer in metrics
UlyanaAndrukhiv Jul 24, 2023
feb6dda
Updated subscribe_events rest route, remove subscribe_handler
UlyanaAndrukhiv Jul 28, 2023
88da0b1
Added unit tests
UlyanaAndrukhiv Aug 2, 2023
fbbc240
Added part of intagration tests
UlyanaAndrukhiv Aug 2, 2023
d07b5fa
Added integration test for rest event streaming, updated unit tests, …
UlyanaAndrukhiv Aug 3, 2023
40bac0a
Merged with master
UlyanaAndrukhiv Aug 3, 2023
b372ed9
Updated routeUrlMap init for rest
UlyanaAndrukhiv Aug 3, 2023
379ec43
Removed unnecessary log
UlyanaAndrukhiv Aug 4, 2023
8068f97
Added more comments
UlyanaAndrukhiv Aug 4, 2023
7c6442c
Moved part of state_stream impl back to access/state_stream package
UlyanaAndrukhiv Aug 9, 2023
296afff
Updated rest test according to comments, removed unnecessary empty li…
UlyanaAndrukhiv Aug 9, 2023
efcf292
Reverted back imports order
UlyanaAndrukhiv Aug 9, 2023
7f3297b
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 9, 2023
cae72f0
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 10, 2023
9d73bfb
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 15, 2023
31ba626
Refactored subscribeEvents function
UlyanaAndrukhiv Aug 16, 2023
1a9f924
Added more comments, linted
UlyanaAndrukhiv Aug 16, 2023
48eed8b
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 16, 2023
4477f31
Updated unit tests, linted, added more comments
UlyanaAndrukhiv Aug 17, 2023
a9a4f62
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 17, 2023
6322ba9
Updated tests, linted
UlyanaAndrukhiv Aug 17, 2023
7908805
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 18, 2023
3cd7b32
Upgraded state streaming impl
UlyanaAndrukhiv Aug 18, 2023
83f9805
Remove unnecessary comment
UlyanaAndrukhiv Aug 18, 2023
f70ba98
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 21, 2023
75c757b
Removed unnecessary check for event types
UlyanaAndrukhiv Aug 22, 2023
f387b4f
Linted integration test
UlyanaAndrukhiv Aug 23, 2023
1053dc7
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 23, 2023
86b66c9
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 25, 2023
12a19de
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Aug 30, 2023
df96439
Refactored according to comments
UlyanaAndrukhiv Sep 4, 2023
212d98f
Added checking connection for closing from client side
UlyanaAndrukhiv Sep 5, 2023
a425791
Fixed unit test for subscribe events
UlyanaAndrukhiv Sep 5, 2023
ec3ec18
Merged with master
UlyanaAndrukhiv Sep 6, 2023
f4f172c
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 7, 2023
23cad55
Updated according to comments
UlyanaAndrukhiv Sep 12, 2023
20b10fb
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 12, 2023
f6325b6
Updated error according to comment
UlyanaAndrukhiv Sep 12, 2023
3252ec4
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 13, 2023
c630e4a
Added RouterBuilder and updated rest unit tests. Added fixes accordin…
UlyanaAndrukhiv Sep 13, 2023
b9e4bef
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 14, 2023
47dc938
Updated according to commits
UlyanaAndrukhiv Sep 14, 2023
4d60ad7
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
durkmurder Sep 14, 2023
b20cb94
Updated according to last comments
UlyanaAndrukhiv Sep 14, 2023
1022c09
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 14, 2023
303c02f
Added small fixes according to last comments
UlyanaAndrukhiv Sep 15, 2023
5cbbec6
Merge branch 'master' into UlyanaAndrukhiv/4379-rest-event-streaming
UlyanaAndrukhiv Sep 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/ingestion"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/access/rest"
Expand Down Expand Up @@ -265,6 +266,8 @@ type FlowAccessNodeBuilder struct {
secureGrpcServer *grpcserver.GrpcServer
unsecureGrpcServer *grpcserver.GrpcServer
stateStreamGrpcServer *grpcserver.GrpcServer

stateStreamBackend *state_stream.StateStreamBackend
}

func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
Expand Down Expand Up @@ -445,7 +448,7 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
return builder
}

func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessNodeBuilder {
func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var ds *badger.Datastore
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
Expand Down Expand Up @@ -625,20 +628,32 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
if err != nil {
return nil, fmt.Errorf("could not get highest consecutive height: %w", err)
}
broadcaster := engine.NewBroadcaster()

stateStreamEng, err := state_stream.NewEng(
node.Logger,
builder.stateStreamBackend, err = state_stream.New(node.Logger,
builder.stateStreamConf,
builder.ExecutionDataStore,
executionDataCache,
node.State,
node.Storage.Headers,
node.Storage.Seals,
node.Storage.Results,
node.RootChainID,
builder.ExecutionDataStore,
executionDataCache,
broadcaster,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
highestAvailableHeight)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
}

stateStreamEng, err := state_stream.NewEng(
node.Logger,
builder.stateStreamConf,
executionDataCache,
node.Storage.Headers,
node.RootChainID,
builder.stateStreamGrpcServer,
builder.stateStreamBackend,
broadcaster,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream engine: %w", err)
Expand Down Expand Up @@ -898,6 +913,10 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
}

builder.
BuildConsensusFollower().
Module("collection node client", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1103,6 +1122,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
nodeBackend,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
builder.stateStreamBackend,
builder.stateStreamConf.EventFilterConfig,
builder.stateStreamConf.MaxGlobalStreams,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1188,10 +1210,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
})
}

if builder.executionDataSyncEnabled {
builder.BuildExecutionDataRequester()
}

builder.Component("secure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.secureGrpcServer, nil
})
Expand Down
4 changes: 4 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
rpcConnection "github.com/onflow/flow-go/engine/access/rpc/connection"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/common/follower"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/protocol"
Expand Down Expand Up @@ -970,6 +971,9 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
restHandler,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
nil, // state streaming is not supported
state_stream.DefaultEventFilterConfig,
0,
)
if err != nil {
return nil, err
Expand Down
24 changes: 19 additions & 5 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
bnd,
suite.secureGrpcServer,
suite.unsecureGrpcServer,
nil,
state_stream.DefaultEventFilterConfig,
0,
)
assert.NoError(suite.T(), err)
suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build()
Expand All @@ -220,20 +223,31 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
ClientSendBufferSize: state_stream.DefaultSendBufferSize,
}

// create state stream engine
suite.stateStreamEng, err = state_stream.NewEng(
stateStreamBackend, err := state_stream.New(
suite.log,
conf,
nil,
suite.execDataCache,
suite.state,
suite.headers,
suite.seals,
suite.results,
suite.chainID,
nil,
suite.execDataCache,
nil,
rootBlock.Header.Height,
rootBlock.Header.Height,
)
assert.NoError(suite.T(), err)

// create state stream engine
suite.stateStreamEng, err = state_stream.NewEng(
suite.log,
conf,
suite.execDataCache,
suite.headers,
suite.chainID,
suite.unsecureGrpcServer,
stateStreamBackend,
nil,
)
assert.NoError(suite.T(), err)

Expand Down
14 changes: 14 additions & 0 deletions engine/access/rest/middleware/logging.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package middleware

import (
"bufio"
"fmt"
"net"
"net/http"
"time"

Expand Down Expand Up @@ -40,6 +43,9 @@ type responseWriter struct {
statusCode int
}

// http.Hijacker necessary for using middleware with gorilla websocket connections.
var _ http.Hijacker = (*responseWriter)(nil)

func newResponseWriter(w http.ResponseWriter) *responseWriter {
return &responseWriter{w, http.StatusOK}
}
Expand All @@ -48,3 +54,11 @@ func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}

func (rw *responseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
hijacker, ok := rw.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, fmt.Errorf("hijacking not supported")
}
return hijacker.Hijack()
}
54 changes: 54 additions & 0 deletions engine/access/rest/request/event_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package request

import (
"fmt"
"regexp"
)

type EventType string

var basicEventRe = regexp.MustCompile(`[A-Z]\.[a-f0-9]{16}\.[\w+]*\.[\w+]*`)
var flowEventRe = regexp.MustCompile(`flow\.[\w]*`)

func (e *EventType) Parse(raw string) error {
if !basicEventRe.MatchString(raw) && !flowEventRe.MatchString(raw) {
return fmt.Errorf("invalid event type format")
}
*e = EventType(raw)
return nil
}

func (e EventType) Flow() string {
return string(e)
}

type EventTypes []EventType

func (e *EventTypes) Parse(raw []string) error {
// make a map to have only unique values as keys
eventTypes := make(EventTypes, 0)
uniqueTypes := make(map[string]bool)
for i, r := range raw {
var eType EventType
err := eType.Parse(r)
if err != nil {
return fmt.Errorf("error at index %d: %w", i, err)
}

if !uniqueTypes[eType.Flow()] {
uniqueTypes[eType.Flow()] = true
eventTypes = append(eventTypes, eType)
}
}

*e = eventTypes
return nil
}

func (e EventTypes) Flow() []string {
eventTypes := make([]string, len(e))
for j, eType := range e {
eventTypes[j] = eType.Flow()
}
return eventTypes
}
17 changes: 6 additions & 11 deletions engine/access/rest/request/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package request

import (
"fmt"
"regexp"

"github.com/onflow/flow-go/model/flow"
)
Expand Down Expand Up @@ -57,19 +56,15 @@ func (g *GetEvents) Parse(rawType string, rawStart string, rawEnd string, rawBlo
return fmt.Errorf("must provide either block IDs or start and end height range")
}

g.Type = rawType
if g.Type == "" {
if rawType == "" {
return fmt.Errorf("event type must be provided")
}

// match basic format A.address.contract.event (ignore err since regex will always compile)
basic, _ := regexp.MatchString(`[A-Z]\.[a-f0-9]{16}\.[\w+]*\.[\w+]*`, g.Type)
// match core events flow.event
core, _ := regexp.MatchString(`flow\.[\w]*`, g.Type)

if !core && !basic {
return fmt.Errorf("invalid event type format")
var eventType EventType
err = eventType.Parse(rawType)
if err != nil {
return err
}
g.Type = eventType.Flow()

// validate start end height option
if g.StartHeight != EmptyHeight && g.EndHeight != EmptyHeight {
Expand Down
6 changes: 6 additions & 0 deletions engine/access/rest/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (rd *Request) CreateTransactionRequest() (CreateTransaction, error) {
return req, err
}

func (rd *Request) SubscribeEventsRequest() (SubscribeEvents, error) {
var req SubscribeEvents
err := req.Build(rd)
return req, err
}

func (rd *Request) Expands(field string) bool {
return rd.ExpandFields[field]
}
Expand Down
69 changes: 69 additions & 0 deletions engine/access/rest/request/subscribe_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package request

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
)

const startBlockIdQuery = "start_block_id"
const eventTypesQuery = "event_types"
const addressesQuery = "addresses"
const contractsQuery = "contracts"

type SubscribeEvents struct {
StartBlockID flow.Identifier
StartHeight uint64

EventTypes []string
Addresses []string
Contracts []string
}

func (g *SubscribeEvents) Build(r *Request) error {
return g.Parse(
r.GetQueryParam(startBlockIdQuery),
r.GetQueryParam(startHeightQuery),
r.GetQueryParams(eventTypesQuery),
r.GetQueryParams(addressesQuery),
r.GetQueryParams(contractsQuery),
)
}

func (g *SubscribeEvents) Parse(rawStartBlockID string, rawStartHeight string, rawTypes []string, rawAddresses []string, rawContracts []string) error {
var startBlockID ID
err := startBlockID.Parse(rawStartBlockID)
if err != nil {
return err
}
g.StartBlockID = startBlockID.Flow()

var height Height
err = height.Parse(rawStartHeight)
if err != nil {
return fmt.Errorf("invalid start height: %w", err)
}
g.StartHeight = height.Flow()

// if both start_block_id and start_height are provided
if g.StartBlockID != flow.ZeroID && g.StartHeight != EmptyHeight {
return fmt.Errorf("can only provide either block ID or start height")
}

// default to root block
if g.StartHeight == EmptyHeight {
g.StartHeight = 0
}

var eventTypes EventTypes
err = eventTypes.Parse(rawTypes)
if err != nil {
return err
}

g.EventTypes = eventTypes.Flow()
g.Addresses = rawAddresses
g.Contracts = rawContracts

return nil
}
4 changes: 1 addition & 3 deletions engine/access/rest/routes/account_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,7 @@ func TestGetAccountKeyByIndex(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
req, _ := http.NewRequest("GET", test.url, nil)
rr, err := executeRequest(req, backend)
assert.NoError(t, err)

rr := executeRequest(req, backend)
assert.Equal(t, http.StatusBadRequest, rr.Code)
assert.JSONEq(t, test.out, rr.Body.String())
})
Expand Down
4 changes: 1 addition & 3 deletions engine/access/rest/routes/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/onflow/flow-go/access/mock"
"github.com/onflow/flow-go/engine/access/rest/middleware"
"github.com/onflow/flow-go/model/flow"

"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -127,8 +126,7 @@ func TestAccessGetAccount(t *testing.T) {

for i, test := range tests {
req, _ := http.NewRequest("GET", test.url, nil)
rr, err := executeRequest(req, backend)
assert.NoError(t, err)
rr := executeRequest(req, backend)

assert.Equal(t, http.StatusBadRequest, rr.Code)
assert.JSONEq(t, test.out, rr.Body.String(), fmt.Sprintf("test #%d failed: %v", i, test))
Expand Down
Loading