From 7197a1ec02659130d915f1ee3e548ec512749ff7 Mon Sep 17 00:00:00 2001 From: Jerry Fan Date: Fri, 6 Sep 2024 12:16:16 -0400 Subject: [PATCH 1/2] add wallet total volume update roundtable --- .../__tests__/stores/wallet-table.test.ts | 106 ++++++++++++- ...240906134410_add_fills_created_at_index.ts | 17 ++ .../postgres/src/stores/wallet-table.ts | 46 ++++++ .../tasks/update-wallet-total-volume.test.ts | 150 ++++++++++++++++++ indexer/services/roundtable/src/config.ts | 4 + indexer/services/roundtable/src/index.ts | 8 + .../src/tasks/update-wallet-total-volume.ts | 54 +++++++ 7 files changed, 384 insertions(+), 1 deletion(-) create mode 100644 indexer/packages/postgres/src/db/migrations/migration_files/20240906134410_add_fills_created_at_index.ts create mode 100644 indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts create mode 100644 indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts diff --git a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts index d5c01982ce..185a079903 100644 --- a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts +++ b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts @@ -1,7 +1,25 @@ import { WalletFromDatabase } from '../../src/types'; import { clearData, migrate, teardown } from '../../src/helpers/db-helpers'; -import { defaultWallet2, defaultWallet3 } from '../helpers/constants'; +import { DateTime } from 'luxon'; +import { + defaultFill, + defaultOrder, + defaultSubaccount, + defaultTendermintEventId, + defaultTendermintEventId2, + defaultTendermintEventId3, + defaultTendermintEventId4, + defaultWallet, + defaultWallet2, + defaultWallet3, + isolatedMarketOrder, + isolatedSubaccount, +} from '../helpers/constants'; +import * as FillTable from '../../src/stores/fill-table'; +import * as OrderTable from '../../src/stores/order-table'; import * as WalletTable from '../../src/stores/wallet-table'; +import * as SubaccountTable from '../../src/stores/subaccount-table'; +import { seedData } from '../helpers/mock-generators'; describe('Wallet store', () => { beforeAll(async () => { @@ -92,4 +110,90 @@ describe('Wallet store', () => { expect(wallets.length).toEqual(1); expect(wallets[0]).toEqual(expect.objectContaining(defaultWallet3)); }); + + it('Successfully updates totalVolume for time window multiple times', async () => { + const firstFillTime = await populateWalletSubaccountFill(); + + // Update totalVolume for a time window that covers all fills + await WalletTable.updateTotalVolume( + firstFillTime.toISO(), + firstFillTime.plus({ hours: 1 }).toISO(), + ); + let wallet = await WalletTable.findById(defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...defaultWallet, + totalVolume: '103', + })); + + // Update totalVolume for a time window that excludes some fills + // For convenience, we will reuse the existing fills data. The total volume calculated in this + // window should be added to the total volume above. + await WalletTable.updateTotalVolume( + firstFillTime.toISO(), + firstFillTime.plus({ minutes: 2 }).toISO(), // windowEntTs is exclusive -> filters out 1 fill + ); + wallet = await WalletTable.findById(defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...defaultWallet, + totalVolume: '205', // 103 + 102 + })); + }); }); + +/** + * Helper function to add entries into wallet, subaccount, fill tables. + * Create a wallet with 2 subaccounts; one subaccount has 3 fills and the other has 1 fill. + * The fills are at t=0,1,2 and t=1 for the subaccounts respectively. + * This setup allows us to test that the totalVolume is correctly calculated for a time window. + * @returns first fill time in ISO format + */ +async function populateWalletSubaccountFill(): Promise { + await seedData(); + await OrderTable.create(defaultOrder); + await OrderTable.create(isolatedMarketOrder); + + // seedData() creates defaultWallet with defaultSubaccount and isolatedSubaccount + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: defaultSubaccount.subaccountNumber }, + [], + {}, + ); + const isolatedSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: isolatedSubaccount.subaccountNumber }, + [], + {}, + ); + + const referenceDt = DateTime.utc().minus({ hours: 1 }); + const eventIds = [ + defaultTendermintEventId, + defaultTendermintEventId2, + defaultTendermintEventId3, + defaultTendermintEventId4, + ]; + let eventIdx = 0; + + // Create 3 fills with 1 min increments for defaultSubaccount + for (let i = 0; i < 3; i++) { + await FillTable.create({ + ...defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ minutes: i }).toISO(), + eventId: eventIds[eventIdx], + price: '1', + size: '1', + }); + eventIdx += 1; + } + // Create 1 fill at referenceDt for isolatedSubaccount + await FillTable.create({ + ...defaultFill, + subaccountId: isolatedSubaccountId[0].id, + createdAt: referenceDt.toISO(), + eventId: eventIds[eventIdx], + price: '10', + size: '10', + }); + + return referenceDt; +} diff --git a/indexer/packages/postgres/src/db/migrations/migration_files/20240906134410_add_fills_created_at_index.ts b/indexer/packages/postgres/src/db/migrations/migration_files/20240906134410_add_fills_created_at_index.ts new file mode 100644 index 0000000000..3c61072403 --- /dev/null +++ b/indexer/packages/postgres/src/db/migrations/migration_files/20240906134410_add_fills_created_at_index.ts @@ -0,0 +1,17 @@ +import * as Knex from 'knex'; + +export async function up(knex: Knex): Promise { + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS "fills_createdat_index" ON "fills" ("createdAt"); + `); +} + +export async function down(knex: Knex): Promise { + await knex.raw(` + DROP INDEX CONCURRENTLY IF EXISTS "fills_createdat_index"; + `); +} + +export const config = { + transaction: false, +}; diff --git a/indexer/packages/postgres/src/stores/wallet-table.ts b/indexer/packages/postgres/src/stores/wallet-table.ts index 8869aa5d36..a74d7ae2f5 100644 --- a/indexer/packages/postgres/src/stores/wallet-table.ts +++ b/indexer/packages/postgres/src/stores/wallet-table.ts @@ -1,6 +1,7 @@ import { PartialModelObject, QueryBuilder } from 'objection'; import { DEFAULT_POSTGRES_OPTIONS } from '../constants'; +import { knexReadReplica } from '../helpers/knex'; import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers'; import Transaction from '../helpers/transaction'; import WalletModel from '../models/wallet-model'; @@ -102,6 +103,7 @@ export async function upsert( // should only ever be one wallet return wallets[0]; } + export async function findById( address: string, options: Options = DEFAULT_POSTGRES_OPTIONS, @@ -114,3 +116,47 @@ export async function findById( .findById(address) .returning('*'); } + +/** + * Calculates the total volume in a given time window for each address and adds the values to the + * existing totalVolume values. + * + * @param windowStartTs - The start timestamp of the time window (inclusive). + * @param windowEndTs - The end timestamp of the time window (exclusive). + */ +export async function updateTotalVolume( + windowStartTs: string, + windowEndTs: string, +) : Promise { + + await knexReadReplica.getConnection().raw( + ` + WITH fills_total AS ( + -- Step 1: Calculate total volume for each subaccountId + SELECT "subaccountId", SUM("price" * "size") AS "totalVolume" + FROM fills + WHERE "createdAt" >= ? AND "createdAt" < ? + GROUP BY "subaccountId" + ), + subaccount_volume AS ( + -- Step 2: Merge with subaccounts table to get the address + SELECT s."address", f."totalVolume" + FROM fills_total f + JOIN subaccounts s + ON f."subaccountId" = s."id" + ), + address_volume AS ( + -- Step 3: Group by address and sum the totalVolume + SELECT "address", SUM("totalVolume") AS "totalVolume" + FROM subaccount_volume + GROUP BY "address" + ) + -- Step 4: Left join the result with the wallets table and update the total volume + UPDATE wallets + SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume" + FROM address_volume av + WHERE wallets."address" = av."address"; + `, + [windowStartTs, windowEndTs], + ); +} diff --git a/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts new file mode 100644 index 0000000000..fd4689db11 --- /dev/null +++ b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts @@ -0,0 +1,150 @@ +import { + dbHelpers, + testConstants, + testMocks, + WalletTable, + SubaccountTable, + PersistentCacheTable, + FillTable, + OrderTable, +} from '@dydxprotocol-indexer/postgres'; +import walletTotalVolumeUpdateTask from '../../src/tasks/update-wallet-total-volume'; +import { DateTime } from 'luxon'; + +describe('update-wallet-total-volume', () => { + beforeAll(async () => { + await dbHelpers.migrate(); + await dbHelpers.clearData(); + }); + + beforeEach(async () => { + await testMocks.seedData(); + await OrderTable.create(testConstants.defaultOrder); + await OrderTable.create(testConstants.isolatedMarketOrder); + }); + + afterAll(async () => { + await dbHelpers.teardown(); + jest.resetAllMocks(); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + }); + + it('Succeeds in populating historical totalVolume on first run', async () => { + const referenceDt: DateTime = DateTime.fromISO('2020-01-01T00:00:00Z'); + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, + [], + {}, + ); + + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ years: 1 }).toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '2', + size: '2', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ years: 2 }).toISO(), + eventId: testConstants.defaultTendermintEventId3, + price: '3', + size: '3', + }); + + await walletTotalVolumeUpdateTask(); + + const wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '14', // 1 + 4 + 9 + })); + }); + + it('Succeeds in incremental totalVolume updates', async () => { + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, + [], + {}, + ); + + // First task run: one new fill + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: DateTime.utc().toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + }); + await walletTotalVolumeUpdateTask(); + let wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '1', + })); + + // Second task run: no new fills + await walletTotalVolumeUpdateTask(); + wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '1', + })); + + // Third task run: one new fill + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: DateTime.utc().toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '1', + size: '1', + }); + await walletTotalVolumeUpdateTask(); + wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '2', + })); + }); + + it('Successfully updates totalVolumeUpdateTime in persistent cache table', async () => { + await walletTotalVolumeUpdateTask(); + const lastUpdateTime1 = await getTotalVolumeUpdateTime(); + + // Sleep for 1s + await new Promise((resolve) => setTimeout(resolve, 1000)); + + await walletTotalVolumeUpdateTask(); + const lastUpdateTime2 = await getTotalVolumeUpdateTime(); + + expect(lastUpdateTime1).not.toBeUndefined(); + expect(lastUpdateTime2).not.toBeUndefined(); + if (lastUpdateTime1?.toMillis() !== undefined && lastUpdateTime2?.toMillis() !== undefined) { + expect(lastUpdateTime2.toMillis()) + .toBeGreaterThan(lastUpdateTime1.plus({ seconds: 1 }).toMillis()); + } + }); +}); + +async function getTotalVolumeUpdateTime(): Promise { + const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime'); + const lastUpdateTime1 = persistentCache?.value + ? DateTime.fromISO(persistentCache.value) + : undefined; + return lastUpdateTime1; +} diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 714a34a3de..c9f2534a69 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -57,6 +57,7 @@ export const configSchema = { LOOPS_ENABLED_LEADERBOARD_PNL_WEEKLY: parseBoolean({ default: false }), LOOPS_ENABLED_LEADERBOARD_PNL_MONTHLY: parseBoolean({ default: false }), LOOPS_ENABLED_LEADERBOARD_PNL_YEARLY: parseBoolean({ default: false }), + LOOPS_ENABLED_UPDATE_WALLET_TOTAL_VOLUME: parseBoolean({ default: true }), // Loop Timing LOOPS_INTERVAL_MS_MARKET_UPDATER: parseInteger({ @@ -125,6 +126,9 @@ export const configSchema = { LOOPS_INTERVAL_MS_LEADERBOARD_PNL_YEARLY: parseInteger({ default: THIRTY_SECONDS_IN_MILLISECONDS, }), + LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME: parseInteger({ + default: FIVE_MINUTES_IN_MILLISECONDS, + }), // Start delay START_DELAY_ENABLED: parseBoolean({ default: true }), diff --git a/indexer/services/roundtable/src/index.ts b/indexer/services/roundtable/src/index.ts index d3ad79696c..c36af62b5a 100644 --- a/indexer/services/roundtable/src/index.ts +++ b/indexer/services/roundtable/src/index.ts @@ -27,6 +27,7 @@ import trackLag from './tasks/track-lag'; import uncrossOrderbookTask from './tasks/uncross-orderbook'; import updateComplianceDataTask from './tasks/update-compliance-data'; import updateResearchEnvironmentTask from './tasks/update-research-environment'; +import updateWalletTotalVolumeTask from './tasks/update-wallet-total-volume'; process.on('SIGTERM', () => { logger.info({ @@ -247,6 +248,13 @@ async function start(): Promise { config.LOOPS_INTERVAL_MS_LEADERBOARD_PNL_YEARLY, ); } + if (config.LOOPS_ENABLED_UPDATE_WALLET_TOTAL_VOLUME) { + startLoop( + updateWalletTotalVolumeTask, + 'update_wallet_total_volume', + config.LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME, + ); + } logger.info({ at: 'index', diff --git a/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts new file mode 100644 index 0000000000..b1f2a7bb88 --- /dev/null +++ b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts @@ -0,0 +1,54 @@ +import { logger, stats } from '@dydxprotocol-indexer/base'; +import { + PersistentCacheTable, + WalletTable, +} from '@dydxprotocol-indexer/postgres'; +import config from '../config'; + +const defaultLastUpdateTime: string = '2000-01-01T00:00:00Z'; +const persistentCacheKey: string = 'totalVolumeUpdateTime'; + +/** + * Update the total volume for each address in the wallet table. + */ +export default async function runTask(): Promise { + try { + const start = Date.now(); + const persistentCacheEntry = await PersistentCacheTable.findById(persistentCacheKey); + + if (!persistentCacheEntry) { + logger.info({ + at: 'update-address-total-volume#runTask', + message: `No previous totalVolumeUpdateTime found in persistent cache table. Will use default value: ${defaultLastUpdateTime}`, + }); + } + + const lastUpdateTime = persistentCacheEntry + ? persistentCacheEntry.value + : defaultLastUpdateTime; + const currentTime = new Date().toISOString(); + + // On the first run of this roundtable, we need to calculate the total volume for all historical + // fills. This is a much more demanding task than regular roundtable runs. + // At time of commit, the total number of rows in 'fills' table in imperator mainnet is ~250M. + // This can be processed in ~1min with the introduction of 'createdAt' index in 'fills' table. + // This is relatively short and significanlty shorter than roundtable task cadence. Hence, + // special handling for the first run is not required. + await WalletTable.updateTotalVolume(lastUpdateTime, currentTime); + await PersistentCacheTable.upsert({ + key: persistentCacheKey, + value: currentTime, + }); + + stats.timing( + `${config.SERVICE_NAME}.update_wallet_total_volume_timing`, + Date.now() - start, + ); + } catch (error) { + logger.error({ + at: 'update-address-total-volume#runTask', + message: 'Error when updating totalVolume in wallets table', + error, + }); + } +} From 3386cc6ccf740bcb1560ef4130d5aa90ce9cc763 Mon Sep 17 00:00:00 2001 From: Jerry Fan Date: Fri, 6 Sep 2024 15:47:11 -0400 Subject: [PATCH 2/2] pr revision --- .../__tests__/stores/wallet-table.test.ts | 25 ++- .../postgres/src/stores/wallet-table.ts | 17 +- .../tasks/update-wallet-total-volume.test.ts | 163 +++++++++++++----- indexer/services/roundtable/src/config.ts | 2 +- .../src/tasks/update-wallet-total-volume.ts | 34 ++-- 5 files changed, 168 insertions(+), 73 deletions(-) diff --git a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts index 185a079903..4e74c5e55c 100644 --- a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts +++ b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts @@ -19,6 +19,7 @@ import * as FillTable from '../../src/stores/fill-table'; import * as OrderTable from '../../src/stores/order-table'; import * as WalletTable from '../../src/stores/wallet-table'; import * as SubaccountTable from '../../src/stores/subaccount-table'; +import * as PersistentCacheTable from '../../src/stores/persistent-cache-table'; import { seedData } from '../helpers/mock-generators'; describe('Wallet store', () => { @@ -116,7 +117,7 @@ describe('Wallet store', () => { // Update totalVolume for a time window that covers all fills await WalletTable.updateTotalVolume( - firstFillTime.toISO(), + firstFillTime.minus({ hours: 1 }).toISO(), // need to minus because left bound is exclusive firstFillTime.plus({ hours: 1 }).toISO(), ); let wallet = await WalletTable.findById(defaultWallet.address); @@ -129,15 +130,31 @@ describe('Wallet store', () => { // For convenience, we will reuse the existing fills data. The total volume calculated in this // window should be added to the total volume above. await WalletTable.updateTotalVolume( - firstFillTime.toISO(), - firstFillTime.plus({ minutes: 2 }).toISO(), // windowEntTs is exclusive -> filters out 1 fill + firstFillTime.toISO(), // exclusive -> filters out first fill from each subaccount + firstFillTime.plus({ minutes: 2 }).toISO(), ); wallet = await WalletTable.findById(defaultWallet.address); expect(wallet).toEqual(expect.objectContaining({ ...defaultWallet, - totalVolume: '205', // 103 + 102 + totalVolume: '105', // 103 + 2 })); }); + + it('Successfully updates totalVolumeUpdateTime in persistent cache', async () => { + const leftBound = DateTime.utc().minus({ hours: 1 }); + const rightBound = DateTime.utc(); + await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO()); + + const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime'); + const lastUpdateTime = persistentCache?.value + ? DateTime.fromISO(persistentCache.value) + : undefined; + + expect(lastUpdateTime).not.toBeUndefined(); + if (lastUpdateTime?.toMillis() !== undefined) { + expect(lastUpdateTime.toMillis()).toEqual(rightBound.toMillis()); + } + }); }); /** diff --git a/indexer/packages/postgres/src/stores/wallet-table.ts b/indexer/packages/postgres/src/stores/wallet-table.ts index a74d7ae2f5..6e96b19dae 100644 --- a/indexer/packages/postgres/src/stores/wallet-table.ts +++ b/indexer/packages/postgres/src/stores/wallet-table.ts @@ -121,8 +121,8 @@ export async function findById( * Calculates the total volume in a given time window for each address and adds the values to the * existing totalVolume values. * - * @param windowStartTs - The start timestamp of the time window (inclusive). - * @param windowEndTs - The end timestamp of the time window (exclusive). + * @param windowStartTs - The start timestamp of the time window (exclusive). + * @param windowEndTs - The end timestamp of the time window (inclusive). */ export async function updateTotalVolume( windowStartTs: string, @@ -131,11 +131,13 @@ export async function updateTotalVolume( await knexReadReplica.getConnection().raw( ` + BEGIN; + WITH fills_total AS ( -- Step 1: Calculate total volume for each subaccountId SELECT "subaccountId", SUM("price" * "size") AS "totalVolume" FROM fills - WHERE "createdAt" >= ? AND "createdAt" < ? + WHERE "createdAt" > '${windowStartTs}' AND "createdAt" <= '${windowEndTs}' GROUP BY "subaccountId" ), subaccount_volume AS ( @@ -156,7 +158,14 @@ export async function updateTotalVolume( SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume" FROM address_volume av WHERE wallets."address" = av."address"; + + -- Step 5: Upsert new totalVolumeUpdateTime to persistent_cache table + INSERT INTO persistent_cache (key, value) + VALUES ('totalVolumeUpdateTime', '${windowEndTs}') + ON CONFLICT (key) + DO UPDATE SET value = EXCLUDED.value; + + COMMIT; `, - [windowStartTs, windowEndTs], ); } diff --git a/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts index fd4689db11..edb61608c0 100644 --- a/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts +++ b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts @@ -32,55 +32,19 @@ describe('update-wallet-total-volume', () => { await dbHelpers.clearData(); }); - it('Succeeds in populating historical totalVolume on first run', async () => { - const referenceDt: DateTime = DateTime.fromISO('2020-01-01T00:00:00Z'); + it('Successfully updates totalVolume multiple times', async () => { const defaultSubaccountId = await SubaccountTable.findAll( { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, [], {}, ); - - await FillTable.create({ - ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, - createdAt: referenceDt.toISO(), - eventId: testConstants.defaultTendermintEventId, - price: '1', - size: '1', - }); - await FillTable.create({ - ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, - createdAt: referenceDt.plus({ years: 1 }).toISO(), - eventId: testConstants.defaultTendermintEventId2, - price: '2', - size: '2', - }); - await FillTable.create({ - ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, - createdAt: referenceDt.plus({ years: 2 }).toISO(), - eventId: testConstants.defaultTendermintEventId3, - price: '3', - size: '3', + // Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt + // to backfill + await PersistentCacheTable.create({ + key: 'totalVolumeUpdateTime', + value: DateTime.utc().toISO(), }); - await walletTotalVolumeUpdateTask(); - - const wallet = await WalletTable.findById(testConstants.defaultWallet.address); - expect(wallet).toEqual(expect.objectContaining({ - ...testConstants.defaultWallet, - totalVolume: '14', // 1 + 4 + 9 - })); - }); - - it('Succeeds in incremental totalVolume updates', async () => { - const defaultSubaccountId = await SubaccountTable.findAll( - { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, - [], - {}, - ); - // First task run: one new fill await FillTable.create({ ...testConstants.defaultFill, @@ -123,12 +87,16 @@ describe('update-wallet-total-volume', () => { }); it('Successfully updates totalVolumeUpdateTime in persistent cache table', async () => { + // Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt + // to backfill + await PersistentCacheTable.create({ + key: 'totalVolumeUpdateTime', + value: DateTime.utc().toISO(), + }); + await walletTotalVolumeUpdateTask(); const lastUpdateTime1 = await getTotalVolumeUpdateTime(); - // Sleep for 1s - await new Promise((resolve) => setTimeout(resolve, 1000)); - await walletTotalVolumeUpdateTask(); const lastUpdateTime2 = await getTotalVolumeUpdateTime(); @@ -136,9 +104,112 @@ describe('update-wallet-total-volume', () => { expect(lastUpdateTime2).not.toBeUndefined(); if (lastUpdateTime1?.toMillis() !== undefined && lastUpdateTime2?.toMillis() !== undefined) { expect(lastUpdateTime2.toMillis()) - .toBeGreaterThan(lastUpdateTime1.plus({ seconds: 1 }).toMillis()); + .toBeGreaterThan(lastUpdateTime1.toMillis()); } }); + + it('Successfully backfills from past date', async () => { + const currentDt: DateTime = DateTime.utc(); + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, + [], + {}, + ); + + // Create 3 fills spanning 2 weeks in the past + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: currentDt.toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: currentDt.minus({ weeks: 1 }).toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '2', + size: '2', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: currentDt.minus({ weeks: 2 }).toISO(), + eventId: testConstants.defaultTendermintEventId3, + price: '3', + size: '3', + }); + + // Set persistent cache totalVolumeUpdateTime to 3 weeks ago to emulate backfill from 3 weeks. + await PersistentCacheTable.create({ + key: 'totalVolumeUpdateTime', + value: currentDt.minus({ weeks: 3 }).toISO(), + }); + + let backfillTime = await getTotalVolumeUpdateTime(); + while (backfillTime !== undefined && DateTime.fromISO(backfillTime.toISO()) < currentDt) { + await walletTotalVolumeUpdateTask(); + backfillTime = await getTotalVolumeUpdateTime(); + } + + const wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '14', // 1 + 4 + 9 + })); + }); + + it('Successfully backfills on first run', async () => { + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, + [], + {}, + ); + + // Leave persistent cache totalVolumeUpdateTime empty and create fills around + // `defaultLastUpdateTime` value to emulate backfilling from very beginning + expect(await getTotalVolumeUpdateTime()).toBeUndefined(); + + const referenceDt = DateTime.fromISO('2020-01-01T00:00:00Z'); + + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ days: 1 }).toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ days: 2 }).toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '2', + size: '2', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ days: 3 }).toISO(), + eventId: testConstants.defaultTendermintEventId3, + price: '3', + size: '3', + }); + + // Emulate 10 roundtable runs (this should backfill all the fills) + for (let i = 0; i < 10; i++) { + await walletTotalVolumeUpdateTask(); + } + + const wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '14', // 1 + 4 + 9 + })); + }); }); async function getTotalVolumeUpdateTime(): Promise { diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index c9f2534a69..a0f3a03bb5 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -127,7 +127,7 @@ export const configSchema = { default: THIRTY_SECONDS_IN_MILLISECONDS, }), LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME: parseInteger({ - default: FIVE_MINUTES_IN_MILLISECONDS, + default: THIRTY_SECONDS_IN_MILLISECONDS, }), // Start delay diff --git a/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts index b1f2a7bb88..5229d068dd 100644 --- a/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts +++ b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts @@ -1,11 +1,10 @@ import { logger, stats } from '@dydxprotocol-indexer/base'; -import { - PersistentCacheTable, - WalletTable, -} from '@dydxprotocol-indexer/postgres'; +import { PersistentCacheTable, WalletTable } from '@dydxprotocol-indexer/postgres'; +import { DateTime } from 'luxon'; + import config from '../config'; -const defaultLastUpdateTime: string = '2000-01-01T00:00:00Z'; +const defaultLastUpdateTime: string = '2020-01-01T00:00:00Z'; const persistentCacheKey: string = 'totalVolumeUpdateTime'; /** @@ -23,21 +22,20 @@ export default async function runTask(): Promise { }); } - const lastUpdateTime = persistentCacheEntry - ? persistentCacheEntry.value - : defaultLastUpdateTime; - const currentTime = new Date().toISOString(); - - // On the first run of this roundtable, we need to calculate the total volume for all historical - // fills. This is a much more demanding task than regular roundtable runs. - // At time of commit, the total number of rows in 'fills' table in imperator mainnet is ~250M. - // This can be processed in ~1min with the introduction of 'createdAt' index in 'fills' table. - // This is relatively short and significanlty shorter than roundtable task cadence. Hence, - // special handling for the first run is not required. - await WalletTable.updateTotalVolume(lastUpdateTime, currentTime); + const lastUpdateTime = DateTime.fromISO(persistentCacheEntry + ? persistentCacheEntry.value + : defaultLastUpdateTime); + let currentTime = DateTime.utc(); + + // During backfilling, we process one day at a time to reduce roundtable runtime. + if (currentTime > lastUpdateTime.plus({ days: 1 })) { + currentTime = lastUpdateTime.plus({ days: 1 }); + } + + await WalletTable.updateTotalVolume(lastUpdateTime.toISO(), currentTime.toISO()); await PersistentCacheTable.upsert({ key: persistentCacheKey, - value: currentTime, + value: currentTime.toISO(), }); stats.timing(