Skip to content

Commit

Permalink
Merge pull request #5160 from onflow/petera/4852-use-local-event-stre…
Browse files Browse the repository at this point in the history
…aming2

[Access] Use local event for event streaming API
  • Loading branch information
peterargue committed Jan 4, 2024
1 parent cff1179 commit 7be756d
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 88 deletions.
12 changes: 12 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,11 +815,22 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}
broadcaster := engine.NewBroadcaster()

eventQueryMode, err := backend.ParseIndexQueryMode(builder.rpcConf.BackendConfig.EventQueryMode)
if err != nil {
return nil, fmt.Errorf("could not parse event query mode: %w", err)
}

// use the events index for events if enabled and the node is configured to use it for
// regular event queries
useIndex := builder.executionDataIndexingEnabled &&
eventQueryMode != backend.IndexQueryModeExecutionNodesOnly

builder.stateStreamBackend, err = statestreambackend.New(
node.Logger,
builder.stateStreamConf,
node.State,
node.Storage.Headers,
node.Storage.Events,
node.Storage.Seals,
node.Storage.Results,
builder.ExecutionDataStore,
Expand All @@ -828,6 +839,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
builder.RegistersAsyncStore,
useIndex,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream backend: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
conf,
suite.state,
suite.headers,
suite.events,
suite.seals,
suite.results,
nil,
Expand All @@ -251,6 +252,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
rootBlock.Header.Height,
rootBlock.Header.Height,
suite.registers,
false,
)
assert.NoError(suite.T(), err)

