Skip to content

Commit

Permalink
Full Node Streaming - reorder protobuf single length fields before va…
Browse files Browse the repository at this point in the history
…riable length fields (backport #2227) (#2231)

Co-authored-by: Jonathan Fung <121899091+jonfung-dydx@users.noreply.github.com>
  • Loading branch information
mergify[bot] and jonfung-dydx authored Sep 10, 2024
1 parent e4327b8 commit 18d44cf
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 299 deletions.
106 changes: 52 additions & 54 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,73 +293,71 @@ export interface StreamOrderbookUpdatesResponseSDKType {
*/

export interface StreamUpdate {
orderbookUpdate?: StreamOrderbookUpdate;
orderFill?: StreamOrderbookFill;
takerOrder?: StreamTakerOrder;
subaccountUpdate?: StreamSubaccountUpdate;
/** Block height of the update. */

blockHeight: number;
/** Exec mode of the update. */

execMode: number;
orderbookUpdate?: StreamOrderbookUpdate;
orderFill?: StreamOrderbookFill;
takerOrder?: StreamTakerOrder;
subaccountUpdate?: StreamSubaccountUpdate;
}
/**
* StreamUpdate is an update that will be pushed through the
* GRPC stream.
*/

export interface StreamUpdateSDKType {
orderbook_update?: StreamOrderbookUpdateSDKType;
order_fill?: StreamOrderbookFillSDKType;
taker_order?: StreamTakerOrderSDKType;
subaccount_update?: StreamSubaccountUpdateSDKType;
/** Block height of the update. */

block_height: number;
/** Exec mode of the update. */

exec_mode: number;
orderbook_update?: StreamOrderbookUpdateSDKType;
order_fill?: StreamOrderbookFillSDKType;
taker_order?: StreamTakerOrderSDKType;
subaccount_update?: StreamSubaccountUpdateSDKType;
}
/**
* StreamOrderbookUpdate provides information on an orderbook update. Used in
* the full node GRPC stream.
*/

export interface StreamOrderbookUpdate {
/**
* Orderbook updates for the clob pair. Can contain order place, removals,
* or updates.
*/
updates: OffChainUpdateV1[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* All updates should be ignored until snapshot is recieved.
* If the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

snapshot: boolean;
/**
* Orderbook updates for the clob pair. Can contain order place, removals,
* or updates.
*/

updates: OffChainUpdateV1[];
}
/**
* StreamOrderbookUpdate provides information on an orderbook update. Used in
* the full node GRPC stream.
*/

export interface StreamOrderbookUpdateSDKType {
/**
* Orderbook updates for the clob pair. Can contain order place, removals,
* or updates.
*/
updates: OffChainUpdateV1SDKType[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* All updates should be ignored until snapshot is recieved.
* If the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

snapshot: boolean;
/**
* Orderbook updates for the clob pair. Can contain order place, removals,
* or updates.
*/

updates: OffChainUpdateV1SDKType[];
}
/**
* StreamOrderbookFill provides information on an orderbook fill. Used in
Expand Down Expand Up @@ -1305,39 +1303,39 @@ export const StreamOrderbookUpdatesResponse = {

function createBaseStreamUpdate(): StreamUpdate {
return {
blockHeight: 0,
execMode: 0,
orderbookUpdate: undefined,
orderFill: undefined,
takerOrder: undefined,
subaccountUpdate: undefined,
blockHeight: 0,
execMode: 0
subaccountUpdate: undefined
};
}

export const StreamUpdate = {
encode(message: StreamUpdate, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.orderbookUpdate !== undefined) {
StreamOrderbookUpdate.encode(message.orderbookUpdate, writer.uint32(10).fork()).ldelim();
if (message.blockHeight !== 0) {
writer.uint32(8).uint32(message.blockHeight);
}

if (message.orderFill !== undefined) {
StreamOrderbookFill.encode(message.orderFill, writer.uint32(18).fork()).ldelim();
if (message.execMode !== 0) {
writer.uint32(16).uint32(message.execMode);
}

if (message.takerOrder !== undefined) {
StreamTakerOrder.encode(message.takerOrder, writer.uint32(26).fork()).ldelim();
if (message.orderbookUpdate !== undefined) {
StreamOrderbookUpdate.encode(message.orderbookUpdate, writer.uint32(26).fork()).ldelim();
}

if (message.subaccountUpdate !== undefined) {
StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(34).fork()).ldelim();
if (message.orderFill !== undefined) {
StreamOrderbookFill.encode(message.orderFill, writer.uint32(34).fork()).ldelim();
}

if (message.blockHeight !== 0) {
writer.uint32(40).uint32(message.blockHeight);
if (message.takerOrder !== undefined) {
StreamTakerOrder.encode(message.takerOrder, writer.uint32(42).fork()).ldelim();
}

if (message.execMode !== 0) {
writer.uint32(48).uint32(message.execMode);
if (message.subaccountUpdate !== undefined) {
StreamSubaccountUpdate.encode(message.subaccountUpdate, writer.uint32(50).fork()).ldelim();
}

return writer;
Expand All @@ -1353,27 +1351,27 @@ export const StreamUpdate = {

switch (tag >>> 3) {
case 1:
message.orderbookUpdate = StreamOrderbookUpdate.decode(reader, reader.uint32());
message.blockHeight = reader.uint32();
break;

case 2:
message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32());
message.execMode = reader.uint32();
break;

case 3:
message.takerOrder = StreamTakerOrder.decode(reader, reader.uint32());
message.orderbookUpdate = StreamOrderbookUpdate.decode(reader, reader.uint32());
break;

case 4:
message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32());
message.orderFill = StreamOrderbookFill.decode(reader, reader.uint32());
break;

case 5:
message.blockHeight = reader.uint32();
message.takerOrder = StreamTakerOrder.decode(reader, reader.uint32());
break;

case 6:
message.execMode = reader.uint32();
message.subaccountUpdate = StreamSubaccountUpdate.decode(reader, reader.uint32());
break;

default:
Expand All @@ -1387,32 +1385,32 @@ export const StreamUpdate = {

fromPartial(object: DeepPartial<StreamUpdate>): StreamUpdate {
const message = createBaseStreamUpdate();
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
message.orderbookUpdate = object.orderbookUpdate !== undefined && object.orderbookUpdate !== null ? StreamOrderbookUpdate.fromPartial(object.orderbookUpdate) : undefined;
message.orderFill = object.orderFill !== undefined && object.orderFill !== null ? StreamOrderbookFill.fromPartial(object.orderFill) : undefined;
message.takerOrder = object.takerOrder !== undefined && object.takerOrder !== null ? StreamTakerOrder.fromPartial(object.takerOrder) : undefined;
message.subaccountUpdate = object.subaccountUpdate !== undefined && object.subaccountUpdate !== null ? StreamSubaccountUpdate.fromPartial(object.subaccountUpdate) : undefined;
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

};

function createBaseStreamOrderbookUpdate(): StreamOrderbookUpdate {
return {
updates: [],
snapshot: false
snapshot: false,
updates: []
};
}

export const StreamOrderbookUpdate = {
encode(message: StreamOrderbookUpdate, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
for (const v of message.updates) {
OffChainUpdateV1.encode(v!, writer.uint32(10).fork()).ldelim();
if (message.snapshot === true) {
writer.uint32(8).bool(message.snapshot);
}

if (message.snapshot === true) {
writer.uint32(16).bool(message.snapshot);
for (const v of message.updates) {
OffChainUpdateV1.encode(v!, writer.uint32(18).fork()).ldelim();
}

return writer;
Expand All @@ -1428,11 +1426,11 @@ export const StreamOrderbookUpdate = {

switch (tag >>> 3) {
case 1:
message.updates.push(OffChainUpdateV1.decode(reader, reader.uint32()));
message.snapshot = reader.bool();
break;

case 2:
message.snapshot = reader.bool();
message.updates.push(OffChainUpdateV1.decode(reader, reader.uint32()));
break;

default:
Expand All @@ -1446,8 +1444,8 @@ export const StreamOrderbookUpdate = {

fromPartial(object: DeepPartial<StreamOrderbookUpdate>): StreamOrderbookUpdate {
const message = createBaseStreamOrderbookUpdate();
message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || [];
message.snapshot = object.snapshot ?? false;
message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || [];
return message;
}

Expand Down
32 changes: 16 additions & 16 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -182,35 +182,35 @@ message StreamOrderbookUpdatesResponse {
// StreamUpdate is an update that will be pushed through the
// GRPC stream.
message StreamUpdate {
// Block height of the update.
uint32 block_height = 1;

// Exec mode of the update.
uint32 exec_mode = 2;

// Contains one of an StreamOrderbookUpdate,
// StreamOrderbookFill, StreamTakerOrderStatus.
oneof update_message {
StreamOrderbookUpdate orderbook_update = 1;
StreamOrderbookFill order_fill = 2;
StreamTakerOrder taker_order = 3;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 4;
StreamOrderbookUpdate orderbook_update = 3;
StreamOrderbookFill order_fill = 4;
StreamTakerOrder taker_order = 5;
dydxprotocol.subaccounts.StreamSubaccountUpdate subaccount_update = 6;
}

// Block height of the update.
uint32 block_height = 5;

// Exec mode of the update.
uint32 exec_mode = 6;
}

// StreamOrderbookUpdate provides information on an orderbook update. Used in
// the full node GRPC stream.
message StreamOrderbookUpdate {
// Orderbook updates for the clob pair. Can contain order place, removals,
// or updates.
repeated dydxprotocol.indexer.off_chain_updates.OffChainUpdateV1 updates = 1
[ (gogoproto.nullable) = false ];

// Snapshot indicates if the response is from a snapshot of the orderbook.
// All updates should be ignored until snapshot is recieved.
// If the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;
bool snapshot = 1;

// Orderbook updates for the clob pair. Can contain order place, removals,
// or updates.
repeated dydxprotocol.indexer.off_chain_updates.OffChainUpdateV1 updates = 2
[ (gogoproto.nullable) = false ];
}

// StreamOrderbookFill provides information on an orderbook fill. Used in
Expand Down
Loading

0 comments on commit 18d44cf

Please sign in to comment.