Skip to content

Commit

Permalink
Full node streaming batch size reset to 2000, properly zero out cache…
Browse files Browse the repository at this point in the history
… on flush (#2068)
  • Loading branch information
jonfung-dydx committed Aug 15, 2024
1 parent c23eb39 commit 3da874b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
4 changes: 2 additions & 2 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ const (

DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBatchSize = 1000000
DefaultGrpcStreamingMaxChannelBufferSize = 1000000
DefaultGrpcStreamingMaxBatchSize = 2000
DefaultGrpcStreamingMaxChannelBufferSize = 2000

DefaultVEOracleEnabled = true
)
Expand Down
12 changes: 6 additions & 6 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 1000000,
GrpcStreamingMaxChannelBufferSize: 1000000,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
},
},
"failure - gRPC disabled": {
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 0,
GrpcStreamingMaxBatchSize: 1000000,
GrpcStreamingMaxBatchSize: 2000,
},
expectedErr: fmt.Errorf("grpc streaming flush interval must be positive number"),
},
Expand All @@ -120,7 +120,7 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 1000000,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 0,
},
expectedErr: fmt.Errorf("grpc streaming channel size must be positive number"),
Expand Down Expand Up @@ -162,8 +162,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcEnable: true,
expectedGrpcStreamingEnable: false,
expectedGrpcStreamingFlushMs: 50,
expectedGrpcStreamingBatchSize: 1000000,
expectedGrpcStreamingMaxChannelBufferSize: 1000000,
expectedGrpcStreamingBatchSize: 2000,
expectedGrpcStreamingMaxChannelBufferSize: 2000,
},
"Sets values from options": {
optsMap: map[string]any{
Expand Down
7 changes: 4 additions & 3 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
for id := range sm.orderbookSubscriptions {
sm.removeSubscription(id)
}
clear(sm.streamUpdateCache)
sm.streamUpdateCache = nil
sm.streamUpdateSubscriptionCache = nil
}
sm.EmitMetrics()
}
Expand Down Expand Up @@ -502,8 +503,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
}
}

clear(sm.streamUpdateCache)
clear(sm.streamUpdateSubscriptionCache)
sm.streamUpdateCache = nil
sm.streamUpdateSubscriptionCache = nil

for _, id := range idsToRemove {
sm.logger.Error(
Expand Down

0 comments on commit 3da874b

Please sign in to comment.