diff --git a/.changeset/healthy-spoons-explode.md b/.changeset/healthy-spoons-explode.md new file mode 100644 index 000000000..5fccea03f --- /dev/null +++ b/.changeset/healthy-spoons-explode.md @@ -0,0 +1,5 @@ +--- +"ponder": patch +--- + +Improved reorg handling resilience. diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index d9ab770eb..57dca2488 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -33,6 +33,7 @@ import { intervalUnion } from "@/utils/interval.js"; import { never } from "@/utils/never.js"; import { type RequestQueue, createRequestQueue } from "@/utils/requestQueue.js"; import { startClock } from "@/utils/timer.js"; +import { type Queue, createQueue } from "@ponder/common"; import { type Address, type Hash, @@ -236,6 +237,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { syncProgress: SyncProgress; historicalSync: HistoricalSync; realtimeSync: RealtimeSync; + realtimeQueue: Queue; unfinalizedBlocks: Omit< Extract, "type" @@ -290,13 +292,22 @@ export const createSync = async (args: CreateSyncParameters): Promise => { network, onFatalError: args.onFatalError, }); + + const realtimeQueue = createQueue({ + initialStart: true, + browser: false, + concurrency: 1, + worker: async (event: RealtimeSyncEvent) => + onRealtimeSyncEvent({ event, network }), + }); + const realtimeSync = createRealtimeSync({ common: args.common, sources, requestQueue, network, onEvent: (event) => - onRealtimeSyncEvent({ event, network }).catch((error) => { + realtimeQueue.add(event).catch((error) => { args.common.logger.error({ service: "sync", msg: `Fatal error: Unable to process ${event.type} event`, @@ -343,6 +354,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { syncProgress, historicalSync, realtimeSync, + realtimeQueue, unfinalizedBlocks: [], }); status[network.name] = { block: null, ready: false }; @@ -717,11 +729,6 @@ export const createSync = async (args: CreateSyncParameters): Promise => { syncProgress.finalized = event.block; const checkpoint = getOmnichainCheckpoint("finalized")!; - // Raise event to parent function (runtime) - if (checkpoint > prev) { - args.onRealtimeEvent({ type: "finalize", checkpoint }); - } - if ( getChainCheckpoint({ syncProgress, network, tag: "finalized" })! > getOmnichainCheckpoint("current")! @@ -813,6 +820,11 @@ export const createSync = async (args: CreateSyncParameters): Promise => { }); } + // Raise event to parent function (runtime) + if (checkpoint > prev) { + args.onRealtimeEvent({ type: "finalize", checkpoint }); + } + /** * The realtime service can be killed if `endBlock` is * defined has become finalized. @@ -973,8 +985,12 @@ export const createSync = async (args: CreateSyncParameters): Promise => { isKilled = true; const promises: Promise[] = []; for (const network of args.networks) { - const { historicalSync, realtimeSync } = perNetworkSync.get(network)!; + const { historicalSync, realtimeSync, realtimeQueue } = + perNetworkSync.get(network)!; historicalSync.kill(); + realtimeQueue.pause(); + realtimeQueue.clear(); + promises.push(realtimeQueue.onIdle()); promises.push(realtimeSync.kill()); } await Promise.all(promises);