From 3b444e1f2ea28bfbfae24bfc6e759e1ee5163139 Mon Sep 17 00:00:00 2001 From: Raju Ahmed Date: Thu, 29 May 2025 20:46:46 +0600 Subject: [PATCH] serialize event count in store function call if multiple events are processed concurrently, all event process request might read the initial store size and write to the store, potentially exceeding the store size limit. Serializing the store size read should fix this. Once the size is loaded in memory, further event process read should just read the in memory value --- lib/event_processor/batch_event_processor.ts | 22 +++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/lib/event_processor/batch_event_processor.ts b/lib/event_processor/batch_event_processor.ts index 48ce32927..b573ca6aa 100644 --- a/lib/event_processor/batch_event_processor.ts +++ b/lib/event_processor/batch_event_processor.ts @@ -74,6 +74,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { private batchSize: number; private eventStore?: Store; private eventCountInStore: Maybe = undefined; + private eventCountWaitPromise: Promise = Promise.resolve(); private maxEventsInStore: number = MAX_EVENTS_IN_STORE; private dispatchRepeater: Repeater; private failedEventRepeater?: Repeater; @@ -264,15 +265,22 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { } } - private async findEventCountInStore(): Promise { + private async readEventCountInStore(store: Store): Promise { + try { + const keys = await store.getKeys(); + this.eventCountInStore = keys.length; + } catch (e) { + this.logger?.error(e); + } + } + + private async findEventCountInStore(): Promise { if (this.eventStore && this.eventCountInStore === undefined) { - try { - const keys = await this.eventStore.getKeys(); - this.eventCountInStore = keys.length; - } catch (e) { - this.logger?.error(e); - } + const store = this.eventStore; + this.eventCountWaitPromise = this.eventCountWaitPromise.then(() => this.readEventCountInStore(store)); + return this.eventCountWaitPromise; } + return Promise.resolve(); } private async storeEvent(eventWithId: EventWithId): Promise {