Skip to content

Commit

Permalink
Full node streaming batch size reset to 2000, properly zero out cache…
Browse files Browse the repository at this point in the history
… on flush (#2068)
  • Loading branch information
jonfung-dydx committed Oct 18, 2024
1 parent 84746ef commit c643a29
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
4 changes: 2 additions & 2 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ const (

DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBatchSize = 1000000
DefaultGrpcStreamingMaxChannelBufferSize = 1000000
DefaultGrpcStreamingMaxBatchSize = 2000
DefaultGrpcStreamingMaxChannelBufferSize = 2000

DefaultVEOracleEnabled = true
DefaultOptimisticExecutionEnabled = false
Expand Down
12 changes: 6 additions & 6 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 1000000,
GrpcStreamingMaxChannelBufferSize: 1000000,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
},
},
"success - optimistic execution": {
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 0,
GrpcStreamingMaxBatchSize: 1000000,
GrpcStreamingMaxBatchSize: 2000,
},
expectedErr: fmt.Errorf("grpc streaming flush interval must be positive number"),
},
Expand All @@ -140,7 +140,7 @@ func TestValidate(t *testing.T) {
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 1000000,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 0,
},
expectedErr: fmt.Errorf("grpc streaming channel size must be positive number"),
Expand Down Expand Up @@ -183,8 +183,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcEnable: true,
expectedGrpcStreamingEnable: false,
expectedGrpcStreamingFlushMs: 50,
expectedGrpcStreamingBatchSize: 1000000,
expectedGrpcStreamingMaxChannelBufferSize: 1000000,
expectedGrpcStreamingBatchSize: 2000,
expectedGrpcStreamingMaxChannelBufferSize: 2000,
expectedOptimisticExecutionEnabled: false,
},
"Sets values from options": {
Expand Down
7 changes: 4 additions & 3 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
for id := range sm.orderbookSubscriptions {
sm.removeSubscription(id)
}
clear(sm.streamUpdateCache)
sm.streamUpdateCache = nil
sm.streamUpdateSubscriptionCache = nil
}
sm.EmitMetrics()
}
Expand Down Expand Up @@ -502,8 +503,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
}
}

clear(sm.streamUpdateCache)
clear(sm.streamUpdateSubscriptionCache)
sm.streamUpdateCache = nil
sm.streamUpdateSubscriptionCache = nil

for _, id := range idsToRemove {
sm.logger.Error(
Expand Down

0 comments on commit c643a29

Please sign in to comment.