Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full Node Streaming Order Filtering by Subaccount impl and tests #2704

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

UnbornAztecKing
Copy link

@UnbornAztecKing UnbornAztecKing commented Jan 25, 2025

Changelist

Full Node Streaming provides an initial state and streaming updates for positions, orders, prices and fills.

The subscription API admits an optional sub account ID filter, which is only applied to the initial positions and position changes.

The user would like to apply the sub account ID filter to the order messages, in addition to position messages.

The change will add boolean flags to the Websocket and GRPC streaming API's:

  • filterOrdersBySubaccountId boolean field for WS request (if not provided, default to False)
  • filter_orders_by_subaccount_id boolean field for StreamOrderbookUpdatesRequest protobuf (if not provided, default to False)

For all endpoints, the previous behavior of not filtering orders for subaccounts is preserved by default.

If filtering orders is not specified, the code path remains the same for looping over stream updates.
If filtering orders is specified, each slice of updates received from the subscription updatesChannel will be filtered like:

  • if the message is not a StreamUpdate_OrderbookUpdate, forward it
  • if the message is a StreamUpdate_OrderbookUpdate, forward iff one of the OffChainUpdateV1 messages inside are for a target subaccount
  • if an error occurs while checking ids, drop it

Test Plan

Unit test

load-tester / Python grpc-stream-client integration test

Author/Reviewer Checklist

  • If this PR has changes that result in a different app state given the same prior state and transaction list, manually add the state-breaking label.
  • If the PR has breaking postgres changes to the indexer add the indexer-postgres-breaking label.
  • If this PR isn't state-breaking but has changes that modify behavior in PrepareProposal or ProcessProposal, manually add the label proposal-breaking.
  • If this PR is one of many that implement a specific feature, manually label them all feature:[feature-name].
  • If you wish to for mergify-bot to automatically create a PR to backport your change to a release branch, manually add the label backport/[branch-name].
  • Manually add any of the following labels: refactor, chore, bug.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added ability to filter orderbook updates by specific subaccount IDs
    • Enhanced streaming functionality to support more granular order update control
  • Improvements

    • Introduced new query parameter filterOrdersBySubaccountId for WebSocket and gRPC streaming
    • Improved streaming manager to handle subaccount-specific order filtering
  • Technical Updates

    • Updated protocol buffers and interfaces to support new filtering mechanism
    • Added utility functions for extracting and processing subaccount information

Copy link
Contributor

coderabbitai bot commented Jan 25, 2025

Walkthrough

This pull request introduces a new boolean parameter filterOrdersBySubaccountId across multiple components of the dYdX protocol's streaming and orderbook systems. The change enables more granular filtering of order updates by allowing clients to receive only updates related to specific subaccount IDs. The modification spans several files, including protobuf definitions, TypeScript generated code, Go streaming managers, and WebSocket server implementations, ensuring consistent support for this new filtering capability.

Changes

File Change Summary
proto/dydxprotocol/clob/query.proto Added filter_orders_by_subaccount_id field to StreamOrderbookUpdatesRequest
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts Added filterOrdersBySubaccountId property to TypeScript interfaces
protocol/streaming/... Updated multiple streaming-related files to support new filtering parameter

Sequence Diagram

sequenceDiagram
    participant Client
    participant WebSocketServer
    participant StreamingManager
    participant OrderbookService

    Client->>WebSocketServer: Connect with filterOrdersBySubaccountId
    WebSocketServer->>StreamingManager: Subscribe(subaccountIds, filterFlag)
    StreamingManager->>OrderbookService: Request Updates
    OrderbookService-->>StreamingManager: Filter Updates by Subaccount
    StreamingManager-->>WebSocketServer: Send Filtered Updates
    WebSocketServer-->>Client: Stream Filtered Updates
Loading

Possibly related PRs

Suggested reviewers

  • roy-dydx
  • teddyding

Poem

🐰 Orderbook streams, now refined and neat,
Subaccount filters make updates a treat!
With boolean magic, we filter with glee,
Precision unleashed, data flowing free!
A rabbit's delight in streaming's new art! 🎉

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@UnbornAztecKing
Copy link
Author

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Jan 25, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (4)
protocol/streaming/util/util_test.go (1)

