Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase FNS default numbers and close connections properly with reasons #2634

Merged
merged 3 commits into from
Dec 10, 2024

Conversation

jayy04
Copy link
Contributor

@jayy04 jayy04 commented Dec 9, 2024

Changelist

❯ wscat -c 'ws://127.0.0.1:9092/ws?clobPairIds=d'

Connected (press CTRL+C to quit)
Disconnected (code: 1003, reason: "invalid clobPairIds: d")

Test Plan

[Describe how this PR was tested (if applicable)]

Author/Reviewer Checklist

  • If this PR has changes that result in a different app state given the same prior state and transaction list, manually add the state-breaking label.
  • If the PR has breaking postgres changes to the indexer add the indexer-postgres-breaking label.
  • If this PR isn't state-breaking but has changes that modify behavior in PrepareProposal or ProcessProposal, manually add the label proposal-breaking.
  • If this PR is one of many that implement a specific feature, manually label them all feature:[feature-name].
  • If you wish to for mergify-bot to automatically create a PR to backport your change to a release branch, manually add the label backport/[branch-name].
  • Manually add any of the following labels: refactor, chore, bug.

Summary by CodeRabbit

  • New Features

    • Enhanced error handling in WebSocket connections, providing clearer closure messages for specific errors.
    • Introduced a new constant for managing write deadlines during WebSocket operations.
  • Improvements

    • Updated default values for gRPC streaming configuration, increasing maximum batch size and channel buffer size.
  • Bug Fixes

    • Improved error handling logic to ensure proper closure of WebSocket connections on parsing errors.

@jayy04 jayy04 requested a review from a team as a code owner December 9, 2024 20:59
Copy link
Contributor

coderabbitai bot commented Dec 9, 2024

Walkthrough

The changes in this pull request involve modifications to two files: flags.go and websocket_server.go. In flags.go, the default values for two constants related to gRPC streaming are increased from 10000 to 100000. In websocket_server.go, error handling in the Handler method is enhanced by introducing a new function for closing WebSocket connections with a reason, along with a new constant for write deadlines. These updates improve the configuration options for gRPC streaming and refine error management in WebSocket handling.

Changes

File Path Change Summary
protocol/app/flags/flags.go Updated constants DefaultGrpcStreamingMaxBatchSize and DefaultGrpcStreamingMaxChannelBufferSize from 10000 to 100000.
protocol/streaming/ws/websocket_server.go Enhanced error handling in Handler method; added function sendCloseWithReason and constant CLOSE_DEADLINE.
protocol/app/flags/flags_test.go Updated expected values for expectedGrpcStreamingBatchSize and expectedGrpcStreamingMaxChannelBufferSize from 10000 to 100000.

Possibly related PRs

Suggested labels

indexer

Suggested reviewers

  • dydxwill

🐰 In the code where the flags do play,
The numbers grew big, brightening the way.
With webs now closed, a reason to share,
Errors handled with utmost care.
So hop along, let the changes ring,
For better streaming, let the code sing! 🎶


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@jayy04
Copy link
Contributor Author

jayy04 commented Dec 9, 2024

@Mergifyio backport release/protocol/v8.x

Copy link
Contributor

mergify bot commented Dec 9, 2024

backport release/protocol/v8.x

✅ Backports have been created

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
protocol/streaming/ws/websocket_server.go (1)

119-128: Consider adding error logging in sendCloseWithReason

While the implementation is correct, it would be beneficial to log any errors that occur during the close operation, as they could indicate networking issues or help with debugging.

 func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error {
     closeMessage := websocket.FormatCloseMessage(closeCode, reason)
     // Set a write deadline to avoid blocking indefinitely
     conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE))
-    return conn.WriteControl(
+    err := conn.WriteControl(
         websocket.CloseMessage,
         closeMessage,
         time.Now().Add(CLOSE_DEADLINE),
     )
+    if err != nil {
+        // Use the logger from the WebsocketServer struct
+        // This would require modifying the function signature to accept the logger
+        logger.Error("Failed to send close message", "error", err)
+    }
+    return err
 }
