Skip to content

Commit

Permalink
Merge pull request #5620 from The-K-R-O-K/AndriiSlisarchuk/5566-tx-re…
Browse files Browse the repository at this point in the history
…sult-in-sub-tx-statuses

[Access] Added transaction result in SendAndSubscribeTransactionStatuses response
  • Loading branch information
Guitarheroua authored Apr 20, 2024
2 parents 41926ad + 6a06688 commit 0983dbb
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 187 deletions.
2 changes: 1 addition & 1 deletion access/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ type API interface {
// SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the
// transaction itself until the block containing the transaction becomes sealed or expired. When the transaction
// status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down.
SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription
SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
}

// TODO: Combine this with flow.TransactionResult?
Expand Down
23 changes: 18 additions & 5 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/counters"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
Expand Down Expand Up @@ -1112,11 +1113,23 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
return err
}

sub := h.api.SubscribeTransactionStatuses(ctx, &tx)
return subscription.HandleSubscription(sub, func(txSubInfo *convert.TransactionSubscribeInfo) error {
err = stream.Send(convert.TransactionSubscribeInfoToMessage(txSubInfo))
if err != nil {
return rpc.ConvertError(err, "could not send response", codes.Internal)
sub := h.api.SubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())

messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error {
for i := range txResults {
value := messageIndex.Value()
if ok := messageIndex.Set(value + 1); !ok {
return status.Errorf(codes.Internal, "the message index has already been incremented to %d", messageIndex.Value())
}

err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{
TransactionResults: TransactionResultToMessage(txResults[i]),
MessageIndex: value,
})
if err != nil {
return rpc.ConvertError(err, "could not send response", codes.Internal)
}
}

return nil
Expand Down
10 changes: 5 additions & 5 deletions access/mock/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 40 additions & 35 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,6 @@ func New(params Params) (*Backend, error) {
// initialize node version info
nodeInfo := getNodeVersionInfo(params.State.Params())

transactionsLocalDataProvider := &TransactionsLocalDataProvider{
state: params.State,
collections: params.Collections,
blocks: params.Blocks,
eventsIndex: params.EventsIndex,
txResultsIndex: params.TxResultsIndex,
systemTxID: systemTxID,
}

b := &Backend{
state: params.State,
BlockTracker: params.BlockTracker,
Expand All @@ -187,25 +178,6 @@ func New(params Params) (*Backend, error) {
scriptExecutor: params.ScriptExecutor,
scriptExecMode: params.ScriptExecutionMode,
},
backendTransactions: backendTransactions{
TransactionsLocalDataProvider: transactionsLocalDataProvider,
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
chainID: params.ChainID,
transactions: params.Transactions,
executionReceipts: params.ExecutionReceipts,
transactionValidator: configureTransactionValidator(params.State, params.ChainID),
transactionMetrics: params.AccessMetrics,
retry: retry,
connFactory: params.ConnFactory,
previousAccessNodes: params.HistoricalAccessNodes,
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
txResultQueryMode: params.TxResultQueryMode,
systemTx: systemTx,
systemTxID: systemTxID,
},
backendEvents: backendEvents{
log: params.Log,
chain: params.ChainID.Chain(),
Expand Down Expand Up @@ -253,22 +225,55 @@ func New(params Params) (*Backend, error) {
subscriptionHandler: params.SubscriptionHandler,
blockTracker: params.BlockTracker,
},
backendSubscribeTransactions: backendSubscribeTransactions{
txLocalDataProvider: transactionsLocalDataProvider,
log: params.Log,
executionResults: params.ExecutionResults,
subscriptionHandler: params.SubscriptionHandler,
blockTracker: params.BlockTracker,
},

collections: params.Collections,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
chainID: params.ChainID,
nodeInfo: nodeInfo,
}

transactionsLocalDataProvider := &TransactionsLocalDataProvider{
state: params.State,
collections: params.Collections,
blocks: params.Blocks,
eventsIndex: params.EventsIndex,
txResultsIndex: params.TxResultsIndex,
systemTxID: systemTxID,
}

b.backendTransactions = backendTransactions{
TransactionsLocalDataProvider: transactionsLocalDataProvider,
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
chainID: params.ChainID,
transactions: params.Transactions,
executionReceipts: params.ExecutionReceipts,
transactionValidator: configureTransactionValidator(params.State, params.ChainID),
transactionMetrics: params.AccessMetrics,
retry: retry,
connFactory: params.ConnFactory,
previousAccessNodes: params.HistoricalAccessNodes,
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
txResultQueryMode: params.TxResultQueryMode,
systemTx: systemTx,
systemTxID: systemTxID,
}

// TODO: The TransactionErrorMessage interface should be reorganized in future, as it is implemented in backendTransactions but used in TransactionsLocalDataProvider, and its initialization is somewhat quirky.
b.backendTransactions.txErrorMessages = b

b.backendSubscribeTransactions = backendSubscribeTransactions{
txLocalDataProvider: transactionsLocalDataProvider,
backendTransactions: &b.backendTransactions,
log: params.Log,
executionResults: params.ExecutionResults,
subscriptionHandler: params.SubscriptionHandler,
blockTracker: params.BlockTracker,
}

retry.SetBackend(b)

preferredENIdentifiers, err = identifierList(params.PreferredExecutionNodeIDs)
Expand Down
Loading

0 comments on commit 0983dbb

Please sign in to comment.