Skip to content

Commit

Permalink
Merge pull request #4547 from UlyanaAndrukhiv/UlyanaAndrukhiv/4379-re…
Browse files Browse the repository at this point in the history
…st-event-streaming

[Access] Enable Event streaming on REST API
  • Loading branch information
durkmurder authored Sep 15, 2023
2 parents 45e4885 + 5cbbec6 commit 37c79a3
Show file tree
Hide file tree
Showing 36 changed files with 1,561 additions and 248 deletions.
40 changes: 29 additions & 11 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/ingestion"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/access/rest"
Expand Down Expand Up @@ -265,6 +266,8 @@ type FlowAccessNodeBuilder struct {
secureGrpcServer *grpcserver.GrpcServer
unsecureGrpcServer *grpcserver.GrpcServer
stateStreamGrpcServer *grpcserver.GrpcServer

stateStreamBackend *state_stream.StateStreamBackend
}

func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
Expand Down Expand Up @@ -445,7 +448,7 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
return builder
}

func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessNodeBuilder {
func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var ds *badger.Datastore
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
Expand Down Expand Up @@ -625,20 +628,32 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
if err != nil {
return nil, fmt.Errorf("could not get highest consecutive height: %w", err)
}
broadcaster := engine.NewBroadcaster()

stateStreamEng, err := state_stream.NewEng(
node.Logger,
builder.stateStreamBackend, err = state_stream.New(node.Logger,
builder.stateStreamConf,
builder.ExecutionDataStore,
executionDataCache,
node.State,
node.Storage.Headers,
node.Storage.Seals,
node.Storage.Results,
node.RootChainID,
builder.ExecutionDataStore,
executionDataCache,
broadcaster,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
highestAvailableHeight)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
}

stateStreamEng, err := state_stream.NewEng(
node.Logger,
builder.stateStreamConf,
executionDataCache,
node.Storage.Headers,
node.RootChainID,
builder.stateStreamGrpcServer,
builder.stateStreamBackend,
broadcaster,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream engine: %w", err)
Expand Down Expand Up @@ -898,6 +913,10 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
}

func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
if builder.executionDataSyncEnabled {
builder.BuildExecutionSyncComponents()
}

builder.
BuildConsensusFollower().
Module("collection node client", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -1106,6 +1125,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
nodeBackend,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
builder.stateStreamBackend,
builder.stateStreamConf.EventFilterConfig,
builder.stateStreamConf.MaxGlobalStreams,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1191,10 +1213,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
})
}

if builder.executionDataSyncEnabled {
builder.BuildExecutionDataRequester()
}

builder.Component("secure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.secureGrpcServer, nil
})
Expand Down
4 changes: 4 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
rpcConnection "github.com/onflow/flow-go/engine/access/rpc/connection"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/common/follower"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/protocol"
Expand Down Expand Up @@ -973,6 +974,9 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
restHandler,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
nil, // state streaming is not supported
state_stream.DefaultEventFilterConfig,
0,
)
if err != nil {
return nil, err
Expand Down
24 changes: 19 additions & 5 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
bnd,
suite.secureGrpcServer,
suite.unsecureGrpcServer,
nil,
state_stream.DefaultEventFilterConfig,
0,
)
assert.NoError(suite.T(), err)
suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build()
Expand All @@ -229,20 +232,31 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
ClientSendBufferSize: state_stream.DefaultSendBufferSize,
}

// create state stream engine
suite.stateStreamEng, err = state_stream.NewEng(
stateStreamBackend, err := state_stream.New(
suite.log,
conf,
nil,
suite.execDataCache,
suite.state,
suite.headers,
suite.seals,
suite.results,
suite.chainID,
nil,
suite.execDataCache,
nil,
rootBlock.Header.Height,
rootBlock.Header.Height,
)
assert.NoError(suite.T(), err)

// create state stream engine
suite.stateStreamEng, err = state_stream.NewEng(
suite.log,
conf,
suite.execDataCache,
suite.headers,
suite.chainID,
suite.unsecureGrpcServer,
stateStreamBackend,
nil,
)
assert.NoError(suite.T(), err)

Expand Down
14 changes: 14 additions & 0 deletions engine/access/rest/middleware/logging.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package middleware

import (
"bufio"
"fmt"
"net"
"net/http"
"time"

Expand Down Expand Up @@ -40,6 +43,9 @@ type responseWriter struct {
statusCode int
}

// http.Hijacker necessary for using middleware with gorilla websocket connections.
var _ http.Hijacker = (*responseWriter)(nil)

func newResponseWriter(w http.ResponseWriter) *responseWriter {
return &responseWriter{w, http.StatusOK}
}
Expand All @@ -48,3 +54,11 @@ func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}

func (rw *responseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
hijacker, ok := rw.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, fmt.Errorf("hijacking not supported")
}
return hijacker.Hijack()
}
54 changes: 54 additions & 0 deletions engine/access/rest/request/event_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package request

import (
"fmt"
"regexp"
)

type EventType string

var basicEventRe = regexp.MustCompile(`[A-Z]\.[a-f0-9]{16}\.[\w+]*\.[\w+]*`)
var flowEventRe = regexp.MustCompile(`flow\.[\w]*`)

func (e *EventType) Parse(raw string) error {
if !basicEventRe.MatchString(raw) && !flowEventRe.MatchString(raw) {
return fmt.Errorf("invalid event type format")
}
*e = EventType(raw)
return nil
}

