Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-1321] subscribe to market prices streaming services #2592

Merged
merged 1 commit into from
Nov 22, 2024
Merged

Conversation

jayy04
Copy link
Contributor

@jayy04 jayy04 commented Nov 21, 2024

Changelist

[Describe or list the changes made in this PR]

Test Plan

[Describe how this PR was tested (if applicable)]

Author/Reviewer Checklist

  • If this PR has changes that result in a different app state given the same prior state and transaction list, manually add the state-breaking label.
  • If the PR has breaking postgres changes to the indexer add the indexer-postgres-breaking label.
  • If this PR isn't state-breaking but has changes that modify behavior in PrepareProposal or ProcessProposal, manually add the label proposal-breaking.
  • If this PR is one of many that implement a specific feature, manually label them all feature:[feature-name].
  • If you wish to for mergify-bot to automatically create a PR to backport your change to a release branch, manually add the label backport/[branch-name].
  • Manually add any of the following labels: refactor, chore, bug.

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced subscription management to include market-specific subscriptions.
    • Added support for handling price snapshots in streaming updates.
    • WebSocket server now processes additional query parameters for improved subscription handling.
  • Bug Fixes

    • Improved error handling in query parameter parsing for WebSocket requests.
  • Documentation

    • Updated interfaces to reflect new methods for price snapshot retrieval.

These updates improve the streaming service's functionality, allowing for more precise and efficient data handling.

@jayy04 jayy04 requested a review from a team as a code owner November 21, 2024 00:16
Copy link

linear bot commented Nov 21, 2024

Copy link
Contributor

coderabbitai bot commented Nov 21, 2024

Walkthrough

The changes in this pull request enhance the subscription management capabilities of the FullNodeStreamingManagerImpl and related components. A new mapping for tracking subscriptions by market ID is introduced, along with updates to subscription methods to include market-specific identifiers. Additional methods for handling price snapshots and updates are added, and the WebSocket server is modified to support new query parameters. These modifications improve the overall functionality of the streaming service by integrating market and price data.

Changes

File Path Change Summary
protocol/streaming/full_node_streaming_manager.go - Added mapping marketIdToSubscriptionIdMapping in FullNodeStreamingManagerImpl.
- Updated OrderbookSubscription to include marketIds.
- Modified Subscribe to accept marketIds.
- Added methods: GetPriceSnapshotsForInitStreams, toPriceStreamUpdates.
- Updated SendCombinedSnapshot to include price updates.
protocol/streaming/noop_streaming_manager.go - Added import for pricestypes.
- Updated Subscribe to include marketIds.
- Added method GetPriceSnapshotsForInitStreams.
- Updated InitializeNewStreams to include priceSnapshots.
protocol/streaming/types/interface.go - Updated Subscribe to include marketIds.
- Updated InitializeNewStreams to accept priceSnapshots.
- Added method GetPriceSnapshotsForInitStreams.
protocol/streaming/ws/websocket_server.go - Added constants CLOB_PAIR_IDS_QUERY_PARAM, MARKET_IDS_QUERY_PARAM.
- Removed parseClobPairIds and added parseUint32 for parsing query parameters.
protocol/x/clob/keeper/grpc_stream_orderbook.go - Updated StreamOrderbookUpdates to include req.GetMarketIds() in the subscription call.
protocol/x/clob/keeper/keeper.go - Added method GetPriceSnapshotsForInitStreams.
- Updated InitializeNewStreams to include priceSnapshots.
protocol/x/clob/types/expected_keepers.go - Added method GetStreamPriceUpdate to PricesKeeper interface.
protocol/x/prices/keeper/market_price.go - Added method GetStreamPriceUpdate to Keeper struct.

Possibly related PRs

Suggested labels

indexer, proto

Suggested reviewers

  • dydxwill
  • jonfung-dydx

🐰 In the realm of streams so bright,
Market IDs dance in the light.
Subscriptions grow, price updates flow,
With every change, our service will glow!
Hopping along, we enhance our way,
Cheers to the code, hip-hip-hooray! 🎉


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (11)
protocol/x/clob/keeper/grpc_stream_orderbook.go (1)

13-16: Consider adding documentation about the new parameter.

The function would benefit from documentation explaining the purpose of the marketIds parameter and its relationship to the streaming functionality.

Add a comment block above the function:

+// StreamOrderbookUpdates subscribes to orderbook updates for the specified CLOB pair,
+// subaccount IDs, and market IDs. It streams updates until the client closes the connection.
+// The market IDs parameter allows filtering updates to specific markets for price streaming.
 func (k Keeper) StreamOrderbookUpdates(
protocol/streaming/types/interface.go (2)

28-28: Consider caching price snapshots for performance.

The addition of price snapshots is well-structured and consistent with the existing snapshot patterns. However, since price data might be frequently accessed, consider implementing a caching mechanism to optimize performance.


35-37: Add documentation for the new method.

The method signature is well-designed and follows existing patterns. Consider adding documentation to describe:

  • The expected behavior of the getPriceSnapshot function
  • The meaning of nil returns from getPriceSnapshot
  • Any performance considerations
+// GetPriceSnapshotsForInitStreams retrieves price snapshots for all subscribed markets
+// during stream initialization.
+// Parameters:
+//   - getPriceSnapshot: A function that returns the current price update for a given market ID.
+//     May return nil if no price data is available.
+// Returns:
+//   - A map of market IDs to their corresponding price updates.
 GetPriceSnapshotsForInitStreams(
   getPriceSnapshot func(marketId uint32) *pricestypes.StreamPriceUpdate,
 ) map[uint32]*pricestypes.StreamPriceUpdate
protocol/streaming/noop_streaming_manager.go (1)

63-67: Consider improving parameter naming

The implementation correctly follows the no-op pattern, but consider making the function parameter name more descriptive:

 func (sm *NoopGrpcStreamingManager) GetPriceSnapshotsForInitStreams(
-    _ func(_ uint32) *pricestypes.StreamPriceUpdate,
+    _ func(marketId uint32) *pricestypes.StreamPriceUpdate,
 ) map[uint32]*pricestypes.StreamPriceUpdate {
     return nil
 }
protocol/x/clob/types/expected_keepers.go (1)

144-144: Add documentation and improve parameter naming.

The method signature is consistent with other streaming methods in the codebase. However, consider these improvements:

  1. Add documentation to explain the parameters and return value:
+// GetStreamPriceUpdate returns a streaming price update for the given market ID.
+// Parameters:
+// - ctx: The SDK context
+// - id: The market identifier
+// - snapshot: When true, returns a full snapshot instead of an incremental update
+// Returns: A StreamPriceUpdate containing the price data
 GetStreamPriceUpdate(ctx sdk.Context, id uint32, snapshot bool) (val pricestypes.StreamPriceUpdate)
  1. Consider renaming the boolean parameter to be more descriptive:
-GetStreamPriceUpdate(ctx sdk.Context, id uint32, snapshot bool)
+GetStreamPriceUpdate(ctx sdk.Context, id uint32, isFullSnapshot bool)
protocol/streaming/ws/websocket_server.go (1)

79-88: Consider enhancing error messages with more context.

While the error handling is thorough, the error messages could be more specific to help with debugging. Consider including the actual values received in the error messages.

-			"Error parsing marketIds",
+			"Error parsing marketIds from query parameters",
 			"err", err,
+			"received_value", r.URL.Query().Get(MARKET_IDS_QUERY_PARAM),

Also applies to: 113-113

protocol/x/prices/keeper/market_price.go (2)

197-197: Add godoc for the new method

Please add documentation that describes the method's purpose, parameters, return values, and potential errors.

Consider adding:

+// GetStreamPriceUpdate returns a StreamPriceUpdate for the given market ID.
+// It includes the current market price and a snapshot flag for streaming purposes.
+// Returns an error if the market price does not exist.
+// Parameters:
+// - ctx: The SDK context
+// - id: The market ID to fetch the price for
+// - snapshot: Flag indicating if this is a snapshot update

197-211: Consider adding telemetry metrics

Since this method is part of a streaming service, it would be valuable to track its usage and performance metrics, similar to other methods in this file.

Consider adding:

 func (k Keeper) GetStreamPriceUpdate(
 	ctx sdk.Context,
 	id uint32,
 	snapshot bool,
 ) (
 	val types.StreamPriceUpdate,
 	err error,
 ) {
+	defer telemetry.ModuleMeasureSince(
+		types.ModuleName,
+		time.Now(),
+		metrics.GetStreamPriceUpdate,
+		metrics.Latency,
+	)
 	price, err := k.GetMarketPrice(ctx, id)
 	if err != nil {
 		return types.StreamPriceUpdate{}, err
 	}
+	telemetry.IncrCounterWithLabels(
+		[]string{types.ModuleName, metrics.StreamPriceUpdates},
+		1,
+		[]metrics.Label{
+			pricefeedmetrics.GetLabelForMarketId(id),
+			{"snapshot", lib.BoolToString(snapshot)},
+		},
+	)
 	return types.StreamPriceUpdate{
 		MarketId: id,
 		Price:    price,
 		Snapshot: snapshot,
 	}, nil
 }
protocol/streaming/full_node_streaming_manager.go (3)

244-245: Correct variable name in comments

The comments refer to subaccountId, but the code is handling marketId. Update the comments to reference marketId for clarity.

Apply this diff:

-        // if subaccountId exists in the map, append the subscription id to the slice
+        // if marketId exists in the map, append the subscription id to the slice
        // otherwise, create a new slice with the subscription id

410-426: Optimize slice allocation in toPriceStreamUpdates

Preallocate the streamUpdates slice with the capacity of len(priceUpdates) to improve performance by reducing the number of allocations during appends.

Apply this diff:

func toPriceStreamUpdates(
    priceUpdates []*pricestypes.StreamPriceUpdate,
    blockHeight uint32,
    execMode sdk.ExecMode,
) []clobtypes.StreamUpdate {
-   streamUpdates := make([]clobtypes.StreamUpdate, 0)
+   streamUpdates := make([]clobtypes.StreamUpdate, 0, len(priceUpdates))
    for _, update := range priceUpdates {
        streamUpdates = append(streamUpdates, clobtypes.StreamUpdate{
            UpdateMessage: &clobtypes.StreamUpdate_PriceUpdate{
                PriceUpdate: update,
            },
            BlockHeight: blockHeight,
            ExecMode:    uint32(execMode),
        })
    }
    return streamUpdates
}

Line range hint 386-394: Optimize slice allocation in toSubaccountStreamUpdates

Similarly, preallocate the streamUpdates slice with the capacity of len(saUpdates) in the toSubaccountStreamUpdates function to enhance performance.

Apply this diff:

func toSubaccountStreamUpdates(
    saUpdates []*satypes.StreamSubaccountUpdate,
    blockHeight uint32,
    execMode sdk.ExecMode,
) []clobtypes.StreamUpdate {
-   streamUpdates := make([]clobtypes.StreamUpdate, 0)
+   streamUpdates := make([]clobtypes.StreamUpdate, 0, len(saUpdates))
    for _, saUpdate := range saUpdates {
        streamUpdates = append(streamUpdates, clobtypes.StreamUpdate{
            UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{
                SubaccountUpdate: saUpdate,
            },
            BlockHeight: blockHeight,
            ExecMode:    uint32(execMode),
        })
    }
    return streamUpdates
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between e700de9 and 6c49a20.

📒 Files selected for processing (8)
  • protocol/streaming/full_node_streaming_manager.go (14 hunks)
  • protocol/streaming/noop_streaming_manager.go (3 hunks)
  • protocol/streaming/types/interface.go (3 hunks)
  • protocol/streaming/ws/websocket_server.go (5 hunks)
  • protocol/x/clob/keeper/grpc_stream_orderbook.go (1 hunks)
  • protocol/x/clob/keeper/keeper.go (4 hunks)
  • protocol/x/clob/types/expected_keepers.go (1 hunks)
  • protocol/x/prices/keeper/market_price.go (1 hunks)
🔇 Additional comments (12)
protocol/x/clob/keeper/grpc_stream_orderbook.go (1)

14-14: LGTM! Verify integration points.

The addition of req.GetMarketIds() aligns with the PR objective of supporting market price subscriptions.

Let's verify the integration points:

✅ Verification successful

Integration verified and working as expected

The verification confirms that:

  • The StreamOrderbookUpdatesRequest struct has a MarketIds field for price updates
  • The FullNodeStreamingManager.Subscribe method accepts marketIds []uint32 as its third parameter, matching the request type

The integration points are properly aligned, and the change is safe.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that the request type has GetMarketIds() method and the streaming manager's Subscribe method accepts marketIds

# Check if GetMarketIds() exists on the request type
ast-grep --pattern 'type StreamOrderbookUpdatesRequest struct {
  $$$
  MarketIds $_ `$$$`
  $$$
}'

# Check if the Subscribe method signature in the streaming manager matches
ast-grep --pattern 'type FullNodeStreamingManager interface {
  $$$
  Subscribe($_, $_, $_, $_) $_
  $$$
}'

