Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase FNS default numbers and close connections properly with reasons #2634

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ const (

DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBatchSize = 10000
DefaultGrpcStreamingMaxChannelBufferSize = 10000
DefaultGrpcStreamingMaxBatchSize = 100_000
DefaultGrpcStreamingMaxChannelBufferSize = 100_000
DefaultWebsocketStreamingEnabled = false
DefaultWebsocketStreamingPort = 9092
DefaultFullNodeStreamingSnapshotInterval = 0
Expand Down
4 changes: 2 additions & 2 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcEnable: true,
expectedGrpcStreamingEnable: false,
expectedGrpcStreamingFlushMs: 50,
expectedGrpcStreamingBatchSize: 10000,
expectedGrpcStreamingMaxChannelBufferSize: 10000,
expectedGrpcStreamingBatchSize: 100_000,
expectedGrpcStreamingMaxChannelBufferSize: 100_000,
expectedWebsocketEnabled: false,
expectedWebsocketPort: 9092,
expectedFullNodeStreamingSnapshotInterval: 0,
Expand Down
45 changes: 30 additions & 15 deletions protocol/streaming/ws/websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
const (
CLOB_PAIR_IDS_QUERY_PARAM = "clobPairIds"
MARKET_IDS_QUERY_PARAM = "marketIds"

CLOSE_DEADLINE = 5 * time.Second
)

var upgrader = websocket.Upgrader{
Expand Down Expand Up @@ -68,33 +70,30 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
// Parse clobPairIds from query parameters
clobPairIds, err := parseUint32(r, CLOB_PAIR_IDS_QUERY_PARAM)
if err != nil {
ws.logger.Error(
"Error parsing clobPairIds",
"err", err,
)
http.Error(w, err.Error(), http.StatusBadRequest)
ws.logger.Error("Error parsing clobPairIds", "err", err)
if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
ws.logger.Error("Error sending close message", "err", err)
}
return
}

// Parse marketIds from query parameters
marketIds, err := parseUint32(r, MARKET_IDS_QUERY_PARAM)
if err != nil {
ws.logger.Error(
"Error parsing marketIds",
"err", err,
)
http.Error(w, err.Error(), http.StatusBadRequest)
ws.logger.Error("Error parsing marketIds", "err", err)
if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
ws.logger.Error("Error sending close message", "err", err)
}
return
}

// Parse subaccountIds from query parameters
subaccountIds, err := parseSubaccountIds(r)
if err != nil {
ws.logger.Error(
"Error parsing subaccountIds",
"err", err,
)
http.Error(w, err.Error(), http.StatusBadRequest)
ws.logger.Error("Error parsing subaccountIds", "err", err)
if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
ws.logger.Error("Error sending close message", "err", err)
}
return
}

Expand All @@ -118,10 +117,26 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
"Ending handler for websocket connection",
"err", err,
)
if err := sendCloseWithReason(conn, websocket.CloseInternalServerErr, err.Error()); err != nil {
ws.logger.Error("Error sending close message", "err", err)
}
return
}
}

func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error {
closeMessage := websocket.FormatCloseMessage(closeCode, reason)
// Set a write deadline to avoid blocking indefinitely
if err := conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)); err != nil {
return err
}
return conn.WriteControl(
websocket.CloseMessage,
closeMessage,
time.Now().Add(CLOSE_DEADLINE),
)
}
Comment on lines +127 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve robustness of connection closure handling

The function handles basic connection closure, but could be enhanced for better reliability:

  1. The same deadline is used twice unnecessarily
  2. Write deadline should be cleaned up after operation
  3. Consider adding context for cancellation support
  4. Error handling could be more comprehensive
-func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error {
+func sendCloseWithReason(ctx context.Context, conn *websocket.Conn, closeCode int, reason string) error {
     closeMessage := websocket.FormatCloseMessage(closeCode, reason)
+    deadline := time.Now().Add(CLOSE_DEADLINE)
     
-    // Set a write deadline to avoid blocking indefinitely
-    if err := conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)); err != nil {
+    // Set write deadline and ensure it's cleared
+    if err := conn.SetWriteDeadline(deadline); err != nil {
         return fmt.Errorf("failed to set write deadline: %w", err)
     }
+    defer conn.SetWriteDeadline(time.Time{})
     
-    return conn.WriteControl(
+    // Use context for cancellation support
+    done := make(chan error, 1)
+    go func() {
+        done <- conn.WriteControl(
             websocket.CloseMessage,
             closeMessage,
-            time.Now().Add(CLOSE_DEADLINE),
-    )
+            deadline,
+        )
+    }()
+    
+    select {
+    case err := <-done:
+        return err
+    case <-ctx.Done():
+        return ctx.Err()
+    }
 }

Update the callers to pass context:

-if err := sendCloseWithReason(conn, closeCode, err.Error()); err != nil {
+if err := sendCloseWithReason(r.Context(), conn, closeCode, err.Error()); err != nil {

Committable suggestion skipped: line range outside the PR's diff.


// parseSubaccountIds is a helper function to parse the subaccountIds from the query parameters.
func parseSubaccountIds(r *http.Request) ([]*satypes.SubaccountId, error) {
subaccountIdsParam := r.URL.Query().Get("subaccountIds")
Expand Down
Loading