Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Added transaction result in SendAndSubscribeTransactionStatuses response #5620

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e8c89f6
added tx result to subscription
Guitarheroua Apr 2, 2024
324d46d
handle missing results
Guitarheroua Apr 3, 2024
53199a4
Added checks to integration test
Guitarheroua Apr 3, 2024
5291a1d
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiSl…
Guitarheroua Apr 3, 2024
8b25c8a
Merge branch 'master' into AndriiSlisarchuk/5566-tx-result-in-sub-tx-…
Guitarheroua Apr 3, 2024
fba973d
Merge branch 'master' into AndriiSlisarchuk/5566-tx-result-in-sub-tx-…
Guitarheroua Apr 4, 2024
afd1ca5
Fixed insecure lint
Guitarheroua Apr 4, 2024
f319aa2
updated protobuf
Guitarheroua Apr 4, 2024
a5c29ac
Fixed issues with integration test
Guitarheroua Apr 4, 2024
a2cbd6b
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiSl…
Guitarheroua Apr 4, 2024
b27800f
Added event encoding version
Guitarheroua Apr 4, 2024
ba274c1
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiSl…
Guitarheroua Apr 5, 2024
f521f8f
Merge branch 'master' into AndriiSlisarchuk/5566-tx-result-in-sub-tx-…
Guitarheroua Apr 10, 2024
e25a0c8
Fixed remarks
Guitarheroua Apr 10, 2024
a501687
Merge branch 'master' into AndriiSlisarchuk/5566-tx-result-in-sub-tx-…
Guitarheroua Apr 10, 2024
bc8083c
Merge branch 'master' into AndriiSlisarchuk/5566-tx-result-in-sub-tx-…
Guitarheroua Apr 17, 2024
2a6c870
fixed remarks
Guitarheroua Apr 17, 2024
470b426
Changed emulator version
Guitarheroua Apr 18, 2024
995f7e5
Merge branch 'master' into AndriiSlisarchuk/5566-tx-result-in-sub-tx-…
Guitarheroua Apr 19, 2024
0fca805
removed replace
Guitarheroua Apr 19, 2024
6a06688
Merge branch 'master' into AndriiSlisarchuk/5566-tx-result-in-sub-tx-…
Guitarheroua Apr 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion access/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ type API interface {
// SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the
// transaction itself until the block containing the transaction becomes sealed or expired. When the transaction
// status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down.
SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription
SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
}

// TODO: Combine this with flow.TransactionResult?
Expand Down
23 changes: 18 additions & 5 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/counters"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
Expand Down Expand Up @@ -1112,11 +1113,23 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
return err
}

sub := h.api.SubscribeTransactionStatuses(ctx, &tx)
return subscription.HandleSubscription(sub, func(txSubInfo *convert.TransactionSubscribeInfo) error {
err = stream.Send(convert.TransactionSubscribeInfoToMessage(txSubInfo))
if err != nil {
return rpc.ConvertError(err, "could not send response", codes.Internal)
sub := h.api.SubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())

messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error {
for i := range txResults {
value := messageIndex.Value()
if ok := messageIndex.Set(value + 1); !ok {
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
return status.Errorf(codes.Internal, "the message index has already been incremented to %d", messageIndex.Value())
}

err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{
TransactionResults: TransactionResultToMessage(txResults[i]),
MessageIndex: value,
})
if err != nil {
return rpc.ConvertError(err, "could not send response", codes.Internal)
}
}

return nil
Expand Down
10 changes: 5 additions & 5 deletions access/mock/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 39 additions & 35 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,6 @@ func New(params Params) (*Backend, error) {
// initialize node version info
nodeInfo := getNodeVersionInfo(params.State.Params())

transactionsLocalDataProvider := &TransactionsLocalDataProvider{
state: params.State,
collections: params.Collections,
blocks: params.Blocks,
eventsIndex: params.EventsIndex,
txResultsIndex: params.TxResultsIndex,
systemTxID: systemTxID,
}

b := &Backend{
state: params.State,
BlockTracker: params.BlockTracker,
Expand All @@ -187,25 +178,6 @@ func New(params Params) (*Backend, error) {
scriptExecutor: params.ScriptExecutor,
scriptExecMode: params.ScriptExecutionMode,
},
backendTransactions: backendTransactions{
TransactionsLocalDataProvider: transactionsLocalDataProvider,
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
chainID: params.ChainID,
transactions: params.Transactions,
executionReceipts: params.ExecutionReceipts,
transactionValidator: configureTransactionValidator(params.State, params.ChainID),
transactionMetrics: params.AccessMetrics,
retry: retry,
connFactory: params.ConnFactory,
previousAccessNodes: params.HistoricalAccessNodes,
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
txResultQueryMode: params.TxResultQueryMode,
systemTx: systemTx,
systemTxID: systemTxID,
},
backendEvents: backendEvents{
log: params.Log,
chain: params.ChainID.Chain(),
Expand Down Expand Up @@ -253,22 +225,54 @@ func New(params Params) (*Backend, error) {
subscriptionHandler: params.SubscriptionHandler,
blockTracker: params.BlockTracker,
},
backendSubscribeTransactions: backendSubscribeTransactions{
txLocalDataProvider: transactionsLocalDataProvider,
log: params.Log,
executionResults: params.ExecutionResults,
subscriptionHandler: params.SubscriptionHandler,
blockTracker: params.BlockTracker,
},

collections: params.Collections,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
chainID: params.ChainID,
nodeInfo: nodeInfo,
}

transactionsLocalDataProvider := &TransactionsLocalDataProvider{
state: params.State,
collections: params.Collections,
blocks: params.Blocks,
eventsIndex: params.EventsIndex,
txResultsIndex: params.TxResultsIndex,
systemTxID: systemTxID,
}

b.backendTransactions = backendTransactions{
TransactionsLocalDataProvider: transactionsLocalDataProvider,
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
chainID: params.ChainID,
transactions: params.Transactions,
executionReceipts: params.ExecutionReceipts,
transactionValidator: configureTransactionValidator(params.State, params.ChainID),
transactionMetrics: params.AccessMetrics,
retry: retry,
connFactory: params.ConnFactory,
previousAccessNodes: params.HistoricalAccessNodes,
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
txResultQueryMode: params.TxResultQueryMode,
systemTx: systemTx,
systemTxID: systemTxID,
}

b.backendTransactions.txErrorMessages = b
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved

b.backendSubscribeTransactions = backendSubscribeTransactions{
txLocalDataProvider: transactionsLocalDataProvider,
backendTransactions: &b.backendTransactions,
log: params.Log,
executionResults: params.ExecutionResults,
subscriptionHandler: params.SubscriptionHandler,
blockTracker: params.BlockTracker,
}

retry.SetBackend(b)

preferredENIdentifiers, err = identifierList(params.PreferredExecutionNodeIDs)
Expand Down
Loading
Loading