Length of output: 4144


Script:

#!/bin/bash
# Let's verify the request type definition and its generated code
# First, find the proto file containing StreamOrderbookUpdatesRequest
fd -e proto -x rg -l "StreamOrderbookUpdatesRequest"

# Then check the generated Go code for this request type
fd -e pb.go -x rg -A 5 "type StreamOrderbookUpdatesRequest struct"

Length of output: 606

protocol/streaming/types/interface.go (2)

6-6: LGTM!

The import of the prices types package is necessary for the new price streaming functionality and follows the project's import naming convention.


18-18: Verify interface implementation updates.

The addition of marketIds []uint32 parameter is consistent with the PR objective. However, this interface change requires updates in all implementing types.

✅ Verification successful

Interface implementations are already updated

The verification shows that all implementations of the FullNodeStreamingManager interface have been properly updated to include the new marketIds []uint32 parameter:

  • NoopGrpcStreamingManager.Subscribe() in protocol/streaming/noop_streaming_manager.go has the parameter (using _ as it's a no-op implementation)
  • FullNodeStreamingManagerImpl.Subscribe() in protocol/streaming/full_node_streaming_manager.go has the parameter explicitly defined
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for types implementing FullNodeStreamingManager interface
ast-grep --pattern 'type $_ struct {
  $$$
}

func ($_ $_) Subscribe(clobPairIds []uint32, subaccountIds []*satypes.SubaccountId, $$$) $$$'

Length of output: 141


Script:

#!/bin/bash
# Let's first find all files that might contain interface implementations
rg -l "FullNodeStreamingManager" .

# Then search for any Subscribe method implementations
rg "func.*Subscribe.*\(" -A 5

Length of output: 1958

protocol/streaming/noop_streaming_manager.go (4)

