Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Optimize header ID calculation in get events - v0.32 #5201

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 67 additions & 45 deletions engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ type backendEvents struct {
queryMode IndexQueryMode
}

// blockMetadata is used to capture information about requested blocks to avoid repeated blockID
// calculations and passing around full block headers.
type blockMetadata struct {
ID flow.Identifier
Height uint64
Timestamp time.Time
}

// GetEventsForHeightRange retrieves events for all sealed blocks between the start block height and
// the end block height (inclusive) that have the given type.
func (b *backendEvents) GetEventsForHeightRange(
Expand Down Expand Up @@ -87,15 +95,25 @@ func (b *backendEvents) GetEventsForHeightRange(
}

// find the block headers for all the blocks between min and max height (inclusive)
blockHeaders := make([]*flow.Header, 0)
blockHeaders := make([]blockMetadata, 0, endHeight-startHeight+1)

for i := startHeight; i <= endHeight; i++ {
header, err := b.headers.ByHeight(i)
// this looks inefficient, but is actually what's done under the covers by `headers.ByHeight`
// and avoids calculating header.ID() for each block.
blockID, err := b.headers.BlockIDByHeight(i)
if err != nil {
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get events: %w", err))
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get blockID for %d: %w", i, err))
}
header, err := b.headers.ByBlockID(blockID)
if err != nil {
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get block header for %d: %w", i, err))
}

blockHeaders = append(blockHeaders, header)
blockHeaders = append(blockHeaders, blockMetadata{
ID: blockID,
Height: header.Height,
Timestamp: header.Timestamp,
})
}

return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
Expand All @@ -114,14 +132,18 @@ func (b *backendEvents) GetEventsForBlockIDs(
}

// find the block headers for all the block IDs
blockHeaders := make([]*flow.Header, 0)
blockHeaders := make([]blockMetadata, 0, len(blockIDs))
for _, blockID := range blockIDs {
header, err := b.headers.ByBlockID(blockID)
if err != nil {
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get events: %w", err))
return nil, rpc.ConvertStorageError(fmt.Errorf("failed to get block header for %s: %w", blockID, err))
}

blockHeaders = append(blockHeaders, header)
blockHeaders = append(blockHeaders, blockMetadata{
ID: blockID,
Height: header.Height,
Timestamp: header.Timestamp,
})
}

return b.getBlockEvents(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
Expand All @@ -131,7 +153,7 @@ func (b *backendEvents) GetEventsForBlockIDs(
// It gets all events available in storage, and requests the rest from an execution node.
func (b *backendEvents) getBlockEvents(
ctx context.Context,
blockHeaders []*flow.Header,
blockInfos []blockMetadata,
eventType string,
requiredEventEncodingVersion entities.EventEncodingVersion,
) ([]flow.BlockEvents, error) {
Expand All @@ -143,36 +165,36 @@ func (b *backendEvents) getBlockEvents(

switch b.queryMode {
case IndexQueryModeExecutionNodesOnly:
return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType, requiredEventEncodingVersion)
return b.getBlockEventsFromExecutionNode(ctx, blockInfos, eventType, requiredEventEncodingVersion)

case IndexQueryModeLocalOnly:
localResponse, missingHeaders, err := b.getBlockEventsFromStorage(ctx, blockHeaders, target, requiredEventEncodingVersion)
localResponse, missingBlocks, err := b.getBlockEventsFromStorage(ctx, blockInfos, target, requiredEventEncodingVersion)
if err != nil {
return nil, err
}
// all blocks should be available.
if len(missingHeaders) > 0 {
return nil, status.Errorf(codes.NotFound, "events not found in local storage for %d blocks", len(missingHeaders))
if len(missingBlocks) > 0 {
return nil, status.Errorf(codes.NotFound, "events not found in local storage for %d blocks", len(missingBlocks))
}
return localResponse, nil

case IndexQueryModeFailover:
localResponse, missingHeaders, err := b.getBlockEventsFromStorage(ctx, blockHeaders, target, requiredEventEncodingVersion)
localResponse, missingBlocks, err := b.getBlockEventsFromStorage(ctx, blockInfos, target, requiredEventEncodingVersion)
if err != nil {
// if there was an error, request all blocks from execution nodes
missingHeaders = blockHeaders
missingBlocks = blockInfos
b.log.Debug().Err(err).Msg("failed to get events from local storage")
}

if len(missingHeaders) == 0 {
if len(missingBlocks) == 0 {
return localResponse, nil
}

b.log.Debug().
Int("missing_blocks", len(missingHeaders)).
Int("missing_blocks", len(missingBlocks)).
Msg("querying execution nodes for events from missing blocks")

enResponse, err := b.getBlockEventsFromExecutionNode(ctx, missingHeaders, eventType, requiredEventEncodingVersion)
enResponse, err := b.getBlockEventsFromExecutionNode(ctx, missingBlocks, eventType, requiredEventEncodingVersion)
if err != nil {
return nil, err
}
Expand All @@ -198,25 +220,25 @@ func (b *backendEvents) getBlockEvents(
// from the local storage
func (b *backendEvents) getBlockEventsFromStorage(
ctx context.Context,
blockHeaders []*flow.Header,
blockInfos []blockMetadata,
eventType flow.EventType,
requiredEventEncodingVersion entities.EventEncodingVersion,
) ([]flow.BlockEvents, []*flow.Header, error) {
missing := make([]*flow.Header, 0)
) ([]flow.BlockEvents, []blockMetadata, error) {
missing := make([]blockMetadata, 0)
resp := make([]flow.BlockEvents, 0)
for _, header := range blockHeaders {
for _, blockInfo := range blockInfos {
if ctx.Err() != nil {
return nil, nil, rpc.ConvertError(ctx.Err(), "failed to get events from storage", codes.Canceled)
}

events, err := b.events.ByBlockID(header.ID())
events, err := b.events.ByBlockID(blockInfo.ID)
if err != nil {
// Note: if there are no events for a block, an empty slice is returned
if errors.Is(err, storage.ErrNotFound) {
missing = append(missing, header)
missing = append(missing, blockInfo)
continue
}
err = fmt.Errorf("failed to get events for block %s: %w", header.ID(), err)
err = fmt.Errorf("failed to get events for block %s: %w", blockInfo.ID, err)
return nil, nil, rpc.ConvertError(err, "failed to get events from storage", codes.Internal)
}

Expand All @@ -230,7 +252,7 @@ func (b *backendEvents) getBlockEventsFromStorage(
if requiredEventEncodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 {
payload, err := convert.CcfPayloadToJsonPayload(e.Payload)
if err != nil {
err = fmt.Errorf("failed to convert event payload for block %s: %w", header.ID(), err)
err = fmt.Errorf("failed to convert event payload for block %s: %w", blockInfo.ID, err)
return nil, nil, rpc.ConvertError(err, "failed to convert event payload", codes.Internal)
}
e.Payload = payload
Expand All @@ -240,9 +262,9 @@ func (b *backendEvents) getBlockEventsFromStorage(
}

resp = append(resp, flow.BlockEvents{
BlockID: header.ID(),
BlockHeight: header.Height,
BlockTimestamp: header.Timestamp,
BlockID: blockInfo.ID,
BlockHeight: blockInfo.Height,
BlockTimestamp: blockInfo.Timestamp,
Events: filteredEvents,
})
}
Expand All @@ -254,15 +276,15 @@ func (b *backendEvents) getBlockEventsFromStorage(
// from an execution node
func (b *backendEvents) getBlockEventsFromExecutionNode(
ctx context.Context,
blockHeaders []*flow.Header,
blockInfos []blockMetadata,
eventType string,
requiredEventEncodingVersion entities.EventEncodingVersion,
) ([]flow.BlockEvents, error) {

// create an execution API request for events at block ID
blockIDs := make([]flow.Identifier, len(blockHeaders))
for i := range blockIDs {
blockIDs[i] = blockHeaders[i].ID()
blockIDs := make([]flow.Identifier, len(blockInfos))
for i := range blockInfos {
blockIDs[i] = blockInfos[i].ID
}

if len(blockIDs) == 0 {
Expand Down Expand Up @@ -296,7 +318,7 @@ func (b *backendEvents) getBlockEventsFromExecutionNode(
// convert execution node api result to access node api result
results, err := verifyAndConvertToAccessEvents(
resp.GetResults(),
blockHeaders,
blockInfos,
resp.GetEventEncodingVersion(),
requiredEventEncodingVersion,
)
Expand All @@ -307,31 +329,31 @@ func (b *backendEvents) getBlockEventsFromExecutionNode(
return results, nil
}

// verifyAndConvertToAccessEvents converts execution node api result to access node api result, and verifies that the results contains
// results from each block that was requested
// verifyAndConvertToAccessEvents converts execution node api result to access node api result,
// and verifies that the results contains results from each block that was requested
func verifyAndConvertToAccessEvents(
execEvents []*execproto.GetEventsForBlockIDsResponse_Result,
requestedBlockHeaders []*flow.Header,
requestedBlockInfos []blockMetadata,
from entities.EventEncodingVersion,
to entities.EventEncodingVersion,
) ([]flow.BlockEvents, error) {
if len(execEvents) != len(requestedBlockHeaders) {
if len(execEvents) != len(requestedBlockInfos) {
return nil, errors.New("number of results does not match number of blocks requested")
}

requestedBlockHeaderSet := map[string]*flow.Header{}
for _, header := range requestedBlockHeaders {
requestedBlockHeaderSet[header.ID().String()] = header
requestedBlockInfoSet := map[string]blockMetadata{}
for _, header := range requestedBlockInfos {
requestedBlockInfoSet[header.ID.String()] = header
}

results := make([]flow.BlockEvents, len(execEvents))

for i, result := range execEvents {
header, expected := requestedBlockHeaderSet[hex.EncodeToString(result.GetBlockId())]
blockInfo, expected := requestedBlockInfoSet[hex.EncodeToString(result.GetBlockId())]
if !expected {
return nil, fmt.Errorf("unexpected blockID from exe node %x", result.GetBlockId())
}
if result.GetBlockHeight() != header.Height {
if result.GetBlockHeight() != blockInfo.Height {
return nil, fmt.Errorf("unexpected block height %d for block %x from exe node",
result.GetBlockHeight(),
result.GetBlockId())
Expand All @@ -344,9 +366,9 @@ func verifyAndConvertToAccessEvents(
}

results[i] = flow.BlockEvents{
BlockID: header.ID(),
BlockHeight: header.Height,
BlockTimestamp: header.Timestamp,
BlockID: blockInfo.ID,
BlockHeight: blockInfo.Height,
BlockTimestamp: blockInfo.Timestamp,
Events: events,
}
}
Expand Down
6 changes: 3 additions & 3 deletions engine/access/rpc/backend/backend_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ func (s *BackendEventsSuite) SetupTest() {
return nil, storage.ErrNotFound
}).Maybe()

s.headers.On("ByHeight", mock.Anything).Return(func(height uint64) (*flow.Header, error) {
s.headers.On("BlockIDByHeight", mock.Anything).Return(func(height uint64) (flow.Identifier, error) {
for _, block := range s.blocks {
if height == block.Header.Height {
return block.Header, nil
return block.ID(), nil
}
}
return nil, storage.ErrNotFound
return flow.ZeroID, storage.ErrNotFound
}).Maybe()

s.headers.On("ByBlockID", mock.Anything).Return(func(blockID flow.Identifier) (*flow.Header, error) {
Expand Down
14 changes: 8 additions & 6 deletions engine/access/rpc/backend/backend_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (b *backendTransactions) trySendTransaction(ctx context.Context, tx *flow.T
}

// otherwise choose all collection nodes to try
collNodes, err := b.chooseCollectionNodes(tx)
collNodes, err := b.chooseCollectionNodes(tx.ID())
if err != nil {
return fmt.Errorf("failed to determine collection node for tx %x: %w", tx, err)
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (b *backendTransactions) trySendTransaction(ctx context.Context, tx *flow.T

// chooseCollectionNodes finds a random subset of size sampleSize of collection node addresses from the
// collection node cluster responsible for the given tx
func (b *backendTransactions) chooseCollectionNodes(tx *flow.TransactionBody) (flow.IdentityList, error) {
func (b *backendTransactions) chooseCollectionNodes(txID flow.Identifier) (flow.IdentityList, error) {

// retrieve the set of collector clusters
clusters, err := b.state.Final().Epochs().Current().Clustering()
Expand All @@ -131,18 +131,20 @@ func (b *backendTransactions) chooseCollectionNodes(tx *flow.TransactionBody) (f
}

// get the cluster responsible for the transaction
targetNodes, ok := clusters.ByTxID(tx.ID())
targetNodes, ok := clusters.ByTxID(txID)
if !ok {
return nil, fmt.Errorf("could not get local cluster by txID: %x", tx.ID())
return nil, fmt.Errorf("could not get local cluster by txID: %x", txID)
}

return targetNodes, nil
}

// sendTransactionToCollection sends the transaction to the given collection node via grpc
func (b *backendTransactions) sendTransactionToCollector(ctx context.Context,
func (b *backendTransactions) sendTransactionToCollector(
ctx context.Context,
tx *flow.TransactionBody,
collectionNodeAddr string) error {
collectionNodeAddr string,
) error {

collectionRPC, closer, err := b.connFactory.GetAccessAPIClient(collectionNodeAddr)
if err != nil {
Expand Down