diff --git a/protocol/app/app.go b/protocol/app/app.go index 6028530e97..656713122d 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1958,7 +1958,8 @@ func getGrpcStreamingManagerFromOptions( return streaming.NewGrpcStreamingManager( logger, appFlags.GrpcStreamingFlushIntervalMs, - appFlags.GrpcStreamingMaxBufferSize, + appFlags.GrpcStreamingMaxBatchSize, + appFlags.GrpcStreamingMaxChannelBufferSize, ) } return streaming.NewNoopGrpcStreamingManager() diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index 75708ef92b..640de83bbe 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -21,9 +21,10 @@ type Flags struct { GrpcEnable bool // Grpc Streaming - GrpcStreamingEnabled bool - GrpcStreamingFlushIntervalMs uint32 - GrpcStreamingMaxBufferSize uint32 + GrpcStreamingEnabled bool + GrpcStreamingFlushIntervalMs uint32 + GrpcStreamingMaxBatchSize uint32 + GrpcStreamingMaxChannelBufferSize uint32 VEOracleEnabled bool // Slinky Vote Extensions } @@ -40,9 +41,10 @@ const ( GrpcEnable = "grpc.enable" // Grpc Streaming - GrpcStreamingEnabled = "grpc-streaming-enabled" - GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms" - GrpcStreamingMaxBufferSize = "grpc-streaming-max-buffer-size" + GrpcStreamingEnabled = "grpc-streaming-enabled" + GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms" + GrpcStreamingMaxBatchSize = "grpc-streaming-max-batch-size" + GrpcStreamingMaxChannelBufferSize = "grpc-streaming-max-channel-buffer-size" // Slinky VEs enabled VEOracleEnabled = "slinky-vote-extension-oracle-enabled" @@ -55,9 +57,10 @@ const ( DefaultNonValidatingFullNode = false DefaultDdErrorTrackingFormat = false - DefaultGrpcStreamingEnabled = false - DefaultGrpcStreamingFlushIntervalMs = 50 - DefaultGrpcStreamingMaxBufferSize = 10000 + DefaultGrpcStreamingEnabled = false + DefaultGrpcStreamingFlushIntervalMs = 50 + DefaultGrpcStreamingMaxBatchSize = 10000 + DefaultGrpcStreamingMaxChannelBufferSize = 10000 DefaultVEOracleEnabled = true ) @@ -99,9 +102,14 @@ func AddFlagsToCmd(cmd *cobra.Command) { "Flush interval (in ms) for grpc streaming", ) cmd.Flags().Uint32( - GrpcStreamingMaxBufferSize, - DefaultGrpcStreamingMaxBufferSize, - "Maximum buffer size before grpc streaming cancels all connections", + GrpcStreamingMaxBatchSize, + DefaultGrpcStreamingMaxBatchSize, + "Maximum batch size before grpc streaming cancels all subscriptions", + ) + cmd.Flags().Uint32( + GrpcStreamingMaxChannelBufferSize, + DefaultGrpcStreamingMaxChannelBufferSize, + "Maximum per-subscription channel size before grpc streaming cancels a singular subscription", ) cmd.Flags().Bool( VEOracleEnabled, @@ -122,12 +130,15 @@ func (f *Flags) Validate() error { if !f.GrpcEnable { return fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server") } - if f.GrpcStreamingMaxBufferSize == 0 { - return fmt.Errorf("grpc streaming buffer size must be positive number") + if f.GrpcStreamingMaxBatchSize == 0 { + return fmt.Errorf("grpc streaming batch size must be positive number") } if f.GrpcStreamingFlushIntervalMs == 0 { return fmt.Errorf("grpc streaming flush interval must be positive number") } + if f.GrpcStreamingMaxChannelBufferSize == 0 { + return fmt.Errorf("grpc streaming channel size must be positive number") + } } return nil } @@ -148,9 +159,10 @@ func GetFlagValuesFromOptions( GrpcAddress: config.DefaultGRPCAddress, GrpcEnable: true, - GrpcStreamingEnabled: DefaultGrpcStreamingEnabled, - GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs, - GrpcStreamingMaxBufferSize: DefaultGrpcStreamingMaxBufferSize, + GrpcStreamingEnabled: DefaultGrpcStreamingEnabled, + GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs, + GrpcStreamingMaxBatchSize: DefaultGrpcStreamingMaxBatchSize, + GrpcStreamingMaxChannelBufferSize: DefaultGrpcStreamingMaxChannelBufferSize, VEOracleEnabled: true, } @@ -204,9 +216,15 @@ func GetFlagValuesFromOptions( } } - if option := appOpts.Get(GrpcStreamingMaxBufferSize); option != nil { + if option := appOpts.Get(GrpcStreamingMaxBatchSize); option != nil { + if v, err := cast.ToUint32E(option); err == nil { + result.GrpcStreamingMaxBatchSize = v + } + } + + if option := appOpts.Get(GrpcStreamingMaxChannelBufferSize); option != nil { if v, err := cast.ToUint32E(option); err == nil { - result.GrpcStreamingMaxBufferSize = v + result.GrpcStreamingMaxChannelBufferSize = v } } diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index c3b5a7bc59..5a007d3d1a 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -35,8 +35,11 @@ func TestAddFlagsToCommand(t *testing.T) { fmt.Sprintf("Has %s flag", flags.GrpcStreamingFlushIntervalMs): { flagName: flags.GrpcStreamingFlushIntervalMs, }, - fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxBufferSize): { - flagName: flags.GrpcStreamingMaxBufferSize, + fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxBatchSize): { + flagName: flags.GrpcStreamingMaxBatchSize, + }, + fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxChannelBufferSize): { + flagName: flags.GrpcStreamingMaxChannelBufferSize, }, } @@ -69,11 +72,12 @@ func TestValidate(t *testing.T) { }, "success - gRPC streaming enabled for validating nodes": { flags: flags.Flags{ - NonValidatingFullNode: false, - GrpcEnable: true, - GrpcStreamingEnabled: true, - GrpcStreamingFlushIntervalMs: 100, - GrpcStreamingMaxBufferSize: 10000, + NonValidatingFullNode: false, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 100, + GrpcStreamingMaxBatchSize: 10000, + GrpcStreamingMaxChannelBufferSize: 10000, }, }, "failure - gRPC disabled": { @@ -90,15 +94,15 @@ func TestValidate(t *testing.T) { }, expectedErr: fmt.Errorf("grpc.enable must be set to true - grpc streaming requires gRPC server"), }, - "failure - gRPC streaming enabled with zero buffer size": { + "failure - gRPC streaming enabled with zero batch size": { flags: flags.Flags{ NonValidatingFullNode: true, GrpcEnable: true, GrpcStreamingEnabled: true, GrpcStreamingFlushIntervalMs: 100, - GrpcStreamingMaxBufferSize: 0, + GrpcStreamingMaxBatchSize: 0, }, - expectedErr: fmt.Errorf("grpc streaming buffer size must be positive number"), + expectedErr: fmt.Errorf("grpc streaming batch size must be positive number"), }, "failure - gRPC streaming enabled with zero flush interval ms": { flags: flags.Flags{ @@ -106,10 +110,21 @@ func TestValidate(t *testing.T) { GrpcEnable: true, GrpcStreamingEnabled: true, GrpcStreamingFlushIntervalMs: 0, - GrpcStreamingMaxBufferSize: 10000, + GrpcStreamingMaxBatchSize: 10000, }, expectedErr: fmt.Errorf("grpc streaming flush interval must be positive number"), }, + "failure - gRPC streaming enabled with zero channel size ms": { + flags: flags.Flags{ + NonValidatingFullNode: true, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 100, + GrpcStreamingMaxBatchSize: 10000, + GrpcStreamingMaxChannelBufferSize: 0, + }, + expectedErr: fmt.Errorf("grpc streaming channel size must be positive number"), + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -129,44 +144,48 @@ func TestGetFlagValuesFromOptions(t *testing.T) { optsMap map[string]any // Expectations. - expectedNonValidatingFullNodeFlag bool - expectedDdAgentHost string - expectedDdTraceAgentPort uint16 - expectedGrpcAddress string - expectedGrpcEnable bool - expectedGrpcStreamingEnable bool - expectedGrpcStreamingFlushMs uint32 - expectedGrpcStreamingBufferSize uint32 + expectedNonValidatingFullNodeFlag bool + expectedDdAgentHost string + expectedDdTraceAgentPort uint16 + expectedGrpcAddress string + expectedGrpcEnable bool + expectedGrpcStreamingEnable bool + expectedGrpcStreamingFlushMs uint32 + expectedGrpcStreamingBatchSize uint32 + expectedGrpcStreamingMaxChannelBufferSize uint32 }{ "Sets to default if unset": { - expectedNonValidatingFullNodeFlag: false, - expectedDdAgentHost: "", - expectedDdTraceAgentPort: 8126, - expectedGrpcAddress: "localhost:9090", - expectedGrpcEnable: true, - expectedGrpcStreamingEnable: false, - expectedGrpcStreamingFlushMs: 50, - expectedGrpcStreamingBufferSize: 10000, + expectedNonValidatingFullNodeFlag: false, + expectedDdAgentHost: "", + expectedDdTraceAgentPort: 8126, + expectedGrpcAddress: "localhost:9090", + expectedGrpcEnable: true, + expectedGrpcStreamingEnable: false, + expectedGrpcStreamingFlushMs: 50, + expectedGrpcStreamingBatchSize: 10000, + expectedGrpcStreamingMaxChannelBufferSize: 10000, }, "Sets values from options": { optsMap: map[string]any{ - flags.NonValidatingFullNodeFlag: true, - flags.DdAgentHost: "agentHostTest", - flags.DdTraceAgentPort: uint16(777), - flags.GrpcEnable: false, - flags.GrpcAddress: "localhost:9091", - flags.GrpcStreamingEnabled: "true", - flags.GrpcStreamingFlushIntervalMs: uint32(408), - flags.GrpcStreamingMaxBufferSize: uint32(650), + flags.NonValidatingFullNodeFlag: true, + flags.DdAgentHost: "agentHostTest", + flags.DdTraceAgentPort: uint16(777), + flags.GrpcEnable: false, + flags.GrpcAddress: "localhost:9091", + flags.GrpcStreamingEnabled: "true", + flags.GrpcStreamingFlushIntervalMs: uint32(408), + flags.GrpcStreamingMaxBatchSize: uint32(650), + flags.GrpcStreamingMaxChannelBufferSize: uint32(972), }, - expectedNonValidatingFullNodeFlag: true, - expectedDdAgentHost: "agentHostTest", - expectedDdTraceAgentPort: 777, - expectedGrpcEnable: false, - expectedGrpcAddress: "localhost:9091", - expectedGrpcStreamingEnable: true, - expectedGrpcStreamingFlushMs: 408, - expectedGrpcStreamingBufferSize: 650, + expectedNonValidatingFullNodeFlag: true, + expectedDdAgentHost: "agentHostTest", + expectedDdTraceAgentPort: 777, + expectedGrpcEnable: false, + expectedGrpcAddress: "localhost:9091", + expectedGrpcStreamingEnable: true, + expectedGrpcStreamingFlushMs: 408, + expectedGrpcStreamingBatchSize: 650, + expectedGrpcStreamingMaxChannelBufferSize: 972, }, } @@ -216,8 +235,13 @@ func TestGetFlagValuesFromOptions(t *testing.T) { ) require.Equal( t, - tc.expectedGrpcStreamingBufferSize, - flags.GrpcStreamingMaxBufferSize, + tc.expectedGrpcStreamingBatchSize, + flags.GrpcStreamingMaxBatchSize, + ) + require.Equal( + t, + tc.expectedGrpcStreamingMaxChannelBufferSize, + flags.GrpcStreamingMaxChannelBufferSize, ) }) } diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index f92b19d4a5..e27406dc86 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -69,11 +69,13 @@ const ( GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" - GrpcEmitProtocolUpdateCount = "grpc_emit_protocol_update_count" + GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" + GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered" GrpcFlushUpdatesLatency = "grpc_flush_updates_latency" + GrpcSubscriptionChannelLength = "grpc_subscription_channel_length" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 9ad4dfa5f6..6f447b755a 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -34,11 +34,14 @@ type GrpcStreamingManagerImpl struct { streamUpdateCache map[uint32][]clobtypes.StreamUpdate numUpdatesInCache uint32 - maxUpdatesInCache uint32 + maxUpdatesInCache uint32 + maxSubscriptionChannelSize uint32 } // OrderbookSubscription represents a active subscription to the orderbook updates stream. type OrderbookSubscription struct { + subscriptionId uint32 + // Initialize the subscription with orderbook snapshots. initialize sync.Once @@ -47,12 +50,16 @@ type OrderbookSubscription struct { // Stream srv clobtypes.Query_StreamOrderbookUpdatesServer + + // Channel to buffer writes before the stream + updatesChannel chan []clobtypes.StreamUpdate } func NewGrpcStreamingManager( logger log.Logger, flushIntervalMs uint32, maxUpdatesInCache uint32, + maxSubscriptionChannelSize uint32, ) *GrpcStreamingManagerImpl { logger = logger.With(log.ModuleKey, "grpc-streaming") grpcStreamingManager := &GrpcStreamingManagerImpl{ @@ -65,16 +72,21 @@ func NewGrpcStreamingManager( streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate), numUpdatesInCache: 0, - maxUpdatesInCache: maxUpdatesInCache, + maxUpdatesInCache: maxUpdatesInCache, + maxSubscriptionChannelSize: maxSubscriptionChannelSize, } - // Start the goroutine for pushing order updates through + // Start the goroutine for pushing order updates through. + // Sender goroutine for the subscription channels. go func() { for { select { case <-grpcStreamingManager.ticker.C: grpcStreamingManager.FlushStreamUpdates() case <-grpcStreamingManager.done: + grpcStreamingManager.logger.Info( + "GRPC Stream poller goroutine shutting down", + ) return } } @@ -96,6 +108,12 @@ func (sm *GrpcStreamingManagerImpl) EmitMetrics() { metrics.GrpcStreamSubscriberCount, float32(len(sm.orderbookSubscriptions)), ) + for _, subscription := range sm.orderbookSubscriptions { + metrics.AddSample( + metrics.GrpcSubscriptionChannelLength, + float32(len(subscription.updatesChannel)), + ) + } } // Subscribe subscribes to the orderbook updates stream. @@ -112,25 +130,71 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( return clobtypes.ErrInvalidGrpcStreamingRequest } + sm.Lock() subscription := &OrderbookSubscription{ - clobPairIds: clobPairIds, - srv: srv, + subscriptionId: sm.nextSubscriptionId, + clobPairIds: clobPairIds, + srv: srv, + updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), } - sm.Lock() - defer sm.Unlock() - - sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription + sm.logger.Info( + fmt.Sprintf( + "New subscription id %+v for clob pair ids: %+v", + subscription.subscriptionId, + clobPairIds, + ), + ) + sm.orderbookSubscriptions[subscription.subscriptionId] = subscription sm.nextSubscriptionId++ sm.EmitMetrics() - return nil + sm.Unlock() + + // Use current goroutine to consistently poll subscription channel for updates + // to send through stream. + for updates := range subscription.updatesChannel { + metrics.IncrCounter( + metrics.GrpcSendResponseToSubscriberCount, + 1, + ) + err = subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updates, + }, + ) + if err != nil { + // On error, remove the subscription from the streaming manager + sm.logger.Error( + fmt.Sprintf( + "Error sending out update for grpc streaming subscription %+v. Dropping subsciption connection.", + subscription.subscriptionId, + ), + "err", err, + ) + delete(sm.orderbookSubscriptions, subscription.subscriptionId) + break + } + } + + sm.logger.Info( + fmt.Sprintf( + "Terminating poller for subscription id %+v", + subscription.subscriptionId, + ), + ) + return err } // removeSubscription removes a subscription from the grpc streaming manager. -// The streaming manager's lock should already be acquired from the thread running this. +// The streaming manager's lock should already be acquired before calling this. func (sm *GrpcStreamingManagerImpl) removeSubscription( subscriptionIdToRemove uint32, ) { + subscription := sm.orderbookSubscriptions[subscriptionIdToRemove] + if subscription == nil { + return + } + close(subscription.updatesChannel) delete(sm.orderbookSubscriptions, subscriptionIdToRemove) sm.logger.Info( fmt.Sprintf("Removed grpc streaming subscription id %+v", subscriptionIdToRemove), @@ -189,27 +253,32 @@ func (sm *GrpcStreamingManagerImpl) SendSnapshot( } if len(v1updates) > 0 { - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: []clobtypes.StreamUpdate{ - { - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: v1updates, - Snapshot: true, - }, - }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), + streamUpdates := []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: true, }, }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), }, - ); err != nil { + } + metrics.IncrCounter( + metrics.GrpcAddToSubscriptionChannelCount, + 1, + ) + select { + case subscription.updatesChannel <- streamUpdates: + default: sm.logger.Error( - fmt.Sprintf("Error sending out update for grpc streaming subscription %+v", id), - "err", err, + fmt.Sprintf( + "GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.", + id, + ), ) - idsToRemove = append(idsToRemove, id) + idsToRemove = append(idsToRemove, subscription.subscriptionId) } } } @@ -312,6 +381,11 @@ func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache( sm.Lock() defer sm.Unlock() + metrics.IncrCounter( + metrics.GrpcAddUpdateToBufferCount, + 1, + ) + for clobPairId, streamUpdates := range updatesByClobPairId { sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...) } @@ -327,6 +401,7 @@ func (sm *GrpcStreamingManagerImpl) AddUpdatesToCache( clear(sm.streamUpdateCache) sm.numUpdatesInCache = 0 } + sm.EmitMetrics() } // FlushStreamUpdates takes in a map of clob pair id to stream updates and emits them to subscribers. @@ -340,12 +415,8 @@ func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() { sm.Lock() defer sm.Unlock() - metrics.IncrCounter( - metrics.GrpcEmitProtocolUpdateCount, - 1, - ) - - // Send updates to subscribers. + // Non-blocking send updates through subscriber's buffered channel. + // If the buffer is full, drop the subscription. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) @@ -357,32 +428,30 @@ func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() { if len(streamUpdatesForSubscription) > 0 { metrics.IncrCounter( - metrics.GrpcSendResponseToSubscriberCount, + metrics.GrpcAddToSubscriptionChannelCount, 1, ) - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: streamUpdatesForSubscription, - }, - ); err != nil { - sm.logger.Error( - fmt.Sprintf("Error sending out update for grpc streaming subscription %+v", id), - "err", err, - ) + select { + case subscription.updatesChannel <- streamUpdatesForSubscription: + default: idsToRemove = append(idsToRemove, id) } } } - // Clean up subscriptions that have been closed. - // If a Send update has failed for any clob pair id, the whole subscription will be removed. + clear(sm.streamUpdateCache) + sm.numUpdatesInCache = 0 + for _, id := range idsToRemove { + sm.logger.Error( + fmt.Sprintf( + "GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.", + id, + ), + ) sm.removeSubscription(id) } - clear(sm.streamUpdateCache) - sm.numUpdatesInCache = 0 - sm.EmitMetrics() }