diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index d76764d0ae..e27406dc86 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -70,6 +70,7 @@ const ( GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" + GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered" diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 46240121e6..6de6887bf5 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -87,14 +87,11 @@ func NewGrpcStreamingManager( grpcStreamingManager.FlushStreamUpdates() case <-grpcStreamingManager.done: grpcStreamingManager.logger.Info( - "GRPC Stream poller goroutine returning", + "GRPC Stream poller goroutine shutting down", ) return } } - grpcStreamingManager.logger.Error( - "Should never see this ever", - ) }() return grpcStreamingManager @@ -158,6 +155,10 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( // Use current goroutine to consistently poll subscription channel for updates // to send through stream. for updates := range subscription.updatesChannel { + metrics.IncrCounter( + metrics.GrpcSendResponseToSubscriberCount, + 1, + ) err = subscription.srv.Send( &clobtypes.StreamOrderbookUpdatesResponse{ Updates: updates, @@ -254,27 +255,32 @@ func (sm *GrpcStreamingManagerImpl) SendSnapshot( } if len(v1updates) > 0 { - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: []clobtypes.StreamUpdate{ - { - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: v1updates, - Snapshot: true, - }, - }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), + streamUpdates := []clobtypes.StreamUpdate{ + { + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: true, }, }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), }, - ); err != nil { + } + metrics.IncrCounter( + metrics.GrpcAddToSubscriptionChannelCount, + 1, + ) + select { + case subscription.updatesChannel <- streamUpdates: + default: sm.logger.Error( - fmt.Sprintf("Error sending out update for grpc streaming subscription %+v", id), - "err", err, + fmt.Sprintf( + "GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.", + id, + ), ) - idsToRemove = append(idsToRemove, id) + idsToRemove = append(idsToRemove, subscription.subscriptionId) } } } @@ -424,7 +430,7 @@ func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() { if len(streamUpdatesForSubscription) > 0 { metrics.IncrCounter( - metrics.GrpcSendResponseToSubscriberCount, + metrics.GrpcAddToSubscriptionChannelCount, 1, ) select {