Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FNS polish - metrics, max msg size, default flag values #2517

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ const (

DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBatchSize = 2000
DefaultGrpcStreamingMaxChannelBufferSize = 2000
DefaultGrpcStreamingMaxBatchSize = 10000
DefaultGrpcStreamingMaxChannelBufferSize = 10000
DefaultWebsocketStreamingEnabled = false
DefaultWebsocketStreamingPort = 9092
DefaultFullNodeStreamingSnapshotInterval = 0
Expand Down
16 changes: 8 additions & 8 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
GrpcStreamingMaxBatchSize: 10000,
GrpcStreamingMaxChannelBufferSize: 10000,
WebsocketStreamingEnabled: false,
},
},
Expand All @@ -100,8 +100,8 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
GrpcStreamingMaxBatchSize: 10000,
GrpcStreamingMaxChannelBufferSize: 10000,
WebsocketStreamingEnabled: true,
WebsocketStreamingPort: 8989,
},
Expand All @@ -119,9 +119,9 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
OptimisticExecutionEnabled: true,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxBatchSize: 10000,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxChannelBufferSize: 2000,
GrpcStreamingMaxChannelBufferSize: 10000,
WebsocketStreamingPort: 8989,
},
},
Expand Down Expand Up @@ -257,8 +257,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcEnable: true,
expectedGrpcStreamingEnable: false,
expectedGrpcStreamingFlushMs: 50,
expectedGrpcStreamingBatchSize: 2000,
expectedGrpcStreamingMaxChannelBufferSize: 2000,
expectedGrpcStreamingBatchSize: 10000,
expectedGrpcStreamingMaxChannelBufferSize: 10000,
expectedWebsocketEnabled: false,
expectedWebsocketPort: 9092,
expectedFullNodeStreamingSnapshotInterval: 0,
Expand Down
4 changes: 2 additions & 2 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ const (
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
Expand All @@ -86,6 +85,7 @@ const (
GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count"
GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count"
GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count"
SubscriptionId = "subscription_id"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
31 changes: 25 additions & 6 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool {
}

func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
metrics.SetGauge(
metrics.AddSample(
metrics.GrpcStreamNumUpdatesBuffered,
float32(len(sm.streamUpdateCache)),
Comment on lines +156 to 158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use SetGauge for gauge metrics instead of AddSample

In the EmitMetrics method, you replaced metrics.SetGauge with metrics.AddSample for the GrpcStreamNumUpdatesBuffered metric. Since this metric represents the current number of updates buffered, which is a gauge (a value that can go up or down), it would be more appropriate to continue using metrics.SetGauge to accurately reflect the current state rather than accumulating samples.

Apply this diff to revert to using SetGauge:

- metrics.AddSample(
+ metrics.SetGauge(
    metrics.GrpcStreamNumUpdatesBuffered,
    float32(len(sm.streamUpdateCache)),
)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
metrics.AddSample(
metrics.GrpcStreamNumUpdatesBuffered,
float32(len(sm.streamUpdateCache)),
metrics.SetGauge(
metrics.GrpcStreamNumUpdatesBuffered,
float32(len(sm.streamUpdateCache)),

)
Expand All @@ -162,9 +162,10 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
float32(len(sm.orderbookSubscriptions)),
)
for _, subscription := range sm.orderbookSubscriptions {
metrics.AddSample(
metrics.AddSampleWithLabels(
metrics.GrpcSubscriptionChannelLength,
float32(len(subscription.updatesChannel)),
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
Comment on lines +165 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use SetGaugeWithLabels for current values

For the GrpcSubscriptionChannelLength metric, you're now using metrics.AddSampleWithLabels. If the intention is to record the current length of each subscription's update channel (which is a gauge), consider using metrics.SetGaugeWithLabels to set the value directly. This provides an accurate snapshot of each channel's length at the time of metric emission.

Apply this diff to use SetGaugeWithLabels:

- metrics.AddSampleWithLabels(
+ metrics.SetGaugeWithLabels(
    metrics.GrpcSubscriptionChannelLength,
    float32(len(subscription.updatesChannel)),
    metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
metrics.AddSampleWithLabels(
metrics.GrpcSubscriptionChannelLength,
float32(len(subscription.updatesChannel)),
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
metrics.SetGaugeWithLabels(
metrics.GrpcSubscriptionChannelLength,
float32(len(subscription.updatesChannel)),
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),

)
}
}
Expand Down Expand Up @@ -234,9 +235,10 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range subscription.updatesChannel {
metrics.IncrCounter(
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
)
err = subscription.messageSender.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Expand Down Expand Up @@ -372,9 +374,17 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
return
}

metrics.IncrCounterWithLabels(
metrics.GrpcAddToSubscriptionChannelCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscriptionId)),
)

select {
case subscription.updatesChannel <- streamUpdates:
default:
// Buffer is full. Emit metric and drop subscription.
sm.EmitMetrics()
Comment on lines +386 to +387
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid emitting metrics inside tight loops or error paths

Calling sm.EmitMetrics() inside the default case when the subscription channel is full could lead to performance issues, especially under high load. Emitting metrics is an I/O operation and might be resource-intensive if called frequently. Consider removing this call or ensuring it's throttled appropriately.

sm.logger.Error(
fmt.Sprintf(
"Streaming subscription id %+v channel full capacity. Dropping subscription connection.",
Expand All @@ -399,6 +409,11 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
return
}

metrics.IncrCounter(
metrics.GrpcSendSubaccountUpdateCount,
1,
)

Comment on lines +412 to +416
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add labels to GrpcSendSubaccountUpdateCount for better granularity

Currently, metrics.GrpcSendSubaccountUpdateCount is incremented without any labels. Adding labels such as the subaccount ID can provide more granular insights into metric data, which can be valuable for debugging and monitoring.

Apply this diff to include the subaccount ID as a label:

metrics.IncrCounter(
-   metrics.GrpcSendSubaccountUpdateCount,
-   1,
+   metrics.GrpcSendSubaccountUpdateCount,
+   1,
+   metrics.GetLabelForStringValue(metrics.SubaccountId, subaccountUpdate.SubaccountId.String()),
)

Committable suggestion was skipped due to low confidence.

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
Expand Down Expand Up @@ -705,9 +720,9 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(

sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds)

sm.EmitMetrics()
// Remove all subscriptions and wipe the buffer if buffer overflows.
sm.RemoveSubscriptionsAndClearBufferIfFull()
sm.EmitMetrics()
}

// AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache.
Expand All @@ -726,8 +741,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache(

sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds)

sm.RemoveSubscriptionsAndClearBufferIfFull()
sm.EmitMetrics()
sm.RemoveSubscriptionsAndClearBufferIfFull()
}

// RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows.
Expand All @@ -743,6 +758,7 @@ func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull(
}
sm.streamUpdateCache = nil
sm.streamUpdateSubscriptionCache = nil
sm.EmitMetrics()
}
}

Expand Down Expand Up @@ -778,13 +794,16 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
// If the buffer is full, drop the subscription.
for id, updates := range subscriptionUpdates {
if subscription, ok := sm.orderbookSubscriptions[id]; ok {
metrics.IncrCounter(
metrics.IncrCounterWithLabels(
metrics.GrpcAddToSubscriptionChannelCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(id)),
Comment on lines +797 to +800
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid duplicate metric increments to prevent misleading data

You're incrementing metrics.GrpcAddToSubscriptionChannelCount in both sendStreamUpdates and FlushStreamUpdatesWithLock. This could result in counting the same event twice, leading to inaccurate metrics. Decide on a single place to increment this counter to ensure the metric reflects the actual number of events accurately.

)
select {
case subscription.updatesChannel <- updates:
default:
// Buffer is full. Emit metric and drop subscription.
sm.EmitMetrics()
Comment on lines +805 to +806
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Limit metric emissions in high-frequency error scenarios

Similar to previous comments, calling sm.EmitMetrics() when a subscription's channel is full within a select default case can lead to performance degradation under high load. Consider removing this call or implementing a rate-limiting mechanism to prevent excessive metric emissions.

idsToRemove = append(idsToRemove, id)
}
}
Expand Down
3 changes: 3 additions & 0 deletions protocol/streaming/ws/websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
}
defer conn.Close()

// Set ws max message size to 10 mb.
conn.SetReadLimit(10 * 1024 * 1024)

// Parse clobPairIds from query parameters
clobPairIds, err := parseClobPairIds(r)
if err != nil {
Expand Down
Loading