21-123: Add test cases for error scenarios and edge cases.

The test suite thoroughly covers the happy path for different order types but lacks test cases for:

  • Invalid order IDs
  • Malformed orders
  • Empty/nil orders
  • Other error conditions that could occur in production

Add test cases for error scenarios:

 tests := map[string]struct {
     update ocutypes.OffChainUpdateV1
     id     satypes.SubaccountId
     err    error
 }{
+    "InvalidOrder": {
+        update: ocutypes.OffChainUpdateV1{
+            UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{
+                OrderPlace: &ocutypes.OrderPlaceV1{
+                    Order:           nil,
+                    PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED,
+                    TimeStamp:       _ToPtr(orderPlaceTime),
+                },
+            },
+        },
+        id:  satypes.SubaccountId{},
+        err: errors.New("order is nil"),
+    },
+    "MalformedOrderId": {
+        update: ocutypes.OffChainUpdateV1{
+            UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{
+                OrderPlace: &ocutypes.OrderPlaceV1{
+                    Order: &v1types.IndexerOrder{
+                        OrderId: v1types.IndexerOrderId{},
+                    },
+                    PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED,
+                    TimeStamp:       _ToPtr(orderPlaceTime),
+                },
+            },
+        },
+        id:  satypes.SubaccountId{},
+        err: errors.New("invalid order id"),
+    },
protocol/streaming/ws/websocket_server.go (1)

100-108: Consider adding logging for subscription initialization.

The error handling for subscription initialization could be improved by adding debug logs to help with troubleshooting.

Add logging:

 if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
     ws.logger.Error("Error sending close message", "err", err)
+    ws.logger.Debug(
+        "Failed to initialize subscription",
+        "filterOrdersBySubaccountId", filterOrdersBySubaccountId,
+        "subaccountIds", subaccountIds,
+    )
 }
protocol/streaming/full_node_streaming_manager_test.go (1)

300-456: Improve test case descriptions and organization.

While the test cases are comprehensive, their names and organization could be improved for better readability and maintainability.

