Skip to content

Commit

Permalink
Merge pull request #6818 from The-K-R-O-K/AndriiDiachuk/6586-tx-statu…
Browse files Browse the repository at this point in the history
…ses-data-providers-impl

[Access] Add implementation for transaction statuses data providers
  • Loading branch information
peterargue authored Jan 2, 2025
2 parents d71546c + 66a79b5 commit 72adf9e
Show file tree
Hide file tree
Showing 18 changed files with 1,295 additions and 74 deletions.
48 changes: 44 additions & 4 deletions access/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,50 @@ type API interface {
//
// If invalid parameters will be supplied SubscribeBlockDigestsFromLatest will return a failed subscription.
SubscribeBlockDigestsFromLatest(ctx context.Context, blockStatus flow.BlockStatus) subscription.Subscription
// SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the
// transaction itself until the block containing the transaction becomes sealed or expired. When the transaction
// status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down.
SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block ID. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
//
// Parameters:
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
// - txID: The identifier of the transaction to monitor.
// - startBlockID: The block ID from which to start monitoring.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block height. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
//
// Parameters:
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
// - txID: The unique identifier of the transaction to monitor.
// - startHeight: The block height from which to start monitoring.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the latest block. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
//
// Parameters:
// - ctx: The context to manage the subscription's lifecycle, including cancellation.
// - txID: The unique identifier of the transaction to monitor.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromLatest(ctx context.Context, txID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
// SendAndSubscribeTransactionStatuses sends a transaction to the network and subscribes to its status updates.
// Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription
// automatically terminates.
//
// Parameters:
// - ctx: The context to manage the transaction sending and subscription lifecycle, including cancellation.
// - tx: The transaction body to be sent and monitored.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
//
// If the transaction cannot be sent, the subscription will fail and return a failed subscription.
SendAndSubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
}

// TODO: Combine this with flow.TransactionResult?
Expand Down
7 changes: 1 addition & 6 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,12 +1425,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
return status.Error(codes.InvalidArgument, err.Error())
}

err = h.api.SendTransaction(ctx, &tx)
if err != nil {
return err
}

sub := h.api.SubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())
sub := h.api.SendAndSubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())

messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleRPCSubscription(sub, func(txResults []*TransactionResult) error {
Expand Down
72 changes: 66 additions & 6 deletions access/mock/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion cmd/util/cmd/run-script/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,32 @@ func (*api) SubscribeBlockDigestsFromLatest(
return nil
}

func (*api) SubscribeTransactionStatuses(
func (a *api) SubscribeTransactionStatusesFromStartBlockID(
_ context.Context,
_ flow.Identifier,
_ flow.Identifier,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return nil
}

func (a *api) SubscribeTransactionStatusesFromStartHeight(
_ context.Context,
_ flow.Identifier,
_ uint64,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return nil
}

func (a *api) SubscribeTransactionStatusesFromLatest(
_ context.Context,
_ flow.Identifier,
_ entities.EventEncodingVersion,
) subscription.Subscription {
return nil
}
func (a *api) SendAndSubscribeTransactionStatuses(
_ context.Context,
_ *flow.TransactionBody,
_ entities.EventEncodingVersion,
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func NewServer(serverAPI access.API,
serverAPI,
chain,
stateStreamConfig.EventFilterConfig,
stateStreamConfig.HeartbeatInterval)
stateStreamConfig.HeartbeatInterval,
)
builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory)

c := cors.New(cors.Options{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
s.Require().NotNil(provider)
s.Require().NoError(err)

// Ensure the provider is properly closed after the test
defer provider.Close()

// Run the provider in a separate goroutine to simulate subscription processing
go func() {
err = provider.Run()
Expand Down Expand Up @@ -263,7 +266,4 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
currentIndex := responses[i].MessageIndex
s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1")
}

// Ensure the provider is properly closed after the test
provider.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (s *EventsProviderSuite) TestEventsDataProvider_HappyPath() {
Events: expectedEvents,
BlockTimestamp: s.rootBlock.Header.Timestamp,
})

}

testHappyPath(
Expand Down Expand Up @@ -252,9 +251,13 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath()
s.chain,
state_stream.DefaultEventFilterConfig,
subscription.DefaultHeartbeatInterval)

s.Require().NotNil(provider)
s.Require().NoError(err)

// Ensure the provider is properly closed after the test
defer provider.Close()

// Run the provider in a separate goroutine to simulate subscription processing
go func() {
err = provider.Run()
Expand Down Expand Up @@ -290,7 +293,4 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath()
currentIndex := responses[i].MessageIndex
s.Require().Equal(prevIndex+1, currentIndex, "Expected MessageIndex to increment by 1")
}

// Ensure the provider is properly closed after the test
provider.Close()
}
18 changes: 10 additions & 8 deletions engine/access/rest/websockets/data_providers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
// Constants defining various topic names used to specify different types of
// data providers.
const (
EventsTopic = "events"
AccountStatusesTopic = "account_statuses"
BlocksTopic = "blocks"
BlockHeadersTopic = "block_headers"
BlockDigestsTopic = "block_digests"
TransactionStatusesTopic = "transaction_statuses"
EventsTopic = "events"
AccountStatusesTopic = "account_statuses"
BlocksTopic = "blocks"
BlockHeadersTopic = "block_headers"
BlockDigestsTopic = "block_digests"
TransactionStatusesTopic = "transaction_statuses"
SendAndGetTransactionStatusesTopic = "send_and_get_transaction_statuses"
)

// DataProviderFactory defines an interface for creating data providers
Expand Down Expand Up @@ -103,8 +104,9 @@ func (s *DataProviderFactoryImpl) NewDataProvider(
case AccountStatusesTopic:
return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval)
case TransactionStatusesTopic:
// TODO: Implemented handlers for each topic should be added in respective case
return nil, fmt.Errorf(`topic "%s" not implemented yet`, topic)
return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch)
case SendAndGetTransactionStatusesTopic:
return NewSendAndGetTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch)
default:
return nil, fmt.Errorf("unsupported topic \"%s\"", topic)
}
Expand Down
22 changes: 22 additions & 0 deletions engine/access/rest/websockets/data_providers/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,28 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() {
s.stateStreamApi.AssertExpectations(s.T())
},
},
{
name: "transaction statuses topic",
topic: TransactionStatusesTopic,
arguments: models.Arguments{},
setupSubscription: func() {
s.setupSubscription(s.accessApi.On("SubscribeTransactionStatusesFromLatest", mock.Anything, mock.Anything, mock.Anything))
},
assertExpectations: func() {
s.stateStreamApi.AssertExpectations(s.T())
},
},
{
name: "send transaction statuses topic",
topic: SendAndGetTransactionStatusesTopic,
arguments: models.Arguments{},
setupSubscription: func() {
s.setupSubscription(s.accessApi.On("SendAndSubscribeTransactionStatuses", mock.Anything, mock.Anything, mock.Anything))
},
assertExpectations: func() {
s.stateStreamApi.AssertExpectations(s.T())
},
},
}

for _, test := range testCases {
Expand Down
Loading

0 comments on commit 72adf9e

Please sign in to comment.