-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[OTE-755] Add update wallet total volume roundtable task #2222
Conversation
WalkthroughThe changes enhance the wallet management system by introducing new test cases for wallet volume calculations, a migration for indexing fills, and an asynchronous function to update total volume based on transaction data. New configuration options allow for control over update intervals, and a task has been implemented to run periodically, ensuring accurate wallet volume information. The test suite validates the functionality of the wallet's total volume calculations under various scenarios. These modifications collectively improve the functionality and performance of the wallet system. Changes
Sequence Diagram(s)sequenceDiagram
participant Config
participant Task
participant WalletTable
participant Cache
Config->>Task: Check if updates are enabled
alt Updates Enabled
Task->>Cache: Retrieve last update time
Cache-->>Task: Last update time
Task->>WalletTable: Update total volume
WalletTable-->>Task: Volume updated
Task->>Cache: Update current time
Cache-->>Task: Current time updated
end
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (7)
- indexer/packages/postgres/tests/stores/wallet-table.test.ts (2 hunks)
- indexer/packages/postgres/src/db/migrations/migration_files/20240906134410_add_fills_created_at_index.ts (1 hunks)
- indexer/packages/postgres/src/stores/wallet-table.ts (3 hunks)
- indexer/services/roundtable/tests/tasks/update-wallet-total-volume.test.ts (1 hunks)
- indexer/services/roundtable/src/config.ts (2 hunks)
- indexer/services/roundtable/src/index.ts (2 hunks)
- indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts (1 hunks)
Additional comments not posted (9)
indexer/packages/postgres/src/db/migrations/migration_files/20240906134410_add_fills_created_at_index.ts (1)
3-7
: Well-implemented migration script for index management.The migration script correctly uses
CREATE INDEX CONCURRENTLY
andDROP INDEX CONCURRENTLY
to manage the index on thecreatedAt
column of thefills
table. This approach is beneficial for live databases as it minimizes locking during index creation and removal. Additionally, the configurationtransaction: false
is appropriately set, acknowledging that these operations cannot be performed within a transaction.Also applies to: 9-13, 15-17
indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts (2)
1-33
: Setup and Imports ApprovedThe setup and imports are appropriately structured for testing the
update-wallet-total-volume
task. The use of database helpers and mock data seeding is correctly implemented to ensure a clean testing environment.
35-75
: Test Case Review: Historical Total Volume CalculationThe test case is well-structured and effectively simulates the initial population of historical totalVolume. However, ensure that the calculation of
totalVolume
as '14' is correct based on the provided fills. It might be beneficial to add a comment explaining how '14' is derived from the fills for clarity.Consider adding a comment or a breakdown of the calculation within the test to aid future maintainability.
indexer/services/roundtable/src/config.ts (2)
60-60
: Configuration option for enabling wallet volume update loopThe addition of
LOOPS_ENABLED_UPDATE_WALLET_TOTAL_VOLUME
is consistent with the existing configuration pattern and usesparseBoolean
to ensure type safety. The default value oftrue
suggests that this feature is enabled by default, which seems appropriate if the feature is ready for use and has been tested.
129-131
: Configuration option for setting the interval of wallet volume update loopThe addition of
LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME
with a default ofFIVE_MINUTES_IN_MILLISECONDS
is well-aligned with the application's needs for periodic updates. UsingparseInteger
ensures that the value is correctly handled as an integer, and the default value is set appropriately for a frequent but not overly aggressive update interval.indexer/packages/postgres/__tests__/stores/wallet-table.test.ts (2)
114-140
: Comprehensive test case fortotalVolume
updates.This test case effectively checks the
totalVolume
updates for different time windows, ensuring the logic for volume aggregation functions as intended. It's crucial to also verify edge cases where time windows might overlap or no fills exist within the specified window.Consider adding tests for edge cases to ensure robustness.
143-199
: Well-implemented helper function for test setup.The
populateWalletSubaccountFill
function is well-documented and effectively sets up the necessary test data. Ensure that the data setup does not introduce any side effects or inconsistencies, particularly when used in multiple test cases.Verify the data setup in various scenarios to ensure consistency and isolation between tests.
indexer/services/roundtable/src/index.ts (2)
30-30
: Import statement for new task added.The import of
updateWalletTotalVolumeTask
is crucial for the new functionality. Ensure that the implementation of this task is thoroughly tested and reviewed.Verify the implementation and testing of the
updateWalletTotalVolumeTask
module to ensure it meets the required functionality.
251-257
: Conditional check for looping mechanism added.The conditional check for initiating the
updateWalletTotalVolumeTask
loop is correctly implemented. Ensure that the configuration settings (LOOPS_ENABLED_UPDATE_WALLET_TOTAL_VOLUME
) are well-documented and easily configurable.Verify that the configuration settings are documented and test the behavior when the configuration is toggled.
/** | ||
* Update the total volume for each address in the wallet table. | ||
*/ | ||
export default async function runTask(): Promise<void> { | ||
try { | ||
const start = Date.now(); | ||
const persistentCacheEntry = await PersistentCacheTable.findById(persistentCacheKey); | ||
|
||
if (!persistentCacheEntry) { | ||
logger.info({ | ||
at: 'update-address-total-volume#runTask', | ||
message: `No previous totalVolumeUpdateTime found in persistent cache table. Will use default value: ${defaultLastUpdateTime}`, | ||
}); | ||
} | ||
|
||
const lastUpdateTime = persistentCacheEntry | ||
? persistentCacheEntry.value | ||
: defaultLastUpdateTime; | ||
const currentTime = new Date().toISOString(); | ||
|
||
// On the first run of this roundtable, we need to calculate the total volume for all historical | ||
// fills. This is a much more demanding task than regular roundtable runs. | ||
// At time of commit, the total number of rows in 'fills' table in imperator mainnet is ~250M. | ||
// This can be processed in ~1min with the introduction of 'createdAt' index in 'fills' table. | ||
// This is relatively short and significanlty shorter than roundtable task cadence. Hence, | ||
// special handling for the first run is not required. | ||
await WalletTable.updateTotalVolume(lastUpdateTime, currentTime); | ||
await PersistentCacheTable.upsert({ | ||
key: persistentCacheKey, | ||
value: currentTime, | ||
}); | ||
|
||
stats.timing( | ||
`${config.SERVICE_NAME}.update_wallet_total_volume_timing`, | ||
Date.now() - start, | ||
); | ||
} catch (error) { | ||
logger.error({ | ||
at: 'update-address-total-volume#runTask', | ||
message: 'Error when updating totalVolume in wallets table', | ||
error, | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Robust implementation of the wallet volume update task.
The task is well-implemented with thorough error handling and logging. The use of a persistent cache to track the last update time is a smart choice that facilitates efficient data processing. The detailed logging at various stages of the task provides valuable insights into its operation, which is beneficial for monitoring and debugging.
However, it would be beneficial to add more detailed comments or documentation explaining the task's overall impact on the system, especially considering its potential to affect large volumes of data.
Would you like me to help draft additional documentation or comments for this task?
/** | ||
* Calculates the total volume in a given time window for each address and adds the values to the | ||
* existing totalVolume values. | ||
* | ||
* @param windowStartTs - The start timestamp of the time window (inclusive). | ||
* @param windowEndTs - The end timestamp of the time window (exclusive). | ||
*/ | ||
export async function updateTotalVolume( | ||
windowStartTs: string, | ||
windowEndTs: string, | ||
) : Promise<void> { | ||
|
||
await knexReadReplica.getConnection().raw( | ||
` | ||
WITH fills_total AS ( | ||
-- Step 1: Calculate total volume for each subaccountId | ||
SELECT "subaccountId", SUM("price" * "size") AS "totalVolume" | ||
FROM fills | ||
WHERE "createdAt" >= ? AND "createdAt" < ? | ||
GROUP BY "subaccountId" | ||
), | ||
subaccount_volume AS ( | ||
-- Step 2: Merge with subaccounts table to get the address | ||
SELECT s."address", f."totalVolume" | ||
FROM fills_total f | ||
JOIN subaccounts s | ||
ON f."subaccountId" = s."id" | ||
), | ||
address_volume AS ( | ||
-- Step 3: Group by address and sum the totalVolume | ||
SELECT "address", SUM("totalVolume") AS "totalVolume" | ||
FROM subaccount_volume | ||
GROUP BY "address" | ||
) | ||
-- Step 4: Left join the result with the wallets table and update the total volume | ||
UPDATE wallets | ||
SET "totalVolume" = COALESCE(wallets."totalVolume", 0) + av."totalVolume" | ||
FROM address_volume av | ||
WHERE wallets."address" = av."address"; | ||
`, | ||
[windowStartTs, windowEndTs], | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Efficient and secure implementation of the updateTotalVolume
function.
The function efficiently calculates and updates the total volume for wallets using SQL queries with Common Table Expressions (CTEs). The use of parameterized queries enhances security by preventing SQL injection. The function is well-integrated into the existing module and correctly handles asynchronous operations.
However, adding error handling within the function could improve its robustness by managing potential database errors more gracefully.
Consider adding try-catch blocks around the database operations to handle and log potential errors effectively. Here's a suggested modification:
+ try {
await knexReadReplica.getConnection().raw(
...
);
+ } catch (error) {
+ logger.error({
+ message: 'Failed to update total volume',
+ error,
+ });
+ throw error; // Re-throw the error if you need to propagate it
+ }
Committable suggestion was skipped due to low confidence.
indexer/services/roundtable/__tests__/tasks/update-wallet-total-volume.test.ts
Outdated
Show resolved
Hide resolved
it('Successfully updates totalVolumeUpdateTime in persistent cache table', async () => { | ||
await walletTotalVolumeUpdateTask(); | ||
const lastUpdateTime1 = await getTotalVolumeUpdateTime(); | ||
|
||
// Sleep for 1s | ||
await new Promise((resolve) => setTimeout(resolve, 1000)); | ||
|
||
await walletTotalVolumeUpdateTask(); | ||
const lastUpdateTime2 = await getTotalVolumeUpdateTime(); | ||
|
||
expect(lastUpdateTime1).not.toBeUndefined(); | ||
expect(lastUpdateTime2).not.toBeUndefined(); | ||
if (lastUpdateTime1?.toMillis() !== undefined && lastUpdateTime2?.toMillis() !== undefined) { | ||
expect(lastUpdateTime2.toMillis()) | ||
.toBeGreaterThan(lastUpdateTime1.plus({ seconds: 1 }).toMillis()); | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test Case Review: Update of Total Volume Update Time
The test case correctly verifies that the totalVolumeUpdateTime
is updated in the persistent cache table. Using setTimeout
to simulate time passing is effective but consider alternatives that could avoid potential flakiness in automated testing environments, such as mocking the time.
Explore alternatives to using setTimeout
for simulating time in tests, such as using time-mocking libraries, to enhance test reliability.
// This can be processed in ~1min with the introduction of 'createdAt' index in 'fills' table. | ||
// This is relatively short and significanlty shorter than roundtable task cadence. Hence, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want every task including the first one to be ideally under a couple seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind adding logic to batch the historical run, but wondering why we can't have a ~1min run for the first run?
export async function up(knex: Knex): Promise<void> { | ||
await knex.raw(` | ||
CREATE INDEX CONCURRENTLY IF NOT EXISTS "fills_createdat_index" ON "fills" ("createdAt"); | ||
`); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an index on subaccountId too on the fills table. Can optimize on group bys and joins
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that was added in a previous PR (https://github.com/dydxprotocol/v4-chain/pull/2174/files)
await walletTotalVolumeUpdateTask(); | ||
const lastUpdateTime1 = await getTotalVolumeUpdateTime(); | ||
|
||
// Sleep for 1s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why sleep here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To create a significant inverval between runs in case time difference is less than smallest precision in postgres which is microseconds. I dont think our code is that fast, so I can remove if preferred.
await WalletTable.updateTotalVolume(lastUpdateTime, currentTime); | ||
await PersistentCacheTable.upsert({ | ||
key: persistentCacheKey, | ||
value: currentTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be useful to have a transaction here? and rollback if any errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
U are right, i will make it so that updateTotalVolume and peristent cache is part of one transaction
77bf33f
to
dc2f825
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (5)
- indexer/packages/postgres/tests/stores/wallet-table.test.ts (2 hunks)
- indexer/packages/postgres/src/stores/wallet-table.ts (3 hunks)
- indexer/services/roundtable/tests/tasks/update-wallet-total-volume.test.ts (1 hunks)
- indexer/services/roundtable/src/config.ts (2 hunks)
- indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts (1 hunks)
Files skipped from review as they are similar to previous changes (5)
- indexer/packages/postgres/tests/stores/wallet-table.test.ts
- indexer/packages/postgres/src/stores/wallet-table.ts
- indexer/services/roundtable/tests/tasks/update-wallet-total-volume.test.ts
- indexer/services/roundtable/src/config.ts
- indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts
dc2f825
to
3386cc6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (5)
- indexer/packages/postgres/tests/stores/wallet-table.test.ts (2 hunks)
- indexer/packages/postgres/src/stores/wallet-table.ts (3 hunks)
- indexer/services/roundtable/tests/tasks/update-wallet-total-volume.test.ts (1 hunks)
- indexer/services/roundtable/src/config.ts (2 hunks)
- indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts (1 hunks)
Files skipped from review as they are similar to previous changes (4)
- indexer/packages/postgres/tests/stores/wallet-table.test.ts
- indexer/packages/postgres/src/stores/wallet-table.ts
- indexer/services/roundtable/tests/tasks/update-wallet-total-volume.test.ts
- indexer/services/roundtable/src/tasks/update-wallet-total-volume.ts
Additional comments not posted (2)
indexer/services/roundtable/src/config.ts (2)
60-60
: Configuration option added for wallet volume update loop.The addition of
LOOPS_ENABLED_UPDATE_WALLET_TOTAL_VOLUME
with a default value oftrue
is correctly implemented usingparseBoolean
. This allows for enabling or disabling the wallet volume update loop.Consider adding documentation in the configuration file or the project's wiki to explain the purpose and usage of this new configuration option.
129-131
: Configuration option added for wallet volume update interval.The addition of
LOOPS_INTERVAL_MS_UPDATE_WALLET_TOTAL_VOLUME
with a default value ofTHIRTY_SECONDS_IN_MILLISECONDS
is correctly implemented usingparseInteger
. This setting controls the frequency of the wallet volume update task.Consider adding documentation in the configuration file or the project's wiki to explain the purpose and usage of this new configuration option.
Changelist
Add new index on createdAt for fills table
Add new roundtable task to update wallet total volume
Test Plan
local unit tests
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Chores