Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OTE-823] Fix FNS onchain events staging + retrieval logic #2318

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,7 @@ func getFullNodeStreamingManagerFromOptions(
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
streamingManagerTransientStoreKey,
cdc,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Remove unused cdc parameter from NewFullNodeStreamingManager.

The cdc parameter is currently not utilized within the NewFullNodeStreamingManager function. If it's not required, please remove it to simplify the function signature and maintain code clarity.

🔗 Analysis chain

LGTM. Verify usage of new parameter in NewFullNodeStreamingManager.

The addition of the cdc parameter to NewFullNodeStreamingManager looks good. This change likely introduces codec functionality for serialization/deserialization in the streaming manager.

Please ensure that the NewFullNodeStreamingManager function properly utilizes the new cdc parameter. Run the following script to verify:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the NewFullNodeStreamingManager function signature and usage of cdc parameter

# Check the function signature
echo "Checking NewFullNodeStreamingManager function signature:"
rg --type go "func NewFullNodeStreamingManager\(.*codec\.Codec.*\)" ./protocol

# Check for usage of cdc within the function
echo "Checking usage of cdc parameter within NewFullNodeStreamingManager:"
rg --type go -A 10 "func NewFullNodeStreamingManager\(" ./protocol | rg "cdc"

Length of output: 502


Script:

#!/bin/bash
# Description: Verify all usages of the cdc parameter within NewFullNodeStreamingManager and the codebase

# Check for any usage of cdc within the NewFullNodeStreamingManager function
echo "Searching for cdc usage within NewFullNodeStreamingManager:"
rg --type go "func NewFullNodeStreamingManager\(" ./protocol | rg -A 20 "cdc"

# Search for any external usage of the cdc parameter in the codebase
echo "Searching for external usages of cdc parameter:"
rg --type go "\bcdc\b" ./protocol

Length of output: 91461

)

