diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 34b7eaaf364..e23aa2aee1a 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -490,19 +490,11 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes. return exists } -// SendOrderbookUpdates groups updates by their clob pair ids and -// sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( +func getStreamUpdatesFromOffchainUpdates( offchainUpdates *clobtypes.OffchainUpdates, blockHeight uint32, execMode sdk.ExecMode, -) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookUpdatesLatency, - time.Now(), - ) - +) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) { // Group updates by clob pair id. updates := make(map[uint32]*clobtypes.OffchainUpdates) for _, message := range offchainUpdates.Messages { @@ -514,8 +506,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( } // Unmarshal each per-clob pair message to v1 updates. - streamUpdates := make([]clobtypes.StreamUpdate, 0) - clobPairIds := make([]uint32, 0) + streamUpdates = make([]clobtypes.StreamUpdate, 0) + clobPairIds = make([]uint32, 0) for clobPairId, update := range updates { v1updates, err := streaming_util.GetOffchainUpdatesV1(update) if err != nil { @@ -535,26 +527,39 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( clobPairIds = append(clobPairIds, clobPairId) } - sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) + return streamUpdates, clobPairIds } -// SendOrderbookFillUpdates groups fills by their clob pair ids and +// SendOrderbookUpdates groups updates by their clob pair ids and // sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( - orderbookFills []clobtypes.StreamOrderbookFill, +func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( + offchainUpdates *clobtypes.OffchainUpdates, blockHeight uint32, execMode sdk.ExecMode, - perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, ) { defer metrics.ModuleMeasureSince( metrics.FullNodeGrpc, - metrics.GrpcSendOrderbookFillsLatency, + metrics.GrpcSendOrderbookUpdatesLatency, time.Now(), ) + streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode) + + sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) +} + +func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills( + orderbookFills []clobtypes.StreamOrderbookFill, + blockHeight uint32, + execMode sdk.ExecMode, + perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, +) ( + streamUpdates []clobtypes.StreamUpdate, + clobPairIds []uint32, +) { // Group fills by clob pair id. - streamUpdates := make([]clobtypes.StreamUpdate, 0) - clobPairIds := make([]uint32, 0) + streamUpdates = make([]clobtypes.StreamUpdate, 0) + clobPairIds = make([]uint32, 0) for _, orderbookFill := range orderbookFills { // If this is a deleveraging fill, fetch the clob pair id from the deleveraged // perpetual id. @@ -577,6 +582,29 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( streamUpdates = append(streamUpdates, streamUpdate) clobPairIds = append(clobPairIds, clobPairId) } + return streamUpdates, clobPairIds +} + +// SendOrderbookFillUpdates groups fills by their clob pair ids and +// sends messages to the subscribers. +func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( + orderbookFills []clobtypes.StreamOrderbookFill, + blockHeight uint32, + execMode sdk.ExecMode, + perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, +) { + defer metrics.ModuleMeasureSince( + metrics.FullNodeGrpc, + metrics.GrpcSendOrderbookFillsLatency, + time.Now(), + ) + + streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills( + orderbookFills, + blockHeight, + execMode, + perpetualIdToClobPairId, + ) sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds) } @@ -609,6 +637,31 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( ) } +func getStreamUpdatesForSubaccountUpdates( + subaccountUpdates []satypes.StreamSubaccountUpdate, + blockHeight uint32, + execMode sdk.ExecMode, +) ( + streamUpdates []clobtypes.StreamUpdate, + subaccountIds []*satypes.SubaccountId, +) { + // Group subaccount updates by subaccount id. + streamUpdates = make([]clobtypes.StreamUpdate, 0) + subaccountIds = make([]*satypes.SubaccountId, 0) + for _, subaccountUpdate := range subaccountUpdates { + streamUpdate := clobtypes.StreamUpdate{ + UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{ + SubaccountUpdate: &subaccountUpdate, + }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), + } + streamUpdates = append(streamUpdates, streamUpdate) + subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId) + } + return streamUpdates, subaccountIds +} + // SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and // sends messages to the subscribers. func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates( @@ -626,20 +679,11 @@ func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates( panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize") } - // Group subaccount updates by subaccount id. - streamUpdates := make([]clobtypes.StreamUpdate, 0) - subaccountIds := make([]*satypes.SubaccountId, 0) - for _, subaccountUpdate := range subaccountUpdates { - streamUpdate := clobtypes.StreamUpdate{ - UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{ - SubaccountUpdate: &subaccountUpdate, - }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), - } - streamUpdates = append(streamUpdates, streamUpdate) - subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId) - } + streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( + subaccountUpdates, + blockHeight, + execMode, + ) sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds) } @@ -796,6 +840,42 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams( return ret } +func (sm *FullNodeStreamingManagerImpl) addBatchUpdatesToCache( + orderbookStreamUpdates []clobtypes.StreamUpdate, + orderbookClobPairIds []uint32, + fillStreamUpdates []clobtypes.StreamUpdate, + fillClobPairIds []uint32, + subaccountStreamUpdates []clobtypes.StreamUpdate, + subaccountIds []*satypes.SubaccountId, +) { + // Add orderbook updates to cache. + sm.streamUpdateCache = append(sm.streamUpdateCache, orderbookStreamUpdates...) + for _, clobPairId := range orderbookClobPairIds { + sm.streamUpdateSubscriptionCache = append( + sm.streamUpdateSubscriptionCache, + sm.clobPairIdToSubscriptionIdMapping[clobPairId], + ) + } + + // Add fill updates to cache. + sm.streamUpdateCache = append(sm.streamUpdateCache, fillStreamUpdates...) + for _, clobPairId := range fillClobPairIds { + sm.streamUpdateSubscriptionCache = append( + sm.streamUpdateSubscriptionCache, + sm.clobPairIdToSubscriptionIdMapping[clobPairId], + ) + } + + // Add subaccount updates to cache. + sm.streamUpdateCache = append(sm.streamUpdateCache, subaccountStreamUpdates...) + for _, subaccountId := range subaccountIds { + sm.streamUpdateSubscriptionCache = append( + sm.streamUpdateSubscriptionCache, + sm.subaccountIdToSubscriptionIdMapping[*subaccountId], + ) + } +} + // Grpc Streaming logic after consensus agrees on a block. // - Stream all events staged during `FinalizeBlock`. // - Stream orderbook updates to sync fills in local ops queue. @@ -809,28 +889,38 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx) - // TODO(CT-1190): Stream below in a single batch. - // Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock. - sm.SendOrderbookUpdates( + orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates( orderBookUpdatesToSyncLocalOpsQueue, uint32(ctx.BlockHeight()), ctx.ExecMode(), ) - // Send finalized fills from FinalizeBlock. - sm.SendOrderbookFillUpdates( + fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( finalizedFills, uint32(ctx.BlockHeight()), ctx.ExecMode(), perpetualIdToClobPairId, ) - // Send finalized subaccount updates from FinalizeBlock. - sm.SendFinalizedSubaccountUpdates( + sm.Lock() + defer sm.Unlock() + + subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( finalizedSubaccountUpdates, uint32(ctx.BlockHeight()), ctx.ExecMode(), ) + + sm.addBatchUpdatesToCache( + orderbookStreamUpdates, + orderbookClobPairIds, + fillStreamUpdates, + fillClobPairIds, + subaccountStreamUpdates, + subaccountIds, + ) + + sm.FlushStreamUpdatesWithLock() } // getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.