Skip to content

Commit

Permalink
[OTE-823] Fix FNS onchain events staging + retrieval logic (backport #…
Browse files Browse the repository at this point in the history
…2318) (#2363)

Co-authored-by: Teddy Ding <teddy@dydx.exchange>
  • Loading branch information
mergify[bot] and teddyding authored Sep 26, 2024
1 parent c4237d1 commit b8f5474
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 15 deletions.
1 change: 1 addition & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2098,6 +2098,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
streamingManagerTransientStoreKey,
cdc,
)

// Start websocket server.
Expand Down
4 changes: 0 additions & 4 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,6 @@ func (f *Flags) Validate() error {

// Grpc streaming
if f.GrpcStreamingEnabled {
if f.OptimisticExecutionEnabled {
// TODO(OTE-456): Finish gRPC streaming x OE integration.
return fmt.Errorf("grpc streaming cannot be enabled together with optimistic execution")
}
if !f.GrpcEnable {
return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server")
}
Expand Down
15 changes: 9 additions & 6 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,17 @@ func TestValidate(t *testing.T) {
OptimisticExecutionEnabled: true,
},
},
"failure - optimistic execution cannot be enabled with gRPC streaming": {
"success - optimistic execution canbe enabled with gRPC streaming": {
flags: flags.Flags{
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
OptimisticExecutionEnabled: true,
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
OptimisticExecutionEnabled: true,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxChannelBufferSize: 2000,
WebsocketStreamingPort: 8989,
},
expectedErr: fmt.Errorf("grpc streaming cannot be enabled together with optimistic execution"),
},
"failure - gRPC disabled": {
flags: flags.Flags{
Expand Down
23 changes: 18 additions & 5 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
Expand All @@ -27,6 +28,7 @@ var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
type FullNodeStreamingManagerImpl struct {
sync.Mutex

cdc codec.BinaryCodec
logger log.Logger

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
Expand Down Expand Up @@ -95,6 +97,7 @@ func NewFullNodeStreamingManager(
maxSubscriptionChannelSize uint32,
snapshotBlockInterval uint32,
streamingManagerTransientStoreKey storetypes.StoreKey,
cdc codec.BinaryCodec,
) *FullNodeStreamingManagerImpl {
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
Expand All @@ -113,6 +116,7 @@ func NewFullNodeStreamingManager(
snapshotBlockInterval: snapshotBlockInterval,

streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
cdc: cdc,
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -391,14 +395,15 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
lib.AssertDeliverTxMode(ctx)
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
}
sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
sm.cdc.MustMarshal(&stagedEvent),
)
}

Expand All @@ -411,25 +416,30 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
lib.AssertDeliverTxMode(ctx)
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &fill,
},
}

sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
sm.cdc.MustMarshal(&stagedEvent),
)
}

func getStagedFinalizeBlockEvents(store storetypes.KVStore) []clobtypes.StagedFinalizeBlockEvent {
func getStagedFinalizeBlockEventsFromStore(
store storetypes.KVStore,
cdc codec.BinaryCodec,
) []clobtypes.StagedFinalizeBlockEvent {
count := getStagedEventsCount(store)
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
for i := uint32(0); i < count; i++ {
var event clobtypes.StagedFinalizeBlockEvent
bytes := store.Get(lib.Uint32ToKey(i))
clobtypes.Amino.MustUnmarshal(bytes, &event)
cdc.MustUnmarshal(bytes, &event)
events[i] = event
}
return events
Expand All @@ -441,7 +451,7 @@ func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
) []clobtypes.StagedFinalizeBlockEvent {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
return getStagedFinalizeBlockEvents(store)
return getStagedFinalizeBlockEventsFromStore(store, sm.cdc)
}

func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
Expand Down Expand Up @@ -889,6 +899,9 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
// Prevent gas metering from state read.
ctx = ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())

finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates(
Expand Down
10 changes: 10 additions & 0 deletions protocol/x/clob/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ func (am AppModule) PreBlock(ctx context.Context) (appmodule.ResponsePreBlock, e
}, nil
}

// BeginBlock executes all ABCI BeginBlock logic respective to the clob module.
func (am AppModule) Precommit(ctx context.Context) error {
defer telemetry.ModuleMeasureSince(am.Name(), time.Now(), telemetry.MetricKeyPrecommiter)
Precommit(
lib.UnwrapSDKContext(ctx, types.ModuleName),
*am.keeper,
)
return nil
}

// BeginBlock executes all ABCI BeginBlock logic respective to the clob module.
func (am AppModule) BeginBlock(ctx context.Context) error {
defer telemetry.ModuleMeasureSince(am.Name(), time.Now(), telemetry.MetricKeyBeginBlocker)
Expand Down

0 comments on commit b8f5474

Please sign in to comment.