7-7: LGTM: Import addition is appropriate

The new import for pricestypes aligns with the added price streaming functionality.


26-26: LGTM: Subscribe method signature update is consistent

The addition of the market IDs parameter maintains the no-op implementation pattern while aligning with the interface changes.


72-72: LGTM: InitializeNewStreams parameter addition is appropriate

The addition of the priceSnapshots parameter maintains consistency with the new price streaming capabilities while preserving the no-op implementation pattern.


5-5: Verify interface implementation completeness

Let's ensure that the NoopGrpcStreamingManager implements all methods of the FullNodeStreamingManager interface with the correct signatures.

✅ Verification successful

Interface implementation is complete and correct

The NoopGrpcStreamingManager correctly implements all methods of the FullNodeStreamingManager interface:

  • Enabled()
  • Stop()
  • Subscribe()
  • InitializeNewStreams()
  • GetSubaccountSnapshotsForInitStreams()
  • GetPriceSnapshotsForInitStreams()
  • SendOrderbookUpdates()
  • SendOrderbookFillUpdate()
  • SendTakerOrderStatus()
  • SendSubaccountUpdate()
  • GetStagedFinalizeBlockEvents()
  • TracksSubaccountId()
  • StreamBatchUpdatesAfterFinalizeBlock()
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify interface implementation completeness

# Find and display the interface definition
ast-grep --pattern 'type FullNodeStreamingManager interface {
  $$$
}'

# Find all methods in the NoopGrpcStreamingManager
ast-grep --pattern 'func (sm *NoopGrpcStreamingManager) $$$'

Length of output: 14106


Script:

#!/bin/bash
# Let's search for the NoopGrpcStreamingManager implementation specifically
rg -A 2 "type NoopGrpcStreamingManager struct" 

# Find all method implementations
rg "func \(sm \*NoopGrpcStreamingManager\)" -A 2

Length of output: 3816

protocol/streaming/ws/websocket_server.go (2)

19-22: LGTM! Well-defined constants for query parameters.

The constants follow Go naming conventions and provide clear, self-documenting values for the query parameters.


Line range hint 69-89: Verify rate limiting for WebSocket connections.

The addition of new query parameters increases the attack surface for potential DoS attacks. Ensure that appropriate rate limiting is in place for WebSocket connections and parameter parsing.

protocol/x/prices/keeper/market_price.go (1)

197-211: Verify integration with WebSocket server

Let's verify how this method is integrated with the WebSocket server and ensure proper error handling in the calling code.

✅ Verification successful

Let me gather more information about the WebSocket server implementation and how it handles price updates.


Based on the gathered information, I can now provide the final response:

GetStreamPriceUpdate implementation is safe and correctly integrated

The changes are safe because:

  • The method is correctly used in the codebase, primarily by the CLOB module through its expected keeper interface
  • The underlying GetMarketPrice already handles error cases, and the WebSocket server has proper connection handling
  • The method maintains consistency with the existing pattern of price update streaming
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check usage of GetStreamPriceUpdate and error handling in calling code

# Find all usages of GetStreamPriceUpdate
echo "Searching for GetStreamPriceUpdate usage..."
rg -A 5 "GetStreamPriceUpdate"

# Look for WebSocket handlers that might use this
echo "Searching for WebSocket handlers..."
rg -B 5 -A 10 "websocket.Handler|ws.Handler|HandleFunc"

# Check for error handling patterns in related files
echo "Checking error handling patterns..."
ast-grep --pattern 'if err := $_.GetStreamPriceUpdate($$$); err != nil {
  $$$
}'

Length of output: 5697


Script:

#!/bin/bash
# Check WebSocket server implementation and price update handling

# Look for WebSocket handler implementation
echo "Checking WebSocket handler implementation..."
rg -A 15 "func.*Handler" protocol/streaming/ws/

# Look for price update related code in WebSocket
echo "Checking price update handling in WebSocket..."
rg -A 10 "StreamPriceUpdate|PriceUpdate" protocol/streaming/ws/

# Check for any error handling related to market prices
echo "Looking for market price error handling..."
rg -A 5 "GetMarketPrice.*err|price.*err" protocol/x/prices/

Length of output: 5698

protocol/x/clob/keeper/keeper.go (2)

25-25: LGTM!

The addition of the prices types import is necessary for the new price streaming functionality.


330-347: LGTM! Verify streaming manager implementation.

The implementation follows the established pattern and correctly integrates with the prices keeper. The method is well-structured with proper safety checks.

Let's verify the streaming manager's implementation of the new price snapshots method:

✅ Verification successful

Let me verify the implementation by checking the concrete type that implements this interface.


Implementation verified and follows established patterns

The streaming manager's implementation is correct and well-structured:

  • Properly implements the interface defined in protocol/streaming/types/interface.go
  • Uses appropriate locking mechanisms for thread safety
  • Efficiently collects price snapshots only for uninitialized subscriptions
  • Avoids duplicate snapshots for the same market ID
  • Correctly uses the provided callback function to fetch price updates
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the streaming manager implementation matches the expected interface

