Skip to content

Commit

Permalink
Full Node Streaming Recurring snapshots (#2079)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Oct 4, 2024
1 parent 2e0e1f0 commit 34bc8fa
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 10 deletions.
6 changes: 5 additions & 1 deletion protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2020,12 +2020,16 @@ func getFullNodeStreamingManagerFromOptions(
logger log.Logger,
) (manager streamingtypes.FullNodeStreamingManager) {
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
logger.Info("GRPC streaming is enabled", log.ModuleKey, "full-node-streaming")
if appFlags.FullNodeStreamingSnapshotInterval > 0 {
logger.Info("Interval snapshots enabled", log.ModuleKey, "full-node-streaming")
}
return streaming.NewFullNodeStreamingManager(
logger,
appFlags.GrpcStreamingFlushIntervalMs,
appFlags.GrpcStreamingMaxBatchSize,
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
)
}
return streaming.NewNoopGrpcStreamingManager()
Expand Down
20 changes: 20 additions & 0 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Flags struct {
GrpcStreamingFlushIntervalMs uint32
GrpcStreamingMaxBatchSize uint32
GrpcStreamingMaxChannelBufferSize uint32
FullNodeStreamingSnapshotInterval uint32

VEOracleEnabled bool // Slinky Vote Extensions
// Optimistic block execution
Expand All @@ -47,6 +48,7 @@ const (
GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms"
GrpcStreamingMaxBatchSize = "grpc-streaming-max-batch-size"
GrpcStreamingMaxChannelBufferSize = "grpc-streaming-max-channel-buffer-size"
FullNodeStreamingSnapshotInterval = "fns-snapshot-interval"

// Slinky VEs enabled
VEOracleEnabled = "slinky-vote-extension-oracle-enabled"
Expand All @@ -66,6 +68,7 @@ const (
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBatchSize = 2000
DefaultGrpcStreamingMaxChannelBufferSize = 2000
DefaultFullNodeStreamingSnapshotInterval = 0

DefaultVEOracleEnabled = true
DefaultOptimisticExecutionEnabled = false
Expand Down Expand Up @@ -117,6 +120,12 @@ func AddFlagsToCmd(cmd *cobra.Command) {
DefaultGrpcStreamingMaxChannelBufferSize,
"Maximum per-subscription channel size before grpc streaming cancels a singular subscription",
)
cmd.Flags().Uint32(
FullNodeStreamingSnapshotInterval,
DefaultFullNodeStreamingSnapshotInterval,
"If set to positive number, number of blocks between each periodic snapshot will be sent out. "+
"Defaults to zero for regular behavior of one initial snapshot.",
)
cmd.Flags().Bool(
VEOracleEnabled,
DefaultVEOracleEnabled,
Expand Down Expand Up @@ -155,6 +164,10 @@ func (f *Flags) Validate() error {
return fmt.Errorf("grpc streaming channel size must be positive number")
}
}
if f.FullNodeStreamingSnapshotInterval > 0 && f.FullNodeStreamingSnapshotInterval < 50 {
return fmt.Errorf("full node streaming snapshot interval must be >= 50 blocks or zero")
}

return nil
}

Expand All @@ -178,6 +191,7 @@ func GetFlagValuesFromOptions(
GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs,
GrpcStreamingMaxBatchSize: DefaultGrpcStreamingMaxBatchSize,
GrpcStreamingMaxChannelBufferSize: DefaultGrpcStreamingMaxChannelBufferSize,
FullNodeStreamingSnapshotInterval: DefaultFullNodeStreamingSnapshotInterval,

VEOracleEnabled: true,
OptimisticExecutionEnabled: DefaultOptimisticExecutionEnabled,
Expand Down Expand Up @@ -244,6 +258,12 @@ func GetFlagValuesFromOptions(
}
}

if option := appOpts.Get(FullNodeStreamingSnapshotInterval); option != nil {
if v, err := cast.ToUint32E(option); err == nil {
result.FullNodeStreamingSnapshotInterval = v
}
}

if option := appOpts.Get(VEOracleEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.VEOracleEnabled = v
Expand Down
48 changes: 42 additions & 6 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func TestAddFlagsToCommand(t *testing.T) {
fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxBatchSize): {
flagName: flags.GrpcStreamingMaxBatchSize,
},
fmt.Sprintf("Has %s flag", flags.FullNodeStreamingSnapshotInterval): {
flagName: flags.FullNodeStreamingSnapshotInterval,
},
fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxChannelBufferSize): {
flagName: flags.GrpcStreamingMaxChannelBufferSize,
},
Expand All @@ -60,12 +63,13 @@ func TestValidate(t *testing.T) {
}{
"success (default values)": {
flags: flags.Flags{
NonValidatingFullNode: flags.DefaultNonValidatingFullNode,
DdAgentHost: flags.DefaultDdAgentHost,
DdTraceAgentPort: flags.DefaultDdTraceAgentPort,
GrpcAddress: config.DefaultGRPCAddress,
GrpcEnable: true,
OptimisticExecutionEnabled: false,
NonValidatingFullNode: flags.DefaultNonValidatingFullNode,
DdAgentHost: flags.DefaultDdAgentHost,
DdTraceAgentPort: flags.DefaultDdTraceAgentPort,
GrpcAddress: config.DefaultGRPCAddress,
GrpcEnable: true,
FullNodeStreamingSnapshotInterval: flags.DefaultFullNodeStreamingSnapshotInterval,
OptimisticExecutionEnabled: false,
},
},
"success - full node & gRPC disabled": {
Expand Down Expand Up @@ -145,6 +149,29 @@ func TestValidate(t *testing.T) {
},
expectedErr: fmt.Errorf("grpc streaming channel size must be positive number"),
},
"failure - full node streaming enabled with <= 49 snapshot interval": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
FullNodeStreamingSnapshotInterval: 49,
},
expectedErr: fmt.Errorf("full node streaming snapshot interval must be >= 50 blocks or zero"),
},
"success - full node streaming enabled with 50 snapshot interval": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
FullNodeStreamingSnapshotInterval: 50,
},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -173,6 +200,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcStreamingFlushMs uint32
expectedGrpcStreamingBatchSize uint32
expectedGrpcStreamingMaxChannelBufferSize uint32
expectedFullNodeStreamingSnapshotInterval uint32
expectedOptimisticExecutionEnabled bool
}{
"Sets to default if unset": {
Expand All @@ -185,6 +213,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcStreamingFlushMs: 50,
expectedGrpcStreamingBatchSize: 2000,
expectedGrpcStreamingMaxChannelBufferSize: 2000,
expectedFullNodeStreamingSnapshotInterval: 0,
expectedOptimisticExecutionEnabled: false,
},
"Sets values from options": {
Expand All @@ -198,6 +227,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
flags.GrpcStreamingFlushIntervalMs: uint32(408),
flags.GrpcStreamingMaxBatchSize: uint32(650),
flags.GrpcStreamingMaxChannelBufferSize: uint32(972),
flags.FullNodeStreamingSnapshotInterval: uint32(123),
flags.OptimisticExecutionEnabled: "true",
},
expectedNonValidatingFullNodeFlag: true,
Expand All @@ -209,6 +239,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcStreamingFlushMs: 408,
expectedGrpcStreamingBatchSize: 650,
expectedGrpcStreamingMaxChannelBufferSize: 972,
expectedFullNodeStreamingSnapshotInterval: 123,
expectedOptimisticExecutionEnabled: true,
},
}
Expand Down Expand Up @@ -262,6 +293,11 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
tc.expectedGrpcStreamingBatchSize,
flags.GrpcStreamingMaxBatchSize,
)
require.Equal(
t,
tc.expectedFullNodeStreamingSnapshotInterval,
flags.FullNodeStreamingSnapshotInterval,
)
require.Equal(
t,
tc.expectedGrpcStreamingMaxChannelBufferSize,
Expand Down
28 changes: 25 additions & 3 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package streaming

import (
"fmt"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"sync"
"time"

satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
Expand Down Expand Up @@ -44,14 +45,18 @@ type FullNodeStreamingManagerImpl struct {

maxUpdatesInCache uint32
maxSubscriptionChannelSize uint32

// Block interval in which snapshot info should be sent out in.
// Defaults to 0, which means only one snapshot will be sent out.
snapshotBlockInterval uint32
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
type OrderbookSubscription struct {
subscriptionId uint32

// Initialize the subscription with orderbook snapshots.
initialize sync.Once
initialize *sync.Once

// Clob pair ids to subscribe to.
clobPairIds []uint32
Expand All @@ -64,13 +69,18 @@ type OrderbookSubscription struct {

// Channel to buffer writes before the stream
updatesChannel chan []clobtypes.StreamUpdate

// If interval snapshots are turned on, the next block height at which
// a snapshot should be sent out.
nextSnapshotBlock uint32
}

func NewFullNodeStreamingManager(
logger log.Logger,
flushIntervalMs uint32,
maxUpdatesInCache uint32,
maxSubscriptionChannelSize uint32,
snapshotBlockInterval uint32,
) *FullNodeStreamingManagerImpl {
logger = logger.With(log.ModuleKey, "full-node-streaming")
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
Expand All @@ -87,6 +97,7 @@ func NewFullNodeStreamingManager(

maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
snapshotBlockInterval: snapshotBlockInterval,
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -149,6 +160,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
initialize: &sync.Once{},
clobPairIds: clobPairIds,
subaccountIds: sIds,
messageSender: messageSender,
Expand Down Expand Up @@ -674,7 +686,15 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
sm.FlushStreamUpdatesWithLock()

updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)

for subscriptionId, subscription := range sm.orderbookSubscriptions {
// If the snapshot block interval is enabled, reset the sync.Once in order to
// re-send snapshots out.
if sm.snapshotBlockInterval > 0 &&
blockHeight == subscription.nextSnapshotBlock {
subscription.initialize = &sync.Once{}
}

subscription.initialize.Do(
func() {
allUpdates := clobtypes.NewOffchainUpdates()
Expand All @@ -688,8 +708,10 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
for _, subaccountId := range subscription.subaccountIds {
saUpdates = append(saUpdates, getSubaccountSnapshot(subaccountId))
}

sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode)
if sm.snapshotBlockInterval != 0 {
subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval
}
},
)
}
Expand Down

0 comments on commit 34bc8fa

Please sign in to comment.