Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orderbook Fills emission #1448

Merged
merged 12 commits into from
May 8, 2024
12 changes: 12 additions & 0 deletions protocol/lib/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,15 @@ func MergeAllMapsMustHaveDistinctKeys[K comparable, V any](maps ...map[K]V) map[
}
return combinedMap
}

// MergeMaps merges all the maps into a single map.
// Does not require maps to have distinct keys.
func MergeMaps[K comparable, V any](maps ...map[K]V) map[K]V {
combinedMap := make(map[K]V)
for _, m := range maps {
for k, v := range m {
combinedMap[k] = v
}
}
return combinedMap
}
1 change: 1 addition & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const (
// Full node grpc
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
)
18 changes: 18 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 0 additions & 18 deletions protocol/mocks/PricesKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,71 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
}
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar looking functions - worth to create helper?

nbd - will let you make the call

ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

// Group fills by clob pair id.
jonfung-dydx marked this conversation as resolved.
Show resolved Hide resolved
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
for _, orderbookFill := range orderbookFills {
// Fetch the clob pair id from the first order in `OrderBookMatchFill`.
// We can assume there must be an order, and that all orders share the same
// clob pair id.
clobPairId := orderbookFill.Orders[0].OrderId.ClobPairId
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{}
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
OrderFill: &orderbookFill,
},
}
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
}

sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := updatesByClobPairId[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}
}

if len(streamUpdatesForSubscription) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
}
}

// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
}
}

// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized.
func (sm *GrpcStreamingManagerImpl) GetUninitializedClobPairIds() []uint32 {
sm.Lock()
Expand Down
8 changes: 8 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
) {
}

func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) GetUninitializedClobPairIds() []uint32 {
return []uint32{}
}
6 changes: 6 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ type GrpcStreamingManager interface {
blockHeight uint32,
execMode sdk.ExecMode,
)
SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
)
}
6 changes: 6 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,9 @@ func (f *FakeMemClobKeeper) SendOrderbookUpdates(
snapshot bool,
) {
}

func (f *FakeMemClobKeeper) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
) {
}
Comment on lines +522 to +526
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method SendOrderbookFillUpdates is currently empty.

This method is defined but does not contain any implementation. If this is intentional (e.g., for a mock or placeholder), it should be documented clearly. Otherwise, it should be implemented or removed if not needed.

39 changes: 0 additions & 39 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,45 +175,6 @@ func PrepareCheckState(
offchainUpdates,
)

// For orders that are filled in the last block, send an orderbook update to the grpc streams.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Axed this whole section to send out order updates.
Reason being:

  • PCS check state is set to deliver state, so no additional updates are needed.
  • since fills are directly emitted immediately on orderbook update, matched orders don't need to be updated

if keeper.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
orderIdsToSend := make(map[types.OrderId]bool)

// Send an update for reverted local operations.
for _, operation := range localValidatorOperationsQueue {
if match := operation.GetMatch(); match != nil {
// For normal order matches, we send an update for the taker and maker orders.
if matchedOrders := match.GetMatchOrders(); matchedOrders != nil {
orderIdsToSend[matchedOrders.TakerOrderId] = true
for _, fill := range matchedOrders.Fills {
orderIdsToSend[fill.MakerOrderId] = true
}
}
// For liquidation matches, we send an update for the maker orders.
if matchedLiquidation := match.GetMatchPerpetualLiquidation(); matchedLiquidation != nil {
for _, fill := range matchedLiquidation.Fills {
orderIdsToSend[fill.MakerOrderId] = true
}
}
}
}

// Send an update for orders that were proposed.
for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock {
orderIdsToSend[orderId] = true
}

// Send update.
for orderId := range orderIdsToSend {
if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists {
orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId)
allUpdates.Append(orderbookUpdate)
}
}
keeper.SendOrderbookUpdates(ctx, allUpdates, false)
}

// 3. Place all stateful order placements included in the last block on the memclob.
// Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock`
// is called within `PlaceConditionalOrdersTriggeredInLastBlock`.
Expand Down
16 changes: 16 additions & 0 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,19 @@ func (k Keeper) SendOrderbookUpdates(
ctx.ExecMode(),
)
}

// SendOrderbookFillUpdates sends the orderbook fills to the gRPC streaming manager.
func (k Keeper) SendOrderbookFillUpdates(
ctx sdk.Context,
orderbookFills []types.StreamOrderbookFill,
) {
if len(orderbookFills) == 0 {
return
}
k.GetGrpcStreamingManager().SendOrderbookFillUpdates(
ctx,
orderbookFills,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
37 changes: 14 additions & 23 deletions protocol/x/clob/keeper/order_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,19 @@ func (k Keeper) RemoveOrderFillAmount(ctx sdk.Context, orderId types.OrderId) {
[]byte(types.OrderAmountFilledKeyPrefix),
)
memStore.Delete(orderId.ToStateKey())

// If grpc stream is on, zero out the fill amount.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero'ing out fill amounts when fill amounts are removed. I added this to the innermost statewrite/remove method. I could have added it to each of the 3-4 callsites but this might be cleaner. Let me know if the other approach is preferred.

if k.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
0, // Total filled quantums is zero because it's been pruned from state.
); success {
allUpdates.AddUpdateMessage(orderId, message)
}
k.SendOrderbookUpdates(ctx, allUpdates, false)
}
}

// PruneStateFillAmountsForShortTermOrders prunes Short-Term order fill amounts from state that are pruneable
Expand All @@ -286,27 +299,5 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders(
blockHeight := lib.MustConvertIntegerToUint32(ctx.BlockHeight())

// Prune all fill amounts from state which have a pruneable block height of the current `blockHeight`.
prunedOrderIds := k.PruneOrdersForBlockHeight(ctx, blockHeight)

// Send an orderbook update for each pruned order for grpc streams.
// This is needed because short term orders are pruned in PrepareCheckState using
// keeper.MemClob.openOrders.blockExpirationsForOrders, which can fall out of sync with state fill amount
// pruning when there's replacement.
// Long-term fix would be to add logic to keep them in sync.
// TODO(CT-722): add logic to keep state fill amount pruning and order pruning in sync.
if k.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
for _, orderId := range prunedOrderIds {
if _, exists := k.MemClob.GetOrder(ctx, orderId); exists {
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
orderId,
0, // Total filled quantums is zero because it's been pruned from state.
); success {
allUpdates.AddUpdateMessage(orderId, message)
}
}
}
k.SendOrderbookUpdates(ctx, allUpdates, false)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need since this zero'ing out fills is punted to the innermost state remove method

k.PruneOrdersForBlockHeight(ctx, blockHeight)
}
Loading
Loading