Skip to content

Commit

Permalink
fix: auto-reinitialize filters
Browse files Browse the repository at this point in the history
  • Loading branch information
jaxernst committed Aug 1, 2023
1 parent 008e2ab commit fcfd307
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 25 deletions.
36 changes: 21 additions & 15 deletions src/actions/public/watchContractEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { setBalance } from '../test/setBalance.js'
import { stopImpersonatingAccount } from '../test/stopImpersonatingAccount.js'
import { writeContract } from '../wallet/writeContract.js'

import { InvalidInputRpcError, RpcRequestError } from '../../index.js'
import * as createContractEventFilter from './createContractEventFilter.js'
import * as getBlockNumber from './getBlockNumber.js'
import * as getFilterChanges from './getFilterChanges.js'
Expand Down Expand Up @@ -597,32 +598,37 @@ describe('errors', () => {
{ retry: 3 },
)

test('reinitializes filter after the consecutive error reset threshold is reached', async () => {
const CONSECUTIVE_ERROR_THRESHOLD = 3

// Fail 3 times to trigger a filter reinitalization.
vi.spyOn(getFilterChanges, 'getFilterChanges')
.mockRejectedValueOnce(new Error('first fail'))
.mockRejectedValueOnce(new Error('second fail'))
.mockRejectedValueOnce(new Error('third fail'))

const eventFilterCreationSpy = vi.spyOn(
test('re-initializes the filter if the active filter uninstalls', async () => {
const filterCreator = vi.spyOn(
createContractEventFilter,
'createContractEventFilter',
)

const unwatch = watchContractEvent(publicClient, {
...usdcContractConfig,
consecutiveErrorResetThreshold: CONSECUTIVE_ERROR_THRESHOLD,
pollingInterval: 200,
onLogs: () => null,
onError: () => null,
pollingInterval: 200,
})

await wait(1000)
await wait(250)
expect(filterCreator).toBeCalledTimes(1)

vi.spyOn(getFilterChanges, 'getFilterChanges').mockRejectedValueOnce(
new InvalidInputRpcError(
new RpcRequestError({
body: { foo: 'bar' },
url: 'url',
error: {
code: -32000,
message: 'message',
},
}),
),
)

// Check that event filter creation function was called twice
expect(eventFilterCreationSpy).toBeCalledTimes(2)
await wait(500)
expect(filterCreator).toBeCalledTimes(2)
unwatch()
})
})
16 changes: 6 additions & 10 deletions src/actions/public/watchContractEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { observe } from '../../utils/observe.js'
import { poll } from '../../utils/poll.js'
import { stringify } from '../../utils/stringify.js'

import { InvalidInputRpcError } from '../../index.js'
import {
type CreateContractEventFilterParameters,
createContractEventFilter,
Expand Down Expand Up @@ -56,8 +57,6 @@ export type WatchContractEventParameters<
onLogs: WatchContractEventOnLogsFn<TAbi, TEventName, TStrict>
/** Polling frequency (in ms). Defaults to Client's pollingInterval config. */
pollingInterval?: number
/** The number of consecutive filter change errors that must occur before resetting the filter. */
consecutiveErrorResetThreshold?: number
/**
* Whether or not the logs must match the indexed/non-indexed arguments on `event`.
* @default false
Expand Down Expand Up @@ -113,7 +112,6 @@ export function watchContractEvent<
onError,
onLogs,
pollingInterval = client.pollingInterval,
consecutiveErrorResetThreshold = 5,
strict: strict_,
}: WatchContractEventParameters<TAbi, TEventName, TStrict>,
): WatchContractEventReturnType {
Expand All @@ -132,11 +130,12 @@ export function watchContractEvent<
let previousBlockNumber: bigint
let filter: Filter<'event', TAbi, TEventName> | undefined
let initialized = false
let consecutiveErrors = 0

const unwatch = poll(
async () => {
console.log('poll')
if (!initialized) {
console.log('inialize')
try {
filter = (await createContractEventFilter(client, {
abi,
Expand Down Expand Up @@ -185,16 +184,13 @@ export function watchContractEvent<
previousBlockNumber = blockNumber
}

consecutiveErrors = 0

if (logs.length === 0) return
if (batch) emit.onLogs(logs as any)
else logs.forEach((log) => emit.onLogs([log] as any))
} catch (err) {
consecutiveErrors++
if (consecutiveErrors >= consecutiveErrorResetThreshold) {
initialized = false
}
// If a filter has been set and gets uninstalled, providers will throw an InvalidInput error.
// Reinitalize the filter when this occurs
if (filter && err instanceof InvalidInputRpcError) initialized = false
emit.onError?.(err as Error)
}
},
Expand Down
32 changes: 32 additions & 0 deletions src/actions/public/watchEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { setBalance } from '../test/setBalance.js'
import { stopImpersonatingAccount } from '../test/stopImpersonatingAccount.js'
import { writeContract } from '../wallet/writeContract.js'

import { InvalidInputRpcError, RpcRequestError } from '../../index.js'
import * as createEventFilter from './createEventFilter.js'
import * as getBlockNumber from './getBlockNumber.js'
import * as getFilterChanges from './getFilterChanges.js'
Expand Down Expand Up @@ -385,4 +386,35 @@ describe('errors', () => {
},
{ retry: 3 },
)

test('re-initializes the filter if the active filter uninstalls', async () => {
const filterCreator = vi.spyOn(createEventFilter, 'createEventFilter')

const unwatch = watchEvent(publicClient, {
...usdcContractConfig,
onLogs: () => null,
onError: () => null,
pollingInterval: 200,
})

await wait(250)
expect(filterCreator).toBeCalledTimes(1)

vi.spyOn(getFilterChanges, 'getFilterChanges').mockRejectedValueOnce(
new InvalidInputRpcError(
new RpcRequestError({
body: { foo: 'bar' },
url: 'url',
error: {
code: -32000,
message: 'message',
},
}),
),
)

await wait(500)
expect(filterCreator).toBeCalledTimes(2)
unwatch()
})
})
4 changes: 4 additions & 0 deletions src/actions/public/watchEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { observe } from '../../utils/observe.js'
import { poll } from '../../utils/poll.js'
import { stringify } from '../../utils/stringify.js'

import { InvalidInputRpcError } from '../../errors/rpc.js'
import {
type CreateEventFilterParameters,
createEventFilter,
Expand Down Expand Up @@ -187,6 +188,9 @@ export function watchEvent<
if (batch) emit.onLogs(logs as any)
else logs.forEach((log) => emit.onLogs([log] as any))
} catch (err) {
// If a filter has been set and gets uninstalled, providers will throw an InvalidInput error.
// Reinitalize the filter when this occurs
if (filter && err instanceof InvalidInputRpcError) initialized = false
emit.onError?.(err as Error)
}
},
Expand Down

0 comments on commit fcfd307

Please sign in to comment.