diff --git a/src/provider/batched-polling-event-subscriber.ts b/src/provider/batched-polling-event-subscriber.ts index 28406c97..12b871bb 100644 --- a/src/provider/batched-polling-event-subscriber.ts +++ b/src/provider/batched-polling-event-subscriber.ts @@ -1,9 +1,10 @@ import { EventFilter, Subscriber, AbstractProvider, Log } from 'ethers'; import { isDefined } from '../utils/is-defined'; -function copyFilter(obj: EventFilter): EventFilter & { fromBlock?: number; toBlock?: number; } { - return JSON.parse(JSON.stringify(obj)); -} +type RangeFilter = EventFilter & { fromBlock: number; toBlock: number; }; + +const DEFAULT_MAX_BATCH_BLOCKS = 500; + /** * A batched event subscriber that polls once per interval instead of per-block. * This dramatically reduces eth_getLogs calls. @@ -24,13 +25,22 @@ export class BatchedPollingEventSubscriber implements Subscriber { #pollingInterval: number; + #maxBatchBlocks: number; + + #polling: boolean; + + #errorBackoff: number = 0; + constructor(provider: AbstractProvider, filter: EventFilter, pollingInterval: number) { + if (pollingInterval <= 0) throw new Error('pollingInterval must be positive'); this.#provider = provider; - this.#filter = copyFilter(filter); + this.#filter = filter; this.#running = false; this.#blockNumber = -2; this.#pollTimer = null; this.#pollingInterval = pollingInterval; + this.#maxBatchBlocks = DEFAULT_MAX_BATCH_BLOCKS; + this.#polling = false; } async #poll(): Promise { @@ -38,39 +48,61 @@ export class BatchedPollingEventSubscriber implements Subscriber { return; } + if (this.#polling) { + this.#schedulePoll(); + return; + } + + this.#polling = true; + try { - const blockNumber = await this.#provider.getBlockNumber(); + const head = await this.#provider.getBlockNumber(); if (this.#blockNumber === -2) { - this.#blockNumber = blockNumber; + this.#blockNumber = head; this.#schedulePoll(); return; } - if (blockNumber <= this.#blockNumber) { + if (head <= this.#blockNumber) { this.#schedulePoll(); return; } - const filter = copyFilter(this.#filter); - filter.fromBlock = this.#blockNumber + 1; - filter.toBlock = blockNumber; + let currentFrom = this.#blockNumber + 1; - const logs: Log[] = await this.#provider.getLogs(filter); + while (currentFrom <= head) { + if (!this.#running) { + break; + } - for (const log of logs) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.#provider.emit(this.#filter, log); - } + const currentTo = Math.min(currentFrom + this.#maxBatchBlocks - 1, head); + + const rangeFilter: RangeFilter = { + ...this.#filter, + fromBlock: currentFrom, + toBlock: currentTo, + }; - if (logs.length > 0) { - this.#blockNumber = logs[logs.length - 1].blockNumber; - } else { - this.#blockNumber = blockNumber; + // eslint-disable-next-line no-await-in-loop + const logs: Log[] = await this.#provider.getLogs(rangeFilter); + + for (const log of logs) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#provider.emit(this.#filter, log); + } + + this.#blockNumber = currentTo; + currentFrom = currentTo + 1; } + + this.#errorBackoff = 0; } catch (error) { - // eslint-disable-next-line no-console - console.error('BatchedPollingEventSubscriber poll error:', error); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#provider.emit('error', error); + this.#errorBackoff = Math.min((this.#errorBackoff || this.#pollingInterval) * 2, 30000); + } finally { + this.#polling = false; } this.#schedulePoll(); @@ -80,10 +112,19 @@ export class BatchedPollingEventSubscriber implements Subscriber { if (!this.#running) { return; } - this.#pollTimer = setTimeout(() => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.#poll(); - }, this.#pollingInterval); + + if (this.#pollTimer != null) { + clearTimeout(this.#pollTimer); + this.#pollTimer = null; + } + + this.#pollTimer = setTimeout( + () => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#poll(); + }, + this.#errorBackoff || this.#pollingInterval, + ); } start(): void { diff --git a/src/provider/polling-json-rpc-provider.ts b/src/provider/polling-json-rpc-provider.ts index c78661c0..f5433959 100644 --- a/src/provider/polling-json-rpc-provider.ts +++ b/src/provider/polling-json-rpc-provider.ts @@ -39,7 +39,11 @@ export class PollingJsonRpcProvider extends JsonRpcProvider { // eslint-disable-next-line @typescript-eslint/no-explicit-any, no-underscore-dangle _getSubscriber(sub: any): Subscriber { if (sub.type === 'event') { - return new BatchedPollingEventSubscriber(this, sub.filter, this.#eventPollingInterval); + const newSub = new BatchedPollingEventSubscriber(this, sub.filter, this.#eventPollingInterval); + if (this.#isPaused) { + newSub.pause(false); + } + return newSub; } // eslint-disable-next-line no-underscore-dangle return super._getSubscriber(sub);