-
Notifications
You must be signed in to change notification settings - Fork 122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FNS polish - metrics, max msg size, default flag values #2517
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -153,7 +153,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool { | |||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { | ||||||||||||||||||
metrics.SetGauge( | ||||||||||||||||||
metrics.AddSample( | ||||||||||||||||||
metrics.GrpcStreamNumUpdatesBuffered, | ||||||||||||||||||
float32(len(sm.streamUpdateCache)), | ||||||||||||||||||
) | ||||||||||||||||||
|
@@ -162,9 +162,10 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { | |||||||||||||||||
float32(len(sm.orderbookSubscriptions)), | ||||||||||||||||||
) | ||||||||||||||||||
for _, subscription := range sm.orderbookSubscriptions { | ||||||||||||||||||
metrics.AddSample( | ||||||||||||||||||
metrics.AddSampleWithLabels( | ||||||||||||||||||
metrics.GrpcSubscriptionChannelLength, | ||||||||||||||||||
float32(len(subscription.updatesChannel)), | ||||||||||||||||||
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), | ||||||||||||||||||
Comment on lines
+165
to
+168
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use For the Apply this diff to use - metrics.AddSampleWithLabels(
+ metrics.SetGaugeWithLabels(
metrics.GrpcSubscriptionChannelLength,
float32(len(subscription.updatesChannel)),
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
) 📝 Committable suggestion
Suggested change
|
||||||||||||||||||
) | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
@@ -234,9 +235,10 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( | |||||||||||||||||
// Use current goroutine to consistently poll subscription channel for updates | ||||||||||||||||||
// to send through stream. | ||||||||||||||||||
for updates := range subscription.updatesChannel { | ||||||||||||||||||
metrics.IncrCounter( | ||||||||||||||||||
metrics.IncrCounterWithLabels( | ||||||||||||||||||
metrics.GrpcSendResponseToSubscriberCount, | ||||||||||||||||||
1, | ||||||||||||||||||
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), | ||||||||||||||||||
) | ||||||||||||||||||
err = subscription.messageSender.Send( | ||||||||||||||||||
&clobtypes.StreamOrderbookUpdatesResponse{ | ||||||||||||||||||
|
@@ -372,9 +374,17 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates( | |||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
metrics.IncrCounterWithLabels( | ||||||||||||||||||
metrics.GrpcAddToSubscriptionChannelCount, | ||||||||||||||||||
1, | ||||||||||||||||||
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscriptionId)), | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
select { | ||||||||||||||||||
case subscription.updatesChannel <- streamUpdates: | ||||||||||||||||||
default: | ||||||||||||||||||
// Buffer is full. Emit metric and drop subscription. | ||||||||||||||||||
sm.EmitMetrics() | ||||||||||||||||||
Comment on lines
+386
to
+387
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid emitting metrics inside tight loops or error paths Calling |
||||||||||||||||||
sm.logger.Error( | ||||||||||||||||||
fmt.Sprintf( | ||||||||||||||||||
"Streaming subscription id %+v channel full capacity. Dropping subscription connection.", | ||||||||||||||||||
|
@@ -399,6 +409,11 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( | |||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
metrics.IncrCounter( | ||||||||||||||||||
metrics.GrpcSendSubaccountUpdateCount, | ||||||||||||||||||
1, | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
Comment on lines
+412
to
+416
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add labels to Currently, Apply this diff to include the subaccount ID as a label: metrics.IncrCounter(
- metrics.GrpcSendSubaccountUpdateCount,
- 1,
+ metrics.GrpcSendSubaccountUpdateCount,
+ 1,
+ metrics.GetLabelForStringValue(metrics.SubaccountId, subaccountUpdate.SubaccountId.String()),
)
|
||||||||||||||||||
// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. | ||||||||||||||||||
stagedEvent := clobtypes.StagedFinalizeBlockEvent{ | ||||||||||||||||||
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ | ||||||||||||||||||
|
@@ -705,9 +720,9 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( | |||||||||||||||||
|
||||||||||||||||||
sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds) | ||||||||||||||||||
|
||||||||||||||||||
sm.EmitMetrics() | ||||||||||||||||||
// Remove all subscriptions and wipe the buffer if buffer overflows. | ||||||||||||||||||
sm.RemoveSubscriptionsAndClearBufferIfFull() | ||||||||||||||||||
sm.EmitMetrics() | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache. | ||||||||||||||||||
|
@@ -726,8 +741,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache( | |||||||||||||||||
|
||||||||||||||||||
sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds) | ||||||||||||||||||
|
||||||||||||||||||
sm.RemoveSubscriptionsAndClearBufferIfFull() | ||||||||||||||||||
sm.EmitMetrics() | ||||||||||||||||||
sm.RemoveSubscriptionsAndClearBufferIfFull() | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows. | ||||||||||||||||||
|
@@ -743,6 +758,7 @@ func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull( | |||||||||||||||||
} | ||||||||||||||||||
sm.streamUpdateCache = nil | ||||||||||||||||||
sm.streamUpdateSubscriptionCache = nil | ||||||||||||||||||
sm.EmitMetrics() | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -778,13 +794,16 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { | |||||||||||||||||
// If the buffer is full, drop the subscription. | ||||||||||||||||||
for id, updates := range subscriptionUpdates { | ||||||||||||||||||
if subscription, ok := sm.orderbookSubscriptions[id]; ok { | ||||||||||||||||||
metrics.IncrCounter( | ||||||||||||||||||
metrics.IncrCounterWithLabels( | ||||||||||||||||||
metrics.GrpcAddToSubscriptionChannelCount, | ||||||||||||||||||
1, | ||||||||||||||||||
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(id)), | ||||||||||||||||||
Comment on lines
+797
to
+800
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid duplicate metric increments to prevent misleading data You're incrementing |
||||||||||||||||||
) | ||||||||||||||||||
select { | ||||||||||||||||||
case subscription.updatesChannel <- updates: | ||||||||||||||||||
default: | ||||||||||||||||||
// Buffer is full. Emit metric and drop subscription. | ||||||||||||||||||
sm.EmitMetrics() | ||||||||||||||||||
Comment on lines
+805
to
+806
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Limit metric emissions in high-frequency error scenarios Similar to previous comments, calling |
||||||||||||||||||
idsToRemove = append(idsToRemove, id) | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use
SetGauge
for gauge metrics instead ofAddSample
In the
EmitMetrics
method, you replacedmetrics.SetGauge
withmetrics.AddSample
for theGrpcStreamNumUpdatesBuffered
metric. Since this metric represents the current number of updates buffered, which is a gauge (a value that can go up or down), it would be more appropriate to continue usingmetrics.SetGauge
to accurately reflect the current state rather than accumulating samples.Apply this diff to revert to using
SetGauge
:📝 Committable suggestion