diff --git a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts index 185a079903d..4e74c5e55c9 100644 --- a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts +++ b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts @@ -19,6 +19,7 @@ import * as FillTable from '../../src/stores/fill-table'; import * as OrderTable from '../../src/stores/order-table'; import * as WalletTable from '../../src/stores/wallet-table'; import * as SubaccountTable from '../../src/stores/subaccount-table'; +import * as PersistentCacheTable from '../../src/stores/persistent-cache-table'; import { seedData } from '../helpers/mock-generators'; describe('Wallet store', () => { @@ -116,7 +117,7 @@ describe('Wallet store', () => { // Update totalVolume for a time window that covers all fills await WalletTable.updateTotalVolume( - firstFillTime.toISO(), + firstFillTime.minus({ hours: 1 }).toISO(), // need to minus because left bound is exclusive firstFillTime.plus({ hours: 1 }).toISO(), ); let wallet = await WalletTable.findById(defaultWallet.address); @@ -129,15 +130,31 @@ describe('Wallet store', () => { // For convenience, we will reuse the existing fills data. The total volume calculated in this // window should be added to the total volume above. await WalletTable.updateTotalVolume( - firstFillTime.toISO(), - firstFillTime.plus({ minutes: 2 }).toISO(), // windowEntTs is exclusive -> filters out 1 fill + firstFillTime.toISO(), // exclusive -> filters out first fill from each subaccount + firstFillTime.plus({ minutes: 2 }).toISO(), ); wallet = await WalletTable.findById(defaultWallet.address); expect(wallet).toEqual(expect.objectContaining({ ...defaultWallet, - totalVolume: '205', // 103 + 102 + totalVolume: '105', // 103 + 2 })); }); + + it('Successfully updates totalVolumeUpdateTime in persistent cache', async () => { + const leftBound = DateTime.utc().minus({ hours: 1 }); + const rightBound = DateTime.utc(); + await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO()); + + const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime'); + const lastUpdateTime = persistentCache?.value + ? DateTime.fromISO(persistentCache.value) + : undefined; + + expect(lastUpdateTime).not.toBeUndefined(); + if (lastUpdateTime?.toMillis() !== undefined) { + expect(lastUpdateTime.toMillis()).toEqual(rightBound.toMillis()); + } + }); }); /** diff --git a/indexer/packages/postgres/src/stores/wallet-table.ts b/indexer/packages/postgres/src/stores/wallet-table.ts index a74d7ae2f57..440c0ff99d5 100644 --- a/indexer/packages/postgres/src/stores/wallet-table.ts +++ b/indexer/packages/postgres/src/stores/wallet-table.ts @@ -121,8 +121,8 @@ export async function findById( * Calculates the total volume in a given time window for each address and adds the values to the * existing totalVolume values. * - * @param windowStartTs - The start timestamp of the time window (inclusive). - * @param windowEndTs - The end timestamp of the time window (exclusive). + * @param windowStartTs - The start timestamp of the time window (exclusive). + * @param windowEndTs - The end timestamp of the time window (inclusive). */ export async function updateTotalVolume( windowStartTs: string, @@ -131,32 +131,41 @@ export async function updateTotalVolume( await knexReadReplica.getConnection().raw( ` - WITH fills_total AS ( - -- Step 1: Calculate total volume for each subaccountId - SELECT "subaccountId", SUM("price" * "size") AS "totalVolume" - FROM fills - WHERE "createdAt" >= ? AND "createdAt" < ? - GROUP BY "subaccountId" - ), - subaccount_volume AS ( - -- Step 2: Merge with subaccounts table to get the address - SELECT s."address", f."totalVolume" - FROM fills_total f - JOIN subaccounts s - ON f."subaccountId" = s."id" - ), - address_volume AS ( - -- Step 3: Group by address and sum the totalVolume - SELECT "address", SUM("totalVolume") AS "totalVolume" - FROM subaccount_volume - GROUP BY "address" - ) - -- Step 4: Left join the result with the wallets table and update the total volume - UPDATE wallets - SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume" - FROM address_volume av - WHERE wallets."address" = av."address"; + BEGIN; + + WITH fills_total AS ( + -- Step 1: Calculate total volume for each subaccountId + SELECT "subaccountId", SUM("price" * "size") AS "totalVolume" + FROM fills + WHERE "createdAt" > '${windowStartTs}' AND "createdAt" <= '${windowEndTs}' + GROUP BY "subaccountId" + ), + subaccount_volume AS ( + -- Step 2: Merge with subaccounts table to get the address + SELECT s."address", f."totalVolume" + FROM fills_total f + JOIN subaccounts s + ON f."subaccountId" = s."id" + ), + address_volume AS ( + -- Step 3: Group by address and sum the totalVolume + SELECT "address", SUM("totalVolume") AS "totalVolume" + FROM subaccount_volume + GROUP BY "address" + ) + -- Step 4: Left join the result with the wallets table and update the total volume + UPDATE wallets + SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume" + FROM address_volume av + WHERE wallets."address" = av."address"; + + -- Step 5: Upsert new totalVolumeUpdateTime to persistent_cache table + INSERT INTO persistent_cache (key, value) + VALUES ('totalVolumeUpdateTime', '${windowEndTs}') + ON CONFLICT (key) + DO UPDATE SET value = EXCLUDED.value; + + COMMIT; `, - [windowStartTs, windowEndTs], ); } diff --git a/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts index fd4689db119..edb61608c05 100644 --- a/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts +++ b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts @@ -32,55 +32,19 @@ describe('update-wallet-total-volume', () => { await dbHelpers.clearData(); }); - it('Succeeds in populating historical totalVolume on first run', async () => { - const referenceDt: DateTime = DateTime.fromISO('2020-01-01T00:00:00Z'); + it('Successfully updates totalVolume multiple times', async () => { const defaultSubaccountId = await SubaccountTable.findAll( { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, [], {}, ); - - await FillTable.create({ - ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, - createdAt: referenceDt.toISO(), - eventId: testConstants.defaultTendermintEventId, - price: '1', - size: '1', - }); - await FillTable.create({ - ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, - createdAt: referenceDt.plus({ years: 1 }).toISO(), - eventId: testConstants.defaultTendermintEventId2, - price: '2', - size: '2', - }); - await FillTable.create({ - ...testConstants.defaultFill, - subaccountId: defaultSubaccountId[0].id, - createdAt: referenceDt.plus({ years: 2 }).toISO(), - eventId: testConstants.defaultTendermintEventId3, - price: '3', - size: '3', + // Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt + // to backfill + await PersistentCacheTable.create({ + key: 'totalVolumeUpdateTime', + value: DateTime.utc().toISO(), }); - await walletTotalVolumeUpdateTask(); - - const wallet = await WalletTable.findById(testConstants.defaultWallet.address); - expect(wallet).toEqual(expect.objectContaining({ - ...testConstants.defaultWallet, - totalVolume: '14', // 1 + 4 + 9 - })); - }); - - it('Succeeds in incremental totalVolume updates', async () => { - const defaultSubaccountId = await SubaccountTable.findAll( - { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, - [], - {}, - ); - // First task run: one new fill await FillTable.create({ ...testConstants.defaultFill, @@ -123,12 +87,16 @@ describe('update-wallet-total-volume', () => { }); it('Successfully updates totalVolumeUpdateTime in persistent cache table', async () => { + // Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt + // to backfill + await PersistentCacheTable.create({ + key: 'totalVolumeUpdateTime', + value: DateTime.utc().toISO(), + }); + await walletTotalVolumeUpdateTask(); const lastUpdateTime1 = await getTotalVolumeUpdateTime(); - // Sleep for 1s - await new Promise((resolve) => setTimeout(resolve, 1000)); - await walletTotalVolumeUpdateTask(); const lastUpdateTime2 = await getTotalVolumeUpdateTime(); @@ -136,9 +104,112 @@ describe('update-wallet-total-volume', () => { expect(lastUpdateTime2).not.toBeUndefined(); if (lastUpdateTime1?.toMillis() !== undefined && lastUpdateTime2?.toMillis() !== undefined) { expect(lastUpdateTime2.toMillis()) - .toBeGreaterThan(lastUpdateTime1.plus({ seconds: 1 }).toMillis()); + .toBeGreaterThan(lastUpdateTime1.toMillis()); } }); + + it('Successfully backfills from past date', async () => { + const currentDt: DateTime = DateTime.utc(); + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, + [], + {}, + ); + + // Create 3 fills spanning 2 weeks in the past + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: currentDt.toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: currentDt.minus({ weeks: 1 }).toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '2', + size: '2', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: currentDt.minus({ weeks: 2 }).toISO(), + eventId: testConstants.defaultTendermintEventId3, + price: '3', + size: '3', + }); + + // Set persistent cache totalVolumeUpdateTime to 3 weeks ago to emulate backfill from 3 weeks. + await PersistentCacheTable.create({ + key: 'totalVolumeUpdateTime', + value: currentDt.minus({ weeks: 3 }).toISO(), + }); + + let backfillTime = await getTotalVolumeUpdateTime(); + while (backfillTime !== undefined && DateTime.fromISO(backfillTime.toISO()) < currentDt) { + await walletTotalVolumeUpdateTask(); + backfillTime = await getTotalVolumeUpdateTime(); + } + + const wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '14', // 1 + 4 + 9 + })); + }); + + it('Successfully backfills on first run', async () => { + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, + [], + {}, + ); + + // Leave persistent cache totalVolumeUpdateTime empty and create fills around + // `defaultLastUpdateTime` value to emulate backfilling from very beginning + expect(await getTotalVolumeUpdateTime()).toBeUndefined(); + + const referenceDt = DateTime.fromISO('2020-01-01T00:00:00Z'); + + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ days: 1 }).toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ days: 2 }).toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '2', + size: '2', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ days: 3 }).toISO(), + eventId: testConstants.defaultTendermintEventId3, + price: '3', + size: '3', + }); + + // Emulate 10 roundtable runs (this should backfill all the fills) + for (let i = 0; i < 10; i++) { + await walletTotalVolumeUpdateTask(); + } + + const wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '14', // 1 + 4 + 9 + })); + }); }); async function getTotalVolumeUpdateTime(): Promise { diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index c9f2534a691..a0f3a03bb5c 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -127,7 +127,7 @@ export const configSchema = { default: THIRTY_SECONDS_IN_MILLISECONDS, }), LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME: parseInteger({ - default: FIVE_MINUTES_IN_MILLISECONDS, + default: THIRTY_SECONDS_IN_MILLISECONDS, }), // Start delay diff --git a/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts index b1f2a7bb88b..5229d068ddc 100644 --- a/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts +++ b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts @@ -1,11 +1,10 @@ import { logger, stats } from '@dydxprotocol-indexer/base'; -import { - PersistentCacheTable, - WalletTable, -} from '@dydxprotocol-indexer/postgres'; +import { PersistentCacheTable, WalletTable } from '@dydxprotocol-indexer/postgres'; +import { DateTime } from 'luxon'; + import config from '../config'; -const defaultLastUpdateTime: string = '2000-01-01T00:00:00Z'; +const defaultLastUpdateTime: string = '2020-01-01T00:00:00Z'; const persistentCacheKey: string = 'totalVolumeUpdateTime'; /** @@ -23,21 +22,20 @@ export default async function runTask(): Promise { }); } - const lastUpdateTime = persistentCacheEntry - ? persistentCacheEntry.value - : defaultLastUpdateTime; - const currentTime = new Date().toISOString(); - - // On the first run of this roundtable, we need to calculate the total volume for all historical - // fills. This is a much more demanding task than regular roundtable runs. - // At time of commit, the total number of rows in 'fills' table in imperator mainnet is ~250M. - // This can be processed in ~1min with the introduction of 'createdAt' index in 'fills' table. - // This is relatively short and significanlty shorter than roundtable task cadence. Hence, - // special handling for the first run is not required. - await WalletTable.updateTotalVolume(lastUpdateTime, currentTime); + const lastUpdateTime = DateTime.fromISO(persistentCacheEntry + ? persistentCacheEntry.value + : defaultLastUpdateTime); + let currentTime = DateTime.utc(); + + // During backfilling, we process one day at a time to reduce roundtable runtime. + if (currentTime > lastUpdateTime.plus({ days: 1 })) { + currentTime = lastUpdateTime.plus({ days: 1 }); + } + + await WalletTable.updateTotalVolume(lastUpdateTime.toISO(), currentTime.toISO()); await PersistentCacheTable.upsert({ key: persistentCacheKey, - value: currentTime, + value: currentTime.toISO(), }); stats.timing(