Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config var to exclude specific stateful order ids from being processed. #2513

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ import Long from 'long';
import { producer } from '@dydxprotocol-indexer/kafka';
import { ConditionalOrderPlacementHandler } from '../../../src/handlers/stateful-order/conditional-order-placement-handler';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('conditionalOrderPlacementHandler', () => {
const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS;

beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
Expand All @@ -59,6 +62,7 @@ describe('conditionalOrderPlacementHandler', () => {
});

afterEach(async () => {
config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs;
await dbHelpers.clearData();
jest.clearAllMocks();
});
Expand Down Expand Up @@ -226,4 +230,22 @@ describe('conditionalOrderPlacementHandler', () => {
order!,
);
});

it.each([
['transaction event', 0],
['block event', -1],
])('successfully skips order (as %s)', async (
_name: string,
transactionIndex: number,
) => {
config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.orderIdToUuid(defaultOrder.orderId!);
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
defaultStatefulOrderEvent,
transactionIndex,
);

await onMessage(kafkaMessage);
const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId);
expect(order).toBeUndefined();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ import { ORDER_FLAG_CONDITIONAL } from '@dydxprotocol-indexer/v4-proto-parser';
import { ConditionalOrderTriggeredHandler } from '../../../src/handlers/stateful-order/conditional-order-triggered-handler';
import { defaultPerpetualMarket } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('conditionalOrderTriggeredHandler', () => {
const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS;

beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
Expand All @@ -53,6 +56,7 @@ describe('conditionalOrderTriggeredHandler', () => {
});

afterEach(async () => {
config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs;
await dbHelpers.clearData();
jest.clearAllMocks();
});
Expand Down Expand Up @@ -163,4 +167,40 @@ describe('conditionalOrderTriggeredHandler', () => {
`Unable to update order status with orderId: ${orderId}`,
);
});

it.each([
['transaction event', 0],
['block event', -1],
])('successfully skips order trigger event (as %s)', async (
_name: string,
transactionIndex: number,
) => {
config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.uuid(
testConstants.defaultOrderGoodTilBlockTime.subaccountId,
'0',
testConstants.defaultOrderGoodTilBlockTime.clobPairId,
testConstants.defaultOrderGoodTilBlockTime.orderFlags,
);
await OrderTable.create({
...testConstants.defaultOrderGoodTilBlockTime,
orderFlags: conditionalOrderId.orderFlags.toString(),
status: OrderStatus.UNTRIGGERED,
triggerPrice: '1000',
clientId: '0',
});
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
defaultStatefulOrderEvent,
transactionIndex,
);

await onMessage(kafkaMessage);
const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId);

expect(order).toBeDefined();
expect(order).toEqual(expect.objectContaining({
status: OrderStatus.OPEN,
updatedAt: defaultDateTime.toISO(),
updatedAtHeight: defaultHeight.toString(),
}));
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
import { producer } from '@dydxprotocol-indexer/kafka';
import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('statefulOrderPlacementHandler', () => {
const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS;

beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
Expand All @@ -59,6 +62,7 @@ describe('statefulOrderPlacementHandler', () => {
});

afterEach(async () => {
config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs;
await dbHelpers.clearData();
jest.clearAllMocks();
});
Expand Down Expand Up @@ -250,4 +254,26 @@ describe('statefulOrderPlacementHandler', () => {
});
// TODO[IND-20]: Add tests for vulcan messages
});

it.each([
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
['stateful order placement as txn event', defaultStatefulOrderEvent, 0],
['stateful long term order placement as txn event', defaultStatefulOrderLongTermEvent, 0],
['stateful order placement as block event', defaultStatefulOrderEvent, -1],
['stateful long term order placement as block event', defaultStatefulOrderLongTermEvent, -1],
])('successfully skips order with %s', async (
_name: string,
statefulOrderEvent: StatefulOrderEventV1,
transactionIndex: number,
) => {
config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.orderIdToUuid(defaultOrder.orderId!);
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
statefulOrderEvent,
transactionIndex,
);

await onMessage(kafkaMessage);
const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId);
expect(order).toBeUndefined();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ import { StatefulOrderRemovalHandler } from '../../../src/handlers/stateful-orde
import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
import { producer } from '@dydxprotocol-indexer/kafka';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('statefulOrderRemovalHandler', () => {
const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS;

beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
Expand All @@ -52,6 +55,7 @@ describe('statefulOrderRemovalHandler', () => {
});

afterEach(async () => {
config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs;
await dbHelpers.clearData();
jest.clearAllMocks();
});
Expand Down Expand Up @@ -153,4 +157,35 @@ describe('statefulOrderRemovalHandler', () => {
`Unable to update order status with orderId: ${orderId}`,
);
});

it.each([
['transaction event', 0],
['block event', -1],
])('successfully skips order removal event (as %s)', async (
_name: string,
transactionIndex: number,
) => {
config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.uuid(
testConstants.defaultOrder.subaccountId,
'0',
testConstants.defaultOrder.clobPairId,
testConstants.defaultOrder.orderFlags,
);
await OrderTable.create({
...testConstants.defaultOrder,
clientId: '0',
});
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
defaultStatefulOrderEvent,
transactionIndex,
);

await onMessage(kafkaMessage);
const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId);
expect(order).toBeDefined();
expect(order).toEqual(expect.objectContaining({
...testConstants.defaultOrder,
clientId: '0',
}));
});
});
8 changes: 8 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
parseSchema,
baseConfigSchema,
parseBoolean,
parseString,
} from '@dydxprotocol-indexer/base';
import {
kafkaConfigSchema,
Expand All @@ -23,6 +24,13 @@ export const configSchema = {
SEND_WEBSOCKET_MESSAGES: parseBoolean({
default: true,
}),
// Config var to skip processing stateful order events with specific uuids.
// Order UUIDs should be in a string delimited by commas.
// Only set if invalid order events are being included in a block and preventing ender from
// progressing.
SKIP_STATEFUL_ORDER_UUIDS: parseString({
default: '',
}),
};

export default parseSchema(configSchema);
20 changes: 18 additions & 2 deletions indexer/services/ender/src/lib/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import { KafkaPublisher } from './kafka-publisher';
import { SyncHandlers, SYNCHRONOUS_SUBTYPES } from './sync-handlers';
import {
ConsolidatedKafkaEvent,
DydxIndexerSubtypes, EventMessage, EventProtoWithTypeAndVersion, GroupedEvents,
DydxIndexerSubtypes, EventMessage, EventProtoWithTypeAndVersion, GroupedEvents, SKIPPED_EVENT_SUBTYPE,
} from './types';

const TXN_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record<string, ValidatorInitializer> = {
Expand Down Expand Up @@ -216,12 +216,28 @@ export class BlockProcessor {
);
validator.validate();
this.sqlEventPromises[eventProto.blockEventIndex] = validator.getEventForBlockProcessor();
const handlers: Handler<EventMessage>[] = validator.createHandlers(
let handlers: Handler<EventMessage>[] = validator.createHandlers(
eventProto.indexerTendermintEvent,
this.txId,
this.messageReceivedTimestamp,
);

if (validator.shouldExcludeEvent()) {
// If the event should be excluded from being processed, set the subtype to a special value
// for skipped events.
this.block.events[eventProto.blockEventIndex] = {
...this.block.events[eventProto.blockEventIndex],
subtype: SKIPPED_EVENT_SUBTYPE,
};
// Set handlers to empty array if event is to be skipped.
handlers = [];
logger.info({
at: 'onMessage#shouldExcludeEvent',
message: 'Excluded event from processing',
eventProto,
});
}

_.map(handlers, (handler: Handler<EventMessage>) => {
if (SYNCHRONOUS_SUBTYPES.includes(eventProto.type as DydxIndexerSubtypes)) {
this.syncHandlers.addHandler(eventProto.type, handler);
Expand Down
2 changes: 2 additions & 0 deletions indexer/services/ender/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ export enum DydxIndexerSubtypes {
UPSERT_VAULT = 'upsert_vault',
}

export const SKIPPED_EVENT_SUBTYPE = 'skipped_event';

// Generic interface used for creating the Handler objects
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type EventMessage = any;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ BEGIN
rval[i] = dydx_funding_handler(block_height, block_time, event_data, event_index, transaction_index);
WHEN '"upsert_vault"'::jsonb THEN
rval[i] = dydx_vault_upsert_handler(block_time, event_data);
WHEN '"skipped_event"'::jsonb THEN
rval[i] = jsonb_build_object();
ELSE
NULL;
END CASE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ BEGIN
rval[i] = dydx_trading_rewards_handler(block_height, block_time, event_data, event_index, transaction_index, jsonb_array_element_text(block->'txHashes', transaction_index));
WHEN '"register_affiliate"'::jsonb THEN
rval[i] = dydx_register_affiliate_handler(block_height, event_data);
WHEN '"skipped_event"'::jsonb THEN
rval[i] = jsonb_build_object();
ELSE
NULL;
END CASE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { OrderTable } from '@dydxprotocol-indexer/postgres';
import { ORDER_FLAG_CONDITIONAL, ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
import {
IndexerTendermintEvent,
Expand All @@ -13,6 +14,7 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import Long from 'long';

import config from '../config';
import { Handler, HandlerInitializer } from '../handlers/handler';
import { ConditionalOrderPlacementHandler } from '../handlers/stateful-order/conditional-order-placement-handler';
import { ConditionalOrderTriggeredHandler } from '../handlers/stateful-order/conditional-order-triggered-handler';
Expand Down Expand Up @@ -233,4 +235,40 @@ export class StatefulOrderValidator extends Validator<StatefulOrderEventV1> {

return [handler];
}

/**
* Skip order uuids in config env var.
*/
public shouldExcludeEvent(): boolean {
const orderUUIDsToSkip: string[] = config.SKIP_STATEFUL_ORDER_UUIDS.split(',');
if (orderUUIDsToSkip.length === 0) {
return false;
}

const orderUUIDStoSkipSet: Set<string> = new Set(orderUUIDsToSkip);
if (orderUUIDStoSkipSet.has(this.getOrderUUId())) {
return true;
}

return false;
}
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved

/**
* Gets order uuid for the event being validated.
* Assumes events are valid.
*/
private getOrderUUId(): string {
if (this.event.orderPlace !== undefined) {
return OrderTable.orderIdToUuid(this.event.orderPlace.order!.orderId!);
} else if (this.event.orderRemoval !== undefined) {
return OrderTable.orderIdToUuid(this.event.orderRemoval.removedOrderId!);
} else if (this.event.conditionalOrderPlacement !== undefined) {
return OrderTable.orderIdToUuid(this.event.conditionalOrderPlacement.order!.orderId!);
} else if (this.event.conditionalOrderTriggered !== undefined) {
return OrderTable.orderIdToUuid(this.event.conditionalOrderTriggered.triggeredOrderId!);
} else if (this.event.longTermOrderPlacement !== undefined) {
return OrderTable.orderIdToUuid(this.event.longTermOrderPlacement.order!.orderId!);
}
return '';
}
vincentwschau marked this conversation as resolved.
Show resolved Hide resolved
}
9 changes: 9 additions & 0 deletions indexer/services/ender/src/validators/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ export abstract class Validator<T extends object> {
txId: number,
messageReceivedTimestamp: string,
): Handler<EventMessage>[];

/**
* Allows aribtrary logic to exclude events from being processed.
* Defaults to no events being excluded.
* @returns
*/
public shouldExcludeEvent(): boolean {
return false;
}
}
Loading