func (e EventType) Flow() string {
return string(e)
}

type EventTypes []EventType

func (e *EventTypes) Parse(raw []string) error {
// make a map to have only unique values as keys
eventTypes := make(EventTypes, 0)
uniqueTypes := make(map[string]bool)
for i, r := range raw {
var eType EventType
err := eType.Parse(r)
if err != nil {
return fmt.Errorf("error at index %d: %w", i, err)
}

if !uniqueTypes[eType.Flow()] {
uniqueTypes[eType.Flow()] = true
eventTypes = append(eventTypes, eType)
}
}

*e = eventTypes
return nil
}

func (e EventTypes) Flow() []string {
eventTypes := make([]string, len(e))
for j, eType := range e {
eventTypes[j] = eType.Flow()
}
return eventTypes
}
17 changes: 6 additions & 11 deletions engine/access/rest/request/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package request

import (
"fmt"
"regexp"

"github.com/onflow/flow-go/model/flow"
)
Expand Down Expand Up @@ -57,19 +56,15 @@ func (g *GetEvents) Parse(rawType string, rawStart string, rawEnd string, rawBlo
return fmt.Errorf("must provide either block IDs or start and end height range")
}

g.Type = rawType
if g.Type == "" {
if rawType == "" {
return fmt.Errorf("event type must be provided")
}

// match basic format A.address.contract.event (ignore err since regex will always compile)
basic, _ := regexp.MatchString(`[A-Z]\.[a-f0-9]{16}\.[\w+]*\.[\w+]*`, g.Type)
// match core events flow.event
core, _ := regexp.MatchString(`flow\.[\w]*`, g.Type)

if !core && !basic {
return fmt.Errorf("invalid event type format")
var eventType EventType
err = eventType.Parse(rawType)
if err != nil {
return err
}
g.Type = eventType.Flow()

// validate start end height option
if g.StartHeight != EmptyHeight && g.EndHeight != EmptyHeight {
Expand Down
6 changes: 6 additions & 0 deletions engine/access/rest/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (rd *Request) CreateTransactionRequest() (CreateTransaction, error) {
return req, err
}

func (rd *Request) SubscribeEventsRequest() (SubscribeEvents, error) {
var req SubscribeEvents
err := req.Build(rd)
return req, err
}

func (rd *Request) Expands(field string) bool {
return rd.ExpandFields[field]
}
Expand Down
69 changes: 69 additions & 0 deletions engine/access/rest/request/subscribe_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package request

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
)

const startBlockIdQuery = "start_block_id"
const eventTypesQuery = "event_types"
const addressesQuery = "addresses"
const contractsQuery = "contracts"

type SubscribeEvents struct {
StartBlockID flow.Identifier
StartHeight uint64

EventTypes []string
Addresses []string
Contracts []string
}

func (g *SubscribeEvents) Build(r *Request) error {
return g.Parse(
r.GetQueryParam(startBlockIdQuery),
r.GetQueryParam(startHeightQuery),
r.GetQueryParams(eventTypesQuery),
r.GetQueryParams(addressesQuery),
r.GetQueryParams(contractsQuery),
)
}

func (g *SubscribeEvents) Parse(rawStartBlockID string, rawStartHeight string, rawTypes []string, rawAddresses []string, rawContracts []string) error {
var startBlockID ID
err := startBlockID.Parse(rawStartBlockID)
if err != nil {
return err
}
g.StartBlockID = startBlockID.Flow()

var height Height
err = height.Parse(rawStartHeight)
if err != nil {
return fmt.Errorf("invalid start height: %w", err)
}
g.StartHeight = height.Flow()

// if both start_block_id and start_height are provided
if g.StartBlockID != flow.ZeroID && g.StartHeight != EmptyHeight {
return fmt.Errorf("can only provide either block ID or start height")
}

// default to root block
if g.StartHeight == EmptyHeight {
g.StartHeight = 0
}

var eventTypes EventTypes
err = eventTypes.Parse(rawTypes)
if err != nil {
return err
}

g.EventTypes = eventTypes.Flow()
g.Addresses = rawAddresses
g.Contracts = rawContracts

return nil
}
4 changes: 1 addition & 3 deletions engine/access/rest/routes/account_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,7 @@ func TestGetAccountKeyByIndex(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
req, _ := http.NewRequest("GET", test.url, nil)
rr, err := executeRequest(req, backend)
assert.NoError(t, err)

rr := executeRequest(req, backend)
assert.Equal(t, http.StatusBadRequest, rr.Code)
assert.JSONEq(t, test.out, rr.Body.String())
})
Expand Down
4 changes: 1 addition & 3 deletions engine/access/rest/routes/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/onflow/flow-go/access/mock"
"github.com/onflow/flow-go/engine/access/rest/middleware"
"github.com/onflow/flow-go/model/flow"

"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -127,8 +126,7 @@ func TestAccessGetAccount(t *testing.T) {

for i, test := range tests {
req, _ := http.NewRequest("GET", test.url, nil)
rr, err := executeRequest(req, backend)
assert.NoError(t, err)
rr := executeRequest(req, backend)

assert.Equal(t, http.StatusBadRequest, rr.Code)
assert.JSONEq(t, test.out, rr.Body.String(), fmt.Sprintf("test #%d failed: %v", i, test))
Expand Down
Loading

0 comments on commit 37c79a3

Please sign in to comment.