Consider reorganizing the test cases into logical groups with more descriptive names:

 tests := map[string]TestCase{
-    "snapshotNotInScope": {
+    "Snapshots/NotInScope": {
         updates:         []clobtypes.StreamUpdate{snapshotBaseStreamUpdate},
         subaccountIds:   []satypes.SubaccountId{neverInScopeSubaccountId},
         filteredUpdates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate},
     },
-    "snapshotNoScope": {
+    "Snapshots/NoScope": {
         updates:         []clobtypes.StreamUpdate{snapshotBaseStreamUpdate},
         subaccountIds:   []satypes.SubaccountId{},
         filteredUpdates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate},
     },
     // ... more test cases

Also, consider adding test cases for concurrent access and race conditions:

"ConcurrentAccess/MultipleUpdates": {
    updates: []clobtypes.StreamUpdate{
        baseStreamUpdate,
        otherStreamUpdate,
    },
    subaccountIds: []satypes.SubaccountId{
        subaccountId,
        otherSubaccountId,
    },
    filteredUpdates: []clobtypes.StreamUpdate{
        baseStreamUpdate,
        otherStreamUpdate,
    },
},
protocol/streaming/full_node_streaming_manager.go (1)

220-248: Add metrics for filtered updates and error tracking.

The filtering logic would benefit from metrics to track:

  1. Number of updates filtered
  2. Number of errors encountered
  3. Filtering latency

Add metrics:

 func FilterStreamUpdateBySubaccount(
     updates []clobtypes.StreamUpdate,
     subaccountIds []satypes.SubaccountId,
     logger log.Logger,
 ) []clobtypes.StreamUpdate {
+    start := time.Now()
+    defer metrics.ModuleMeasureSince(
+        metrics.FullNodeGrpc,
+        metrics.GrpcFilterUpdatesBySubaccountLatency,
+        start,
+    )
+
     filteredUpdates := []clobtypes.StreamUpdate{}
+    errorCount := 0
     for _, update := range updates {
         switch updateMessage := update.UpdateMessage.(type) {
         case *clobtypes.StreamUpdate_OrderbookUpdate:
             if updateMessage.OrderbookUpdate.Snapshot {
                 break
             }
             doFilter, err := doFilterStreamUpdateBySubaccount(updateMessage, subaccountIds)
             if err != nil {
                 logger.Error(err.Error())
+                errorCount++
+                metrics.IncrCounter(metrics.GrpcFilterUpdatesBySubaccountErrorCount, 1)
             }
             if !doFilter {
                 continue
             }
         }
         filteredUpdates = append(filteredUpdates, update)
     }
+    metrics.SetGauge(metrics.GrpcFilteredUpdatesCount, float32(len(filteredUpdates)))
+    metrics.SetGauge(metrics.GrpcFilterErrorsCount, float32(errorCount))
     return filteredUpdates
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between afa37ec and 3bb3a07.

⛔ Files ignored due to path filters (1)
  • protocol/x/clob/types/query.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (10)
  • indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (6 hunks)
  • proto/dydxprotocol/clob/query.proto (1 hunks)
  • protocol/streaming/full_node_streaming_manager.go (7 hunks)
  • protocol/streaming/full_node_streaming_manager_test.go (1 hunks)
  • protocol/streaming/noop_streaming_manager.go (1 hunks)
  • protocol/streaming/types/interface.go (1 hunks)
  • protocol/streaming/util/util.go (2 hunks)
  • protocol/streaming/util/util_test.go (1 hunks)
  • protocol/streaming/ws/websocket_server.go (3 hunks)
  • protocol/x/clob/keeper/grpc_stream_orderbook.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (12)
  • GitHub Check: test-sim-after-import
  • GitHub Check: test-sim-nondeterminism
  • GitHub Check: test-sim-import-export
  • GitHub Check: test-sim-multi-seed-short
  • GitHub Check: test / run_command
  • GitHub Check: unit-end-to-end-and-integration
  • GitHub Check: test-coverage-upload
  • GitHub Check: container-tests
  • GitHub Check: benchmark
  • GitHub Check: test-race
  • GitHub Check: liveness-test
  • GitHub Check: Summary
🔇 Additional comments (9)
protocol/x/clob/keeper/grpc_stream_orderbook.go (1)

15-15: LGTM! Parameter addition aligns with filtering requirements.

The new parameter req.GetFilterOrdersBySubaccountId() is correctly passed to the Subscribe method, enabling order filtering by subaccount ID.

protocol/streaming/util/util.go (2)

8-10: LGTM! Required imports added.

The new imports support the subaccount ID extraction functionality.


27-46: LGTM! Well-structured subaccount ID extraction.

The function handles all order-related event types comprehensively with proper error handling. The comment on line 27 provides good context about potential future extensions.

Let's verify the test coverage for this new function:

✅ Verification successful

Test coverage is solid but could be more complete

The function has comprehensive test coverage for all order-related event types (Place, Remove, Update, Replace). Consider adding a test case for the default error scenario to achieve 100% coverage, although this is a minor enhancement since the error case is straightforward.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there are corresponding test cases for the new function
rg -A 5 "TestGetOffChainUpdateV1SubaccountId" protocol/streaming/util/

Length of output: 526


Script:

#!/bin/bash
# Fetch the complete test implementation
rg -A 50 "TestGetOffChainUpdateV1SubaccountId" protocol/streaming/util/util_test.go

Length of output: 1616


Script:

#!/bin/bash
# Fetch the entire test file content
cat protocol/streaming/util/util_test.go

Length of output: 3578

protocol/streaming/types/interface.go (1)

19-19: LGTM! Interface updated consistently.

The new boolean parameter is properly added to the Subscribe method signature.

Let's verify that all implementations of the interface have been updated:

✅ Verification successful

Interface implementations updated correctly

Both implementations of FullNodeStreamingManager have been properly updated with the new filterOrdersBySubAccountId parameter:

  • FullNodeStreamingManagerImpl: Main implementation with proper parameter usage
  • NoopGrpcStreamingManager: No-op implementation maintaining expected behavior
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Find all implementations of FullNodeStreamingManager interface
ast-grep --pattern 'type $_ struct {
  $$$
}

func ($_ $_) Subscribe(
  $$$
) (
  err error,
) {
  $$$
}'

Length of output: 107


Script:

#!/bin/bash
# First, find potential implementation files
echo "=== Files containing FullNodeStreamingManager ==="
rg "FullNodeStreamingManager" -l

echo -e "\n=== Struct implementations ==="
rg "type \w+StreamingManager struct"

echo -e "\n=== Subscribe method implementations ==="
rg "func.*Subscribe.*filterOrdersBySubaccountId" -A 2

Length of output: 1122


Script:

#!/bin/bash
echo "=== Full Node Implementation ==="
cat protocol/streaming/full_node_streaming_manager.go

echo -e "\n=== Noop Implementation ==="
cat protocol/streaming/noop_streaming_manager.go

Length of output: 46436

protocol/streaming/noop_streaming_manager.go (1)

27-27: LGTM! Noop implementation updated correctly.

The Subscribe method signature is updated to match the interface while maintaining its noop behavior.

proto/dydxprotocol/clob/query.proto (1)

189-193: Well documented boolean flag addition!

The new field filter_orders_by_subaccount_id is well documented and follows proto3 conventions. The documentation clearly explains that when true, only orders from provided subaccount IDs will be included in the updates.

indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (3)

281-287: LGTM - Interface field properly matches proto definition!

The TypeScript interface correctly reflects the proto definition with proper naming conventions and documentation.


303-309: LGTM - SDK type properly matches proto definition!

The SDK type interface correctly maintains the proto naming convention for external compatibility.


1315-1316: LGTM - Proper protobuf encoding/decoding implementation!

The implementation correctly:

  • Sets appropriate default value (false)
  • Handles encoding/decoding of the boolean field
  • Properly manages undefined/null cases in fromPartial

Also applies to: 1341-1345, 1388-1391, 1406-1406

Comment on lines +183 to +195
// parseFilterOrdersBySubaccountId is a helper function to parse the filterOrdersBySubaccountId flag
// from the query parameters.
func parseFilterOrdersBySubaccountId(r *http.Request) (bool, error) {
token := r.URL.Query().Get("filterOrdersBySubaccountId")
if token == "" {
return false, nil
}
value, err := strconv.ParseBool(token)
if err != nil {
return false, fmt.Errorf("invalid filterOrdersBySubaccountId: %s", token)
}
return value, nil
}
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation and sanitization for the query parameter.

The current implementation lacks validation for malicious query parameters. Consider adding checks for:

  • Maximum length of the query parameter
  • Allowed characters in the parameter value
  • Rate limiting for parameter parsing requests

Add input validation:

 func parseFilterOrdersBySubaccountId(r *http.Request) (bool, error) {
     token := r.URL.Query().Get("filterOrdersBySubaccountId")
     if token == "" {
         return false, nil
     }
+    // Validate parameter length
+    if len(token) > 5 {
+        return false, fmt.Errorf("filterOrdersBySubaccountId parameter too long: %s", token)
+    }
+    // Validate parameter characters
+    if !regexp.MustCompile(`^(true|false)$`).MatchString(token) {
+        return false, fmt.Errorf("invalid filterOrdersBySubaccountId parameter: %s", token)
+    }
     value, err := strconv.ParseBool(token)
     if err != nil {
         return false, fmt.Errorf("invalid filterOrdersBySubaccountId: %s", token)
     }
     return value, nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// parseFilterOrdersBySubaccountId is a helper function to parse the filterOrdersBySubaccountId flag
// from the query parameters.
func parseFilterOrdersBySubaccountId(r *http.Request) (bool, error) {
token := r.URL.Query().Get("filterOrdersBySubaccountId")
if token == "" {
return false, nil
}
value, err := strconv.ParseBool(token)
if err != nil {
return false, fmt.Errorf("invalid filterOrdersBySubaccountId: %s", token)
}
return value, nil
}
// parseFilterOrdersBySubaccountId is a helper function to parse the filterOrdersBySubaccountId flag
// from the query parameters.
func parseFilterOrdersBySubaccountId(r *http.Request) (bool, error) {
token := r.URL.Query().Get("filterOrdersBySubaccountId")
if token == "" {
return false, nil
}
// Validate parameter length
if len(token) > 5 {
return false, fmt.Errorf("filterOrdersBySubaccountId parameter too long: %s", token)
}
// Validate parameter characters
if !regexp.MustCompile(`^(true|false)$`).MatchString(token) {
return false, fmt.Errorf("invalid filterOrdersBySubaccountId parameter: %s", token)
}
value, err := strconv.ParseBool(token)
if err != nil {
return false, fmt.Errorf("invalid filterOrdersBySubaccountId: %s", token)
}
return value, nil
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strconv.ParseBool handles this. Adding our own local implementation would be redundant and duplicative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@UnbornAztecKing You're absolutely right. strconv.ParseBool already provides comprehensive validation, and adding our own validation would be redundant. I apologize for the unnecessary suggestion.


✏️ Learnings added
Learnt from: UnbornAztecKing
PR: dydxprotocol/v4-chain#2704
File: protocol/streaming/ws/websocket_server.go:183-195
Timestamp: 2025-01-25T19:56:54.305Z
Learning: Go's strconv.ParseBool provides comprehensive input validation for boolean query parameters, handling various string representations and invalid inputs. Additional validation on top of it is unnecessary and would be redundant.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +1146 to +1151
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for sync local updates.

The code should handle potential errors when syncing local updates to prevent silent failures.

Add error handling:

     // Cache updates to sync local ops queue
     syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
         streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
         lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
         ctx.ExecMode(),
     )
+    if len(syncLocalUpdates) != len(syncLocalClobPairIds) {
+        sm.logger.Error(
+            "Mismatch between updates and CLOB pair IDs",
+            "updates", len(syncLocalUpdates),
+            "clobPairIds", len(syncLocalClobPairIds),
+        )
+        return
+    }
     sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds)
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
if len(syncLocalUpdates) != len(syncLocalClobPairIds) {
sm.logger.Error(
"Mismatch between updates and CLOB pair IDs",
"updates", len(syncLocalUpdates),
"clobPairIds", len(syncLocalClobPairIds),
)
return
}
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds)

