Skip to content

Commit

Permalink
emit stream fills when clob match appended to opqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Apr 30, 2024
1 parent a0ee1e1 commit 5b1a471
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 20 deletions.
5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 0 additions & 18 deletions protocol/mocks/PricesKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 57 additions & 0 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,63 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
}
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
) {
// Group fills by clob pair id.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
for _, orderbookFill := range orderbookFills {
// Fetch the clob pair id from the first order in `OrderBookMatchFill`.
// We can assume there must be an order, and that all orders share the same
// clob pair id.
clobPairId := orderbookFill.Orders[0].OrderId.ClobPairId
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{}
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
OrderFill: &orderbookFill,
},
}
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
}

sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := updatesByClobPairId[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}
}

if len(streamUpdatesForSubscription) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
BlockHeight: lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ExecMode: uint32(ctx.ExecMode()),
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
}
}

// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
}
}

// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized.
func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 {
sm.Lock()
Expand Down
6 changes: 6 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
) {
}

func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
) {
}

func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {
return []uint32{}
}
4 changes: 4 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ type GrpcStreamingManager interface {
blockHeight uint32,
execMode sdk.ExecMode,
)
SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
)
}
6 changes: 6 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates(
snapshot bool,
) {
}

func (f *FakeMemClobKeeper) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
) {
}
14 changes: 14 additions & 0 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,17 @@ func (k Keeper) SendOrderbookUpdates(
ctx.ExecMode(),
)
}

// SendOrderbookFillUpdates sends the orderbook fills to the gRPC streaming manager.
func (k Keeper) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
) {
if len(orderbookFills) == 0 {
return
}
k.GetGrpcStreamingManager().SendOrderbookFillUpdates(
ctx,
orderbookFills,
)
}
8 changes: 7 additions & 1 deletion protocol/x/clob/memclob/memclob.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,13 @@ func (m *MemClobPriceTimePriority) mustUpdateMemclobStateWithMatches(
}

// Add the new matches to the operations queue.
m.operationsToPropose.MustAddMatchToOperationsQueue(takerOrder, makerFillWithOrders)
internalOperation := m.operationsToPropose.MustAddMatchToOperationsQueue(takerOrder, makerFillWithOrders)
// If orderbook updates are on, send an orderbook update with the fill to grpc streams.
if m.generateOrderbookUpdates {
clobMatch := internalOperation.GetMatch()
orderbookMatchFill := m.GenerateStreamOrderbookFill(ctx, *clobMatch, takerOrder, makerFillWithOrders)
m.clobKeeper.SendOrderbookFillUpdates(ctx, []types.StreamOrderbookFill{orderbookMatchFill})
}

// Build a slice of all subaccounts which had matches this matching loop, and sort them for determinism.
allSubaccounts := lib.GetSortedKeys[satypes.SortedSubaccountIds](subaccountTotalMatchedQuantums)
Expand Down
31 changes: 31 additions & 0 deletions protocol/x/clob/memclob/memclob_grpc_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,37 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

// GenerateStreamOrderbookFill wraps a clob match into the `StreamOrderbookFill`
// data structure which provides prices and fill amounts alongside clob match.
func (m *MemClobPriceTimePriority) GenerateStreamOrderbookFill(
ctx sdk.Context,
clobMatch types.ClobMatch,
takerOrder types.MatchableOrder,
makerFillWithOrders []types.MakerFillWithOrder,
) types.StreamOrderbookFill {
ordersInClobMatch := []types.Order{}
fillAmounts := []uint32{}

for _, makerFillWithOrder := range makerFillWithOrders {
ordersInClobMatch = append(ordersInClobMatch, makerFillWithOrder.Order)
fillAmount := m.GetOrderFilledAmount(ctx, makerFillWithOrder.Order.OrderId)
fillAmounts = append(fillAmounts, uint32(fillAmount))
}
// If taker order is not a liquidation order, has to be a regular
// taker order. Add the taker order to the orders array.
if !takerOrder.IsLiquidation() {
order := takerOrder.MustGetOrder()
ordersInClobMatch = append(ordersInClobMatch, order)
fillAmount := m.GetOrderFilledAmount(ctx, order.OrderId)
fillAmounts = append(fillAmounts, uint32(fillAmount))
}
return types.StreamOrderbookFill{
ClobMatch: &clobMatch,
Orders: ordersInClobMatch,
FillAmounts: fillAmounts,
}
}

// GetOffchainUpdatesForOrderbookSnapshot returns the offchain updates for the orderbook snapshot.
// This is used by the gRPC streaming server to send the orderbook snapshot to the client.
func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot(
Expand Down
4 changes: 4 additions & 0 deletions protocol/x/clob/types/mem_clob_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,8 @@ type MemClobKeeper interface {
offchainUpdates *OffchainUpdates,
snapshot bool,
)
SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []StreamOrderbookFill,
)
}
3 changes: 2 additions & 1 deletion protocol/x/clob/types/operations_to_propose.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (o *OperationsToPropose) MustAddStatefulOrderPlacementToOperationsQueue(
func (o *OperationsToPropose) MustAddMatchToOperationsQueue(
takerMatchableOrder MatchableOrder,
makerFillsWithOrders []MakerFillWithOrder,
) {
) InternalOperation {
makerFills := lib.MapSlice(
makerFillsWithOrders,
func(mfwo MakerFillWithOrder) MakerFill {
Expand Down Expand Up @@ -234,6 +234,7 @@ func (o *OperationsToPropose) MustAddMatchToOperationsQueue(
o.OperationsQueue,
matchOperation,
)
return matchOperation
}

// AddZeroFillDeleveragingToOperationsQueue adds a zero-fill deleveraging match operation to the
Expand Down

0 comments on commit 5b1a471

Please sign in to comment.