diff --git a/access/api.go b/access/api.go index 3201796c6ed..4a5bcbc7de3 100644 --- a/access/api.go +++ b/access/api.go @@ -197,7 +197,7 @@ type API interface { // SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the // transaction itself until the block containing the transaction becomes sealed or expired. When the transaction // status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down. - SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription + SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription } // TODO: Combine this with flow.TransactionResult? diff --git a/access/handler.go b/access/handler.go index a191f333662..559cb7b5096 100644 --- a/access/handler.go +++ b/access/handler.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/counters" "github.com/onflow/flow/protobuf/go/flow/access" "github.com/onflow/flow/protobuf/go/flow/entities" @@ -1112,11 +1113,23 @@ func (h *Handler) SendAndSubscribeTransactionStatuses( return err } - sub := h.api.SubscribeTransactionStatuses(ctx, &tx) - return subscription.HandleSubscription(sub, func(txSubInfo *convert.TransactionSubscribeInfo) error { - err = stream.Send(convert.TransactionSubscribeInfoToMessage(txSubInfo)) - if err != nil { - return rpc.ConvertError(err, "could not send response", codes.Internal) + sub := h.api.SubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion()) + + messageIndex := counters.NewMonotonousCounter(0) + return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error { + for i := range txResults { + value := messageIndex.Value() + if ok := messageIndex.Set(value + 1); !ok { + return status.Errorf(codes.Internal, "the message index has already been incremented to %d", messageIndex.Value()) + } + + err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{ + TransactionResults: TransactionResultToMessage(txResults[i]), + MessageIndex: value, + }) + if err != nil { + return rpc.ConvertError(err, "could not send response", codes.Internal) + } } return nil diff --git a/access/mock/api.go b/access/mock/api.go index b27e8a03580..8e6f8e53936 100644 --- a/access/mock/api.go +++ b/access/mock/api.go @@ -977,13 +977,13 @@ func (_m *API) SubscribeBlocksFromStartHeight(ctx context.Context, startHeight u return r0 } -// SubscribeTransactionStatuses provides a mock function with given fields: ctx, tx -func (_m *API) SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription { - ret := _m.Called(ctx, tx) +// SubscribeTransactionStatuses provides a mock function with given fields: ctx, tx, requiredEventEncodingVersion +func (_m *API) SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription { + ret := _m.Called(ctx, tx, requiredEventEncodingVersion) var r0 subscription.Subscription - if rf, ok := ret.Get(0).(func(context.Context, *flow.TransactionBody) subscription.Subscription); ok { - r0 = rf(ctx, tx) + if rf, ok := ret.Get(0).(func(context.Context, *flow.TransactionBody, entities.EventEncodingVersion) subscription.Subscription); ok { + r0 = rf(ctx, tx, requiredEventEncodingVersion) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(subscription.Subscription) diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 5c27d1e9577..1b7ef03ddba 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -162,15 +162,6 @@ func New(params Params) (*Backend, error) { // initialize node version info nodeInfo := getNodeVersionInfo(params.State.Params()) - transactionsLocalDataProvider := &TransactionsLocalDataProvider{ - state: params.State, - collections: params.Collections, - blocks: params.Blocks, - eventsIndex: params.EventsIndex, - txResultsIndex: params.TxResultsIndex, - systemTxID: systemTxID, - } - b := &Backend{ state: params.State, BlockTracker: params.BlockTracker, @@ -187,25 +178,6 @@ func New(params Params) (*Backend, error) { scriptExecutor: params.ScriptExecutor, scriptExecMode: params.ScriptExecutionMode, }, - backendTransactions: backendTransactions{ - TransactionsLocalDataProvider: transactionsLocalDataProvider, - log: params.Log, - staticCollectionRPC: params.CollectionRPC, - chainID: params.ChainID, - transactions: params.Transactions, - executionReceipts: params.ExecutionReceipts, - transactionValidator: configureTransactionValidator(params.State, params.ChainID), - transactionMetrics: params.AccessMetrics, - retry: retry, - connFactory: params.ConnFactory, - previousAccessNodes: params.HistoricalAccessNodes, - nodeCommunicator: params.Communicator, - txResultCache: txResCache, - txErrorMessagesCache: txErrorMessagesCache, - txResultQueryMode: params.TxResultQueryMode, - systemTx: systemTx, - systemTxID: systemTxID, - }, backendEvents: backendEvents{ log: params.Log, chain: params.ChainID.Chain(), @@ -253,13 +225,7 @@ func New(params Params) (*Backend, error) { subscriptionHandler: params.SubscriptionHandler, blockTracker: params.BlockTracker, }, - backendSubscribeTransactions: backendSubscribeTransactions{ - txLocalDataProvider: transactionsLocalDataProvider, - log: params.Log, - executionResults: params.ExecutionResults, - subscriptionHandler: params.SubscriptionHandler, - blockTracker: params.BlockTracker, - }, + collections: params.Collections, executionReceipts: params.ExecutionReceipts, connFactory: params.ConnFactory, @@ -267,8 +233,47 @@ func New(params Params) (*Backend, error) { nodeInfo: nodeInfo, } + transactionsLocalDataProvider := &TransactionsLocalDataProvider{ + state: params.State, + collections: params.Collections, + blocks: params.Blocks, + eventsIndex: params.EventsIndex, + txResultsIndex: params.TxResultsIndex, + systemTxID: systemTxID, + } + + b.backendTransactions = backendTransactions{ + TransactionsLocalDataProvider: transactionsLocalDataProvider, + log: params.Log, + staticCollectionRPC: params.CollectionRPC, + chainID: params.ChainID, + transactions: params.Transactions, + executionReceipts: params.ExecutionReceipts, + transactionValidator: configureTransactionValidator(params.State, params.ChainID), + transactionMetrics: params.AccessMetrics, + retry: retry, + connFactory: params.ConnFactory, + previousAccessNodes: params.HistoricalAccessNodes, + nodeCommunicator: params.Communicator, + txResultCache: txResCache, + txErrorMessagesCache: txErrorMessagesCache, + txResultQueryMode: params.TxResultQueryMode, + systemTx: systemTx, + systemTxID: systemTxID, + } + + // TODO: The TransactionErrorMessage interface should be reorganized in future, as it is implemented in backendTransactions but used in TransactionsLocalDataProvider, and its initialization is somewhat quirky. b.backendTransactions.txErrorMessages = b + b.backendSubscribeTransactions = backendSubscribeTransactions{ + txLocalDataProvider: transactionsLocalDataProvider, + backendTransactions: &b.backendTransactions, + log: params.Log, + executionResults: params.ExecutionResults, + subscriptionHandler: params.SubscriptionHandler, + blockTracker: params.BlockTracker, + } + retry.SetBackend(b) preferredENIdentifiers, err = identifierList(params.PreferredExecutionNodeIDs) diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go index c01dd2db3d8..b8908365f32 100644 --- a/engine/access/rpc/backend/backend_stream_transactions.go +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -5,26 +5,26 @@ import ( "errors" "fmt" - "github.com/onflow/flow-go/module/irrecoverable" - "github.com/onflow/flow-go/state" - - "github.com/onflow/flow-go/engine/common/rpc/convert" - "github.com/onflow/flow-go/module/counters" - "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/rs/zerolog" + "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/subscription" "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/state" "github.com/onflow/flow-go/storage" + + "github.com/onflow/flow/protobuf/go/flow/entities" ) // backendSubscribeTransactions handles transaction subscriptions. type backendSubscribeTransactions struct { txLocalDataProvider *TransactionsLocalDataProvider + backendTransactions *backendTransactions executionResults storage.ExecutionResults log zerolog.Logger @@ -34,30 +34,34 @@ type backendSubscribeTransactions struct { // TransactionSubscriptionMetadata holds data representing the status state for each transaction subscription. type TransactionSubscriptionMetadata struct { - txID flow.Identifier - txReferenceBlockID flow.Identifier - messageIndex counters.StrictMonotonousCounter - blockWithTx *flow.Header - blockID flow.Identifier - txExecuted bool - lastTxStatus flow.TransactionStatus + *access.TransactionResult + txReferenceBlockID flow.Identifier + blockWithTx *flow.Header + txExecuted bool + eventEncodingVersion entities.EventEncodingVersion } // SubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID. // If invalid tx parameters will be supplied SubscribeTransactionStatuses will return a failed subscription. -func (b *backendSubscribeTransactions) SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription { +func (b *backendSubscribeTransactions) SubscribeTransactionStatuses( + ctx context.Context, + tx *flow.TransactionBody, + requiredEventEncodingVersion entities.EventEncodingVersion, +) subscription.Subscription { nextHeight, err := b.blockTracker.GetStartHeightFromBlockID(tx.ReferenceBlockID) if err != nil { return subscription.NewFailedSubscription(err, "could not get start height") } txInfo := TransactionSubscriptionMetadata{ - txID: tx.ID(), - txReferenceBlockID: tx.ReferenceBlockID, - messageIndex: counters.NewMonotonousCounter(0), - blockWithTx: nil, - blockID: flow.ZeroID, - lastTxStatus: flow.TransactionStatusUnknown, + TransactionResult: &access.TransactionResult{ + TransactionID: tx.ID(), + BlockID: flow.ZeroID, + Status: flow.TransactionStatusUnknown, + }, + txReferenceBlockID: tx.ReferenceBlockID, + blockWithTx: nil, + eventEncodingVersion: requiredEventEncodingVersion, } return b.subscriptionHandler.Subscribe(ctx, nextHeight, b.getTransactionStatusResponse(&txInfo)) @@ -67,25 +71,25 @@ func (b *backendSubscribeTransactions) SubscribeTransactionStatuses(ctx context. // subscription responses based on new blocks. func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *TransactionSubscriptionMetadata) func(context.Context, uint64) (interface{}, error) { return func(ctx context.Context, height uint64) (interface{}, error) { - highestHeight, err := b.blockTracker.GetHighestHeight(flow.BlockStatusFinalized) + err := b.checkBlockReady(height) if err != nil { - return nil, fmt.Errorf("could not get highest height for block %d: %w", height, err) - } - - // Fail early if no block finalized notification has been received for the given height. - // Note: It's possible that the block is locally finalized before the notification is - // received. This ensures a consistent view is available to all streams. - if height > highestHeight { - return nil, fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady) + return nil, err } - if txInfo.lastTxStatus == flow.TransactionStatusSealed || txInfo.lastTxStatus == flow.TransactionStatusExpired { - return nil, fmt.Errorf("transaction final status %s was already reported: %w", txInfo.lastTxStatus.String(), subscription.ErrEndOfData) + // If the transaction status already reported the final status, return with no data available + if txInfo.Status == flow.TransactionStatusSealed || txInfo.Status == flow.TransactionStatusExpired { + return nil, fmt.Errorf("transaction final status %s was already reported: %w", txInfo.Status.String(), subscription.ErrEndOfData) } + // If on this step transaction block not available, search for it. if txInfo.blockWithTx == nil { - // Check if block contains transaction. - txInfo.blockWithTx, txInfo.blockID, err = b.searchForTransactionBlock(height, txInfo) + // Search for transaction`s block information. + txInfo.blockWithTx, + txInfo.BlockID, + txInfo.BlockHeight, + txInfo.CollectionID, + err = b.searchForTransactionBlockInfo(height, txInfo) + if err != nil { if errors.Is(err, storage.ErrNotFound) { return nil, fmt.Errorf("could not find block %d in storage: %w", height, subscription.ErrBlockNotReady) @@ -97,20 +101,32 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran } } - // Find the transaction status. - var txStatus flow.TransactionStatus - if txInfo.blockWithTx == nil { - txStatus, err = b.txLocalDataProvider.DeriveUnknownTransactionStatus(txInfo.txReferenceBlockID) - } else { - if !txInfo.txExecuted { - // Check if transaction was executed. - txInfo.txExecuted, err = b.searchForExecutionResult(txInfo.blockID) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to get execution result for block %s: %v", txInfo.blockID, err) - } + // Get old status here, as it could be replaced by status from founded tx result + prevTxStatus := txInfo.Status + + // Check, if transaction executed and transaction result already available + if txInfo.blockWithTx != nil && !txInfo.txExecuted { + txResult, err := b.searchForTransactionResult(ctx, txInfo) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get execution result for block %s: %v", txInfo.BlockID, err) } - txStatus, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockID, txInfo.blockWithTx.Height, txInfo.txExecuted) + // If transaction result was found, fully replace it in metadata. New transaction status already included in result. + if txResult != nil { + txInfo.TransactionResult = txResult + //Fill in execution status for future usages + txInfo.txExecuted = true + } + } + + // If block with transaction was not found, get transaction status to check if it different from last status + if txInfo.blockWithTx == nil { + txInfo.Status, err = b.txLocalDataProvider.DeriveUnknownTransactionStatus(txInfo.txReferenceBlockID) + } else if txInfo.Status == prevTxStatus { + // When a block with the transaction is available, it is possible to receive a new transaction status while + // searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction + // statuses are the same, the current transaction status should be retrieved. + txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.BlockID, txInfo.blockWithTx.Height, txInfo.txExecuted) } if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { @@ -119,64 +135,150 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran return nil, rpc.ConvertStorageError(err) } - // The same transaction status should not be reported, so return here with no response - if txInfo.lastTxStatus == txStatus { + // If the old and new transaction statuses are still the same, the status change should not be reported, so + // return here with no response. + if prevTxStatus == txInfo.Status { return nil, nil } - txInfo.lastTxStatus = txStatus - messageIndex := txInfo.messageIndex.Value() - if ok := txInfo.messageIndex.Set(messageIndex + 1); !ok { - return nil, status.Errorf(codes.Internal, "the message index has already been incremented to %d", txInfo.messageIndex.Value()) + return b.generateResultsWithMissingStatuses(txInfo, prevTxStatus) + } +} + +// generateResultsWithMissingStatuses checks if the current result differs from the previous result by more than one step. +// If yes, it generates results for the missing transaction statuses. This is done because the subscription should send +// responses for each of the statuses in the transaction lifecycle, and the message should be sent in the order of transaction statuses. +// Possible orders of transaction statuses: +// 1. pending(1) -> finalized(2) -> executed(3) -> sealed(4) +// 2. pending(1) -> expired(5) +// No errors expected during normal operations. +func (b *backendSubscribeTransactions) generateResultsWithMissingStatuses( + txInfo *TransactionSubscriptionMetadata, + prevTxStatus flow.TransactionStatus, +) ([]*access.TransactionResult, error) { + // If the previous status is pending and the new status is expired, which is the last status, return its result. + // If the previous status is anything other than pending, return an error, as this transition is unexpected. + if txInfo.Status == flow.TransactionStatusExpired { + if prevTxStatus == flow.TransactionStatusPending { + return []*access.TransactionResult{ + txInfo.TransactionResult, + }, nil + } else { + return nil, fmt.Errorf("unexpected transition from %s to %s transaction status", prevTxStatus.String(), txInfo.Status.String()) + } + } + + var results []*access.TransactionResult + + // If the difference between statuses' values is more than one step, fill in the missing results. + if (txInfo.Status - prevTxStatus) > 1 { + for missingStatus := prevTxStatus + 1; missingStatus < txInfo.Status; missingStatus++ { + switch missingStatus { + case flow.TransactionStatusPending: + results = append(results, &access.TransactionResult{ + Status: missingStatus, + TransactionID: txInfo.TransactionID, + }) + case flow.TransactionStatusFinalized: + results = append(results, &access.TransactionResult{ + Status: missingStatus, + TransactionID: txInfo.TransactionID, + BlockID: txInfo.BlockID, + BlockHeight: txInfo.BlockHeight, + CollectionID: txInfo.CollectionID, + }) + case flow.TransactionStatusExecuted: + missingTxResult := *txInfo.TransactionResult + missingTxResult.Status = missingStatus + results = append(results, &missingTxResult) + default: + return nil, fmt.Errorf("unexpected missing transaction status") + } } + } + + results = append(results, txInfo.TransactionResult) + return results, nil +} + +// checkBlockReady checks if the given block height is valid and available based on the expected block status. +// Expected errors during normal operation: +// - subscription.ErrBlockNotReady: block for the given block height is not available. +func (b *backendSubscribeTransactions) checkBlockReady(height uint64) error { + // Get the highest available finalized block height + highestHeight, err := b.blockTracker.GetHighestHeight(flow.BlockStatusFinalized) + if err != nil { + return fmt.Errorf("could not get highest height for block %d: %w", height, err) + } - return &convert.TransactionSubscribeInfo{ - ID: txInfo.txID, - Status: txInfo.lastTxStatus, - MessageIndex: messageIndex, - }, nil + // Fail early if no block finalized notification has been received for the given height. + // Note: It's possible that the block is locally finalized before the notification is + // received. This ensures a consistent view is available to all streams. + if height > highestHeight { + return fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady) } + + return nil } -// searchForTransactionBlock searches for the block containing the specified transaction. +// searchForTransactionBlockInfo searches for the block containing the specified transaction. // It retrieves the block at the given height and checks if the transaction is included in that block. // Expected errors: -// - subscription.ErrBlockNotReady when unable to retrieve the block or collection ID +// - ErrTransactionNotInBlock when unable to retrieve the collection // - codes.Internal when other errors occur during block or collection lookup -func (b *backendSubscribeTransactions) searchForTransactionBlock( +func (b *backendSubscribeTransactions) searchForTransactionBlockInfo( height uint64, txInfo *TransactionSubscriptionMetadata, -) (*flow.Header, flow.Identifier, error) { +) (*flow.Header, flow.Identifier, uint64, flow.Identifier, error) { block, err := b.txLocalDataProvider.blocks.ByHeight(height) if err != nil { - return nil, flow.ZeroID, fmt.Errorf("error looking up block: %w", err) + return nil, flow.ZeroID, 0, flow.ZeroID, fmt.Errorf("error looking up block: %w", err) } - collectionID, err := b.txLocalDataProvider.LookupCollectionIDInBlock(block, txInfo.txID) + collectionID, err := b.txLocalDataProvider.LookupCollectionIDInBlock(block, txInfo.TransactionID) if err != nil { - return nil, flow.ZeroID, fmt.Errorf("error looking up transaction in block: %w", err) + return nil, flow.ZeroID, 0, flow.ZeroID, fmt.Errorf("error looking up transaction in block: %w", err) } if collectionID != flow.ZeroID { - return block.Header, block.ID(), nil + return block.Header, block.ID(), height, collectionID, nil } - return nil, flow.ZeroID, nil + return nil, flow.ZeroID, 0, flow.ZeroID, nil } -// searchForExecutionResult searches for the execution result of a block. It retrieves the execution result for the specified block ID. +// searchForTransactionResult searches for the transaction result of a block. It retrieves the execution result for the specified block ID. // Expected errors: // - codes.Internal if an internal error occurs while retrieving execution result. -func (b *backendSubscribeTransactions) searchForExecutionResult( - blockID flow.Identifier, -) (bool, error) { - _, err := b.executionResults.ByBlockID(blockID) +func (b *backendSubscribeTransactions) searchForTransactionResult( + ctx context.Context, + txInfo *TransactionSubscriptionMetadata, +) (*access.TransactionResult, error) { + _, err := b.executionResults.ByBlockID(txInfo.BlockID) if err != nil { if errors.Is(err, storage.ErrNotFound) { - return false, nil + return nil, nil + } + return nil, fmt.Errorf("failed to get execution result for block %s: %w", txInfo.BlockID, err) + } + + txResult, err := b.backendTransactions.GetTransactionResult( + ctx, + txInfo.TransactionID, + txInfo.BlockID, + txInfo.CollectionID, + txInfo.eventEncodingVersion, + ) + + if err != nil { + // if either the storage or execution node reported no results or there were not enough execution results + if status.Code(err) == codes.NotFound { + // No result yet, indicate that it has not been executed + return nil, nil } - return false, fmt.Errorf("failed to get execution result for block %s: %w", blockID, err) + // Other Error trying to retrieve the result, return with err + return nil, err } - return true, nil + return txResult, nil } diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index cf7438bf605..598105bf7bd 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -6,6 +6,10 @@ import ( "testing" "time" + "github.com/onflow/flow/protobuf/go/flow/entities" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + protocolint "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/engine/access/index" @@ -20,15 +24,14 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + accessapi "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine" access "github.com/onflow/flow-go/engine/access/mock" backendmock "github.com/onflow/flow-go/engine/access/rpc/backend/mock" connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" "github.com/onflow/flow-go/engine/access/subscription" subscriptionmock "github.com/onflow/flow-go/engine/access/subscription/mock" - "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/counters" "github.com/onflow/flow-go/module/metrics" protocol "github.com/onflow/flow-go/state/protocol/mock" storagemock "github.com/onflow/flow-go/storage/mock" @@ -135,7 +138,6 @@ func (s *TransactionStatusSuite) SetupTest() { s.reporter = syncmock.NewIndexReporter(s.T()) s.blocks.On("ByHeight", mock.AnythingOfType("uint64")).Return(mocks.StorageMapGetter(s.blockMap)) - s.state.On("Final").Return(s.finalSnapshot, nil) s.state.On("AtBlockID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) protocolint.Snapshot { s.tempSnapshot.On("Head").Unset() @@ -167,7 +169,9 @@ func (s *TransactionStatusSuite) SetupTest() { }, nil) backendParams := s.backendParams() - err := backendParams.TxResultsIndex.Initialize(s.reporter) + err := backendParams.EventsIndex.Initialize(s.reporter) + require.NoError(s.T(), err) + err = backendParams.TxResultsIndex.Initialize(s.reporter) require.NoError(s.T(), err) s.backend, err = New(backendParams) @@ -201,8 +205,10 @@ func (s *TransactionStatusSuite) backendParams() Params { subscription.DefaultResponseLimit, subscription.DefaultSendBufferSize, ), - TxResultsIndex: index.NewTransactionResultsIndex(s.transactionResults), - EventsIndex: index.NewEventsIndex(s.events), + TxResultsIndex: index.NewTransactionResultsIndex(s.transactionResults), + EventQueryMode: IndexQueryModeLocalOnly, + TxResultQueryMode: IndexQueryModeLocalOnly, + EventsIndex: index.NewEventsIndex(s.events), } } @@ -225,6 +231,20 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + s.reporter.On("LowestIndexedHeight").Return(s.rootBlock.Header.Height, nil) + s.reporter.On("HighestIndexedHeight").Return(func() (uint64, error) { + finalizedHeader := s.finalizedBlock.Header + return finalizedHeader.Height, nil + }, nil) + s.blocks.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) (*flow.Block, error) { + for _, block := range s.blockMap { + if block.ID() == blockID { + return block, nil + } + } + + return nil, nil + }, nil) s.sealedSnapshot.On("Head").Return(func() *flow.Header { return s.sealedBlock.Header }, nil) @@ -234,12 +254,35 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { // Generate sent transaction with ref block of the current finalized block transaction := unittest.TransactionFixture() transaction.SetReferenceBlockID(s.finalizedBlock.ID()) + s.transactions.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(&transaction.TransactionBody, nil) + col := flow.CollectionFromTransactions([]*flow.Transaction{&transaction}) guarantee := col.Guarantee() light := col.Light() txId := transaction.ID() + txResult := flow.LightTransactionResult{ + TransactionID: txId, + Failed: false, + ComputationUsed: 0, + } + + eventsForTx := unittest.EventsFixture(1, flow.EventAccountCreated) + eventMessages := make([]*entities.Event, 1) + for j, event := range eventsForTx { + eventMessages[j] = convert.EventToMessage(event) + } - expectedMsgIndexCounter := counters.NewMonotonousCounter(0) + s.events.On( + "ByBlockIDTransactionID", + mock.AnythingOfType("flow.Identifier"), + mock.AnythingOfType("flow.Identifier"), + ).Return(eventsForTx, nil) + + s.transactionResults.On( + "ByBlockIDTransactionID", + mock.AnythingOfType("flow.Identifier"), + mock.AnythingOfType("flow.Identifier"), + ).Return(&txResult, nil) // Create a special common function to read subscription messages from the channel and check converting it to transaction info // and check results for correctness @@ -250,21 +293,18 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { "channel closed while waiting for transaction info:\n\t- txID %x\n\t- blockID: %x \n\t- err: %v", txId, s.finalizedBlock.ID(), sub.Err()) - txInfo, ok := v.(*convert.TransactionSubscribeInfo) + txResults, ok := v.([]*accessapi.TransactionResult) require.True(s.T(), ok, "unexpected response type: %T", v) + require.Len(s.T(), txResults, 1) - assert.Equal(s.T(), txId, txInfo.ID) - assert.Equal(s.T(), expectedTxStatus, txInfo.Status) - - expectedMsgIndex := expectedMsgIndexCounter.Value() - assert.Equal(s.T(), expectedMsgIndex, txInfo.MessageIndex) - wasSet := expectedMsgIndexCounter.Set(expectedMsgIndex + 1) - require.True(s.T(), wasSet) + result := txResults[0] + assert.Equal(s.T(), txId, result.TransactionID) + assert.Equal(s.T(), expectedTxStatus, result.Status) }, time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) } // 1. Subscribe to transaction status and receive the first message with pending status - sub := s.backend.SubscribeTransactionStatuses(ctx, &transaction.TransactionBody) + sub := s.backend.SubscribeTransactionStatuses(ctx, &transaction.TransactionBody, entities.EventEncodingVersion_CCF_V0) checkNewSubscriptionMessage(sub, flow.TransactionStatusPending) // 2. Make transaction reference block sealed, and add a new finalized block that includes the transaction @@ -278,7 +318,6 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { // 3. Add one more finalized block on top of the transaction block and add execution results to storage finalizedResult := unittest.ExecutionResultFixture(unittest.WithBlock(s.finalizedBlock)) s.resultsMap[s.finalizedBlock.ID()] = finalizedResult - s.addNewFinalizedBlock(s.finalizedBlock.Header, true) checkNewSubscriptionMessage(sub, flow.TransactionStatusExecuted) @@ -315,8 +354,6 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusExpired() { transaction.SetReferenceBlockID(s.finalizedBlock.ID()) txId := transaction.ID() - expectedMsgIndexCounter := counters.NewMonotonousCounter(0) - // Create a special common function to read subscription messages from the channel and check converting it to transaction info // and check results for correctness checkNewSubscriptionMessage := func(sub subscription.Subscription, expectedTxStatus flow.TransactionStatus) { @@ -326,21 +363,18 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusExpired() { "channel closed while waiting for transaction info:\n\t- txID %x\n\t- blockID: %x \n\t- err: %v", txId, s.finalizedBlock.ID(), sub.Err()) - txInfo, ok := v.(*convert.TransactionSubscribeInfo) + txResults, ok := v.([]*accessapi.TransactionResult) require.True(s.T(), ok, "unexpected response type: %T", v) + require.Len(s.T(), txResults, 1) - assert.Equal(s.T(), txId, txInfo.ID) - assert.Equal(s.T(), expectedTxStatus, txInfo.Status) - - expectedMsgIndex := expectedMsgIndexCounter.Value() - assert.Equal(s.T(), expectedMsgIndex, txInfo.MessageIndex) - wasSet := expectedMsgIndexCounter.Set(expectedMsgIndex + 1) - require.True(s.T(), wasSet) + result := txResults[0] + assert.Equal(s.T(), txId, result.TransactionID) + assert.Equal(s.T(), expectedTxStatus, result.Status) }, time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) } // Subscribe to transaction status and receive the first message with pending status - sub := s.backend.SubscribeTransactionStatuses(ctx, &transaction.TransactionBody) + sub := s.backend.SubscribeTransactionStatuses(ctx, &transaction.TransactionBody, entities.EventEncodingVersion_CCF_V0) checkNewSubscriptionMessage(sub, flow.TransactionStatusPending) // Generate 600 blocks without transaction included and check, that transaction still pending diff --git a/engine/common/rpc/convert/transactions.go b/engine/common/rpc/convert/transactions.go index 6b92f419fdd..221f41b0936 100644 --- a/engine/common/rpc/convert/transactions.go +++ b/engine/common/rpc/convert/transactions.go @@ -1,29 +1,11 @@ package convert import ( - "github.com/onflow/flow/protobuf/go/flow/access" "github.com/onflow/flow/protobuf/go/flow/entities" "github.com/onflow/flow-go/model/flow" ) -// TransactionSubscribeInfo represents information about a subscribed transaction. -// It contains the ID of the transaction, its status, and the index of the associated message. -type TransactionSubscribeInfo struct { - ID flow.Identifier - Status flow.TransactionStatus - MessageIndex uint64 -} - -// TransactionSubscribeInfoToMessage converts a TransactionSubscribeInfo struct to a protobuf message -func TransactionSubscribeInfoToMessage(data *TransactionSubscribeInfo) *access.SendAndSubscribeTransactionStatusesResponse { - return &access.SendAndSubscribeTransactionStatusesResponse{ - Id: data.ID[:], - Status: entities.TransactionStatus(data.Status), - MessageIndex: data.MessageIndex, - } -} - // TransactionToMessage converts a flow.TransactionBody to a protobuf message func TransactionToMessage(tb flow.TransactionBody) *entities.Transaction { proposalKeyMessage := &entities.Transaction_ProposalKey{ diff --git a/go.mod b/go.mod index 8ddab239286..636f9cda7d6 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/contracts v0.15.1 github.com/onflow/flow-core-contracts/lib/go/templates v0.15.1 github.com/onflow/flow-go-sdk v0.44.0 - github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240404170900-c321c1475f1e + github.com/onflow/flow/protobuf/go/flow v0.4.1-0.20240412170550-911321113030 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index e38d2ea2a20..17f4ba300d5 100644 --- a/go.sum +++ b/go.sum @@ -1370,8 +1370,8 @@ github.com/onflow/flow-go/crypto v0.21.3/go.mod h1:vI6V4CY3R6c4JKBxdcRiR/AnjBfL8 github.com/onflow/flow-nft/lib/go/contracts v1.1.0 h1:rhUDeD27jhLwOqQKI/23008CYfnqXErrJvc4EFRP2a0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0/go.mod h1:YsvzYng4htDgRB9sa9jxdwoTuuhjK8WYWXTyLkIigZY= github.com/onflow/flow/protobuf/go/flow v0.2.2/go.mod h1:gQxYqCfkI8lpnKsmIjwtN2mV/N2PIwc1I+RUK4HPIc8= -github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240404170900-c321c1475f1e h1:r4+gVDDMOOc04Y1qjCZULAdgoaxSMsqSdE1EyviG76U= -github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240404170900-c321c1475f1e/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.1-0.20240412170550-911321113030 h1:I+aosSiJny88O4p3nPbCiUcp/UqN6AepvO6uj82bjH0= +github.com/onflow/flow/protobuf/go/flow v0.4.1-0.20240412170550-911321113030/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.13.4 h1:iNO86fm8RbBbhZ87ZulblInqCdHnAQVY8okBrNsTevc= github.com/onflow/go-ethereum v1.13.4/go.mod h1:cE/gEUkAffhwbVmMJYz+t1dAfVNHNwZCgc3BWtZxBGY= github.com/onflow/sdks v0.5.0 h1:2HCRibwqDaQ1c9oUApnkZtEAhWiNY2GTpRD5+ftdkN8= diff --git a/insecure/go.mod b/insecure/go.mod index 3195eeaa334..5d2b0cba848 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -210,7 +210,7 @@ require ( github.com/onflow/flow-ft/lib/go/contracts v0.7.1-0.20230711213910-baad011d2b13 // indirect github.com/onflow/flow-go-sdk v0.46.0 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.1.0 // indirect - github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240404170900-c321c1475f1e // indirect + github.com/onflow/flow/protobuf/go/flow v0.4.1-0.20240412170550-911321113030 // indirect github.com/onflow/go-ethereum v1.13.4 // indirect github.com/onflow/sdks v0.5.0 // indirect github.com/onflow/wal v0.0.0-20240208022732-d756cd497d3b // indirect diff --git a/insecure/go.sum b/insecure/go.sum index 29793ab10f4..7c356c47ac0 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -1333,8 +1333,8 @@ github.com/onflow/flow-go/crypto v0.21.3/go.mod h1:vI6V4CY3R6c4JKBxdcRiR/AnjBfL8 github.com/onflow/flow-nft/lib/go/contracts v1.1.0 h1:rhUDeD27jhLwOqQKI/23008CYfnqXErrJvc4EFRP2a0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0/go.mod h1:YsvzYng4htDgRB9sa9jxdwoTuuhjK8WYWXTyLkIigZY= github.com/onflow/flow/protobuf/go/flow v0.2.2/go.mod h1:gQxYqCfkI8lpnKsmIjwtN2mV/N2PIwc1I+RUK4HPIc8= -github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240404170900-c321c1475f1e h1:r4+gVDDMOOc04Y1qjCZULAdgoaxSMsqSdE1EyviG76U= -github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240404170900-c321c1475f1e/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.1-0.20240412170550-911321113030 h1:I+aosSiJny88O4p3nPbCiUcp/UqN6AepvO6uj82bjH0= +github.com/onflow/flow/protobuf/go/flow v0.4.1-0.20240412170550-911321113030/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.13.4 h1:iNO86fm8RbBbhZ87ZulblInqCdHnAQVY8okBrNsTevc= github.com/onflow/go-ethereum v1.13.4/go.mod h1:cE/gEUkAffhwbVmMJYz+t1dAfVNHNwZCgc3BWtZxBGY= github.com/onflow/sdks v0.5.0 h1:2HCRibwqDaQ1c9oUApnkZtEAhWiNY2GTpRD5+ftdkN8= diff --git a/integration/go.mod b/integration/go.mod index 215e20f7307..5e1a1997f5e 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -24,11 +24,11 @@ require ( github.com/onflow/crypto v0.25.1 github.com/onflow/flow-core-contracts/lib/go/contracts v0.15.1 github.com/onflow/flow-core-contracts/lib/go/templates v0.15.1 - github.com/onflow/flow-emulator v0.61.2-0.20240404201132-f53137a8e4cb - github.com/onflow/flow-go v0.33.2-0.20240404171354-0b0592cc5bba - github.com/onflow/flow-go-sdk v0.46.0 + github.com/onflow/flow-emulator v0.62.2-0.20240418140508-d969ff66d9cd + github.com/onflow/flow-go v0.33.2-0.20240412174857-015156b297b5 + github.com/onflow/flow-go-sdk v0.46.2 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 - github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240404170900-c321c1475f1e + github.com/onflow/flow/protobuf/go/flow v0.4.1-0.20240412170550-911321113030 github.com/onflow/go-ethereum v1.13.4 github.com/plus3it/gorecurcopy v0.0.1 github.com/prometheus/client_golang v1.18.0 diff --git a/integration/go.sum b/integration/go.sum index 3cc3f03a335..f2f704a4c86 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -1414,19 +1414,19 @@ github.com/onflow/flow-core-contracts/lib/go/contracts v0.15.1 h1:xF5wHug6H8vKfz github.com/onflow/flow-core-contracts/lib/go/contracts v0.15.1/go.mod h1:WHp24VkUQfcfZi0XjI1uRVRt5alM5SHVkwOil1U2Tpc= github.com/onflow/flow-core-contracts/lib/go/templates v0.15.1 h1:EjWjbyVEA+bMxXbM44dE6MsYeqOu5a9q/EwSWa4ma2M= github.com/onflow/flow-core-contracts/lib/go/templates v0.15.1/go.mod h1:c09d6sNyF/j5/pAynK7sNPb1XKqJqk1rxZPEqEL+dUo= -github.com/onflow/flow-emulator v0.61.2-0.20240404201132-f53137a8e4cb h1:A2R42Vvw+HdAi3DnH2U/AFK4ziOk/wNkVB1lrhEzai8= -github.com/onflow/flow-emulator v0.61.2-0.20240404201132-f53137a8e4cb/go.mod h1:DicO8yliaj+0AFldfwa5No2FfZRQja1R7/abxSHqqDE= +github.com/onflow/flow-emulator v0.62.2-0.20240418140508-d969ff66d9cd h1:bR5IxvTK4HApiJt+OP+mLNKkVkr75piaLu8wDT6uKDA= +github.com/onflow/flow-emulator v0.62.2-0.20240418140508-d969ff66d9cd/go.mod h1:ONxdb0U5kE7XK8B1ZAAo6JAzYRAtC6oh9I8WAfi9I+E= github.com/onflow/flow-ft/lib/go/contracts v0.7.1-0.20230711213910-baad011d2b13 h1:B4ll7e3j+MqTJv2122Enq3RtDNzmIGRu9xjV7fo7un0= github.com/onflow/flow-ft/lib/go/contracts v0.7.1-0.20230711213910-baad011d2b13/go.mod h1:kTMFIySzEJJeupk+7EmXs0EJ6CBWY/MV9fv9iYQk+RU= github.com/onflow/flow-go-sdk v0.24.0/go.mod h1:IoptMLPyFXWvyd9yYA6/4EmSeeozl6nJoIv4FaEMg74= -github.com/onflow/flow-go-sdk v0.46.0 h1:mrIQziCDe6Oi5HH/aPFvYluh1XUwO6lYpoXLWrBZc2s= -github.com/onflow/flow-go-sdk v0.46.0/go.mod h1:azVWF0yHI8wT1erF0vuYGqQZybl6Frbc+0Zu3rIPeHc= +github.com/onflow/flow-go-sdk v0.46.2 h1:ypVGBeH9m5XpBOTU/CYVC0y/+z42e8mhUlq5aLiD24A= +github.com/onflow/flow-go-sdk v0.46.2/go.mod h1:tfLjB9FZmwqtT5gaacjvpIhz7KCd67YPm6v+iqYAjEA= github.com/onflow/flow-go/crypto v0.21.3/go.mod h1:vI6V4CY3R6c4JKBxdcRiR/AnjBfL8OSD97bJc60cLuQ= github.com/onflow/flow-nft/lib/go/contracts v1.1.0 h1:rhUDeD27jhLwOqQKI/23008CYfnqXErrJvc4EFRP2a0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0/go.mod h1:YsvzYng4htDgRB9sa9jxdwoTuuhjK8WYWXTyLkIigZY= github.com/onflow/flow/protobuf/go/flow v0.2.2/go.mod h1:gQxYqCfkI8lpnKsmIjwtN2mV/N2PIwc1I+RUK4HPIc8= -github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240404170900-c321c1475f1e h1:r4+gVDDMOOc04Y1qjCZULAdgoaxSMsqSdE1EyviG76U= -github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240404170900-c321c1475f1e/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.1-0.20240412170550-911321113030 h1:I+aosSiJny88O4p3nPbCiUcp/UqN6AepvO6uj82bjH0= +github.com/onflow/flow/protobuf/go/flow v0.4.1-0.20240412170550-911321113030/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.13.4 h1:iNO86fm8RbBbhZ87ZulblInqCdHnAQVY8okBrNsTevc= github.com/onflow/go-ethereum v1.13.4/go.mod h1:cE/gEUkAffhwbVmMJYz+t1dAfVNHNwZCgc3BWtZxBGY= github.com/onflow/nft-storefront/lib/go/contracts v0.0.0-20221222181731-14b90207cead h1:2j1Unqs76Z1b95Gu4C3Y28hzNUHBix7wL490e61SMSw= diff --git a/integration/tests/access/cohort1/access_api_test.go b/integration/tests/access/cohort1/access_api_test.go index 1cbf5b191c4..e3ad3369c43 100644 --- a/integration/tests/access/cohort1/access_api_test.go +++ b/integration/tests/access/cohort1/access_api_test.go @@ -278,12 +278,13 @@ func (s *AccessAPISuite) TestSendAndSubscribeTransactionStatuses() { // Send and subscribe to the transaction status using the access API subClient, err := accessClient.SendAndSubscribeTransactionStatuses(s.ctx, &accessproto.SendAndSubscribeTransactionStatusesRequest{ - Transaction: transactionMsg, + Transaction: transactionMsg, + EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, }) s.Require().NoError(err) expectedCounter := uint64(0) - var finalTxStatus entities.TransactionStatus + lastReportedTxStatus := entities.TransactionStatus_UNKNOWN var txID sdk.Identifier for { @@ -297,17 +298,22 @@ func (s *AccessAPISuite) TestSendAndSubscribeTransactionStatuses() { } if txID == sdk.EmptyID { - txID = sdk.Identifier(resp.GetId()) + txID = sdk.Identifier(resp.TransactionResults.TransactionId) } s.Assert().Equal(expectedCounter, resp.GetMessageIndex()) - s.Assert().Equal(txID, sdk.Identifier(resp.GetId())) + s.Assert().Equal(txID, sdk.Identifier(resp.TransactionResults.TransactionId)) + // Check if all statuses received one by one. The subscription should send responses for each of the statuses, + // and the message should be sent in the order of transaction statuses. + // Expected order: pending(1) -> finalized(2) -> executed(3) -> sealed(4) + s.Assert().Equal(lastReportedTxStatus, resp.TransactionResults.Status-1) expectedCounter++ - finalTxStatus = resp.Status + lastReportedTxStatus = resp.TransactionResults.Status } - s.Assert().Equal(entities.TransactionStatus_SEALED, finalTxStatus) + // Check, if the final transaction status is sealed. + s.Assert().Equal(entities.TransactionStatus_SEALED, lastReportedTxStatus) } func (s *AccessAPISuite) testGetAccount(client *client.Client) {