Expand Down
5 changes: 5 additions & 0 deletions engine/access/state_stream/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func New(
config Config,
state protocol.State,
headers storage.Headers,
events storage.Events,
seals storage.Seals,
results storage.ExecutionResults,
execDataStore execution_data.ExecutionDataStore,
Expand All @@ -98,6 +99,7 @@ func New(
rootHeight uint64,
highestAvailableHeight uint64,
registers *execution.RegistersAsyncStore,
useEventsIndex bool,
) (*StateStreamBackend, error) {
logger := log.With().Str("module", "state_stream_api").Logger()

Expand Down Expand Up @@ -136,12 +138,15 @@ func New(

b.EventsBackend = EventsBackend{
log: logger,
events: events,
headers: headers,
broadcaster: broadcaster,
sendTimeout: config.ClientSendTimeout,
responseLimit: config.ResponseLimit,
sendBufferSize: int(config.ClientSendBufferSize),
getExecutionData: b.getExecutionData,
getStartHeight: b.getStartHeight,
useIndex: useEventsIndex,
}

return b, nil
Expand Down
71 changes: 55 additions & 16 deletions engine/access/state_stream/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
)

Expand All @@ -21,13 +22,17 @@ type EventsResponse struct {

type EventsBackend struct {
log zerolog.Logger
events storage.Events
headers storage.Headers
broadcaster *engine.Broadcaster
sendTimeout time.Duration
responseLimit float64
sendBufferSize int

getExecutionData GetExecutionDataFunc
getStartHeight GetStartHeightFunc

useIndex bool
}

func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) state_stream.Subscription {
Expand All @@ -43,27 +48,61 @@ func (b EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Id
return sub
}

// getResponseFactory returns a function function that returns the event response for a given height.
func (b EventsBackend) getResponseFactory(filter state_stream.EventFilter) GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (interface{}, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
return func(ctx context.Context, height uint64) (response interface{}, err error) {
if b.useIndex {
response, err = b.getEventsFromStorage(height, filter)
} else {
response, err = b.getEventsFromExecutionData(ctx, height, filter)
}

events := []flow.Event{}
for _, chunkExecutionData := range executionData.ChunkExecutionDatas {
events = append(events, filter.Filter(chunkExecutionData.Events)...)
if err == nil && b.log.GetLevel() == zerolog.TraceLevel {
eventsResponse := response.(*EventsResponse)
b.log.Trace().
Hex("block_id", logging.ID(eventsResponse.BlockID)).
Uint64("height", height).
Int("events", len(eventsResponse.Events)).
Msg("sending events")
}
return
}
}

// getEventsFromExecutionData returns the events for a given height extractd from the execution data.
func (b EventsBackend) getEventsFromExecutionData(ctx context.Context, height uint64, filter state_stream.EventFilter) (*EventsResponse, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
}

var events flow.EventsList
for _, chunkExecutionData := range executionData.ChunkExecutionDatas {
events = append(events, filter.Filter(chunkExecutionData.Events)...)
}

b.log.Trace().
Hex("block_id", logging.ID(executionData.BlockID)).
Uint64("height", height).
Msgf("sending %d events", len(events))
return &EventsResponse{
BlockID: executionData.BlockID,
Height: height,
Events: events,
}, nil
}

return &EventsResponse{
BlockID: executionData.BlockID,
Height: height,
Events: events,
}, nil
// getEventsFromStorage returns the events for a given height from the index storage.
func (b EventsBackend) getEventsFromStorage(height uint64, filter state_stream.EventFilter) (*EventsResponse, error) {
blockID, err := b.headers.BlockIDByHeight(height)
if err != nil {
return nil, fmt.Errorf("could not get header for height %d: %w", height, err)
}

events, err := b.events.ByBlockID(blockID)
if err != nil {
return nil, fmt.Errorf("could not get events for block %d: %w", height, err)
}

return &EventsResponse{
BlockID: blockID,
Height: height,
Events: filter.Filter(events),
}, nil
}
31 changes: 23 additions & 8 deletions engine/access/state_stream/backend/backend_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/codes"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/mocks"
)

type BackendEventsSuite struct {
Expand All @@ -29,8 +31,24 @@ func (s *BackendEventsSuite) SetupTest() {
s.BackendExecutionDataSuite.SetupTest()
}

// TestSubscribeEvents tests the SubscribeEvents method happy path
func (s *BackendEventsSuite) TestSubscribeEvents() {
// TestSubscribeEventsFromExecutionData tests the SubscribeEvents method happy path for events
// extracted from ExecutionData
func (s *BackendEventsSuite) TestSubscribeEventsFromExecutionData() {
s.runTestSubscribeEvents()
}

// TestSubscribeEventsFromLocalStorage tests the SubscribeEvents method happy path for events
// extracted from local storage
func (s *BackendEventsSuite) TestSubscribeEventsFromLocalStorage() {
s.backend.useIndex = true
s.events.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(
mocks.StorageMapGetter(s.blockEvents),
)

s.runTestSubscribeEvents()
}

func (s *BackendEventsSuite) runTestSubscribeEvents() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -77,9 +95,6 @@ func (s *BackendEventsSuite) TestSubscribeEvents() {
},
}

// supports simple address comparisions for testing
chain := flow.MonotonicEmulator.Chain()

// create variations for each of the base test
tests := make([]testType, 0, len(baseTests)*3)
for _, test := range baseTests {
Expand All @@ -90,13 +105,13 @@ func (s *BackendEventsSuite) TestSubscribeEvents() {

t2 := test
t2.name = fmt.Sprintf("%s - some events", test.name)
t2.filters, err = state_stream.NewEventFilter(state_stream.DefaultEventFilterConfig, chain, []string{string(testEventTypes[0])}, nil, nil)
t2.filters, err = state_stream.NewEventFilter(state_stream.DefaultEventFilterConfig, chainID.Chain(), []string{string(testEventTypes[0])}, nil, nil)
require.NoError(s.T(), err)
tests = append(tests, t2)

t3 := test
t3.name = fmt.Sprintf("%s - no events", test.name)
t3.filters, err = state_stream.NewEventFilter(state_stream.DefaultEventFilterConfig, chain, []string{"A.0x1.NonExistent.Event"}, nil, nil)
t3.filters, err = state_stream.NewEventFilter(state_stream.DefaultEventFilterConfig, chainID.Chain(), []string{"A.0x1.NonExistent.Event"}, nil, nil)
require.NoError(s.T(), err)
tests = append(tests, t3)
}
Expand Down Expand Up @@ -126,7 +141,7 @@ func (s *BackendEventsSuite) TestSubscribeEvents() {
s.broadcaster.Publish()
}

expectedEvents := flow.EventsList{}
var expectedEvents flow.EventsList
for _, event := range s.blockEvents[b.ID()] {
if test.filters.Match(event) {
expectedEvents = append(expectedEvents, event)
Expand Down
Loading

0 comments on commit 7be756d

Please sign in to comment.