diff --git a/.changeset/shaggy-oranges-lie.md b/.changeset/shaggy-oranges-lie.md new file mode 100644 index 000000000..251e2dd20 --- /dev/null +++ b/.changeset/shaggy-oranges-lie.md @@ -0,0 +1,5 @@ +--- +"ponder": patch +--- + +Fixed a bug where events between the historical backfill and live indexing were skipped. This does not affect the rpc cache. diff --git a/packages/core/src/sync-store/index.ts b/packages/core/src/sync-store/index.ts index 7485e6704..46e956c96 100644 --- a/packages/core/src/sync-store/index.ts +++ b/packages/core/src/sync-store/index.ts @@ -96,7 +96,7 @@ export type SyncStore = { filters: Filter[]; from: string; to: string; - limit: number; + limit?: number; }): Promise<{ events: RawEvent[]; cursor: string }>; insertRpcRequestResult(args: { request: string; @@ -836,7 +836,7 @@ export const createSyncStore = ({ .where("event.checkpoint", "<=", to) .orderBy("event.checkpoint", "asc") .orderBy("event.filterIndex", "asc") - .limit(limit) + .$if(limit !== undefined, (qb) => qb.limit(limit!)) .execute(); }, ); diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index b184ca1f1..0fff83a39 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -235,12 +235,16 @@ export const createSync = async (args: CreateSyncParameters): Promise => { syncProgress: SyncProgress; historicalSync: HistoricalSync; realtimeSync: RealtimeSync; - unfinalizedBlocks: (Omit< + unfinalizedBlocks: Omit< Extract, "type" - > & { events: RawEvent[] })[]; + >[]; } >(); + /** Events that have been executed but not finalized. */ + let executedEvents: RawEvent[] = []; + /** Events that have not been executed yet. */ + let pendingEvents: RawEvent[] = []; const status: Status = {}; let isKilled = false; // Realtime events across all chains that can't be passed to the parent function @@ -634,38 +638,32 @@ export const createSync = async (args: CreateSyncParameters): Promise => { hexToNumber(syncProgress.current.number), ); - const blockWithEventData = event; - - const events = buildEvents({ + const newEvents = buildEvents({ sources: args.sources, chainId: network.chainId, - blockWithEventData, + blockWithEventData: event, finalizedChildAddresses: realtimeSync.finalizedChildAddresses, unfinalizedChildAddresses: realtimeSync.unfinalizedChildAddresses, }); - unfinalizedBlocks.push({ ...blockWithEventData, events }); + unfinalizedBlocks.push(event); + pendingEvents.push(...newEvents); if (to > from) { for (const network of args.networks) { updateRealtimeStatus({ checkpoint: to, network }); } - const pendingEvents: RawEvent[] = []; + // Move events from pending to executed - for (const { unfinalizedBlocks } of perNetworkSync.values()) { - for (const { events } of unfinalizedBlocks) { - for (const event of events) { - if (event.checkpoint > from && event.checkpoint <= to) { - pendingEvents.push(event); - } - } - } - } + const events = pendingEvents + .filter((event) => event.checkpoint < to) + .sort((a, b) => (a.checkpoint < b.checkpoint ? -1 : 1)); - const events = pendingEvents.sort((a, b) => - a.checkpoint < b.checkpoint ? -1 : 1, + pendingEvents = pendingEvents.filter( + ({ checkpoint }) => checkpoint > to, ); + executedEvents.push(...events); args .onRealtimeEvent({ @@ -714,10 +712,10 @@ export const createSync = async (args: CreateSyncParameters): Promise => { service: "sync", msg: `Finalized block for '${network.name}' has surpassed overall indexing checkpoint`, }); - // exit early because we need to keep `unfinalizedBlocks.events` - return; } + // Remove all finalized data + const finalizedBlocks = unfinalizedBlocks.filter( ({ block }) => hexToNumber(block.number) <= hexToNumber(event.block.number), @@ -729,6 +727,10 @@ export const createSync = async (args: CreateSyncParameters): Promise => { hexToNumber(block.number) > hexToNumber(event.block.number), ); + executedEvents = executedEvents.filter( + (e) => e.checkpoint > checkpoint, + ); + // Add finalized blocks, logs, transactions, receipts, and traces to the sync-store. await Promise.all([ @@ -819,6 +821,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { */ case "reorg": { syncProgress.current = event.block; + // Note: this checkpoint is <= the previous checkpoint const checkpoint = getOmnichainCheckpoint("current")!; // Update "ponder_sync_block" metric @@ -827,15 +830,36 @@ export const createSync = async (args: CreateSyncParameters): Promise => { hexToNumber(syncProgress.current.number), ); + // Remove all reorged data + perNetworkSync.get(network)!.unfinalizedBlocks = unfinalizedBlocks.filter( ({ block }) => hexToNumber(block.number) <= hexToNumber(event.block.number), ); + const isReorgedEvent = ({ chainId, block }: RawEvent) => + chainId === network.chainId && + Number(block.number) > hexToNumber(event.block.number); + + pendingEvents = pendingEvents.filter( + (e) => isReorgedEvent(e) === false, + ); + executedEvents = executedEvents.filter( + (e) => isReorgedEvent(e) === false, + ); + + // Move events from executed to pending + + const events = executedEvents.filter((e) => e.checkpoint > checkpoint); + executedEvents = executedEvents.filter( + (e) => e.checkpoint < checkpoint, + ); + pendingEvents.push(...events); + await args.syncStore.pruneRpcRequestResult({ - blocks: event.reorgedBlocks, chainId: network.chainId, + blocks: event.reorgedBlocks, }); // Raise event to parent function (runtime) @@ -854,12 +878,39 @@ export const createSync = async (args: CreateSyncParameters): Promise => { for (const network of args.networks) { const { syncProgress, realtimeSync } = perNetworkSync.get(network)!; + const filters = args.sources + .filter(({ filter }) => filter.chainId === network.chainId) + .map(({ filter }) => filter); + status[network.name]!.block = { number: hexToNumber(syncProgress.current!.number), timestamp: hexToNumber(syncProgress.current!.timestamp), }; status[network.name]!.ready = true; + // Fetch any events between the omnichain finalized checkpoint and the single-chain + // finalized checkpoint and add them to pendingEvents. These events are synced during + // the historical phase, but must be indexed in the realtime phase because events + // synced in realtime on other chains might be ordered before them. + const from = getOmnichainCheckpoint("finalized")!; + + const finalized = getChainCheckpoint({ + syncProgress, + network, + tag: "finalized", + })!; + const end = getChainCheckpoint({ + syncProgress, + network, + tag: "end", + })!; + const to = min(finalized, end); + + if (to > from) { + const events = await args.syncStore.getEvents({ filters, from, to }); + pendingEvents.push(...events.events); + } + if (isSyncEnd(syncProgress)) { args.common.metrics.ponder_sync_is_complete.set( { network: network.name }, @@ -873,12 +924,8 @@ export const createSync = async (args: CreateSyncParameters): Promise => { const initialChildAddresses = new Map>(); - for (const { filter } of args.sources) { - if ( - filter.chainId === network.chainId && - "address" in filter && - isAddressFactory(filter.address) - ) { + for (const filter of filters) { + if ("address" in filter && isAddressFactory(filter.address)) { const addresses = await args.syncStore.getChildAddresses({ filter: filter.address, });