# Search for the GetPriceSnapshotsForInitStreams method definition
ast-grep --pattern 'type FullNodeStreamingManager interface {
  $$$
  GetPriceSnapshotsForInitStreams($_) $_
  $$$
}'

Length of output: 4052


Script:

#!/bin/bash
# Search for the concrete implementation of GetPriceSnapshotsForInitStreams
ast-grep --pattern 'func ($_ $_) GetPriceSnapshotsForInitStreams(
  getPriceSnapshot func(marketId uint32) *pricestypes.StreamPriceUpdate,
) map[uint32]*pricestypes.StreamPriceUpdate {
  $$$
}'

Length of output: 2135

Comment on lines +157 to +176
// parseUint32 is a helper function to parse the uint32 from the query parameters.
func parseUint32(r *http.Request, queryParam string) ([]uint32, error) {
param := r.URL.Query().Get(queryParam)
if param == "" {
return []uint32{}, nil
}
idStrs := strings.Split(clobPairIdsParam, ",")
clobPairIds := make([]uint32, 0)
idStrs := strings.Split(param, ",")
ids := make([]uint32, 0)
for _, idStr := range idStrs {
id, err := strconv.Atoi(idStr)
if err != nil {
return nil, fmt.Errorf("invalid clobPairId: %s", idStr)
return nil, fmt.Errorf("invalid %s: %s", queryParam, idStr)
}
if id < 0 || id > math.MaxInt32 {
return nil, fmt.Errorf("invalid clob pair id: %s", idStr)
return nil, fmt.Errorf("invalid %s: %s", queryParam, idStr)
}
clobPairIds = append(clobPairIds, uint32(id))
ids = append(ids, uint32(id))
}

return clobPairIds, nil
return ids, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider additional input validation in parseUint32.

While the function handles basic parsing and range validation, consider adding the following safeguards:

  1. Duplicate ID validation
  2. Maximum input length check
  3. Empty string element validation after split
  4. Explicit overflow handling
 func parseUint32(r *http.Request, queryParam string) ([]uint32, error) {
 	param := r.URL.Query().Get(queryParam)
 	if param == "" {
 		return []uint32{}, nil
 	}
+	// Add maximum length check
+	if len(param) > 1000 {
+		return nil, fmt.Errorf("%s parameter too long: max 1000 characters", queryParam)
+	}
+
 	idStrs := strings.Split(param, ",")
+	// Add maximum number of IDs check
+	if len(idStrs) > 100 {
+		return nil, fmt.Errorf("too many %s values: max 100", queryParam)
+	}
+
 	ids := make([]uint32, 0)
+	seen := make(map[uint32]bool)
 	for _, idStr := range idStrs {
+		// Check for empty elements
+		if idStr == "" {
+			return nil, fmt.Errorf("empty value in %s list", queryParam)
+		}
+
 		id, err := strconv.Atoi(idStr)
 		if err != nil {
 			return nil, fmt.Errorf("invalid %s: %s", queryParam, idStr)
 		}
 		if id < 0 || id > math.MaxInt32 {
 			return nil, fmt.Errorf("invalid %s: %s", queryParam, idStr)
 		}
-		ids = append(ids, uint32(id))
+		
+		// Check for duplicates
+		uid := uint32(id)
+		if seen[uid] {
+			return nil, fmt.Errorf("duplicate %s: %d", queryParam, id)
+		}
+		seen[uid] = true
+		ids = append(ids, uid)
 	}
 	return ids, nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// parseUint32 is a helper function to parse the uint32 from the query parameters.
func parseUint32(r *http.Request, queryParam string) ([]uint32, error) {
param := r.URL.Query().Get(queryParam)
if param == "" {
return []uint32{}, nil
}
idStrs := strings.Split(clobPairIdsParam, ",")
clobPairIds := make([]uint32, 0)
idStrs := strings.Split(param, ",")
ids := make([]uint32, 0)
for _, idStr := range idStrs {
id, err := strconv.Atoi(idStr)
if err != nil {
return nil, fmt.Errorf("invalid clobPairId: %s", idStr)
return nil, fmt.Errorf("invalid %s: %s", queryParam, idStr)
}
if id < 0 || id > math.MaxInt32 {
return nil, fmt.Errorf("invalid clob pair id: %s", idStr)
return nil, fmt.Errorf("invalid %s: %s", queryParam, idStr)
}
clobPairIds = append(clobPairIds, uint32(id))
ids = append(ids, uint32(id))
}
return clobPairIds, nil
return ids, nil
// parseUint32 is a helper function to parse the uint32 from the query parameters.
func parseUint32(r *http.Request, queryParam string) ([]uint32, error) {
param := r.URL.Query().Get(queryParam)
if param == "" {
return []uint32{}, nil
}
// Add maximum length check
if len(param) > 1000 {
return nil, fmt.Errorf("%s parameter too long: max 1000 characters", queryParam)
}
idStrs := strings.Split(param, ",")
// Add maximum number of IDs check
if len(idStrs) > 100 {
return nil, fmt.Errorf("too many %s values: max 100", queryParam)
}
ids := make([]uint32, 0)
seen := make(map[uint32]bool)
for _, idStr := range idStrs {
// Check for empty elements
if idStr == "" {
return nil, fmt.Errorf("empty value in %s list", queryParam)
}
id, err := strconv.Atoi(idStr)
if err != nil {
return nil, fmt.Errorf("invalid %s: %s", queryParam, idStr)
}
if id < 0 || id > math.MaxInt32 {
return nil, fmt.Errorf("invalid %s: %s", queryParam, idStr)
}
// Check for duplicates
uid := uint32(id)
if seen[uid] {
return nil, fmt.Errorf("duplicate %s: %d", queryParam, id)
}
seen[uid] = true
ids = append(ids, uid)
}
return ids, nil
}

Comment on lines 197 to 218
// GetStreamPriceUpdate returns a stream price update from its id.
func (k Keeper) GetStreamPriceUpdate(
ctx sdk.Context,
id uint32,
snapshot bool,
) (
val types.StreamPriceUpdate,
) {
price, _ := k.GetMarketPrice(ctx, id)
return types.StreamPriceUpdate{
MarketId: id,
Price: price,
Snapshot: snapshot,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential errors from GetMarketPrice

The current implementation silently ignores errors from GetMarketPrice, which could lead to returning invalid or zero-value prices for non-existent markets. This is particularly concerning as GetMarketPrice can return ErrMarketPriceDoesNotExist.

Consider this implementation:

 func (k Keeper) GetStreamPriceUpdate(
 	ctx sdk.Context,
 	id uint32,
 	snapshot bool,
 ) (
-	val types.StreamPriceUpdate,
+	val types.StreamPriceUpdate,
+	err error,
 ) {
-	price, _ := k.GetMarketPrice(ctx, id)
+	price, err := k.GetMarketPrice(ctx, id)
+	if err != nil {
+		return types.StreamPriceUpdate{}, err
+	}
 	return types.StreamPriceUpdate{
 		MarketId: id,
 		Price:    price,
 		Snapshot: snapshot,
-	}
+	}, nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// GetStreamPriceUpdate returns a stream price update from its id.
func (k Keeper) GetStreamPriceUpdate(
ctx sdk.Context,
id uint32,
snapshot bool,
) (
val types.StreamPriceUpdate,
) {
price, _ := k.GetMarketPrice(ctx, id)
return types.StreamPriceUpdate{
MarketId: id,
Price: price,
Snapshot: snapshot,
}
}
// GetStreamPriceUpdate returns a stream price update from its id.
func (k Keeper) GetStreamPriceUpdate(
ctx sdk.Context,
id uint32,
snapshot bool,
) (
val types.StreamPriceUpdate,
err error,
) {
price, err := k.GetMarketPrice(ctx, id)
if err != nil {
return types.StreamPriceUpdate{}, err
}
return types.StreamPriceUpdate{
MarketId: id,
Price: price,
Snapshot: snapshot,
}, nil
}

Comment on lines +1124 to +1134
priceUpdates = append(priceUpdates, priceUpdate)
} else {
sm.logger.Error(
fmt.Sprintf(
"Price update not found for market id %v. This should not happen.",
marketId,
),
)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle missing price updates more gracefully

When a price update is not found for a market ID, the code logs an error stating "This should not happen." Consider handling this scenario more gracefully by implementing appropriate error handling or fallback logic to ensure robustness.

) (
val types.StreamPriceUpdate,
) {
price, _ := k.GetMarketPrice(ctx, id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log error here or other more graceful error handling? Otherwise price is an empty object

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (5)
protocol/streaming/types/interface.go (2)

35-37: Add documentation for the new method

While the implementation follows existing patterns, please add documentation to explain:

  1. The purpose of this method
  2. The expected behavior of the getPriceSnapshot function parameter
  3. Error handling expectations (if any)

Consider adding a comment block like:

// GetPriceSnapshotsForInitStreams retrieves price snapshots for stream initialization.
// Parameters:
//   - getPriceSnapshot: function that returns a price snapshot for a given market ID
// Returns:
//   - map of market IDs to their corresponding price snapshots

Line range hint 10-65: Consider interface segregation for better maintainability

As the interface grows with different types of streaming functionality (orderbook, subaccount, and now prices), consider splitting it into more focused interfaces following the Interface Segregation Principle. This could improve maintainability and make the responsibilities clearer.

For example:

  • IOrderbookStreamingManager
  • ISubaccountStreamingManager
  • IPriceStreamingManager
  • IStreamingManagerCore (for common functionality)
protocol/streaming/ws/websocket_server.go (2)

Line range hint 69-89: Consider reducing code duplication in parameter parsing.

The error handling code for clobPairIds and marketIds is identical. Consider extracting the common error handling logic into a helper function.

+func handleParseError(w http.ResponseWriter, logger log.Logger, paramName string, err error) {
+    logger.Error(
+        fmt.Sprintf("Error parsing %s", paramName),
+        "err", err,
+    )
+    http.Error(w, err.Error(), http.StatusBadRequest)
+}

 func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
     // ...
     clobPairIds, err := parseUint32(r, CLOB_PAIR_IDS_QUERY_PARAM)
     if err != nil {
-        ws.logger.Error(
-            "Error parsing clobPairIds",
-            "err", err,
-        )
-        http.Error(w, err.Error(), http.StatusBadRequest)
+        handleParseError(w, ws.logger, "clobPairIds", err)
         return
     }

     marketIds, err := parseUint32(r, MARKET_IDS_QUERY_PARAM)
     if err != nil {
-        ws.logger.Error(
-            "Error parsing marketIds",
-            "err", err,
-        )
-        http.Error(w, err.Error(), http.StatusBadRequest)
+        handleParseError(w, ws.logger, "marketIds", err)
         return
     }

Also applies to: 113-113


Line range hint 25-31: Consider implementing security measures for the WebSocket server.

The current implementation allows all connections without restrictions. Consider:

  1. Implementing proper origin validation instead of allowing all connections
  2. Adding rate limiting for connections
  3. Setting maximum concurrent connection limits

Example implementation for origin validation:

 var upgrader = websocket.Upgrader{
     ReadBufferSize:  1024,
     WriteBufferSize: 1024,
     CheckOrigin: func(r *http.Request) bool {
-        return true // Allow all connections by default
+        origin := r.Header.Get("Origin")
+        // Add your allowed origins
+        allowedOrigins := []string{"https://api.example.com", "https://app.example.com"}
+        for _, allowed := range allowedOrigins {
+            if origin == allowed {
+                return true
+            }
+        }
+        return false
     },
 }
protocol/streaming/full_node_streaming_manager.go (1)

244-245: Fix copy-pasted comment

The comment refers to "subaccountId" but should refer to "marketId" instead.

-		// if subaccountId exists in the map, append the subscription id to the slice
-		// otherwise, create a new slice with the subscription id
+		// if marketId exists in the map, append the subscription id to the slice
+		// otherwise, create a new slice with the subscription id
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 6c49a20 and 262c536.

📒 Files selected for processing (8)
  • protocol/streaming/full_node_streaming_manager.go (14 hunks)
  • protocol/streaming/noop_streaming_manager.go (3 hunks)
  • protocol/streaming/types/interface.go (3 hunks)
  • protocol/streaming/ws/websocket_server.go (5 hunks)
  • protocol/x/clob/keeper/grpc_stream_orderbook.go (1 hunks)
  • protocol/x/clob/keeper/keeper.go (4 hunks)
  • protocol/x/clob/types/expected_keepers.go (1 hunks)
  • protocol/x/prices/keeper/market_price.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • protocol/streaming/noop_streaming_manager.go
  • protocol/x/clob/keeper/grpc_stream_orderbook.go
  • protocol/x/clob/types/expected_keepers.go
  • protocol/x/prices/keeper/market_price.go
🔇 Additional comments (10)
protocol/streaming/types/interface.go (3)

6-6: LGTM: Import addition is appropriate

The new import for price types is necessary for the price streaming functionality and follows the project's naming conventions.


28-28: LGTM: Price snapshots parameter addition

The new parameter for price snapshots follows the existing pattern of snapshot parameters in the method signature.

✅ Verification successful

LGTM: Price snapshots parameter is properly implemented

The verification confirms that the priceSnapshots parameter is correctly implemented across the codebase:

  • Interface definition in protocol/streaming/types/interface.go
  • Full implementation in protocol/streaming/full_node_streaming_manager.go which properly uses the snapshots
  • No-op implementation in protocol/streaming/noop_streaming_manager.go
  • Proper initialization and passing of price snapshots in protocol/x/clob/keeper/keeper.go
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify that all implementations properly initialize price snapshots
ast-grep --pattern 'func ($_ $_) InitializeNewStreams($_, $_, priceSnapshots map[uint32]*pricestypes.StreamPriceUpdate, $_, $_) {
  $$$
}'

Length of output: 140


Script:

#!/bin/bash
# First, let's find all files that might contain InitializeNewStreams implementations
rg "InitializeNewStreams" -l

# Then let's see the actual implementations with context
rg "InitializeNewStreams" -A 10

Length of output: 8333


18-18: LGTM: Market IDs parameter addition

The new parameter for market IDs follows consistent typing and naming patterns.

✅ Verification successful

Based on the search results, I can now provide a final response since we have found both the interface definition and its implementations. The marketIds parameter has been properly implemented in both the noop and full node implementations of the streaming manager.

LGTM: Market IDs parameter properly implemented

The new marketIds []uint32 parameter has been correctly implemented in both streaming manager implementations:

  • NoopGrpcStreamingManager: Includes parameter in signature with noop implementation
  • FullNodeStreamingManagerImpl: Includes parameter and performs validation
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify that all implementations of the interface properly handle the new parameter
ast-grep --pattern 'type $_ struct {
  $$$
}

func ($_ $_) Subscribe(clobPairIds []uint32, subaccountIds []*satypes.SubaccountId, marketIds []uint32, srv $_) error {
  $$$
}'

Length of output: 176


Script:

#!/bin/bash
# Search for Subscribe method implementations with broader pattern
rg -U "func.*Subscribe.*\(" -A 5

# Also search for interface definition to understand the contract
rg -U "type.*interface.*{" -A 10

Length of output: 1481


Script:

#!/bin/bash
# Search for interface definition with fixed regex
rg -U "type.*interface" -A 10

# Search for Subscribe implementations with file context
rg -U --context=10 "func.*Subscribe.*marketIds.*\)"

Length of output: 135379


Script:

#!/bin/bash
# Search for Subscribe method implementations with specific file context
rg -U --context=10 "func.*Subscribe.*marketIds.*\)" protocol/streaming/

# Also search for interface definition in streaming types
rg -U --context=10 "type.*interface" protocol/streaming/types/

Length of output: 2298


Script:

#!/bin/bash
# Search for implementations of Subscribe method in streaming manager files
rg -U --context=10 "func \(.*\) Subscribe\(" protocol/streaming/full_node_streaming_manager.go protocol/streaming/noop_streaming_manager.go

Length of output: 3203

protocol/streaming/ws/websocket_server.go (2)

19-22: LGTM! Well-defined constants.

The constants are appropriately named and follow Go naming conventions.


157-176: ⚠️ Potential issue

Add comprehensive input validation as suggested in the past review.

The current implementation lacks important validations that were suggested in a previous review. These validations are crucial for robust error handling and security:

  1. Maximum input length check to prevent DoS attacks
  2. Empty string element validation
  3. Duplicate ID validation
  4. Explicit overflow handling

The previous review provided a detailed implementation that should be incorporated.

protocol/x/clob/keeper/keeper.go (2)

25-25: LGTM: Import addition is appropriate

The addition of the prices types import aligns with the new price streaming functionality.


330-347: LGTM: Well-structured implementation following existing patterns

The implementation properly follows the established pattern for snapshot retrieval and streaming initialization. The method correctly validates the context mode and uses appropriate keeper methods for data retrieval.

Let's verify the streaming manager implementation:

✅ Verification successful

Streaming manager implementation verified and properly integrated

The implementation in FullNodeStreamingManagerImpl correctly:

  • Maintains thread safety with proper locking
  • Efficiently collects price snapshots only for uninitialized subscriptions
  • Avoids duplicate snapshots for the same market ID
  • Returns the expected map of market IDs to price updates
  • Properly uses the provided callback function to fetch price data

The noop implementation appropriately returns nil, following the expected pattern for no-op managers.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the streaming manager implementation of GetPriceSnapshotsForInitStreams

# Search for the corresponding method in the streaming manager
ast-grep --pattern 'GetPriceSnapshotsForInitStreams($_) {
  $$$
}'

Length of output: 68


Script:

#!/bin/bash
# Let's try a broader search for the streaming manager implementation
# First, find files that might contain the streaming manager
rg -l "StreamingManager" --type go

# Then search for any mentions of GetPriceSnapshotsForInitStreams
rg "GetPriceSnapshotsForInitStreams" --type go

Length of output: 1294


Script:

#!/bin/bash
# Let's examine the implementation in the full node streaming manager
rg -A 10 "GetPriceSnapshotsForInitStreams" protocol/streaming/full_node_streaming_manager.go

# Also check the interface definition
rg -A 5 "GetPriceSnapshotsForInitStreams" protocol/streaming/types/interface.go

Length of output: 924


Script:

#!/bin/bash
# Let's see the complete implementation of GetPriceSnapshotsForInitStreams in the full node manager
rg -A 20 "GetPriceSnapshotsForInitStreams" protocol/streaming/full_node_streaming_manager.go

# Also check the noop implementation for comparison
rg -A 10 "GetPriceSnapshotsForInitStreams" protocol/streaming/noop_streaming_manager.go

Length of output: 1349

protocol/streaming/full_node_streaming_manager.go (3)

10-10: LGTM: Clean structural additions for price streaming support

The new imports and field additions follow the existing patterns consistently.

Also applies to: 56-57, 85-86


410-426: LGTM: Clean implementation of price update conversion

The function follows the established pattern of other conversion functions in the codebase.


1123-1133: Improve error handling for missing price updates

The current error handling:

  1. Only logs an error without providing actionable information
  2. Continues execution which could lead to incomplete data being sent to clients

This issue was previously identified in a past review. Consider implementing proper error handling:

  1. Add context about why the price update might be missing
  2. Consider if continuing with incomplete data is appropriate
  3. Add metrics to track these occurrences

@jayy04 jayy04 merged commit fefd10e into main Nov 22, 2024
22 checks passed
@jayy04 jayy04 deleted the jy/ct-1321 branch November 22, 2024 17:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

2 participants