From 3a74da6807a0250bff0e05ae57f922922d8847be Mon Sep 17 00:00:00 2001 From: Stanislav Natalenko Date: Tue, 3 Feb 2026 18:42:15 +0000 Subject: [PATCH] Feat: Support selective restore event loading --- eslint.config.mjs | 1 + src/AggregateCommandHandler.ts | 23 +++++++--- src/CqrsContainerBuilder.ts | 3 +- src/EventStore.ts | 15 ++++--- src/in-memory/InMemoryEventStorage.ts | 18 ++++++-- src/in-memory/InMemorySnapshotStorage.ts | 14 +++--- src/interfaces/IAggregate.ts | 23 ++++++++++ src/interfaces/IAggregateSnapshotStorage.ts | 8 ++-- src/interfaces/IEventStorageReader.ts | 28 +++++++++++- src/interfaces/ISnapshotEvent.ts | 2 +- tests/unit/AggregateCommandHandler.test.ts | 47 ++++++++++++++++++++- 11 files changed, 153 insertions(+), 29 deletions(-) diff --git a/eslint.config.mjs b/eslint.config.mjs index 4e2dec9..fa6a9b6 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -513,6 +513,7 @@ export default defineConfig([ "afterBlockComment": false, "beforeLineComment": true, "afterLineComment": false, + "allowClassStart": true, "allowBlockStart": true, "allowObjectStart": true, "allowArrayStart": true diff --git a/src/AggregateCommandHandler.ts b/src/AggregateCommandHandler.ts index c8d86c8..c999434 100644 --- a/src/AggregateCommandHandler.ts +++ b/src/AggregateCommandHandler.ts @@ -1,5 +1,6 @@ import { getClassName, Lock, MapAssertable } from './utils/index.ts'; import { + type AggregateEventsQueryParams, type IAggregate, type IAggregateConstructor, type IAggregateFactory, @@ -23,10 +24,11 @@ import { */ export class AggregateCommandHandler implements ICommandHandler { - #eventStore: IEventStore; - #logger?: ILogger; - #aggregateFactory: IAggregateFactory; - #handles: string[]; + readonly #eventStore: IEventStore; + readonly #logger?: ILogger; + readonly #aggregateFactory: IAggregateFactory; + readonly #handles: Readonly; + readonly #restoresFrom?: Readonly; /** Aggregate instances cache for concurrent command handling */ #aggregatesCache: MapAssertable> = new MapAssertable(); @@ -39,11 +41,13 @@ export class AggregateCommandHandler implements I aggregateType, aggregateFactory, handles, + restoresFrom, logger }: Pick & { aggregateType?: IAggregateConstructor, aggregateFactory?: IAggregateFactory, - handles?: string[] + handles?: Readonly, + restoresFrom?: Readonly }) { if (!eventStore) throw new TypeError('eventStore argument required'); @@ -57,6 +61,7 @@ export class AggregateCommandHandler implements I const AggregateType = aggregateType; this.#aggregateFactory = params => new AggregateType(params); this.#handles = AggregateType.handles; + this.#restoresFrom = AggregateType.restoresFrom; } else if (aggregateFactory) { if (!Array.isArray(handles) || !handles.length) @@ -64,6 +69,7 @@ export class AggregateCommandHandler implements I this.#aggregateFactory = aggregateFactory; this.#handles = handles; + this.#restoresFrom = restoresFrom; } else { throw new TypeError('either aggregateType or aggregateFactory is required'); @@ -86,9 +92,14 @@ export class AggregateCommandHandler implements I if (!id) throw new TypeError('id argument required'); - const eventsIterable = this.#eventStore.getAggregateEvents(id); const aggregate = this.#aggregateFactory({ id }); + const queryOptions = this.#restoresFrom?.length ? + { eventTypes: this.#restoresFrom, tail: 'last' } satisfies AggregateEventsQueryParams : + undefined; + + const eventsIterable = this.#eventStore.getAggregateEvents(id, queryOptions); + let eventCount = 0; for await (const event of eventsIterable) { aggregate.mutate(event); diff --git a/src/CqrsContainerBuilder.ts b/src/CqrsContainerBuilder.ts index 158ba88..3156db5 100644 --- a/src/CqrsContainerBuilder.ts +++ b/src/CqrsContainerBuilder.ts @@ -95,7 +95,8 @@ export class CqrsContainerBuilder container.createInstance(AggregateType, options), - handles: AggregateType.handles + handles: AggregateType.handles, + restoresFrom: AggregateType.restoresFrom }); return this.registerCommandHandler(commandHandlerFactory); diff --git a/src/EventStore.ts b/src/EventStore.ts index c6d7cf3..4040f18 100644 --- a/src/EventStore.ts +++ b/src/EventStore.ts @@ -15,6 +15,7 @@ import { type IEventDispatcher, type IEventBus, type IContainer, + type AggregateEventsQueryParams, isIdentifierProvider, isIEventBus, isIEventStorageReader, @@ -102,20 +103,24 @@ export class EventStore implements IEventStore { } /** Retrieve all events of specific Aggregate */ - async* getAggregateEvents(aggregateId: Identifier): IEventStream { + async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream { if (!aggregateId) throw new TypeError('aggregateId argument required'); this.#logger?.debug(`retrieving event stream for aggregate ${aggregateId}...`); - const snapshot = this.#snapshotStorage ? - await this.#snapshotStorage.getAggregateSnapshot(aggregateId) : - undefined; + // Get snapshot from snapshot storage if not provided in options + let snapshot = options?.snapshot; + if (!snapshot && this.#snapshotStorage) + snapshot = await this.#snapshotStorage.getAggregateSnapshot(aggregateId); if (snapshot) yield snapshot; - const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, { snapshot }); + const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, { + ...options, + snapshot + }); yield* eventsIterable; diff --git a/src/in-memory/InMemoryEventStorage.ts b/src/in-memory/InMemoryEventStorage.ts index d3071d1..dcfb630 100644 --- a/src/in-memory/InMemoryEventStorage.ts +++ b/src/in-memory/InMemoryEventStorage.ts @@ -8,7 +8,8 @@ import type { IEventStorageWriter, Identifier, IDispatchPipelineProcessor, - DispatchPipelineBatch + DispatchPipelineBatch, + AggregateEventsQueryParams } from '../interfaces/index.ts'; import { nextCycle } from './utils/index.ts'; @@ -40,20 +41,31 @@ export class InMemoryEventStorage implements return events; } - async* getAggregateEvents(aggregateId: Identifier, options?: { snapshot: IEvent }): IEventStream { + async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream { await nextCycle(); const afterVersion = options?.snapshot?.aggregateVersion; - const results = !afterVersion ? + const allAfterSnapshot = !afterVersion ? this.#events.filter(e => e.aggregateId === aggregateId) : this.#events.filter(e => e.aggregateId === aggregateId && e.aggregateVersion !== undefined && e.aggregateVersion > afterVersion); + const results = options?.eventTypes === undefined ? + allAfterSnapshot : + allAfterSnapshot.filter(e => options.eventTypes!.includes(e.type)); + await nextCycle(); yield* results; + + if (options?.tail === 'last' && allAfterSnapshot.length) { + const tailEvent = allAfterSnapshot[allAfterSnapshot.length - 1]; + const alreadyYieldedTail = results.length && results[results.length - 1] === tailEvent; + if (!alreadyYieldedTail) + yield tailEvent; + } } async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream { diff --git a/src/in-memory/InMemorySnapshotStorage.ts b/src/in-memory/InMemorySnapshotStorage.ts index 51ebaaa..3422568 100644 --- a/src/in-memory/InMemorySnapshotStorage.ts +++ b/src/in-memory/InMemorySnapshotStorage.ts @@ -4,7 +4,7 @@ import { type IContainer, type Identifier, type IDispatchPipelineProcessor, - type IEvent, + type ISnapshotEvent, type ILogger, isSnapshotEvent } from '../interfaces/index.ts'; @@ -16,26 +16,26 @@ import * as Event from '../Event.ts'; */ export class InMemorySnapshotStorage implements IAggregateSnapshotStorage, IDispatchPipelineProcessor { - #snapshots: Map = new Map(); + #snapshots: Map = new Map(); #logger: ILogger | undefined; constructor(c?: Partial>) { - this.#logger = c?.logger && 'child' in c?.logger ? - c?.logger.child({ service: new.target.name }) : + this.#logger = c?.logger && 'child' in c.logger ? + c.logger.child({ service: new.target.name }) : c?.logger; } /** * Get latest aggregate snapshot */ - async getAggregateSnapshot(aggregateId: string): Promise { + async getAggregateSnapshot(aggregateId: string): Promise { return this.#snapshots.get(aggregateId); } /** * Save new aggregate snapshot */ - async saveAggregateSnapshot(snapshotEvent: IEvent) { + async saveAggregateSnapshot(snapshotEvent: ISnapshotEvent) { if (!snapshotEvent.aggregateId) throw new TypeError('event.aggregateId is required'); @@ -47,7 +47,7 @@ export class InMemorySnapshotStorage implements IAggregateSnapshotStorage, IDisp /** * Delete aggregate snapshot */ - deleteAggregateSnapshot(snapshotEvent: IEvent): Promise | void { + deleteAggregateSnapshot(snapshotEvent: ISnapshotEvent): Promise | void { if (!snapshotEvent.aggregateId) throw new TypeError('snapshotEvent.aggregateId argument required'); diff --git a/src/interfaces/IAggregate.ts b/src/interfaces/IAggregate.ts index 528ee9c..8db27fd 100644 --- a/src/interfaces/IAggregate.ts +++ b/src/interfaces/IAggregate.ts @@ -33,6 +33,14 @@ export interface IAggregate { export interface IMutableAggregateState { + /** + * Optional list of event types that are required to restore this state. + * + * Exposed by AbstractAggregate as `restoresFrom` and may be used by the command handler + * to load only the state-relevant events when rehydrating an aggregate. + */ + handles?: Readonly; + /** * Apply a single event to mutate the aggregate's state. */ @@ -58,7 +66,22 @@ export interface IAggregateConstructor< TAggregate extends IAggregate, TState extends IMutableAggregateState | object | void > { + + /** + * List of command types handled by the aggregate. + * + * Used to subscribe AggregateCommandHandler to the command bus. + */ readonly handles: string[]; + + /** + * Optional list of event types that are required to restore the aggregate state. + * + * If provided, AggregateCommandHandler can request only these events from storage + * (typically together with a `tail: 'last'` marker to restore the version). + */ + readonly restoresFrom?: Readonly; + new(options: IAggregateConstructorParams): TAggregate; } diff --git a/src/interfaces/IAggregateSnapshotStorage.ts b/src/interfaces/IAggregateSnapshotStorage.ts index 9dae771..1e64157 100644 --- a/src/interfaces/IAggregateSnapshotStorage.ts +++ b/src/interfaces/IAggregateSnapshotStorage.ts @@ -1,11 +1,11 @@ import type { Identifier } from './Identifier.ts'; -import type { IEvent } from './IEvent.ts'; +import type { ISnapshotEvent } from './ISnapshotEvent.ts'; export interface IAggregateSnapshotStorage { getAggregateSnapshot(aggregateId: Identifier): - Promise | undefined> | IEvent | undefined; + Promise | undefined> | ISnapshotEvent | undefined; - saveAggregateSnapshot(snapshotEvent: IEvent): Promise | void; + saveAggregateSnapshot(snapshotEvent: ISnapshotEvent): Promise | void; - deleteAggregateSnapshot(snapshotEvent: IEvent): Promise | void; + deleteAggregateSnapshot(snapshotEvent: ISnapshotEvent): Promise | void; } diff --git a/src/interfaces/IEventStorageReader.ts b/src/interfaces/IEventStorageReader.ts index 9329ae7..c3ee0e1 100644 --- a/src/interfaces/IEventStorageReader.ts +++ b/src/interfaces/IEventStorageReader.ts @@ -1,6 +1,7 @@ import type { Identifier } from './Identifier.ts'; import type { IEvent } from './IEvent.ts'; import type { IEventStream } from './IEventStream.ts'; +import type { ISnapshotEvent } from './ISnapshotEvent.ts'; import { isObject } from './isObject.ts'; export type EventQueryAfter = { @@ -15,6 +16,31 @@ export type EventQueryBefore = { beforeEvent?: IEvent; } +export type AggregateEventsQueryParams = { + + /** + * Optional snapshot event. If provided, storage should return only events after + * the snapshot's aggregateVersion. + */ + snapshot?: ISnapshotEvent, + + /** + * Optional list of event types to return. + * + * IMPORTANT: If you filter eventTypes, make sure you still restore the aggregate + * version correctly (e.g. via `tail: 'last'`), otherwise emitted events may get + * incorrect aggregateVersion values. + */ + eventTypes?: Readonly, + + /** + * Optionally include the last aggregate event (after snapshot), regardless of type. + * Useful together with `eventTypes` to restore the aggregate version without pulling + * the full stream. + */ + tail?: 'last' +} + export interface IEventStorageReader { /** @@ -25,7 +51,7 @@ export interface IEventStorageReader { /** * Retrieves all events (and optionally a snapshot) associated with a specific aggregate. */ - getAggregateEvents(aggregateId: Identifier, options?: { snapshot?: IEvent }): IEventStream; + getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream; /** * Retrieves events associated with a saga, with optional filtering by version or timestamp. diff --git a/src/interfaces/ISnapshotEvent.ts b/src/interfaces/ISnapshotEvent.ts index 8ca2afe..8d51721 100644 --- a/src/interfaces/ISnapshotEvent.ts +++ b/src/interfaces/ISnapshotEvent.ts @@ -2,7 +2,7 @@ import { type IEvent, isEvent } from './IEvent.ts'; export const SNAPSHOT_EVENT_TYPE: 'snapshot' = 'snapshot'; -export interface ISnapshotEvent extends IEvent { +export interface ISnapshotEvent extends IEvent { type: typeof SNAPSHOT_EVENT_TYPE } diff --git a/tests/unit/AggregateCommandHandler.test.ts b/tests/unit/AggregateCommandHandler.test.ts index 319247a..37d9200 100644 --- a/tests/unit/AggregateCommandHandler.test.ts +++ b/tests/unit/AggregateCommandHandler.test.ts @@ -36,6 +36,29 @@ class MyAggregate extends AbstractAggregate { } } +class SelectiveRestoreAggregate extends AbstractAggregate { + static get handles() { + return ['do']; + } + + static get restoresFrom() { + return ['stateEvent']; + } + + constructor({ id }: { id: Identifier }) { + super({ + id, + state: { + stateEvent() { } + } + }); + } + + do() { + this.emit('newEvent'); + } +} + class CommandBus { handlers: any = {}; on(messageType, listener) { @@ -129,7 +152,29 @@ describe('AggregateCommandHandler', function () { assert(getAggregateEventsSpy.calledOnce, 'getAggregateEvents was not called'); const { args } = getAggregateEventsSpy.lastCall; - expect(args).to.have.length(1); + expect(args[0]).to.eql(1); + }); + + it('can restore from filtered event types while keeping aggregate version via tail event', async () => { + const aggregateId = 'restore-filter-test-id'; + + await eventStore.dispatch([ + { aggregateId, aggregateVersion: 0, type: 'stateEvent' }, + { aggregateId, aggregateVersion: 1, type: 'irrelevant' }, + { aggregateId, aggregateVersion: 2, type: 'irrelevant' } + ] as any); + + const handler = new AggregateCommandHandler({ eventStore, aggregateType: SelectiveRestoreAggregate }); + + const events = await handler.execute({ type: 'do', aggregateId }); + + assert(getAggregateEventsSpy.called, 'getAggregateEvents was not called'); + expect(getAggregateEventsSpy.lastCall.args).to.eql([ + aggregateId, + { eventTypes: ['stateEvent'], tail: 'last' } + ]); + + expect(events[0]).to.have.property('aggregateVersion', 3); }); it('passes commands to aggregate.handle(cmd)', async () => {