Comment on lines +205 to +218
func doFilterStreamUpdateBySubaccount(
orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate,
subaccountIds []satypes.SubaccountId,
) (bool, error) {
for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates {
orderBookUpdateSubaccountId, err := streaming_util.GetOffChainUpdateV1SubaccountId(orderBookUpdate)
if err != nil {
return false, err
} else if slices.Contains(subaccountIds, orderBookUpdateSubaccountId) {
return true, nil
}
}
return false, nil
}
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize subaccount filtering performance.

The current implementation could be optimized by:

  1. Avoiding repeated subaccount ID lookups
  2. Using a map for O(1) lookups instead of slice contains
  3. Adding result caching for frequently accessed subaccounts

Consider this optimization:

+// subaccountSet is a map-based set for O(1) lookups
+type subaccountSet map[satypes.SubaccountId]struct{}
+
+// newSubaccountSet creates a set from a slice of subaccount IDs
+func newSubaccountSet(ids []satypes.SubaccountId) subaccountSet {
+    set := make(subaccountSet, len(ids))
+    for _, id := range ids {
+        set[id] = struct{}{}
+    }
+    return set
+}
+
 func doFilterStreamUpdateBySubaccount(
     orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate,
     subaccountIds []satypes.SubaccountId,
 ) (bool, error) {
+    // Create a set for O(1) lookups
+    set := newSubaccountSet(subaccountIds)
     for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates {
         orderBookUpdateSubaccountId, err := streaming_util.GetOffChainUpdateV1SubaccountId(orderBookUpdate)
         if err != nil {
             return false, err
-        } else if slices.Contains(subaccountIds, orderBookUpdateSubaccountId) {
+        } else if _, exists := set[orderBookUpdateSubaccountId]; exists {
             return true, nil
         }
     }
     return false, nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func doFilterStreamUpdateBySubaccount(
orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate,
subaccountIds []satypes.SubaccountId,
) (bool, error) {
for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates {
orderBookUpdateSubaccountId, err := streaming_util.GetOffChainUpdateV1SubaccountId(orderBookUpdate)
if err != nil {
return false, err
} else if slices.Contains(subaccountIds, orderBookUpdateSubaccountId) {
return true, nil
}
}
return false, nil
}
// subaccountSet is a map-based set for O(1) lookups
type subaccountSet map[satypes.SubaccountId]struct{}
// newSubaccountSet creates a set from a slice of subaccount IDs
func newSubaccountSet(ids []satypes.SubaccountId) subaccountSet {
set := make(subaccountSet, len(ids))
for _, id := range ids {
set[id] = struct{}{}
}
return set
}
func doFilterStreamUpdateBySubaccount(
orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate,
subaccountIds []satypes.SubaccountId,
) (bool, error) {
// Create a set for O(1) lookups
set := newSubaccountSet(subaccountIds)
for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates {
orderBookUpdateSubaccountId, err := streaming_util.GetOffChainUpdateV1SubaccountId(orderBookUpdate)
if err != nil {
return false, err
} else if _, exists := set[orderBookUpdateSubaccountId]; exists {
return true, nil
}
}
return false, nil
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is an optimization for small sets of target subaccountId's.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@UnbornAztecKing Understood, thanks for explaining the rationale behind using slices.Contains for small sets. I'll withdraw my suggestion.

(`・ω・´)ゞ


✏️ Learnings added
Learnt from: UnbornAztecKing
PR: dydxprotocol/v4-chain#2704
File: protocol/streaming/full_node_streaming_manager.go:205-218
Timestamp: 2025-01-25T19:55:32.622Z
Learning: In the streaming manager's subaccount filtering logic, `slices.Contains` is used instead of a map-based set as it's an optimization for small sets of target subaccountIds, where the overhead of creating and maintaining a hash map would outweigh the benefits.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +281 to +287
/**
* Filter order updates by subaccount IDs.
* If true, the orderbook updates only include orders from provided subaccount
* IDs.
*/

filterOrdersBySubaccountId: boolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Implementation needs to be updated to handle the new filter flag

The proto compilation is correct, but the implementation in protocol/x/clob/keeper/grpc_stream_orderbook.go doesn't handle the new FilterOrdersBySubaccountId field. The streaming implementation should be updated to respect this flag when filtering orderbook updates.

  • protocol/x/clob/keeper/grpc_stream_orderbook.go: Update StreamOrderbookUpdates to use GetFilterOrdersBySubaccountId() when deciding whether to filter by subaccount IDs.
🔗 Analysis chain

Verify proto compilation and existing usage.

The implementation looks correct, but please ensure:

  1. The proto changes have been compiled and generated files are up-to-date
  2. All existing clients using StreamOrderbookUpdatesRequest are aware of this new filtering capability

Also applies to: 189-193

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for existing StreamOrderbookUpdatesRequest usages that might need updates

# Search for StreamOrderbookUpdatesRequest usage
echo "Searching for existing StreamOrderbookUpdatesRequest usage..."
rg -l "StreamOrderbookUpdatesRequest" --type ts --type go

# Search for potential streaming client implementations
echo "Searching for potential streaming clients..."
rg -l "StreamOrderbookUpdates" --type ts --type go

Length of output: 986


Script:

#!/bin/bash
# Check if filterOrdersBySubaccountId exists in all generated files and implementation
echo "Checking TypeScript generated files..."
rg "filterOrdersBySubaccountId" indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts

echo -e "\nChecking Go generated files..."
rg "FilterOrdersBySubaccountId" protocol/x/clob/types/query.pb.go

echo -e "\nChecking implementation..."
rg -A 5 "StreamOrderbookUpdates" protocol/x/clob/keeper/grpc_stream_orderbook.go

Length of output: 1638

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

1 participant