From c643a298943d200d573cf93b21f8985e5252a11f Mon Sep 17 00:00:00 2001 From: Jonathan Fung <121899091+jonfung-dydx@users.noreply.github.com> Date: Fri, 9 Aug 2024 09:10:35 -0700 Subject: [PATCH] Full node streaming batch size reset to 2000, properly zero out cache on flush (#2068) --- protocol/app/flags/flags.go | 4 ++-- protocol/app/flags/flags_test.go | 12 ++++++------ protocol/streaming/full_node_streaming_manager.go | 7 ++++--- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index 74d72e46a9..bdbe34a514 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -64,8 +64,8 @@ const ( DefaultGrpcStreamingEnabled = false DefaultGrpcStreamingFlushIntervalMs = 50 - DefaultGrpcStreamingMaxBatchSize = 1000000 - DefaultGrpcStreamingMaxChannelBufferSize = 1000000 + DefaultGrpcStreamingMaxBatchSize = 2000 + DefaultGrpcStreamingMaxChannelBufferSize = 2000 DefaultVEOracleEnabled = true DefaultOptimisticExecutionEnabled = false diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index 3aac1ebed7..8260efe313 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -80,8 +80,8 @@ func TestValidate(t *testing.T) { GrpcEnable: true, GrpcStreamingEnabled: true, GrpcStreamingFlushIntervalMs: 100, - GrpcStreamingMaxBatchSize: 1000000, - GrpcStreamingMaxChannelBufferSize: 1000000, + GrpcStreamingMaxBatchSize: 2000, + GrpcStreamingMaxChannelBufferSize: 2000, }, }, "success - optimistic execution": { @@ -130,7 +130,7 @@ func TestValidate(t *testing.T) { GrpcEnable: true, GrpcStreamingEnabled: true, GrpcStreamingFlushIntervalMs: 0, - GrpcStreamingMaxBatchSize: 1000000, + GrpcStreamingMaxBatchSize: 2000, }, expectedErr: fmt.Errorf("grpc streaming flush interval must be positive number"), }, @@ -140,7 +140,7 @@ func TestValidate(t *testing.T) { GrpcEnable: true, GrpcStreamingEnabled: true, GrpcStreamingFlushIntervalMs: 100, - GrpcStreamingMaxBatchSize: 1000000, + GrpcStreamingMaxBatchSize: 2000, GrpcStreamingMaxChannelBufferSize: 0, }, expectedErr: fmt.Errorf("grpc streaming channel size must be positive number"), @@ -183,8 +183,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcEnable: true, expectedGrpcStreamingEnable: false, expectedGrpcStreamingFlushMs: 50, - expectedGrpcStreamingBatchSize: 1000000, - expectedGrpcStreamingMaxChannelBufferSize: 1000000, + expectedGrpcStreamingBatchSize: 2000, + expectedGrpcStreamingMaxChannelBufferSize: 2000, expectedOptimisticExecutionEnabled: false, }, "Sets values from options": { diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 473b611b9c..fb10dfb3ec 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -453,7 +453,8 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( for id := range sm.orderbookSubscriptions { sm.removeSubscription(id) } - clear(sm.streamUpdateCache) + sm.streamUpdateCache = nil + sm.streamUpdateSubscriptionCache = nil } sm.EmitMetrics() } @@ -502,8 +503,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { } } - clear(sm.streamUpdateCache) - clear(sm.streamUpdateSubscriptionCache) + sm.streamUpdateCache = nil + sm.streamUpdateSubscriptionCache = nil for _, id := range idsToRemove { sm.logger.Error(