From 12ec9f881dbcf18b84d0e2aa832a53720c16493e Mon Sep 17 00:00:00 2001 From: zy0n Date: Wed, 14 Jan 2026 15:57:02 -0500 Subject: [PATCH] feat: implement BatchedPollingEventSubscriber for optimized event polling --- .../batched-polling-event-subscriber.ts | 121 ++++++++++++++++++ src/provider/polling-json-rpc-provider.ts | 47 ++++++- 2 files changed, 166 insertions(+), 2 deletions(-) create mode 100644 src/provider/batched-polling-event-subscriber.ts diff --git a/src/provider/batched-polling-event-subscriber.ts b/src/provider/batched-polling-event-subscriber.ts new file mode 100644 index 00000000..28406c97 --- /dev/null +++ b/src/provider/batched-polling-event-subscriber.ts @@ -0,0 +1,121 @@ +import { EventFilter, Subscriber, AbstractProvider, Log } from 'ethers'; +import { isDefined } from '../utils/is-defined'; + +function copyFilter(obj: EventFilter): EventFilter & { fromBlock?: number; toBlock?: number; } { + return JSON.parse(JSON.stringify(obj)); +} +/** + * A batched event subscriber that polls once per interval instead of per-block. + * This dramatically reduces eth_getLogs calls. + * + * reasoning: ethers PollingEventSubscriber fires on every "block" event, + * this subscriber uses a timer-based approach matching the pollingInterval. + */ +export class BatchedPollingEventSubscriber implements Subscriber { + #provider: AbstractProvider; + + #filter: EventFilter; + + #running: boolean; + + #blockNumber: number; + + #pollTimer: ReturnType | null; + + #pollingInterval: number; + + constructor(provider: AbstractProvider, filter: EventFilter, pollingInterval: number) { + this.#provider = provider; + this.#filter = copyFilter(filter); + this.#running = false; + this.#blockNumber = -2; + this.#pollTimer = null; + this.#pollingInterval = pollingInterval; + } + + async #poll(): Promise { + if (!this.#running) { + return; + } + + try { + const blockNumber = await this.#provider.getBlockNumber(); + + if (this.#blockNumber === -2) { + this.#blockNumber = blockNumber; + this.#schedulePoll(); + return; + } + + if (blockNumber <= this.#blockNumber) { + this.#schedulePoll(); + return; + } + + const filter = copyFilter(this.#filter); + filter.fromBlock = this.#blockNumber + 1; + filter.toBlock = blockNumber; + + const logs: Log[] = await this.#provider.getLogs(filter); + + for (const log of logs) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#provider.emit(this.#filter, log); + } + + if (logs.length > 0) { + this.#blockNumber = logs[logs.length - 1].blockNumber; + } else { + this.#blockNumber = blockNumber; + } + } catch (error) { + // eslint-disable-next-line no-console + console.error('BatchedPollingEventSubscriber poll error:', error); + } + + this.#schedulePoll(); + } + + #schedulePoll(): void { + if (!this.#running) { + return; + } + this.#pollTimer = setTimeout(() => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#poll(); + }, this.#pollingInterval); + } + + start(): void { + if (this.#running) { + return; + } + this.#running = true; + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#poll(); + } + + stop(): void { + if (!this.#running) { + return; + } + this.#running = false; + + if (this.#pollTimer != null) { + clearTimeout(this.#pollTimer); + this.#pollTimer = null; + } + } + + pause(dropWhilePaused?: boolean): void { + this.stop(); + if (isDefined(dropWhilePaused) && dropWhilePaused) { + this.#blockNumber = -2; + } + } + + resume(): void { + this.start(); + } +} diff --git a/src/provider/polling-json-rpc-provider.ts b/src/provider/polling-json-rpc-provider.ts index 055b7f5d..c78661c0 100644 --- a/src/provider/polling-json-rpc-provider.ts +++ b/src/provider/polling-json-rpc-provider.ts @@ -1,10 +1,25 @@ -import { JsonRpcProvider, JsonRpcApiProviderOptions, Network } from 'ethers'; +import { + JsonRpcProvider, + JsonRpcApiProviderOptions, + Network, + Subscriber, +} from 'ethers'; +import { BatchedPollingEventSubscriber } from './batched-polling-event-subscriber'; /** * Uses a setting in JsonRpcProvider to poll for events, * rather than using sparsely-implemented eth_filter events. + * + * Overrides _getSubscriber to use BatchedPollingEventSubscriber for events, + * which polls once per pollingInterval instead of per-block. */ export class PollingJsonRpcProvider extends JsonRpcProvider { + readonly isPollingProvider: boolean = true; + + #eventPollingInterval: number; + + #isPaused: boolean = false; + constructor(url: string, chainId: number, pollingInterval = 10000, maxLogsPerBatch = 100) { const network = Network.from(chainId); const options: JsonRpcApiProviderOptions = { @@ -14,7 +29,35 @@ export class PollingJsonRpcProvider extends JsonRpcProvider { }; super(url, network, options); this.pollingInterval = pollingInterval; + this.#eventPollingInterval = pollingInterval; } - readonly isPollingProvider: boolean = true; + get paused(): boolean { + return this.#isPaused; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any, no-underscore-dangle + _getSubscriber(sub: any): Subscriber { + if (sub.type === 'event') { + return new BatchedPollingEventSubscriber(this, sub.filter, this.#eventPollingInterval); + } + // eslint-disable-next-line no-underscore-dangle + return super._getSubscriber(sub); + } + + pause(): void { + this.#isPaused = true; + // eslint-disable-next-line no-underscore-dangle + this._forEachSubscriber((sub) => { + sub.pause(false); + }); + } + + resume(): void { + this.#isPaused = false; + // eslint-disable-next-line no-underscore-dangle + this._forEachSubscriber((sub) => { + sub.resume(); + }); + } }