Skip to content

Commit

Permalink
pr revision
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryfan01234 committed Sep 7, 2024
1 parent 7197a1e commit dc2f825
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 97 deletions.
25 changes: 21 additions & 4 deletions indexer/packages/postgres/__tests__/stores/wallet-table.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import * as FillTable from '../../src/stores/fill-table';
import * as OrderTable from '../../src/stores/order-table';
import * as WalletTable from '../../src/stores/wallet-table';
import * as SubaccountTable from '../../src/stores/subaccount-table';
import * as PersistentCacheTable from '../../src/stores/persistent-cache-table';
import { seedData } from '../helpers/mock-generators';

describe('Wallet store', () => {
Expand Down Expand Up @@ -116,7 +117,7 @@ describe('Wallet store', () => {

// Update totalVolume for a time window that covers all fills
await WalletTable.updateTotalVolume(
firstFillTime.toISO(),
firstFillTime.minus({ hours: 1 }).toISO(), // need to minus because left bound is exclusive
firstFillTime.plus({ hours: 1 }).toISO(),
);
let wallet = await WalletTable.findById(defaultWallet.address);
Expand All @@ -129,15 +130,31 @@ describe('Wallet store', () => {
// For convenience, we will reuse the existing fills data. The total volume calculated in this
// window should be added to the total volume above.
await WalletTable.updateTotalVolume(
firstFillTime.toISO(),
firstFillTime.plus({ minutes: 2 }).toISO(), // windowEntTs is exclusive -> filters out 1 fill
firstFillTime.toISO(), // exclusive -> filters out first fill from each subaccount
firstFillTime.plus({ minutes: 2 }).toISO(),
);
wallet = await WalletTable.findById(defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
...defaultWallet,
totalVolume: '205', // 103 + 102
totalVolume: '105', // 103 + 2
}));
});

it('Successfully updates totalVolumeUpdateTime in persistent cache', async () => {
const leftBound = DateTime.utc().minus({ hours: 1 });
const rightBound = DateTime.utc();
await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO());

const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime');
const lastUpdateTime = persistentCache?.value
? DateTime.fromISO(persistentCache.value)
: undefined;

expect(lastUpdateTime).not.toBeUndefined();
if (lastUpdateTime?.toMillis() !== undefined) {
expect(lastUpdateTime.toMillis()).toEqual(rightBound.toMillis());
}
});
});

