Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyaoy committed Jun 11, 2024
1 parent 8ed0ebf commit 7f9416c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ describe('stateful-order-replacement-handler', () => {
emitSubaccountMessage: boolean,
transactionIndex: number,
) => {
await OrderTable.create({
...testConstants.defaultOrder,
clientId: '0',
orderFlags: ORDER_FLAG_LONG_TERM.toString(),
});
config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_STATEFUL_ORDERS = emitSubaccountMessage;
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
statefulOrderEvent,
Expand All @@ -111,6 +116,14 @@ describe('stateful-order-replacement-handler', () => {

await onMessage(kafkaMessage);

const oldOrder: OrderFromDatabase | undefined = await OrderTable.findById(oldOrderUuid);
expect(oldOrder).toBeDefined();
expect(oldOrder).toEqual(expect.objectContaining({
status: OrderStatus.CANCELED,
updatedAt: defaultDateTime.toISO(),
updatedAtHeight: defaultHeight.toString(),
}));

const newOrder: OrderFromDatabase | undefined = await OrderTable.findById(newOrderUuid);
expect(newOrder).toEqual({
id: newOrderUuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';

export class StatefulOrderReplacementHandler
extends AbstractStatefulOrderHandler<StatefulOrderEventV1> {
eventType: string = 'StatefulOrderEvent';

public getOrderId(): string {
private getOrderId(): string {
const orderId = OrderTable.orderIdToUuid(this.event.orderReplacement!.order!.orderId!);
return orderId;
}
Expand Down Expand Up @@ -74,31 +73,31 @@ export class StatefulOrderReplacementHandler
},
));

if (config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_STATEFUL_ORDERS) {
const perpetualMarket: PerpetualMarketFromDatabase = perpetualMarketRefresher
.getPerpetualMarketFromClobPairId(order.orderId!.clobPairId.toString())!;
const dbOrder: OrderFromDatabase = OrderModel.fromJson(resultRow.order) as OrderFromDatabase;
const redisOrder: RedisOrder = convertToRedisOrder(order, perpetualMarket);
const subaccountContent: SubaccountMessageContents = generateSubaccountMessageContents(
redisOrder,
dbOrder,
perpetualMarket,
OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED,
this.block.height.toString(),
);
// send subaccount websocket message
const perpetualMarket: PerpetualMarketFromDatabase = perpetualMarketRefresher
.getPerpetualMarketFromClobPairId(order.orderId!.clobPairId.toString())!;
const dbOrder: OrderFromDatabase = OrderModel.fromJson(resultRow.order) as OrderFromDatabase;
const redisOrder: RedisOrder = convertToRedisOrder(order, perpetualMarket);
const subaccountContent: SubaccountMessageContents = generateSubaccountMessageContents(
redisOrder,
dbOrder,
perpetualMarket,
OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED,
this.block.height.toString(),
);

const subaccountIdProto: SubaccountId = {
owner: this.getSubaccountId().owner,
number: this.getSubaccountId().number,
};
kafkaEvents.push(this.generateConsolidatedSubaccountKafkaEvent(
JSON.stringify(subaccountContent),
subaccountIdProto,
this.getOrderId(),
false,
subaccountContent,
));

const subaccountIdProto: SubaccountId = {
owner: this.getSubaccountId().owner,
number: this.getSubaccountId().number,
};
kafkaEvents.push(this.generateConsolidatedSubaccountKafkaEvent(
JSON.stringify(subaccountContent),
subaccountIdProto,
this.getOrderId(),
false,
subaccountContent,
));
}
return kafkaEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,37 @@ DECLARE
order_record orders%ROWTYPE;
subaccount_record subaccounts%ROWTYPE;
BEGIN
/* For order replacement, remove old order first and don't return immediately */
IF event_data->'orderReplacement' IS NOT NULL THEN
order_id = event_data->'orderReplacement'->'oldOrderId';
order_record."status" = 'CANCELED';

clob_pair_id = (order_id->'clobPairId')::bigint;
perpetual_market_record = dydx_get_perpetual_market_for_clob_pair(clob_pair_id);

subaccount_id = dydx_uuid_from_subaccount_id(order_id->'subaccountId');
SELECT * INTO subaccount_record FROM subaccounts WHERE "id" = subaccount_id;
IF NOT FOUND THEN
RAISE EXCEPTION 'Subaccount for order not found: %', order_;
END IF;

order_record."id" = dydx_uuid_from_order_id(order_id);
order_record."updatedAt" = block_time;
order_record."updatedAtHeight" = block_height;
UPDATE orders
SET
"status" = order_record."status",
"updatedAt" = order_record."updatedAt",
"updatedAtHeight" = order_record."updatedAtHeight"
WHERE "id" = order_record."id"
RETURNING * INTO order_record;

IF NOT FOUND THEN
RAISE NOTICE 'Unable to cancel replaced order because not found with orderId: %', dydx_uuid_from_order_id(order_id);
END IF;
END IF;
order_record := NULL; /* Reset order_record so the order place below doesn't carry over any values set above. */

/** TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent. */
IF event_data->'orderPlace' IS NOT NULL OR event_data->'longTermOrderPlacement' IS NOT NULL OR event_data->'conditionalOrderPlacement' IS NOT NULL OR event_data->'orderReplacement' IS NOT NULL THEN
order_ = coalesce(event_data->'orderPlace'->'order', event_data->'longTermOrderPlacement'->'order', event_data->'conditionalOrderPlacement'->'order', event_data->'orderReplacement'->'order');
Expand Down Expand Up @@ -50,6 +81,7 @@ BEGIN
order_record."reduceOnly" = (order_->>'reduceOnly')::boolean;
order_record."orderFlags" = (order_->'orderId'->'orderFlags')::bigint;
order_record."goodTilBlockTime" = to_timestamp((order_->'goodTilBlockTime')::double precision);

order_record."clientMetadata" = (order_->'clientMetadata')::bigint;
order_record."createdAtHeight" = block_height;
order_record."updatedAt" = block_time;
Expand Down Expand Up @@ -139,46 +171,7 @@ BEGIN
dydx_to_jsonb(subaccount_record)
);
ELSE
RAISE EXCEPTION 'Unkonwn sub-event type %', event_data;
END IF;

/* For order replacement, remove old order */
IF event_data->'orderReplacement' IS NOT NULL THEN
order_id = event_data->'orderReplacement'->'oldOrderId';
order_record."status" = 'CANCELED';

clob_pair_id = (order_id->'clobPairId')::bigint;
perpetual_market_record = dydx_get_perpetual_market_for_clob_pair(clob_pair_id);

subaccount_id = dydx_uuid_from_subaccount_id(order_id->'subaccountId');
SELECT * INTO subaccount_record FROM subaccounts WHERE "id" = subaccount_id;
IF NOT FOUND THEN
RAISE EXCEPTION 'Subaccount for order not found: %', order_;
END IF;

order_record."id" = dydx_uuid_from_order_id(order_id);
order_record."updatedAt" = block_time;
order_record."updatedAtHeight" = block_height;
UPDATE orders
SET
"status" = order_record."status",
"updatedAt" = order_record."updatedAt",
"updatedAtHeight" = order_record."updatedAtHeight"
WHERE "id" = order_record."id"
RETURNING * INTO order_record;

IF NOT FOUND THEN
RAISE EXCEPTION 'Unable to update order status with orderId: %', dydx_uuid_from_order_id(order_id);
END IF;

RETURN jsonb_build_object(
'order',
dydx_to_jsonb(order_record),
'perpetual_market',
dydx_to_jsonb(perpetual_market_record),
'subaccount',
dydx_to_jsonb(subaccount_record)
);
RAISE EXCEPTION 'Unknown sub-event type %', event_data;
END IF;
END;
$$ LANGUAGE plpgsql;

0 comments on commit 7f9416c

Please sign in to comment.