Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GRPC Full node streaming - batching protos #1626

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 42 additions & 48 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,35 +267,17 @@ export interface StreamOrderbookUpdatesRequestSDKType {
*/

export interface StreamOrderbookUpdatesResponse {
/** Orderbook updates for the clob pair. */
/** Batch of updates for the clob pair. */
updates: StreamUpdate[];
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

blockHeight: number;
/** Exec mode of the updates. */

execMode: number;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesResponseSDKType {
/** Orderbook updates for the clob pair. */
/** Batch of updates for the clob pair. */
Comment on lines +270 to +279
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor to centralize block height and exec mode data.

The restructuring of blockHeight and execMode from StreamOrderbookUpdatesResponse to StreamUpdate centralizes these properties, which could simplify data management and potentially improve data consistency across different parts of the system. Ensure that all parts of the system that interact with these properties are updated accordingly to accommodate this change.

Also applies to: 290-310

updates: StreamUpdateSDKType[];
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

block_height: number;
/** Exec mode of the updates. */

exec_mode: number;
}
/**
* StreamUpdate is an update that will be pushed through the
Expand All @@ -305,6 +287,12 @@ export interface StreamOrderbookUpdatesResponseSDKType {
export interface StreamUpdate {
orderbookUpdate?: StreamOrderbookUpdate;
orderFill?: StreamOrderbookFill;
/** Block height of the update. */

blockHeight: number;
/** Exec mode of the update. */

execMode: number;
}
/**
* StreamUpdate is an update that will be pushed through the
Expand All @@ -314,6 +302,12 @@ export interface StreamUpdate {
export interface StreamUpdateSDKType {
orderbook_update?: StreamOrderbookUpdateSDKType;
order_fill?: StreamOrderbookFillSDKType;
/** Block height of the update. */

block_height: number;
/** Exec mode of the update. */

exec_mode: number;
}
/**
* StreamOrderbookUpdate provides information on an orderbook update. Used in
Expand All @@ -328,8 +322,8 @@ export interface StreamOrderbookUpdate {
updates: OffChainUpdateV1[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* All updates should be ignored until snapshot is recieved.
* If the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

Expand All @@ -348,8 +342,8 @@ export interface StreamOrderbookUpdateSDKType {
updates: OffChainUpdateV1SDKType[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* All updates should be ignored until snapshot is recieved.
* If the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

Expand All @@ -363,7 +357,7 @@ export interface StreamOrderbookUpdateSDKType {
export interface StreamOrderbookFill {
/**
* Clob match. Provides information on which orders were matched
* and the type of order. Fill amounts here are relative.
* and the type of order.
*/
clobMatch?: ClobMatch;
/**
Expand All @@ -384,7 +378,7 @@ export interface StreamOrderbookFill {
export interface StreamOrderbookFillSDKType {
/**
* Clob match. Provides information on which orders were matched
* and the type of order. Fill amounts here are relative.
* and the type of order.
*/
clob_match?: ClobMatchSDKType;
/**
Expand Down Expand Up @@ -1159,9 +1153,7 @@ export const StreamOrderbookUpdatesRequest = {

function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse {
return {
updates: [],
blockHeight: 0,
execMode: 0
updates: []
};
}

Expand All @@ -1171,14 +1163,6 @@ export const StreamOrderbookUpdatesResponse = {
StreamUpdate.encode(v!, writer.uint32(10).fork()).ldelim();
}

if (message.blockHeight !== 0) {
writer.uint32(16).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(24).uint32(message.execMode);
}

return writer;
},

Expand All @@ -1195,14 +1179,6 @@ export const StreamOrderbookUpdatesResponse = {
message.updates.push(StreamUpdate.decode(reader, reader.uint32()));
break;

case 2:
message.blockHeight = reader.uint32();
break;

case 3:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -1215,8 +1191,6 @@ export const StreamOrderbookUpdatesResponse = {
fromPartial(object: DeepPartial<StreamOrderbookUpdatesResponse>): StreamOrderbookUpdatesResponse {
const message = createBaseStreamOrderbookUpdatesResponse();
message.updates = object.updates?.map(e => StreamUpdate.fromPartial(e)) || [];
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

Expand All @@ -1225,7 +1199,9 @@ export const StreamOrderbookUpdatesResponse = {
function createBaseStreamUpdate(): StreamUpdate {
return {
orderbookUpdate: undefined,
orderFill: undefined
orderFill: undefined,
blockHeight: 0,
execMode: 0
};
}

Expand All @@ -1239,6 +1215,14 @@ export const StreamUpdate = {
StreamOrderbookFill.encode(message.orderFill, writer.uint32(18).fork()).ldelim();
}

if (message.blockHeight !== 0) {
writer.uint32(24).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(32).uint32(message.execMode);
}

return writer;
},

Expand All @@ -1259,6 +1243,14 @@ export const StreamUpdate = {
message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32());
break;

case 3:
message.blockHeight = reader.uint32();
break;

case 4:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -1272,6 +1264,8 @@ export const StreamUpdate = {
const message = createBaseStreamUpdate();
message.orderbookUpdate = object.orderbookUpdate !== undefined && object.orderbookUpdate !== null ? StreamOrderbookUpdate.fromPartial(object.orderbookUpdate) : undefined;
message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined;
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

Expand Down
23 changes: 11 additions & 12 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,8 @@ message StreamOrderbookUpdatesRequest {
// StreamOrderbookUpdatesResponse is a response message for the
// StreamOrderbookUpdates method.
message StreamOrderbookUpdatesResponse {
// Orderbook updates for the clob pair.
// Batch of updates for the clob pair.
repeated StreamUpdate updates = 1 [ (gogoproto.nullable) = false ];

// ---Additional fields used to debug issues---
// Block height of the updates.
uint32 block_height = 2;

// Exec mode of the updates.
uint32 exec_mode = 3;
}

// StreamUpdate is an update that will be pushed through the
Expand All @@ -190,6 +183,12 @@ message StreamUpdate {
StreamOrderbookUpdate orderbook_update = 1;
StreamOrderbookFill order_fill = 2;
}

// Block height of the update.
uint32 block_height = 3;

// Exec mode of the update.
uint32 exec_mode = 4;
}

// StreamOrderbookUpdate provides information on an orderbook update. Used in
Expand All @@ -201,8 +200,8 @@ message StreamOrderbookUpdate {
[ (gogoproto.nullable) = false ];

// Snapshot indicates if the response is from a snapshot of the orderbook.
// This is true for the initial response and false for all subsequent updates.
// Note that if the snapshot is true, then all previous entries should be
// All updates should be ignored until snapshot is recieved.
// If the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;
}
Expand All @@ -211,13 +210,13 @@ message StreamOrderbookUpdate {
// the full node GRPC stream.
message StreamOrderbookFill {
// Clob match. Provides information on which orders were matched
// and the type of order. Fill amounts here are relative.
// and the type of order.
ClobMatch clob_match = 1;

// All orders involved in the specified clob match. Used to look up
// price of a match through a given maker order id.
repeated Order orders = 2 [ (gogoproto.nullable) = false ];

// Resulting fill amounts for each order in the orders array.
repeated uint64 fill_amounts = 3 [ (gogoproto.nullable) = false ];
repeated uint64 fill_amounts = 3;
}
14 changes: 5 additions & 9 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
Snapshot: snapshot,
},
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
}
}

sm.sendStreamUpdate(
updatesByClobPairId,
blockHeight,
execMode,
)
}

Expand Down Expand Up @@ -166,22 +166,20 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
OrderFill: &orderbookFill,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
}

sm.sendStreamUpdate(
updatesByClobPairId,
blockHeight,
execMode,
)
}

// sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers.
func (sm *GrpcStreamingManagerImpl) sendStreamUpdate(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
metrics.IncrCounter(
metrics.GrpcEmitProtocolUpdateCount,
Expand All @@ -208,9 +206,7 @@ func (sm *GrpcStreamingManagerImpl) sendStreamUpdate(
)
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
Updates: streamUpdatesForSubscription,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
Expand Down
Loading
Loading