diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index aadc805696..b166f88352 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -72,8 +72,8 @@ const ( DefaultGrpcStreamingEnabled = false DefaultGrpcStreamingFlushIntervalMs = 50 - DefaultGrpcStreamingMaxBatchSize = 2000 - DefaultGrpcStreamingMaxChannelBufferSize = 2000 + DefaultGrpcStreamingMaxBatchSize = 10000 + DefaultGrpcStreamingMaxChannelBufferSize = 10000 DefaultWebsocketStreamingEnabled = false DefaultWebsocketStreamingPort = 9092 DefaultFullNodeStreamingSnapshotInterval = 0 diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index 0ad03872fa..8b3ed1a8f4 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -89,8 +89,8 @@ func TestValidate(t *testing.T) { GrpcEnable: true, GrpcStreamingEnabled: true, GrpcStreamingFlushIntervalMs: 100, - GrpcStreamingMaxBatchSize: 2000, - GrpcStreamingMaxChannelBufferSize: 2000, + GrpcStreamingMaxBatchSize: 10000, + GrpcStreamingMaxChannelBufferSize: 10000, WebsocketStreamingEnabled: false, }, }, @@ -100,8 +100,8 @@ func TestValidate(t *testing.T) { GrpcEnable: true, GrpcStreamingEnabled: true, GrpcStreamingFlushIntervalMs: 100, - GrpcStreamingMaxBatchSize: 2000, - GrpcStreamingMaxChannelBufferSize: 2000, + GrpcStreamingMaxBatchSize: 10000, + GrpcStreamingMaxChannelBufferSize: 10000, WebsocketStreamingEnabled: true, WebsocketStreamingPort: 8989, }, @@ -119,9 +119,9 @@ func TestValidate(t *testing.T) { GrpcEnable: true, GrpcStreamingEnabled: true, OptimisticExecutionEnabled: true, - GrpcStreamingMaxBatchSize: 2000, + GrpcStreamingMaxBatchSize: 10000, GrpcStreamingFlushIntervalMs: 100, - GrpcStreamingMaxChannelBufferSize: 2000, + GrpcStreamingMaxChannelBufferSize: 10000, WebsocketStreamingPort: 8989, }, }, @@ -257,8 +257,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcEnable: true, expectedGrpcStreamingEnable: false, expectedGrpcStreamingFlushMs: 50, - expectedGrpcStreamingBatchSize: 2000, - expectedGrpcStreamingMaxChannelBufferSize: 2000, + expectedGrpcStreamingBatchSize: 10000, + expectedGrpcStreamingMaxChannelBufferSize: 10000, expectedWebsocketEnabled: false, expectedWebsocketPort: 9092, expectedFullNodeStreamingSnapshotInterval: 0, diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index c84abc2fb1..18ef5fab1b 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -73,9 +73,8 @@ const ( FullNodeGrpc = "full_node_grpc" GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" - GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency" + GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" - GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency" GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" @@ -86,6 +85,7 @@ const ( GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count" GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count" GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count" + SubscriptionId = "subscription_id" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index f63beac347..6cf24bd0b5 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -153,7 +153,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool { } func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { - metrics.SetGauge( + metrics.AddSample( metrics.GrpcStreamNumUpdatesBuffered, float32(len(sm.streamUpdateCache)), ) @@ -162,9 +162,10 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { float32(len(sm.orderbookSubscriptions)), ) for _, subscription := range sm.orderbookSubscriptions { - metrics.AddSample( + metrics.AddSampleWithLabels( metrics.GrpcSubscriptionChannelLength, float32(len(subscription.updatesChannel)), + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), ) } } @@ -234,9 +235,10 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( // Use current goroutine to consistently poll subscription channel for updates // to send through stream. for updates := range subscription.updatesChannel { - metrics.IncrCounter( + metrics.IncrCounterWithLabels( metrics.GrpcSendResponseToSubscriberCount, 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), ) err = subscription.messageSender.Send( &clobtypes.StreamOrderbookUpdatesResponse{ @@ -372,9 +374,17 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates( return } + metrics.IncrCounterWithLabels( + metrics.GrpcAddToSubscriptionChannelCount, + 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscriptionId)), + ) + select { case subscription.updatesChannel <- streamUpdates: default: + // Buffer is full. Emit metric and drop subscription. + sm.EmitMetrics() sm.logger.Error( fmt.Sprintf( "Streaming subscription id %+v channel full capacity. Dropping subscription connection.", @@ -399,6 +409,11 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( return } + metrics.IncrCounter( + metrics.GrpcSendSubaccountUpdateCount, + 1, + ) + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ @@ -705,9 +720,9 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds) + sm.EmitMetrics() // Remove all subscriptions and wipe the buffer if buffer overflows. sm.RemoveSubscriptionsAndClearBufferIfFull() - sm.EmitMetrics() } // AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache. @@ -726,8 +741,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache( sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds) - sm.RemoveSubscriptionsAndClearBufferIfFull() sm.EmitMetrics() + sm.RemoveSubscriptionsAndClearBufferIfFull() } // RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows. @@ -743,6 +758,7 @@ func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull( } sm.streamUpdateCache = nil sm.streamUpdateSubscriptionCache = nil + sm.EmitMetrics() } } @@ -778,13 +794,16 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { // If the buffer is full, drop the subscription. for id, updates := range subscriptionUpdates { if subscription, ok := sm.orderbookSubscriptions[id]; ok { - metrics.IncrCounter( + metrics.IncrCounterWithLabels( metrics.GrpcAddToSubscriptionChannelCount, 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(id)), ) select { case subscription.updatesChannel <- updates: default: + // Buffer is full. Emit metric and drop subscription. + sm.EmitMetrics() idsToRemove = append(idsToRemove, id) } } diff --git a/protocol/streaming/ws/websocket_server.go b/protocol/streaming/ws/websocket_server.go index 0b66804595..c94e84208e 100644 --- a/protocol/streaming/ws/websocket_server.go +++ b/protocol/streaming/ws/websocket_server.go @@ -56,6 +56,9 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { } defer conn.Close() + // Set ws max message size to 10 mb. + conn.SetReadLimit(10 * 1024 * 1024) + // Parse clobPairIds from query parameters clobPairIds, err := parseClobPairIds(r) if err != nil {