Skip to content

Commit

Permalink
[Access] Optimize header ID calculation in get events
Browse files Browse the repository at this point in the history
  • Loading branch information
peterargue committed Jan 4, 2024
1 parent 4f05dcc commit f8469b7
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 54 deletions.
112 changes: 67 additions & 45 deletions engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ type backendEvents struct {
queryMode IndexQueryMode
}

// blockMetadata is used to capture information about requested blocks to avoid repeated blockID
// calculations and passing around full block headers.
type blockMetadata struct {
ID flow.Identifier
Height uint64
Timestamp time.Time
}

// GetEventsForHeightRange retrieves events for all sealed blocks between the start block height and
// the end block height (inclusive) that have the given type.
func (b *backendEvents) GetEventsForHeightRange(
Expand Down Expand Up @@ -87,15 +95,25 @@ func (b *backendEvents) GetEventsForHeightRange(
}

// find the block headers for all the blocks between min and max height (inclusive)
blockHeaders := make([]*flow.Header, 0)
blockHeaders := make([]blockMetadata, 0, endHeight-startHeight+1)

for i := startHeight; i <= endHeight; i++ {
header, err := b.headers.ByHeight(i)
// this looks inefficient, but is actually what's done under the covers by `headers.ByHeight`
// and avoids calculating header.ID() for each block.
blockID, err := b.headers.BlockIDByHeight(i)
if err != nil {
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get events: %w", err))
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get blockID for %d: %w", i, err))
}
header, err := b.headers.ByBlockID(blockID)
if err != nil {
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get block header for %d: %w", i, err))
}

blockHeaders = append(blockHeaders, header)
blockHeaders = append(blockHeaders, blockMetadata{
ID: blockID,
Height: header.Height,
Timestamp: header.Timestamp,
})
}

return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
Expand All @@ -114,14 +132,18 @@ func (b *backendEvents) GetEventsForBlockIDs(
}

// find the block headers for all the block IDs
blockHeaders := make([]*flow.Header, 0)
blockHeaders := make([]blockMetadata, 0, len(blockIDs))
for _, blockID := range blockIDs {
header, err := b.headers.ByBlockID(blockID)
if err != nil {
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get events: %w", err))
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get block header for %s: %w", blockID, err))
}

blockHeaders = append(blockHeaders, header)
blockHeaders = append(blockHeaders, blockMetadata{
ID: blockID,
Height: header.Height,
Timestamp: header.Timestamp,
})
}

return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
Expand All @@ -131,7 +153,7 @@ func (b *backendEvents) GetEventsForBlockIDs(
// It gets all events available in storage, and requests the rest from an execution node.
func (b *backendEvents) getBlockEvents(
ctx context.Context,
blockHeaders []*flow.Header,
blockInfos []blockMetadata,
eventType string,
requiredEventEncodingVersion entities.EventEncodingVersion,
) ([]flow.BlockEvents, error) {
Expand All @@ -143,36 +165,36 @@ func (b *backendEvents) getBlockEvents(

switch b.queryMode {
case IndexQueryModeExecutionNodesOnly:
return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
return b.getBlockEventsFromExecutionNode(ctx, blockInfos, eventType, requiredEventEncodingVersion)

case IndexQueryModeLocalOnly:
localResponse, missingHeaders, err := b.getBlockEventsFromStorage(ctx, blockHeaders, target, requiredEventEncodingVersion)
localResponse, missingBlocks, err := b.getBlockEventsFromStorage(ctx, blockInfos, target, requiredEventEncodingVersion)
if err != nil {
return nil, err
}
// all blocks should be available.
if len(missingHeaders) > 0 {
return nil, status.Errorf(codes.NotFound, "events not found in local storage for %d blocks", len(missingHeaders))
if len(missingBlocks) > 0 {
return nil, status.Errorf(codes.NotFound, "events not found in local storage for %d blocks", len(missingBlocks))
}
return localResponse, nil

case IndexQueryModeFailover:
localResponse, missingHeaders, err := b.getBlockEventsFromStorage(ctx, blockHeaders, target, requiredEventEncodingVersion)
localResponse, missingBlocks, err := b.getBlockEventsFromStorage(ctx, blockInfos, target, requiredEventEncodingVersion)
if err != nil {
// if there was an error, request all blocks from execution nodes
missingHeaders = blockHeaders
missingBlocks = blockInfos
b.log.Debug().Err(err).Msg("failed to get events from local storage")
}

if len(missingHeaders) == 0 {
if len(missingBlocks) == 0 {
return localResponse, nil
}

b.log.Debug().
Int("missing_blocks", len(missingHeaders)).
Int("missing_blocks", len(missingBlocks)).
Msg("querying execution nodes for events from missing blocks")

enResponse, err := b.getBlockEventsFromExecutionNode(ctx, missingHeaders, eventType, requiredEventEncodingVersion)
enResponse, err := b.getBlockEventsFromExecutionNode(ctx, missingBlocks, eventType, requiredEventEncodingVersion)
if err != nil {
return nil, err
}
Expand All @@ -198,25 +220,25 @@ func (b *backendEvents) getBlockEvents(
// from the local storage
func (b *backendEvents) getBlockEventsFromStorage(
ctx context.Context,
blockHeaders []*flow.Header,
blockInfos []blockMetadata,
eventType flow.EventType,
requiredEventEncodingVersion entities.EventEncodingVersion,
) ([]flow.BlockEvents, []*flow.Header, error) {
missing := make([]*flow.Header, 0)
) ([]flow.BlockEvents, []blockMetadata, error) {
missing := make([]blockMetadata, 0)
resp := make([]flow.BlockEvents, 0)
for _, header := range blockHeaders {
for _, blockInfo := range blockInfos {
if ctx.Err() != nil {
return nil, nil, rpc.ConvertError(ctx.Err(), "failed to get events from storage", codes.Canceled)
}

events, err := b.events.ByBlockID(header.ID())
events, err := b.events.ByBlockID(blockInfo.ID)
if err != nil {
// Note: if there are no events for a block, an empty slice is returned
if errors.Is(err, storage.ErrNotFound) {
missing = append(missing, header)
missing = append(missing, blockInfo)
continue
}
err = fmt.Errorf("failed to get events for block %s: %w", header.ID(), err)
err = fmt.Errorf("failed to get events for block %s: %w", blockInfo.ID, err)
return nil, nil, rpc.ConvertError(err, "failed to get events from storage", codes.Internal)
}

Expand All @@ -230,7 +252,7 @@ func (b *backendEvents) getBlockEventsFromStorage(
if requiredEventEncodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 {
payload, err := convert.CcfPayloadToJsonPayload(e.Payload)
if err != nil {
err = fmt.Errorf("failed to convert event payload for block %s: %w", header.ID(), err)
err = fmt.Errorf("failed to convert event payload for block %s: %w", blockInfo.ID, err)
return nil, nil, rpc.ConvertError(err, "failed to convert event payload", codes.Internal)
}
e.Payload = payload
Expand All @@ -240,9 +262,9 @@ func (b *backendEvents) getBlockEventsFromStorage(
}

resp = append(resp, flow.BlockEvents{
BlockID: header.ID(),
BlockHeight: header.Height,
BlockTimestamp: header.Timestamp,
BlockID: blockInfo.ID,
BlockHeight: blockInfo.Height,
BlockTimestamp: blockInfo.Timestamp,
Events: filteredEvents,
})
}
Expand All @@ -254,15 +276,15 @@ func (b *backendEvents) getBlockEventsFromStorage(
// from an execution node
func (b *backendEvents) getBlockEventsFromExecutionNode(
ctx context.Context,
blockHeaders []*flow.Header,
blockInfos []blockMetadata,
eventType string,
requiredEventEncodingVersion entities.EventEncodingVersion,
) ([]flow.BlockEvents, error) {

// create an execution API request for events at block ID
blockIDs := make([]flow.Identifier, len(blockHeaders))
for i := range blockIDs {
blockIDs[i] = blockHeaders[i].ID()
blockIDs := make([]flow.Identifier, len(blockInfos))
for i := range blockInfos {
blockIDs[i] = blockInfos[i].ID
}

if len(blockIDs) == 0 {
Expand Down Expand Up @@ -296,7 +318,7 @@ func (b *backendEvents) getBlockEventsFromExecutionNode(
// convert execution node api result to access node api result
results, err := verifyAndConvertToAccessEvents(
resp.GetResults(),
blockHeaders,
blockInfos,
resp.GetEventEncodingVersion(),
requiredEventEncodingVersion,
)
Expand All @@ -307,31 +329,31 @@ func (b *backendEvents) getBlockEventsFromExecutionNode(
return results, nil
}

// verifyAndConvertToAccessEvents converts execution node api result to access node api result, and verifies that the results contains
// results from each block that was requested
// verifyAndConvertToAccessEvents converts execution node api result to access node api result,
// and verifies that the results contains results from each block that was requested
func verifyAndConvertToAccessEvents(
execEvents []*execproto.GetEventsForBlockIDsResponse_Result,
requestedBlockHeaders []*flow.Header,
requestedBlockInfos []blockMetadata,
from entities.EventEncodingVersion,
to entities.EventEncodingVersion,
) ([]flow.BlockEvents, error) {
if len(execEvents) != len(requestedBlockHeaders) {
if len(execEvents) != len(requestedBlockInfos) {
return nil, errors.New("number of results does not match number of blocks requested")
}

requestedBlockHeaderSet := map[string]*flow.Header{}
for _, header := range requestedBlockHeaders {
requestedBlockHeaderSet[header.ID().String()] = header
requestedBlockInfoSet := map[string]blockMetadata{}
for _, header := range requestedBlockInfos {
requestedBlockInfoSet[header.ID.String()] = header
}

results := make([]flow.BlockEvents, len(execEvents))

for i, result := range execEvents {
header, expected := requestedBlockHeaderSet[hex.EncodeToString(result.GetBlockId())]
blockInfo, expected := requestedBlockInfoSet[hex.EncodeToString(result.GetBlockId())]
if !expected {
return nil, fmt.Errorf("unexpected blockID from exe node %x", result.GetBlockId())
}
if result.GetBlockHeight() != header.Height {
if result.GetBlockHeight() != blockInfo.Height {
return nil, fmt.Errorf("unexpected block height %d for block %x from exe node",
result.GetBlockHeight(),
result.GetBlockId())
Expand All @@ -344,9 +366,9 @@ func verifyAndConvertToAccessEvents(
}

results[i] = flow.BlockEvents{
BlockID: header.ID(),
BlockHeight: header.Height,
BlockTimestamp: header.Timestamp,
BlockID: blockInfo.ID,
BlockHeight: blockInfo.Height,
BlockTimestamp: blockInfo.Timestamp,
Events: events,
}
}
Expand Down
6 changes: 3 additions & 3 deletions engine/access/rpc/backend/backend_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ func (s *BackendEventsSuite) SetupTest() {
return nil, storage.ErrNotFound
}).Maybe()

s.headers.On("ByHeight", mock.Anything).Return(func(height uint64) (*flow.Header, error) {
s.headers.On("BlockIDByHeight", mock.Anything).Return(func(height uint64) (flow.Identifier, error) {
for _, block := range s.blocks {
if height == block.Header.Height {
return block.Header, nil
return block.ID(), nil
}
}
return nil, storage.ErrNotFound
return flow.ZeroID, storage.ErrNotFound
}).Maybe()

s.headers.On("ByBlockID", mock.Anything).Return(func(blockID flow.Identifier) (*flow.Header, error) {
Expand Down
14 changes: 8 additions & 6 deletions engine/access/rpc/backend/backend_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (b *backendTransactions) trySendTransaction(ctx context.Context, tx *flow.T
}

// otherwise choose all collection nodes to try
collNodes, err := b.chooseCollectionNodes(tx)
collNodes, err := b.chooseCollectionNodes(tx.ID())
if err != nil {
return fmt.Errorf("failed to determine collection node for tx %x: %w", tx, err)
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (b *backendTransactions) trySendTransaction(ctx context.Context, tx *flow.T

// chooseCollectionNodes finds a random subset of size sampleSize of collection node addresses from the
// collection node cluster responsible for the given tx
func (b *backendTransactions) chooseCollectionNodes(tx *flow.TransactionBody) (flow.IdentityList, error) {
func (b *backendTransactions) chooseCollectionNodes(txID flow.Identifier) (flow.IdentityList, error) {

// retrieve the set of collector clusters
clusters, err := b.state.Final().Epochs().Current().Clustering()
Expand All @@ -131,18 +131,20 @@ func (b *backendTransactions) chooseCollectionNodes(tx *flow.TransactionBody) (f
}

// get the cluster responsible for the transaction
targetNodes, ok := clusters.ByTxID(tx.ID())
targetNodes, ok := clusters.ByTxID(txID)
if !ok {
return nil, fmt.Errorf("could not get local cluster by txID: %x", tx.ID())
return nil, fmt.Errorf("could not get local cluster by txID: %x", txID)
}

return targetNodes, nil
}

// sendTransactionToCollection sends the transaction to the given collection node via grpc
func (b *backendTransactions) sendTransactionToCollector(ctx context.Context,
func (b *backendTransactions) sendTransactionToCollector(
ctx context.Context,
tx *flow.TransactionBody,
collectionNodeAddr string) error {
collectionNodeAddr string,
) error {

collectionRPC, closer, err := b.connFactory.GetAccessAPIClient(collectionNodeAddr)
if err != nil {
Expand Down

0 comments on commit f8469b7

Please sign in to comment.