/**
Expand Down
65 changes: 37 additions & 28 deletions indexer/packages/postgres/src/stores/wallet-table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ export async function findById(
* Calculates the total volume in a given time window for each address and adds the values to the
* existing totalVolume values.
*
* @param windowStartTs - The start timestamp of the time window (inclusive).
* @param windowEndTs - The end timestamp of the time window (exclusive).
* @param windowStartTs - The start timestamp of the time window (exclusive).
* @param windowEndTs - The end timestamp of the time window (inclusive).
*/
export async function updateTotalVolume(
windowStartTs: string,
Expand All @@ -131,32 +131,41 @@ export async function updateTotalVolume(

await knexReadReplica.getConnection().raw(
`
WITH fills_total AS (
-- Step 1: Calculate total volume for each subaccountId
SELECT "subaccountId", SUM("price" * "size") AS "totalVolume"
FROM fills
WHERE "createdAt" >= ? AND "createdAt" < ?
GROUP BY "subaccountId"
),
subaccount_volume AS (
-- Step 2: Merge with subaccounts table to get the address
SELECT s."address", f."totalVolume"
FROM fills_total f
JOIN subaccounts s
ON f."subaccountId" = s."id"
),
address_volume AS (
-- Step 3: Group by address and sum the totalVolume
SELECT "address", SUM("totalVolume") AS "totalVolume"
FROM subaccount_volume
GROUP BY "address"
)
-- Step 4: Left join the result with the wallets table and update the total volume
UPDATE wallets
SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume"
FROM address_volume av
WHERE wallets."address" = av."address";
BEGIN;
WITH fills_total AS (
-- Step 1: Calculate total volume for each subaccountId
SELECT "subaccountId", SUM("price" * "size") AS "totalVolume"
FROM fills
WHERE "createdAt" > '${windowStartTs}' AND "createdAt" <= '${windowEndTs}'
GROUP BY "subaccountId"
),
subaccount_volume AS (
-- Step 2: Merge with subaccounts table to get the address
SELECT s."address", f."totalVolume"
FROM fills_total f
JOIN subaccounts s
ON f."subaccountId" = s."id"
),
address_volume AS (
-- Step 3: Group by address and sum the totalVolume
SELECT "address", SUM("totalVolume") AS "totalVolume"
FROM subaccount_volume
GROUP BY "address"
)
-- Step 4: Left join the result with the wallets table and update the total volume
UPDATE wallets
SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume"
FROM address_volume av
WHERE wallets."address" = av."address";
-- Step 5: Upsert new totalVolumeUpdateTime to persistent_cache table
INSERT INTO persistent_cache (key, value)
VALUES ('totalVolumeUpdateTime', '${windowEndTs}')
ON CONFLICT (key)
DO UPDATE SET value = EXCLUDED.value;
COMMIT;
`,
[windowStartTs, windowEndTs],
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,55 +32,19 @@ describe('update-wallet-total-volume', () => {
await dbHelpers.clearData();
});

it('Succeeds in populating historical totalVolume on first run', async () => {
const referenceDt: DateTime = DateTime.fromISO('2020-01-01T00:00:00Z');
it('Successfully updates totalVolume multiple times', async () => {
const defaultSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: testConstants.defaultSubaccount.subaccountNumber },
[],
{},
);

await FillTable.create({
...testConstants.defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: referenceDt.toISO(),
eventId: testConstants.defaultTendermintEventId,
price: '1',
size: '1',
});
await FillTable.create({
...testConstants.defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: referenceDt.plus({ years: 1 }).toISO(),
eventId: testConstants.defaultTendermintEventId2,
price: '2',
size: '2',
});
await FillTable.create({
...testConstants.defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: referenceDt.plus({ years: 2 }).toISO(),
eventId: testConstants.defaultTendermintEventId3,
price: '3',
size: '3',
// Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt
// to backfill
await PersistentCacheTable.create({
key: 'totalVolumeUpdateTime',
value: DateTime.utc().toISO(),
});

await walletTotalVolumeUpdateTask();

const wallet = await WalletTable.findById(testConstants.defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
...testConstants.defaultWallet,
totalVolume: '14', // 1 + 4 + 9
}));
});

it('Succeeds in incremental totalVolume updates', async () => {
const defaultSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: testConstants.defaultSubaccount.subaccountNumber },
[],
{},
);

// First task run: one new fill
await FillTable.create({
...testConstants.defaultFill,
Expand Down Expand Up @@ -123,22 +87,129 @@ describe('update-wallet-total-volume', () => {
});

it('Successfully updates totalVolumeUpdateTime in persistent cache table', async () => {
// Set persistent cache totalVolumeUpdateTime so walletTotalVolumeUpdateTask() does not attempt
// to backfill
await PersistentCacheTable.create({
key: 'totalVolumeUpdateTime',
value: DateTime.utc().toISO(),
});

await walletTotalVolumeUpdateTask();
const lastUpdateTime1 = await getTotalVolumeUpdateTime();

// Sleep for 1s
await new Promise((resolve) => setTimeout(resolve, 1000));

await walletTotalVolumeUpdateTask();
const lastUpdateTime2 = await getTotalVolumeUpdateTime();

expect(lastUpdateTime1).not.toBeUndefined();
expect(lastUpdateTime2).not.toBeUndefined();
if (lastUpdateTime1?.toMillis() !== undefined && lastUpdateTime2?.toMillis() !== undefined) {
expect(lastUpdateTime2.toMillis())
.toBeGreaterThan(lastUpdateTime1.plus({ seconds: 1 }).toMillis());
.toBeGreaterThan(lastUpdateTime1.toMillis());
}
});

it('Successfully backfills from past date', async () => {
const currentDt: DateTime = DateTime.utc();
const defaultSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: testConstants.defaultSubaccount.subaccountNumber },
[],
{},
);

// Create 3 fills spanning 2 weeks in the past
await FillTable.create({
...testConstants.defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: currentDt.toISO(),
eventId: testConstants.defaultTendermintEventId,
price: '1',
size: '1',
});
await FillTable.create({
...testConstants.defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: currentDt.minus({ weeks: 1 }).toISO(),
eventId: testConstants.defaultTendermintEventId2,
price: '2',
size: '2',
});
await FillTable.create({
...testConstants.defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: currentDt.minus({ weeks: 2 }).toISO(),
eventId: testConstants.defaultTendermintEventId3,
price: '3',
size: '3',
});

// Set persistent cache totalVolumeUpdateTime to 3 weeks ago to emulate backfill from 3 weeks.
await PersistentCacheTable.create({
key: 'totalVolumeUpdateTime',
value: currentDt.minus({ weeks: 3 }).toISO(),
});

let backfillTime = await getTotalVolumeUpdateTime();
while (backfillTime !== undefined && DateTime.fromISO(backfillTime.toISO()) < currentDt) {
await walletTotalVolumeUpdateTask();
backfillTime = await getTotalVolumeUpdateTime();
}

const wallet = await WalletTable.findById(testConstants.defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
...testConstants.defaultWallet,
totalVolume: '14', // 1 + 4 + 9
}));
});

it('Successfully backfills on first run', async () => {
const defaultSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: testConstants.defaultSubaccount.subaccountNumber },
[],
{},
);

// Leave persistent cache totalVolumeUpdateTime empty and create fills around
// `defaultLastUpdateTime` value to emulate backfilling from very beginning
expect(await getTotalVolumeUpdateTime()).toBeUndefined();

const referenceDt = DateTime.fromISO('2020-01-01T00:00:00Z');

await FillTable.create({
...testConstants.defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: referenceDt.plus({ days: 1 }).toISO(),
eventId: testConstants.defaultTendermintEventId,
price: '1',
size: '1',
});
await FillTable.create({
...testConstants.defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: referenceDt.plus({ days: 2 }).toISO(),
eventId: testConstants.defaultTendermintEventId2,
price: '2',
size: '2',
});
await FillTable.create({
...testConstants.defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: referenceDt.plus({ days: 3 }).toISO(),
eventId: testConstants.defaultTendermintEventId3,
price: '3',
size: '3',
});

// Emulate 10 roundtable runs (this should backfill all the fills)
for (let i = 0; i < 10; i++) {
await walletTotalVolumeUpdateTask();
}

const wallet = await WalletTable.findById(testConstants.defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
...testConstants.defaultWallet,
totalVolume: '14', // 1 + 4 + 9
}));
});
});

