Skip to content

Commit

Permalink
Add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Sep 6, 2024
1 parent cb839c1 commit a3401b8
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 46 deletions.
8 changes: 4 additions & 4 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 66 additions & 28 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package streaming
import (
"fmt"
"sync"
"sync/atomic"
"time"

satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
Expand Down Expand Up @@ -55,8 +56,8 @@ type FullNodeStreamingManagerImpl struct {
type OrderbookSubscription struct {
subscriptionId uint32

// Initialize the subscription with orderbook snapshots.
initialize *sync.Once
// Whether the subscription is initialized with snapshot.
initialized *atomic.Bool

// Clob pair ids to subscribe to.
clobPairIds []uint32
Expand All @@ -75,6 +76,10 @@ type OrderbookSubscription struct {
nextSnapshotBlock uint32
}

func (sub *OrderbookSubscription) IsInitialized() bool {
return sub.initialized.Load()
}

func NewFullNodeStreamingManager(
logger log.Logger,
flushIntervalMs uint32,
Expand Down Expand Up @@ -159,7 +164,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
initialize: &sync.Once{},
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
messageSender: messageSender,
Expand Down Expand Up @@ -674,9 +679,33 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
sm.EmitMetrics()
}

func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate {
sm.Lock()
defer sm.Unlock()

ret := make(map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate)
for _, subscription := range sm.orderbookSubscriptions {
// If the subscription has been initialized, no need to grab the subaccount snapshot.
if alreadyInitialized := subscription.initialized.Load(); alreadyInitialized {
continue
}

for _, subaccountId := range subscription.subaccountIds {
if _, exists := ret[subaccountId]; exists {
continue
}

ret[subaccountId] = getSubaccountSnapshot(subaccountId)
}
}
return ret
}

func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
Expand All @@ -690,31 +719,40 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)

for subscriptionId, subscription := range sm.orderbookSubscriptions {
// If the snapshot block interval is enabled, reset the sync.Once in order to
// re-send snapshots out.
if sm.snapshotBlockInterval > 0 &&
blockHeight == subscription.nextSnapshotBlock {
subscription.initialize = &sync.Once{}
}

subscription.initialize.Do(
func() {
allUpdates := clobtypes.NewOffchainUpdates()
for _, clobPairId := range subscription.clobPairIds {
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId))
}
allUpdates.Append(updatesByClobPairId[clobPairId])
}
saUpdates := []*satypes.StreamSubaccountUpdate{}
for _, subaccountId := range subscription.subaccountIds {
saUpdates = append(saUpdates, getSubaccountSnapshot(subaccountId))
if alreadyInitialized := subscription.initialized.Swap(true); !alreadyInitialized {
allUpdates := clobtypes.NewOffchainUpdates()
for _, clobPairId := range subscription.clobPairIds {
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId))
}
sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode)
if sm.snapshotBlockInterval != 0 {
subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval
allUpdates.Append(updatesByClobPairId[clobPairId])
}

saUpdates := []*satypes.StreamSubaccountUpdate{}
for _, subaccountId := range subscription.subaccountIds {
// The subaccount snapshot may not exist due to the following race condition
// 1. At beginning of PrepareCheckState we get snapshot for all subscribed subaccounts.
// 2. A new subaccount is subscribed to by a new subscription.
// 3. InitializeNewStreams is called.
// Then the new subaccount would not be included in the snapshot.
// We are okay with this behavior.
if saUpdate, ok := subaccountSnapshots[subaccountId]; ok {
saUpdates = append(saUpdates, saUpdate)
}
},
)
}

sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode)

if sm.snapshotBlockInterval != 0 {
subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval
}
}

// If the snapshot block interval is enabled and the next block is a snapshot block,
// reset the `atomic.Bool` so snapshots are sent for the next block.
if sm.snapshotBlockInterval > 0 &&
blockHeight+1 == subscription.nextSnapshotBlock {
subscription.initialized = &atomic.Bool{} // False by default.
}
}
}
8 changes: 7 additions & 1 deletion protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,15 @@ func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId)
return false
}

func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams(
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate {
return nil
}

func (sm *NoopGrpcStreamingManager) InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
Expand Down
5 changes: 4 additions & 1 deletion protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ type FullNodeStreamingManager interface {
// L3+ Orderbook updates.
InitializeNewStreams(
getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
)
GetSubaccountSnapshotsForInitStreams(
getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
Expand Down
16 changes: 15 additions & 1 deletion protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/keeper"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
)

// PreBlocker executes all ABCI PreBlock logic respective to the clob module.
Expand Down Expand Up @@ -123,6 +124,15 @@ func PrepareCheckState(
log.BlockHeight, ctx.BlockHeight()+1,
)

// We just committed block `h`, preparing `CheckState` of `h+1`
// Before we modify the `CheckState`, we first take the snapshot of
// the subscribed subaccounts at the end of block `h`. This we send finalized state of
// the subaccounts below in `InitializeNewStreams`.
var subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate
if keeper.GetFullNodeStreamingManager().Enabled() {
subaccountSnapshots = keeper.GetSubaccountSnapshotsForInitStreams(ctx)
}

// Prune any rate limiting information that is no longer relevant.
keeper.PruneRateLimits(ctx)

Expand Down Expand Up @@ -239,7 +249,11 @@ func PrepareCheckState(
)

// Initialize new streams with orderbook snapshots, if any.
keeper.InitializeNewStreams(ctx)
keeper.InitializeNewStreams(
ctx,
// Use the subaccount snapshot at the top of function to initialize the streams.
subaccountSnapshots,
)

// Set per-orderbook gauges.
keeper.MemClob.SetMemclobGauges(ctx)
Expand Down
36 changes: 26 additions & 10 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package keeper
import (
"errors"
"fmt"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"sync/atomic"

satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
Expand Down Expand Up @@ -251,9 +252,31 @@ func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) {
k.antehandler = anteHandler
}

func (k Keeper) GetSubaccountSnapshotsForInitStreams(
ctx sdk.Context,
) (
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
) {
lib.AssertCheckTxMode(ctx)

return k.GetFullNodeStreamingManager().GetSubaccountSnapshotsForInitStreams(
func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate {
subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate(
ctx,
subaccountId,
true,
)
return &subaccountUpdate
},
)
}

// InitializeNewStreams initializes new streams for all uninitialized clob pairs
// by sending the corresponding orderbook snapshots.
func (k Keeper) InitializeNewStreams(ctx sdk.Context) {
func (k Keeper) InitializeNewStreams(
ctx sdk.Context,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
) {
streamingManager := k.GetFullNodeStreamingManager()

streamingManager.InitializeNewStreams(
Expand All @@ -263,14 +286,7 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) {
clobPairId,
)
},
func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate {
subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate(
ctx,
subaccountId,
true,
)
return &subaccountUpdate
},
subaccountSnapshots,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
Expand Down
5 changes: 4 additions & 1 deletion protocol/x/clob/types/clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ type ClobKeeper interface {
) error
UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error
// full node streaming
InitializeNewStreams(ctx sdk.Context)
InitializeNewStreams(
ctx sdk.Context,
subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
)
SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *OffchainUpdates,
Expand Down

0 comments on commit a3401b8

Please sign in to comment.