// Start websocket server.
Expand Down
4 changes: 0 additions & 4 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,6 @@ func (f *Flags) Validate() error {

// Grpc streaming
if f.GrpcStreamingEnabled {
if f.OptimisticExecutionEnabled {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously added to prevent both flags from being accidentally set true

// TODO(OTE-456): Finish gRPC streaming x OE integration.
return fmt.Errorf("grpc streaming cannot be enabled together with optimistic execution")
}
if !f.GrpcEnable {
return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server")
}
Expand Down
15 changes: 9 additions & 6 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,17 @@ func TestValidate(t *testing.T) {
OptimisticExecutionEnabled: true,
},
},
"failure - optimistic execution cannot be enabled with gRPC streaming": {
"success - optimistic execution canbe enabled with gRPC streaming": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

flags: flags.Flags{
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
OptimisticExecutionEnabled: true,
NonValidatingFullNode: false,
GrpcEnable: true,
GrpcStreamingEnabled: true,
OptimisticExecutionEnabled: true,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxChannelBufferSize: 2000,
WebsocketStreamingPort: 8989,
},
expectedErr: fmt.Errorf("grpc streaming cannot be enabled together with optimistic execution"),
},
"failure - gRPC disabled": {
flags: flags.Flags{
Expand Down
23 changes: 18 additions & 5 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
Expand All @@ -27,6 +28,7 @@ var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
type FullNodeStreamingManagerImpl struct {
sync.Mutex

cdc codec.BinaryCodec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Update all calls to NewFullNodeStreamingManager to include the cdc parameter.

  • File: protocol/app/app.go
    • Line: (exact line number not provided)

Ensure that the cdc parameter is passed when instantiating NewFullNodeStreamingManager in all instances.

🔗 Analysis chain

Update all usages to accommodate the new 'cdc' parameter.

The cdc field of type codec.BinaryCodec has been added to the FullNodeStreamingManagerImpl struct (line 31). The constructor NewFullNodeStreamingManager now requires a cdc parameter (line 100), which is assigned during initialization (line 119). Ensure that:

  • All calls to NewFullNodeStreamingManager include the new cdc parameter.
  • Any instances where FullNodeStreamingManagerImpl is directly instantiated are updated to initialize the cdc field appropriately.

Run the following script to verify that all calls to NewFullNodeStreamingManager have been updated with the cdc parameter:

Also applies to: 100-100, 119-119

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify all calls to 'NewFullNodeStreamingManager' include the 'cdc' parameter.

# Search for calls to 'NewFullNodeStreamingManager' lacking the 'cdc' parameter
rg --type go 'NewFullNodeStreamingManager\(' -A 5 | grep -v 'cdc'

# Alternatively, list all calls to inspect them manually
rg --type go -A 5 'NewFullNodeStreamingManager\('

Length of output: 1246

logger log.Logger

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
Expand Down Expand Up @@ -95,6 +97,7 @@ func NewFullNodeStreamingManager(
maxSubscriptionChannelSize uint32,
snapshotBlockInterval uint32,
streamingManagerTransientStoreKey storetypes.StoreKey,
cdc codec.BinaryCodec,
) *FullNodeStreamingManagerImpl {
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
Expand All @@ -113,6 +116,7 @@ func NewFullNodeStreamingManager(
snapshotBlockInterval: snapshotBlockInterval,

streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
cdc: cdc,
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -391,14 +395,15 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
) {
lib.AssertDeliverTxMode(ctx)
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
}
sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
sm.cdc.MustMarshal(&stagedEvent),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace 'MustMarshal' with error-handling 'Marshal' to prevent panics.

In StageFinalizeBlockSubaccountUpdate (line 406) and StageFinalizeBlockFill (line 428), sm.cdc.MustMarshal(&stagedEvent) is used. MustMarshal will panic if an error occurs during marshaling. It's safer to use Marshal with proper error handling to improve robustness.

Apply this change to handle marshaling errors gracefully:

- sm.cdc.MustMarshal(&stagedEvent)
+ marshaledEvent, err := sm.cdc.Marshal(&stagedEvent)
+ if err != nil {
+     sm.logger.Error("Failed to marshal staged event", "error", err)
+     return // Handle the error appropriately
+ }
+ sm.stageFinalizeBlockEvent(ctx, marshaledEvent)

Also applies to: 428-428

)
}

Expand All @@ -411,25 +416,30 @@ func (sm *FullNodeStreamingManagerImpl) StageFinalizeBlockFill(
ctx sdk.Context,
fill clobtypes.StreamOrderbookFill,
) {
lib.AssertDeliverTxMode(ctx)
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_OrderFill{
OrderFill: &fill,
},
}

sm.stageFinalizeBlockEvent(
ctx,
clobtypes.Amino.MustMarshal(stagedEvent),
sm.cdc.MustMarshal(&stagedEvent),
)
}

func getStagedFinalizeBlockEvents(store storetypes.KVStore) []clobtypes.StagedFinalizeBlockEvent {
func getStagedFinalizeBlockEventsFromStore(
store storetypes.KVStore,
cdc codec.BinaryCodec,
) []clobtypes.StagedFinalizeBlockEvent {
count := getStagedEventsCount(store)
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
for i := uint32(0); i < count; i++ {
var event clobtypes.StagedFinalizeBlockEvent
bytes := store.Get(lib.Uint32ToKey(i))
clobtypes.Amino.MustUnmarshal(bytes, &event)
cdc.MustUnmarshal(bytes, &event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle errors from 'Unmarshal' instead of using 'MustUnmarshal'.

At line 442, cdc.MustUnmarshal(bytes, &event) is called. MustUnmarshal panics on error, which can cause the entire application to crash. It's better to use Unmarshal with error checking to make the code more resilient.

Modify the code to handle unmarshaling errors:

- cdc.MustUnmarshal(bytes, &event)
+ if err := cdc.Unmarshal(bytes, &event); err != nil {
+     sm.logger.Error("Failed to unmarshal staged event", "error", err)
+     continue // or handle the error as appropriate
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cdc.MustUnmarshal(bytes, &event)
if err := cdc.Unmarshal(bytes, &event); err != nil {
sm.logger.Error("Failed to unmarshal staged event", "error", err)
continue // or handle the error as appropriate
}

events[i] = event
}
return events
Expand All @@ -441,7 +451,7 @@ func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
) []clobtypes.StagedFinalizeBlockEvent {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
return getStagedFinalizeBlockEvents(store)
return getStagedFinalizeBlockEventsFromStore(store, sm.cdc)
}

func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
Expand Down Expand Up @@ -889,6 +899,9 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
// Prevent gas metering from state read.
ctx = ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())

finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates(
Expand Down
10 changes: 10 additions & 0 deletions protocol/x/clob/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ func (am AppModule) PreBlock(ctx context.Context) (appmodule.ResponsePreBlock, e
}, nil
}

// BeginBlock executes all ABCI BeginBlock logic respective to the clob module.
func (am AppModule) Precommit(ctx context.Context) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callback was not previously implemented, so ClobModule.Precommit was not being run

defer telemetry.ModuleMeasureSince(am.Name(), time.Now(), telemetry.MetricKeyPrecommiter)
Precommit(
lib.UnwrapSDKContext(ctx, types.ModuleName),
*am.keeper,
)
return nil
}

// BeginBlock executes all ABCI BeginBlock logic respective to the clob module.
func (am AppModule) BeginBlock(ctx context.Context) error {
defer telemetry.ModuleMeasureSince(am.Name(), time.Now(), telemetry.MetricKeyBeginBlocker)
Expand Down
Loading