async function getTotalVolumeUpdateTime(): Promise<DateTime | undefined> {
Expand Down
2 changes: 1 addition & 1 deletion indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export const configSchema = {
default: THIRTY_SECONDS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME: parseInteger({
default: FIVE_MINUTES_IN_MILLISECONDS,
default: THIRTY_SECONDS_IN_MILLISECONDS,
}),

// Start delay
Expand Down
34 changes: 16 additions & 18 deletions indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { logger, stats } from '@dydxprotocol-indexer/base';
import {
PersistentCacheTable,
WalletTable,
} from '@dydxprotocol-indexer/postgres';
import { PersistentCacheTable, WalletTable } from '@dydxprotocol-indexer/postgres';
import { DateTime } from 'luxon';

import config from '../config';

const defaultLastUpdateTime: string = '2000-01-01T00:00:00Z';
const defaultLastUpdateTime: string = '2020-01-01T00:00:00Z';
const persistentCacheKey: string = 'totalVolumeUpdateTime';

/**
Expand All @@ -23,21 +22,20 @@ export default async function runTask(): Promise<void> {
});
}

const lastUpdateTime = persistentCacheEntry
? persistentCacheEntry.value
: defaultLastUpdateTime;
const currentTime = new Date().toISOString();

// On the first run of this roundtable, we need to calculate the total volume for all historical
// fills. This is a much more demanding task than regular roundtable runs.
// At time of commit, the total number of rows in 'fills' table in imperator mainnet is ~250M.
// This can be processed in ~1min with the introduction of 'createdAt' index in 'fills' table.
// This is relatively short and significanlty shorter than roundtable task cadence. Hence,
// special handling for the first run is not required.
await WalletTable.updateTotalVolume(lastUpdateTime, currentTime);
const lastUpdateTime = DateTime.fromISO(persistentCacheEntry
? persistentCacheEntry.value
: defaultLastUpdateTime);
let currentTime = DateTime.utc();

// During backfilling, we process one day at a time to reduce roundtable runtime.
if (currentTime > lastUpdateTime.plus({ days: 1 })) {
currentTime = lastUpdateTime.plus({ days: 1 });
}

await WalletTable.updateTotalVolume(lastUpdateTime.toISO(), currentTime.toISO());
await PersistentCacheTable.upsert({
key: persistentCacheKey,
value: currentTime,
value: currentTime.toISO(),
});

stats.timing(
Expand Down

0 comments on commit dc2f825

Please sign in to comment.