Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OTE-755] Add update wallet total volume roundtable task #2222

Merged
merged 2 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 122 additions & 1 deletion indexer/packages/postgres/__tests__/stores/wallet-table.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
import { WalletFromDatabase } from '../../src/types';
import { clearData, migrate, teardown } from '../../src/helpers/db-helpers';
import { defaultWallet2, defaultWallet3 } from '../helpers/constants';
import { DateTime } from 'luxon';
import {
defaultFill,
defaultOrder,
defaultSubaccount,
defaultTendermintEventId,
defaultTendermintEventId2,
defaultTendermintEventId3,
defaultTendermintEventId4,
defaultWallet,
defaultWallet2,
defaultWallet3,
isolatedMarketOrder,
isolatedSubaccount,
} from '../helpers/constants';
import * as FillTable from '../../src/stores/fill-table';
import * as OrderTable from '../../src/stores/order-table';
import * as WalletTable from '../../src/stores/wallet-table';
import * as SubaccountTable from '../../src/stores/subaccount-table';
import * as PersistentCacheTable from '../../src/stores/persistent-cache-table';
import { seedData } from '../helpers/mock-generators';

describe('Wallet store', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -92,4 +111,106 @@ describe('Wallet store', () => {
expect(wallets.length).toEqual(1);
expect(wallets[0]).toEqual(expect.objectContaining(defaultWallet3));
});

it('Successfully updates totalVolume for time window multiple times', async () => {
const firstFillTime = await populateWalletSubaccountFill();

// Update totalVolume for a time window that covers all fills
await WalletTable.updateTotalVolume(
firstFillTime.minus({ hours: 1 }).toISO(), // need to minus because left bound is exclusive
firstFillTime.plus({ hours: 1 }).toISO(),
);
let wallet = await WalletTable.findById(defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
...defaultWallet,
totalVolume: '103',
}));

// Update totalVolume for a time window that excludes some fills
// For convenience, we will reuse the existing fills data. The total volume calculated in this
// window should be added to the total volume above.
await WalletTable.updateTotalVolume(
firstFillTime.toISO(), // exclusive -> filters out first fill from each subaccount
firstFillTime.plus({ minutes: 2 }).toISO(),
);
wallet = await WalletTable.findById(defaultWallet.address);
expect(wallet).toEqual(expect.objectContaining({
...defaultWallet,
totalVolume: '105', // 103 + 2
}));
});

it('Successfully updates totalVolumeUpdateTime in persistent cache', async () => {
const leftBound = DateTime.utc().minus({ hours: 1 });
const rightBound = DateTime.utc();
await WalletTable.updateTotalVolume(leftBound.toISO(), rightBound.toISO());

const persistentCache = await PersistentCacheTable.findById('totalVolumeUpdateTime');
const lastUpdateTime = persistentCache?.value
? DateTime.fromISO(persistentCache.value)
: undefined;

expect(lastUpdateTime).not.toBeUndefined();
if (lastUpdateTime?.toMillis() !== undefined) {
expect(lastUpdateTime.toMillis()).toEqual(rightBound.toMillis());
}
});
});

/**
* Helper function to add entries into wallet, subaccount, fill tables.
* Create a wallet with 2 subaccounts; one subaccount has 3 fills and the other has 1 fill.
* The fills are at t=0,1,2 and t=1 for the subaccounts respectively.
* This setup allows us to test that the totalVolume is correctly calculated for a time window.
* @returns first fill time in ISO format
*/
async function populateWalletSubaccountFill(): Promise<DateTime> {
await seedData();
await OrderTable.create(defaultOrder);
await OrderTable.create(isolatedMarketOrder);

// seedData() creates defaultWallet with defaultSubaccount and isolatedSubaccount
const defaultSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: defaultSubaccount.subaccountNumber },
[],
{},
);
const isolatedSubaccountId = await SubaccountTable.findAll(
{ subaccountNumber: isolatedSubaccount.subaccountNumber },
[],
{},
);

