diff --git a/engine/access/rpc/backend/backend_events.go b/engine/access/rpc/backend/backend_events.go index 71aeb5c40cf..e35442ca966 100644 --- a/engine/access/rpc/backend/backend_events.go +++ b/engine/access/rpc/backend/backend_events.go @@ -38,6 +38,14 @@ type backendEvents struct { queryMode IndexQueryMode } +// blockMetadata is used to capture information about requested blocks to avoid repeated blockID +// calculations and passing around full block headers. +type blockMetadata struct { + ID flow.Identifier + Height uint64 + Timestamp time.Time +} + // GetEventsForHeightRange retrieves events for all sealed blocks between the start block height and // the end block height (inclusive) that have the given type. func (b *backendEvents) GetEventsForHeightRange( @@ -87,15 +95,25 @@ func (b *backendEvents) GetEventsForHeightRange( } // find the block headers for all the blocks between min and max height (inclusive) - blockHeaders := make([]*flow.Header, 0) + blockHeaders := make([]blockMetadata, 0, endHeight-startHeight+1) for i := startHeight; i <= endHeight; i++ { - header, err := b.headers.ByHeight(i) + // this looks inefficient, but is actually what's done under the covers by `headers.ByHeight` + // and avoids calculating header.ID() for each block. + blockID, err := b.headers.BlockIDByHeight(i) if err != nil { - return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get events: %w", err)) + return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get blockID for %d: %w", i, err)) + } + header, err := b.headers.ByBlockID(blockID) + if err != nil { + return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get block header for %d: %w", i, err)) } - blockHeaders = append(blockHeaders, header) + blockHeaders = append(blockHeaders, blockMetadata{ + ID: blockID, + Height: header.Height, + Timestamp: header.Timestamp, + }) } return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion) @@ -114,14 +132,18 @@ func (b *backendEvents) GetEventsForBlockIDs( } // find the block headers for all the block IDs - blockHeaders := make([]*flow.Header, 0) + blockHeaders := make([]blockMetadata, 0, len(blockIDs)) for _, blockID := range blockIDs { header, err := b.headers.ByBlockID(blockID) if err != nil { - return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get events: %w", err)) + return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get block header for %s: %w", blockID, err)) } - blockHeaders = append(blockHeaders, header) + blockHeaders = append(blockHeaders, blockMetadata{ + ID: blockID, + Height: header.Height, + Timestamp: header.Timestamp, + }) } return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion) @@ -131,7 +153,7 @@ func (b *backendEvents) GetEventsForBlockIDs( // It gets all events available in storage, and requests the rest from an execution node. func (b *backendEvents) getBlockEvents( ctx context.Context, - blockHeaders []*flow.Header, + blockInfos []blockMetadata, eventType string, requiredEventEncodingVersion entities.EventEncodingVersion, ) ([]flow.BlockEvents, error) { @@ -143,36 +165,36 @@ func (b *backendEvents) getBlockEvents( switch b.queryMode { case IndexQueryModeExecutionNodesOnly: - return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType, requiredEventEncodingVersion) + return b.getBlockEventsFromExecutionNode(ctx, blockInfos, eventType, requiredEventEncodingVersion) case IndexQueryModeLocalOnly: - localResponse, missingHeaders, err := b.getBlockEventsFromStorage(ctx, blockHeaders, target, requiredEventEncodingVersion) + localResponse, missingBlocks, err := b.getBlockEventsFromStorage(ctx, blockInfos, target, requiredEventEncodingVersion) if err != nil { return nil, err } // all blocks should be available. - if len(missingHeaders) > 0 { - return nil, status.Errorf(codes.NotFound, "events not found in local storage for %d blocks", len(missingHeaders)) + if len(missingBlocks) > 0 { + return nil, status.Errorf(codes.NotFound, "events not found in local storage for %d blocks", len(missingBlocks)) } return localResponse, nil case IndexQueryModeFailover: - localResponse, missingHeaders, err := b.getBlockEventsFromStorage(ctx, blockHeaders, target, requiredEventEncodingVersion) + localResponse, missingBlocks, err := b.getBlockEventsFromStorage(ctx, blockInfos, target, requiredEventEncodingVersion) if err != nil { // if there was an error, request all blocks from execution nodes - missingHeaders = blockHeaders + missingBlocks = blockInfos b.log.Debug().Err(err).Msg("failed to get events from local storage") } - if len(missingHeaders) == 0 { + if len(missingBlocks) == 0 { return localResponse, nil } b.log.Debug(). - Int("missing_blocks", len(missingHeaders)). + Int("missing_blocks", len(missingBlocks)). Msg("querying execution nodes for events from missing blocks") - enResponse, err := b.getBlockEventsFromExecutionNode(ctx, missingHeaders, eventType, requiredEventEncodingVersion) + enResponse, err := b.getBlockEventsFromExecutionNode(ctx, missingBlocks, eventType, requiredEventEncodingVersion) if err != nil { return nil, err } @@ -198,25 +220,25 @@ func (b *backendEvents) getBlockEvents( // from the local storage func (b *backendEvents) getBlockEventsFromStorage( ctx context.Context, - blockHeaders []*flow.Header, + blockInfos []blockMetadata, eventType flow.EventType, requiredEventEncodingVersion entities.EventEncodingVersion, -) ([]flow.BlockEvents, []*flow.Header, error) { - missing := make([]*flow.Header, 0) +) ([]flow.BlockEvents, []blockMetadata, error) { + missing := make([]blockMetadata, 0) resp := make([]flow.BlockEvents, 0) - for _, header := range blockHeaders { + for _, blockInfo := range blockInfos { if ctx.Err() != nil { return nil, nil, rpc.ConvertError(ctx.Err(), "failed to get events from storage", codes.Canceled) } - events, err := b.events.ByBlockID(header.ID()) + events, err := b.events.ByBlockID(blockInfo.ID) if err != nil { // Note: if there are no events for a block, an empty slice is returned if errors.Is(err, storage.ErrNotFound) { - missing = append(missing, header) + missing = append(missing, blockInfo) continue } - err = fmt.Errorf("failed to get events for block %s: %w", header.ID(), err) + err = fmt.Errorf("failed to get events for block %s: %w", blockInfo.ID, err) return nil, nil, rpc.ConvertError(err, "failed to get events from storage", codes.Internal) } @@ -230,7 +252,7 @@ func (b *backendEvents) getBlockEventsFromStorage( if requiredEventEncodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 { payload, err := convert.CcfPayloadToJsonPayload(e.Payload) if err != nil { - err = fmt.Errorf("failed to convert event payload for block %s: %w", header.ID(), err) + err = fmt.Errorf("failed to convert event payload for block %s: %w", blockInfo.ID, err) return nil, nil, rpc.ConvertError(err, "failed to convert event payload", codes.Internal) } e.Payload = payload @@ -240,9 +262,9 @@ func (b *backendEvents) getBlockEventsFromStorage( } resp = append(resp, flow.BlockEvents{ - BlockID: header.ID(), - BlockHeight: header.Height, - BlockTimestamp: header.Timestamp, + BlockID: blockInfo.ID, + BlockHeight: blockInfo.Height, + BlockTimestamp: blockInfo.Timestamp, Events: filteredEvents, }) } @@ -254,15 +276,15 @@ func (b *backendEvents) getBlockEventsFromStorage( // from an execution node func (b *backendEvents) getBlockEventsFromExecutionNode( ctx context.Context, - blockHeaders []*flow.Header, + blockInfos []blockMetadata, eventType string, requiredEventEncodingVersion entities.EventEncodingVersion, ) ([]flow.BlockEvents, error) { // create an execution API request for events at block ID - blockIDs := make([]flow.Identifier, len(blockHeaders)) - for i := range blockIDs { - blockIDs[i] = blockHeaders[i].ID() + blockIDs := make([]flow.Identifier, len(blockInfos)) + for i := range blockInfos { + blockIDs[i] = blockInfos[i].ID } if len(blockIDs) == 0 { @@ -296,7 +318,7 @@ func (b *backendEvents) getBlockEventsFromExecutionNode( // convert execution node api result to access node api result results, err := verifyAndConvertToAccessEvents( resp.GetResults(), - blockHeaders, + blockInfos, resp.GetEventEncodingVersion(), requiredEventEncodingVersion, ) @@ -307,31 +329,31 @@ func (b *backendEvents) getBlockEventsFromExecutionNode( return results, nil } -// verifyAndConvertToAccessEvents converts execution node api result to access node api result, and verifies that the results contains -// results from each block that was requested +// verifyAndConvertToAccessEvents converts execution node api result to access node api result, +// and verifies that the results contains results from each block that was requested func verifyAndConvertToAccessEvents( execEvents []*execproto.GetEventsForBlockIDsResponse_Result, - requestedBlockHeaders []*flow.Header, + requestedBlockInfos []blockMetadata, from entities.EventEncodingVersion, to entities.EventEncodingVersion, ) ([]flow.BlockEvents, error) { - if len(execEvents) != len(requestedBlockHeaders) { + if len(execEvents) != len(requestedBlockInfos) { return nil, errors.New("number of results does not match number of blocks requested") } - requestedBlockHeaderSet := map[string]*flow.Header{} - for _, header := range requestedBlockHeaders { - requestedBlockHeaderSet[header.ID().String()] = header + requestedBlockInfoSet := map[string]blockMetadata{} + for _, header := range requestedBlockInfos { + requestedBlockInfoSet[header.ID.String()] = header } results := make([]flow.BlockEvents, len(execEvents)) for i, result := range execEvents { - header, expected := requestedBlockHeaderSet[hex.EncodeToString(result.GetBlockId())] + blockInfo, expected := requestedBlockInfoSet[hex.EncodeToString(result.GetBlockId())] if !expected { return nil, fmt.Errorf("unexpected blockID from exe node %x", result.GetBlockId()) } - if result.GetBlockHeight() != header.Height { + if result.GetBlockHeight() != blockInfo.Height { return nil, fmt.Errorf("unexpected block height %d for block %x from exe node", result.GetBlockHeight(), result.GetBlockId()) @@ -344,9 +366,9 @@ func verifyAndConvertToAccessEvents( } results[i] = flow.BlockEvents{ - BlockID: header.ID(), - BlockHeight: header.Height, - BlockTimestamp: header.Timestamp, + BlockID: blockInfo.ID, + BlockHeight: blockInfo.Height, + BlockTimestamp: blockInfo.Timestamp, Events: events, } } diff --git a/engine/access/rpc/backend/backend_events_test.go b/engine/access/rpc/backend/backend_events_test.go index 68263272af0..10306a304aa 100644 --- a/engine/access/rpc/backend/backend_events_test.go +++ b/engine/access/rpc/backend/backend_events_test.go @@ -121,13 +121,13 @@ func (s *BackendEventsSuite) SetupTest() { return nil, storage.ErrNotFound }).Maybe() - s.headers.On("ByHeight", mock.Anything).Return(func(height uint64) (*flow.Header, error) { + s.headers.On("BlockIDByHeight", mock.Anything).Return(func(height uint64) (flow.Identifier, error) { for _, block := range s.blocks { if height == block.Header.Height { - return block.Header, nil + return block.ID(), nil } } - return nil, storage.ErrNotFound + return flow.ZeroID, storage.ErrNotFound }).Maybe() s.headers.On("ByBlockID", mock.Anything).Return(func(blockID flow.Identifier) (*flow.Header, error) { diff --git a/engine/access/rpc/backend/backend_transactions.go b/engine/access/rpc/backend/backend_transactions.go index 2d981c6e20c..17083312858 100644 --- a/engine/access/rpc/backend/backend_transactions.go +++ b/engine/access/rpc/backend/backend_transactions.go @@ -91,7 +91,7 @@ func (b *backendTransactions) trySendTransaction(ctx context.Context, tx *flow.T } // otherwise choose all collection nodes to try - collNodes, err := b.chooseCollectionNodes(tx) + collNodes, err := b.chooseCollectionNodes(tx.ID()) if err != nil { return fmt.Errorf("failed to determine collection node for tx %x: %w", tx, err) } @@ -122,7 +122,7 @@ func (b *backendTransactions) trySendTransaction(ctx context.Context, tx *flow.T // chooseCollectionNodes finds a random subset of size sampleSize of collection node addresses from the // collection node cluster responsible for the given tx -func (b *backendTransactions) chooseCollectionNodes(tx *flow.TransactionBody) (flow.IdentityList, error) { +func (b *backendTransactions) chooseCollectionNodes(txID flow.Identifier) (flow.IdentityList, error) { // retrieve the set of collector clusters clusters, err := b.state.Final().Epochs().Current().Clustering() @@ -131,18 +131,20 @@ func (b *backendTransactions) chooseCollectionNodes(tx *flow.TransactionBody) (f } // get the cluster responsible for the transaction - targetNodes, ok := clusters.ByTxID(tx.ID()) + targetNodes, ok := clusters.ByTxID(txID) if !ok { - return nil, fmt.Errorf("could not get local cluster by txID: %x", tx.ID()) + return nil, fmt.Errorf("could not get local cluster by txID: %x", txID) } return targetNodes, nil } // sendTransactionToCollection sends the transaction to the given collection node via grpc -func (b *backendTransactions) sendTransactionToCollector(ctx context.Context, +func (b *backendTransactions) sendTransactionToCollector( + ctx context.Context, tx *flow.TransactionBody, - collectionNodeAddr string) error { + collectionNodeAddr string, +) error { collectionRPC, closer, err := b.connFactory.GetAccessAPIClient(collectionNodeAddr) if err != nil {