protocol/app/flags/flags.go (1)

75-76: Consider documenting the rationale for buffer size increase

While the increased buffer sizes align well with the improved WebSocket connection handling, it would be helpful to document:

  1. The reasoning behind choosing 100,000 as the new default
  2. Any performance benchmarks that led to this decision
  3. Guidelines for operators who might need to tune these values
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between fc5ff36 and 0ef04b2.

📒 Files selected for processing (2)
  • protocol/app/flags/flags.go (1 hunks)
  • protocol/streaming/ws/websocket_server.go (3 hunks)
🔇 Additional comments (3)
protocol/streaming/ws/websocket_server.go (2)

22-23: LGTM: Reasonable timeout for connection closure

The 5-second deadline for closing WebSocket connections is a reasonable default that balances between giving enough time for the close message to be sent while preventing indefinite blocking.


73-74: Improved error handling with proper WebSocket closure

The error handling now properly closes WebSocket connections with specific error reasons instead of just logging errors. This is a significant improvement as it:

  1. Provides better feedback to clients about why their connection was terminated
  2. Ensures proper cleanup of WebSocket resources
  3. Follows WebSocket protocol best practices

Also applies to: 81-82, 89-90, 114-114

protocol/app/flags/flags.go (1)

75-76: Verify memory impact of increased buffer sizes

The 10x increase in both DefaultGrpcStreamingMaxBatchSize and DefaultGrpcStreamingMaxChannelBufferSize could significantly impact memory usage, especially under high load.

Please ensure that:

  1. The system has sufficient memory to handle these larger buffers
  2. The increased values align with expected message volumes
  3. There's proper monitoring in place for memory usage

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
protocol/streaming/ws/websocket_server.go (2)

73-76: Enhance error logging and error code specificity

While the error handling is improved with proper connection closure, consider these enhancements:

  1. Add more context to error logs:
  2. Use specific close codes for different error types
-ws.logger.Error("Error parsing clobPairIds", "err", err)
+ws.logger.Error("Failed to parse clobPairIds query parameter", 
+    "err", err,
+    "raw_value", r.URL.Query().Get(CLOB_PAIR_IDS_QUERY_PARAM))