const referenceDt = DateTime.utc().minus({ hours: 1 });
const eventIds = [
defaultTendermintEventId,
defaultTendermintEventId2,
defaultTendermintEventId3,
defaultTendermintEventId4,
];
let eventIdx = 0;

// Create 3 fills with 1 min increments for defaultSubaccount
for (let i = 0; i < 3; i++) {
await FillTable.create({
...defaultFill,
subaccountId: defaultSubaccountId[0].id,
createdAt: referenceDt.plus({ minutes: i }).toISO(),
eventId: eventIds[eventIdx],
price: '1',
size: '1',
});
eventIdx += 1;
}
// Create 1 fill at referenceDt for isolatedSubaccount
await FillTable.create({
...defaultFill,
subaccountId: isolatedSubaccountId[0].id,
createdAt: referenceDt.toISO(),
eventId: eventIds[eventIdx],
price: '10',
size: '10',
});

return referenceDt;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import * as Knex from 'knex';

export async function up(knex: Knex): Promise<void> {
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS "fills_createdat_index" ON "fills" ("createdAt");
`);
}
Comment on lines +3 to +7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an index on subaccountId too on the fills table. Can optimize on group bys and joins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


export async function down(knex: Knex): Promise<void> {
await knex.raw(`
DROP INDEX CONCURRENTLY IF EXISTS "fills_createdat_index";
`);
}

export const config = {
transaction: false,
};
55 changes: 55 additions & 0 deletions indexer/packages/postgres/src/stores/wallet-table.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { PartialModelObject, QueryBuilder } from 'objection';

import { DEFAULT_POSTGRES_OPTIONS } from '../constants';
import { knexReadReplica } from '../helpers/knex';
import { setupBaseQuery, verifyAllRequiredFields } from '../helpers/stores-helpers';
import Transaction from '../helpers/transaction';
import WalletModel from '../models/wallet-model';
Expand Down Expand Up @@ -102,6 +103,7 @@ export async function upsert(
// should only ever be one wallet
return wallets[0];
}

export async function findById(
address: string,
options: Options = DEFAULT_POSTGRES_OPTIONS,
Expand All @@ -114,3 +116,56 @@ export async function findById(
.findById(address)
.returning('*');
}

/**
* Calculates the total volume in a given time window for each address and adds the values to the
* existing totalVolume values.
*
* @param windowStartTs - The start timestamp of the time window (exclusive).
* @param windowEndTs - The end timestamp of the time window (inclusive).
*/
export async function updateTotalVolume(
windowStartTs: string,
windowEndTs: string,
) : Promise<void> {

await knexReadReplica.getConnection().raw(
`
BEGIN;

WITH fills_total AS (
-- Step 1: Calculate total volume for each subaccountId
SELECT "subaccountId", SUM("price" * "size") AS "totalVolume"
FROM fills
WHERE "createdAt" > '${windowStartTs}' AND "createdAt" <= '${windowEndTs}'
GROUP BY "subaccountId"
),
subaccount_volume AS (
-- Step 2: Merge with subaccounts table to get the address
SELECT s."address", f."totalVolume"
FROM fills_total f
JOIN subaccounts s
ON f."subaccountId" = s."id"
),
address_volume AS (
-- Step 3: Group by address and sum the totalVolume
SELECT "address", SUM("totalVolume") AS "totalVolume"
FROM subaccount_volume
GROUP BY "address"
)
-- Step 4: Left join the result with the wallets table and update the total volume
UPDATE wallets
SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume"
FROM address_volume av
WHERE wallets."address" = av."address";

-- Step 5: Upsert new totalVolumeUpdateTime to persistent_cache table
INSERT INTO persistent_cache (key, value)
VALUES ('totalVolumeUpdateTime', '${windowEndTs}')
ON CONFLICT (key)
DO UPDATE SET value = EXCLUDED.value;

COMMIT;
`,
);
}
Loading
Loading