diff --git a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts index d5c01982ce..4e74c5e55c 100644 --- a/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts +++ b/indexer/packages/postgres/__tests__/stores/wallet-table.test.ts @@ -1,7 +1,26 @@ import { WalletFromDatabase } from '../../src/types'; import { clearData, migrate, teardown } from '../../src/helpers/db-helpers'; -import { defaultWallet2, defaultWallet3 } from '../helpers/constants'; +import { DateTime } from 'luxon'; +import { + defaultFill, + defaultOrder, + defaultSubaccount, + defaultTendermintEventId, + defaultTendermintEventId2, + defaultTendermintEventId3, + defaultTendermintEventId4, + defaultWallet, + defaultWallet2, + defaultWallet3, + isolatedMarketOrder, + isolatedSubaccount, +} from '../helpers/constants'; +import * as FillTable from '../../src/stores/fill-table'; +import * as OrderTable from '../../src/stores/order-table'; import * as WalletTable from '../../src/stores/wallet-table'; +import * as SubaccountTable from '../../src/stores/subaccount-table'; +import * as PersistentCacheTable from '../../src/stores/persistent-cache-table'; +import { seedData } from '../helpers/mock-generators'; describe('Wallet store', () => { beforeAll(async () => { @@ -92,4 +111,106 @@ describe('Wallet store', () => { expect(wallets.length).toEqual(1); expect(wallets[0]).toEqual(expect.objectContaining(defaultWallet3)); }); + + it('Successfully updates totalVolume for time window multiple times', async () => { + const firstFillTime = await populateWalletSubaccountFill(); + + // Update totalVolume for a time window that covers all fills + await WalletTable.updateTotalVolume( + firstFillTime.minus({ hours: 1 }).toISO(), // need to minus because left bound is exclusive + firstFillTime.plus({ hours: 1 }).toISO(), + ); + let wallet = await WalletTable.findById(defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...defaultWallet, + totalVolume: '103', + })); + + // Update totalVolume for a time window that excludes some fills + // For convenience, we will reuse the existing fills data. The total volume calculated in this + // window should be added to the total volume above. + await WalletTable.updateTotalVolume( + firstFillTime.toISO(), // exclusive -> filters out first fill from each subaccount + firstFillTime.plus({ minutes: 2 }).toISO(), + ); + wallet = await WalletTable.findById(defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...defaultWallet, + totalVolume: '105', // 103 + 2 + })); + }); + + it('Successfully updates totalVolumeUpdateTime in persistent cache', async () => { + const leftBound = DateTime.utc().minus({ hours: 1 }); + const rightBound = DateTime.utc(); + await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO()); + + const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime'); + const lastUpdateTime = persistentCache?.value + ? DateTime.fromISO(persistentCache.value) + : undefined; + + expect(lastUpdateTime).not.toBeUndefined(); + if (lastUpdateTime?.toMillis() !== undefined) { + expect(lastUpdateTime.toMillis()).toEqual(rightBound.toMillis()); + } + }); }); + +/** + * Helper function to add entries into wallet, subaccount, fill tables. + * Create a wallet with 2 subaccounts; one subaccount has 3 fills and the other has 1 fill. + * The fills are at t=0,1,2 and t=1 for the subaccounts respectively. + * This setup allows us to test that the totalVolume is correctly calculated for a time window. + * @returns first fill time in ISO format + */ +async function populateWalletSubaccountFill(): Promise { + await seedData(); + await OrderTable.create(defaultOrder); + await OrderTable.create(isolatedMarketOrder); + + // seedData() creates defaultWallet with defaultSubaccount and isolatedSubaccount + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: defaultSubaccount.subaccountNumber }, + [], + {}, + ); + const isolatedSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: isolatedSubaccount.subaccountNumber }, + [], + {}, + ); + + const referenceDt = DateTime.utc().minus({ hours: 1 }); + const eventIds = [ + defaultTendermintEventId, + defaultTendermintEventId2, + defaultTendermintEventId3, + defaultTendermintEventId4, + ]; + let eventIdx = 0; + + // Create 3 fills with 1 min increments for defaultSubaccount + for (let i = 0; i < 3; i++) { + await FillTable.create({ + ...defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ minutes: i }).toISO(), + eventId: eventIds[eventIdx], + price: '1', + size: '1', + }); + eventIdx += 1; + } + // Create 1 fill at referenceDt for isolatedSubaccount + await FillTable.create({ + ...defaultFill, + subaccountId: isolatedSubaccountId[0].id, + createdAt: referenceDt.toISO(), + eventId: eventIds[eventIdx], + price: '10', + size: '10', + }); + + return referenceDt; +} diff --git a/indexer/packages/postgres/src/db/migrations/migration_files/20240906134410_add_fills_created_at_index.ts b/indexer/packages/postgres/src/db/migrations/migration_files/20240906134410_add_fills_created_at_index.ts new file mode 100644 index 0000000000..3c61072403 --- /dev/null +++ b/indexer/packages/postgres/src/db/migrations/migration_files/20240906134410_add_fills_created_at_index.ts @@ -0,0 +1,17 @@ +import * as Knex from 'knex'; + +export async function up(knex: Knex): Promise { + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS "fills_createdat_index" ON "fills" ("createdAt"); + `); +} + +export async function down(knex: Knex): Promise { + await knex.raw(` + DROP INDEX CONCURRENTLY IF EXISTS "fills_createdat_index"; + `); +} + +export const config = { + transaction: false, +}; diff --git a/indexer/packages/postgres/src/stores/wallet-table.ts b/indexer/packages/postgres/src/stores/wallet-table.ts index 8869aa5d36..6e96b19dae 100644 --- a/indexer/packages/postgres/src/stores/wallet-table.ts +++ b/indexer/packages/postgres/src/stores/wallet-table.ts @@ -1,6 +1,7 @@ import { PartialModelObject, QueryBuilder } from 'objection'; import { DEFAULT_POSTGRES_OPTIONS } from '../constants'; +import { knexReadReplica } from '../helpers/knex'; import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers'; import Transaction from '../helpers/transaction'; import WalletModel from '../models/wallet-model'; @@ -102,6 +103,7 @@ export async function upsert( // should only ever be one wallet return wallets[0]; } + export async function findById( address: string, options: Options = DEFAULT_POSTGRES_OPTIONS, @@ -114,3 +116,56 @@ export async function findById( .findById(address) .returning('*'); } + +/** + * Calculates the total volume in a given time window for each address and adds the values to the + * existing totalVolume values. + * + * @param windowStartTs - The start timestamp of the time window (exclusive). + * @param windowEndTs - The end timestamp of the time window (inclusive). + */ +export async function updateTotalVolume( + windowStartTs: string, + windowEndTs: string, +) : Promise { + + await knexReadReplica.getConnection().raw( + ` + BEGIN; + + WITH fills_total AS ( + -- Step 1: Calculate total volume for each subaccountId + SELECT "subaccountId", SUM("price" * "size") AS "totalVolume" + FROM fills + WHERE "createdAt" > '${windowStartTs}' AND "createdAt" <= '${windowEndTs}' + GROUP BY "subaccountId" + ), + subaccount_volume AS ( + -- Step 2: Merge with subaccounts table to get the address + SELECT s."address", f."totalVolume" + FROM fills_total f + JOIN subaccounts s + ON f."subaccountId" = s."id" + ), + address_volume AS ( + -- Step 3: Group by address and sum the totalVolume + SELECT "address", SUM("totalVolume") AS "totalVolume" + FROM subaccount_volume + GROUP BY "address" + ) + -- Step 4: Left join the result with the wallets table and update the total volume + UPDATE wallets + SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume" + FROM address_volume av + WHERE wallets."address" = av."address"; + + -- Step 5: Upsert new totalVolumeUpdateTime to persistent_cache table + INSERT INTO persistent_cache (key, value) + VALUES ('totalVolumeUpdateTime', '${windowEndTs}') + ON CONFLICT (key) + DO UPDATE SET value = EXCLUDED.value; + + COMMIT; + `, + ); +} diff --git a/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts new file mode 100644 index 0000000000..edb61608c0 --- /dev/null +++ b/indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts @@ -0,0 +1,221 @@ +import { + dbHelpers, + testConstants, + testMocks, + WalletTable, + SubaccountTable, + PersistentCacheTable, + FillTable, + OrderTable, +} from '@dydxprotocol-indexer/postgres'; +import walletTotalVolumeUpdateTask from '../../src/tasks/update-wallet-total-volume'; +import { DateTime } from 'luxon'; + +describe('update-wallet-total-volume', () => { + beforeAll(async () => { + await dbHelpers.migrate(); + await dbHelpers.clearData(); + }); + + beforeEach(async () => { + await testMocks.seedData(); + await OrderTable.create(testConstants.defaultOrder); + await OrderTable.create(testConstants.isolatedMarketOrder); + }); + + afterAll(async () => { + await dbHelpers.teardown(); + jest.resetAllMocks(); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + }); + + it('Successfully updates totalVolume multiple times', async () => { + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, + [], + {}, + ); + // Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt + // to backfill + await PersistentCacheTable.create({ + key: 'totalVolumeUpdateTime', + value: DateTime.utc().toISO(), + }); + + // First task run: one new fill + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: DateTime.utc().toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + }); + await walletTotalVolumeUpdateTask(); + let wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '1', + })); + + // Second task run: no new fills + await walletTotalVolumeUpdateTask(); + wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '1', + })); + + // Third task run: one new fill + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: DateTime.utc().toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '1', + size: '1', + }); + await walletTotalVolumeUpdateTask(); + wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '2', + })); + }); + + it('Successfully updates totalVolumeUpdateTime in persistent cache table', async () => { + // Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt + // to backfill + await PersistentCacheTable.create({ + key: 'totalVolumeUpdateTime', + value: DateTime.utc().toISO(), + }); + + await walletTotalVolumeUpdateTask(); + const lastUpdateTime1 = await getTotalVolumeUpdateTime(); + + await walletTotalVolumeUpdateTask(); + const lastUpdateTime2 = await getTotalVolumeUpdateTime(); + + expect(lastUpdateTime1).not.toBeUndefined(); + expect(lastUpdateTime2).not.toBeUndefined(); + if (lastUpdateTime1?.toMillis() !== undefined && lastUpdateTime2?.toMillis() !== undefined) { + expect(lastUpdateTime2.toMillis()) + .toBeGreaterThan(lastUpdateTime1.toMillis()); + } + }); + + it('Successfully backfills from past date', async () => { + const currentDt: DateTime = DateTime.utc(); + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, + [], + {}, + ); + + // Create 3 fills spanning 2 weeks in the past + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: currentDt.toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: currentDt.minus({ weeks: 1 }).toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '2', + size: '2', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: currentDt.minus({ weeks: 2 }).toISO(), + eventId: testConstants.defaultTendermintEventId3, + price: '3', + size: '3', + }); + + // Set persistent cache totalVolumeUpdateTime to 3 weeks ago to emulate backfill from 3 weeks. + await PersistentCacheTable.create({ + key: 'totalVolumeUpdateTime', + value: currentDt.minus({ weeks: 3 }).toISO(), + }); + + let backfillTime = await getTotalVolumeUpdateTime(); + while (backfillTime !== undefined && DateTime.fromISO(backfillTime.toISO()) < currentDt) { + await walletTotalVolumeUpdateTask(); + backfillTime = await getTotalVolumeUpdateTime(); + } + + const wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '14', // 1 + 4 + 9 + })); + }); + + it('Successfully backfills on first run', async () => { + const defaultSubaccountId = await SubaccountTable.findAll( + { subaccountNumber: testConstants.defaultSubaccount.subaccountNumber }, + [], + {}, + ); + + // Leave persistent cache totalVolumeUpdateTime empty and create fills around + // `defaultLastUpdateTime` value to emulate backfilling from very beginning + expect(await getTotalVolumeUpdateTime()).toBeUndefined(); + + const referenceDt = DateTime.fromISO('2020-01-01T00:00:00Z'); + + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ days: 1 }).toISO(), + eventId: testConstants.defaultTendermintEventId, + price: '1', + size: '1', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ days: 2 }).toISO(), + eventId: testConstants.defaultTendermintEventId2, + price: '2', + size: '2', + }); + await FillTable.create({ + ...testConstants.defaultFill, + subaccountId: defaultSubaccountId[0].id, + createdAt: referenceDt.plus({ days: 3 }).toISO(), + eventId: testConstants.defaultTendermintEventId3, + price: '3', + size: '3', + }); + + // Emulate 10 roundtable runs (this should backfill all the fills) + for (let i = 0; i < 10; i++) { + await walletTotalVolumeUpdateTask(); + } + + const wallet = await WalletTable.findById(testConstants.defaultWallet.address); + expect(wallet).toEqual(expect.objectContaining({ + ...testConstants.defaultWallet, + totalVolume: '14', // 1 + 4 + 9 + })); + }); +}); + +async function getTotalVolumeUpdateTime(): Promise { + const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime'); + const lastUpdateTime1 = persistentCache?.value + ? DateTime.fromISO(persistentCache.value) + : undefined; + return lastUpdateTime1; +} diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 714a34a3de..a0f3a03bb5 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -57,6 +57,7 @@ export const configSchema = { LOOPS_ENABLED_LEADERBOARD_PNL_WEEKLY: parseBoolean({ default: false }), LOOPS_ENABLED_LEADERBOARD_PNL_MONTHLY: parseBoolean({ default: false }), LOOPS_ENABLED_LEADERBOARD_PNL_YEARLY: parseBoolean({ default: false }), + LOOPS_ENABLED_UPDATE_WALLET_TOTAL_VOLUME: parseBoolean({ default: true }), // Loop Timing LOOPS_INTERVAL_MS_MARKET_UPDATER: parseInteger({ @@ -125,6 +126,9 @@ export const configSchema = { LOOPS_INTERVAL_MS_LEADERBOARD_PNL_YEARLY: parseInteger({ default: THIRTY_SECONDS_IN_MILLISECONDS, }), + LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME: parseInteger({ + default: THIRTY_SECONDS_IN_MILLISECONDS, + }), // Start delay START_DELAY_ENABLED: parseBoolean({ default: true }), diff --git a/indexer/services/roundtable/src/index.ts b/indexer/services/roundtable/src/index.ts index d3ad79696c..c36af62b5a 100644 --- a/indexer/services/roundtable/src/index.ts +++ b/indexer/services/roundtable/src/index.ts @@ -27,6 +27,7 @@ import trackLag from './tasks/track-lag'; import uncrossOrderbookTask from './tasks/uncross-orderbook'; import updateComplianceDataTask from './tasks/update-compliance-data'; import updateResearchEnvironmentTask from './tasks/update-research-environment'; +import updateWalletTotalVolumeTask from './tasks/update-wallet-total-volume'; process.on('SIGTERM', () => { logger.info({ @@ -247,6 +248,13 @@ async function start(): Promise { config.LOOPS_INTERVAL_MS_LEADERBOARD_PNL_YEARLY, ); } + if (config.LOOPS_ENABLED_UPDATE_WALLET_TOTAL_VOLUME) { + startLoop( + updateWalletTotalVolumeTask, + 'update_wallet_total_volume', + config.LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME, + ); + } logger.info({ at: 'index', diff --git a/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts new file mode 100644 index 0000000000..5229d068dd --- /dev/null +++ b/indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts @@ -0,0 +1,52 @@ +import { logger, stats } from '@dydxprotocol-indexer/base'; +import { PersistentCacheTable, WalletTable } from '@dydxprotocol-indexer/postgres'; +import { DateTime } from 'luxon'; + +import config from '../config'; + +const defaultLastUpdateTime: string = '2020-01-01T00:00:00Z'; +const persistentCacheKey: string = 'totalVolumeUpdateTime'; + +/** + * Update the total volume for each address in the wallet table. + */ +export default async function runTask(): Promise { + try { + const start = Date.now(); + const persistentCacheEntry = await PersistentCacheTable.findById(persistentCacheKey); + + if (!persistentCacheEntry) { + logger.info({ + at: 'update-address-total-volume#runTask', + message: `No previous totalVolumeUpdateTime found in persistent cache table. Will use default value: ${defaultLastUpdateTime}`, + }); + } + + const lastUpdateTime = DateTime.fromISO(persistentCacheEntry + ? persistentCacheEntry.value + : defaultLastUpdateTime); + let currentTime = DateTime.utc(); + + // During backfilling, we process one day at a time to reduce roundtable runtime. + if (currentTime > lastUpdateTime.plus({ days: 1 })) { + currentTime = lastUpdateTime.plus({ days: 1 }); + } + + await WalletTable.updateTotalVolume(lastUpdateTime.toISO(), currentTime.toISO()); + await PersistentCacheTable.upsert({ + key: persistentCacheKey, + value: currentTime.toISO(), + }); + + stats.timing( + `${config.SERVICE_NAME}.update_wallet_total_volume_timing`, + Date.now() - start, + ); + } catch (error) { + logger.error({ + at: 'update-address-total-volume#runTask', + message: 'Error when updating totalVolume in wallets table', + error, + }); + } +}