-if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
+// Use specific close codes for different error types
+closeCode := websocket.CloseInvalidFramePayloadData
+if strings.Contains(err.Error(), "invalid format") {
+    closeCode = websocket.CloseUnsupportedData
+}
+if err := sendCloseWithReason(conn, closeCode, err.Error()); err != nil {

Also applies to: 83-86, 93-96, 120-122


Line range hint 1-139: Consider adding WebSocket server robustness features

The current implementation could benefit from additional features for production readiness:

  1. Rate limiting and connection limits to prevent DoS
  2. Ping/pong mechanism for connection health monitoring
  3. Graceful shutdown handling in the Handler

Consider implementing:

  1. A connection limit using a semaphore
  2. Regular ping/pong checks with timeout
  3. Context-based cancellation for graceful shutdown

Example ping/pong implementation:

const (
    // Add to existing constants
    pingPeriod = 30 * time.Second
    pongWait = 60 * time.Second
)

func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
    // After upgrading connection
    conn.SetReadDeadline(time.Now().Add(pongWait))
    conn.SetPongHandler(func(string) error {
        conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })

    // Start ping ticker in a separate goroutine
    ticker := time.NewTicker(pingPeriod)
    defer ticker.Stop()
    go func() {
        for {
            select {
            case <-ticker.C:
                if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil {
                    return
                }
            }
        }
    }()
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 0ef04b2 and b62cca1.

📒 Files selected for processing (1)
  • protocol/streaming/ws/websocket_server.go (3 hunks)
🔇 Additional comments (1)
protocol/streaming/ws/websocket_server.go (1)

22-23: LGTM: Reasonable timeout value for connection closure

The 5-second deadline is appropriate for network operations and follows Go naming conventions.

Comment on lines +127 to +138
func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error {
closeMessage := websocket.FormatCloseMessage(closeCode, reason)
// Set a write deadline to avoid blocking indefinitely
if err := conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)); err != nil {
return err
}
return conn.WriteControl(
websocket.CloseMessage,
closeMessage,
time.Now().Add(CLOSE_DEADLINE),
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve robustness of connection closure handling

The function handles basic connection closure, but could be enhanced for better reliability:

  1. The same deadline is used twice unnecessarily
  2. Write deadline should be cleaned up after operation
  3. Consider adding context for cancellation support
  4. Error handling could be more comprehensive
-func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error {
+func sendCloseWithReason(ctx context.Context, conn *websocket.Conn, closeCode int, reason string) error {
     closeMessage := websocket.FormatCloseMessage(closeCode, reason)
+    deadline := time.Now().Add(CLOSE_DEADLINE)
     
-    // Set a write deadline to avoid blocking indefinitely
-    if err := conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)); err != nil {
+    // Set write deadline and ensure it's cleared
+    if err := conn.SetWriteDeadline(deadline); err != nil {
         return fmt.Errorf("failed to set write deadline: %w", err)
     }
+    defer conn.SetWriteDeadline(time.Time{})
     
-    return conn.WriteControl(
+    // Use context for cancellation support
+    done := make(chan error, 1)
+    go func() {
+        done <- conn.WriteControl(
             websocket.CloseMessage,
             closeMessage,
-            time.Now().Add(CLOSE_DEADLINE),
-    )
+            deadline,
+        )
+    }()
+    
+    select {
+    case err := <-done:
+        return err
+    case <-ctx.Done():
+        return ctx.Err()
+    }
 }

Update the callers to pass context:

-if err := sendCloseWithReason(conn, closeCode, err.Error()); err != nil {
+if err := sendCloseWithReason(r.Context(), conn, closeCode, err.Error()); err != nil {

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
protocol/app/flags/flags_test.go (1)

Line range hint 82-89: Update gRPC streaming values in TestValidate test cases

The test cases in TestValidate still use the old value of 10000 for GrpcStreamingMaxBatchSize and GrpcStreamingMaxChannelBufferSize. Consider updating these to match the new default value of 100000 for consistency.

Here's an example of the affected test cases that need updating:

  • "success - gRPC streaming enabled for validating nodes"
  • "success - both grpc and websocket streaming enabled for validating nodes"

Apply this diff to update the values:

 "success - gRPC streaming enabled for validating nodes": {
   flags: flags.Flags{
     NonValidatingFullNode:             false,
     GrpcEnable:                        true,
     GrpcStreamingEnabled:              true,
     GrpcStreamingFlushIntervalMs:      100,
-    GrpcStreamingMaxBatchSize:         10000,
-    GrpcStreamingMaxChannelBufferSize: 10000,
+    GrpcStreamingMaxBatchSize:         100000,
+    GrpcStreamingMaxChannelBufferSize: 100000,
     WebsocketStreamingEnabled:         false,
   },
 },

Also applies to: 108-116

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b62cca1 and 71e72c6.

📒 Files selected for processing (1)
  • protocol/app/flags/flags_test.go (1 hunks)
🔇 Additional comments (1)
protocol/app/flags/flags_test.go (1)

260-261: LGTM! Default values correctly updated.

The test case correctly reflects the new default values for gRPC streaming configuration:

  • expectedGrpcStreamingBatchSize: 100_000
  • expectedGrpcStreamingMaxChannelBufferSize: 100_000

@jayy04 jayy04 merged commit bed7a87 into main Dec 10, 2024
22 checks passed
@jayy04 jayy04 deleted the jy/fns-default branch December 10, 2024 18:15
mergify bot pushed a commit that referenced this pull request Dec 10, 2024
jayy04 added a commit that referenced this pull request Dec 10, 2024
…ons (backport #2634) (#2639)

Co-authored-by: jayy04 <103467857+jayy04@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

3 participants