Skip to content

Commit

Permalink
more metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Jun 12, 2024
1 parent eb8a11f commit ef874ab
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 21 deletions.
1 change: 1 addition & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
Expand Down
48 changes: 27 additions & 21 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,11 @@ func NewGrpcStreamingManager(
grpcStreamingManager.FlushStreamUpdates()
case <-grpcStreamingManager.done:
grpcStreamingManager.logger.Info(
"GRPC Stream poller goroutine returning",
"GRPC Stream poller goroutine shutting down",
)
return
}
}
grpcStreamingManager.logger.Error(
"Should never see this ever",
)
}()

return grpcStreamingManager
Expand Down Expand Up @@ -158,6 +155,10 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range subscription.updatesChannel {
metrics.IncrCounter(
metrics.GrpcSendResponseToSubscriberCount,
1,
)
err = subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Expand Down Expand Up @@ -254,27 +255,32 @@ func (sm *GrpcStreamingManagerImpl) SendSnapshot(
}

if len(v1updates) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: true,
},
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
streamUpdates := []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: true,
},
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
); err != nil {
}
metrics.IncrCounter(
metrics.GrpcAddToSubscriptionChannelCount,
1,
)
select {
case subscription.updatesChannel <- streamUpdates:
default:
sm.logger.Error(
fmt.Sprintf("Error sending out update for grpc streaming subscription %+v", id),
"err", err,
fmt.Sprintf(
"GRPC Streaming subscription id %+v channel full capacity. Dropping subscription connection.",
id,
),
)
idsToRemove = append(idsToRemove, id)
idsToRemove = append(idsToRemove, subscription.subscriptionId)
}
}
}
Expand Down Expand Up @@ -424,7 +430,7 @@ func (sm *GrpcStreamingManagerImpl) FlushStreamUpdates() {

if len(streamUpdatesForSubscription) > 0 {
metrics.IncrCounter(
metrics.GrpcSendResponseToSubscriberCount,
metrics.GrpcAddToSubscriptionChannelCount,
1,
)
select {
Expand Down

0 comments on commit ef874ab

Please sign in to comment.