Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ export default defineConfig([
"afterBlockComment": false,
"beforeLineComment": true,
"afterLineComment": false,
"allowClassStart": true,
"allowBlockStart": true,
"allowObjectStart": true,
"allowArrayStart": true
Expand Down
23 changes: 17 additions & 6 deletions src/AggregateCommandHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { getClassName, Lock, MapAssertable } from './utils/index.ts';
import {
type AggregateEventsQueryParams,
type IAggregate,
type IAggregateConstructor,
type IAggregateFactory,
Expand All @@ -23,10 +24,11 @@ import {
*/
export class AggregateCommandHandler<TAggregate extends IAggregate> implements ICommandHandler {

#eventStore: IEventStore;
#logger?: ILogger;
#aggregateFactory: IAggregateFactory<TAggregate, any>;
#handles: string[];
readonly #eventStore: IEventStore;
readonly #logger?: ILogger;
readonly #aggregateFactory: IAggregateFactory<TAggregate, any>;
readonly #handles: Readonly<string[]>;
readonly #restoresFrom?: Readonly<string[]>;

/** Aggregate instances cache for concurrent command handling */
#aggregatesCache: MapAssertable<Identifier, Promise<TAggregate>> = new MapAssertable();
Expand All @@ -39,11 +41,13 @@ export class AggregateCommandHandler<TAggregate extends IAggregate> implements I
aggregateType,
aggregateFactory,
handles,
restoresFrom,
logger
}: Pick<IContainer, 'eventStore' | 'logger'> & {
aggregateType?: IAggregateConstructor<TAggregate, any>,
aggregateFactory?: IAggregateFactory<TAggregate, any>,
handles?: string[]
handles?: Readonly<string[]>,
restoresFrom?: Readonly<string[]>
}) {
if (!eventStore)
throw new TypeError('eventStore argument required');
Expand All @@ -57,13 +61,15 @@ export class AggregateCommandHandler<TAggregate extends IAggregate> implements I
const AggregateType = aggregateType;
this.#aggregateFactory = params => new AggregateType(params);
this.#handles = AggregateType.handles;
this.#restoresFrom = AggregateType.restoresFrom;
}
else if (aggregateFactory) {
if (!Array.isArray(handles) || !handles.length)
throw new TypeError('handles argument must be an non-empty Array');

this.#aggregateFactory = aggregateFactory;
this.#handles = handles;
this.#restoresFrom = restoresFrom;
}
else {
throw new TypeError('either aggregateType or aggregateFactory is required');
Expand All @@ -86,9 +92,14 @@ export class AggregateCommandHandler<TAggregate extends IAggregate> implements I
if (!id)
throw new TypeError('id argument required');

const eventsIterable = this.#eventStore.getAggregateEvents(id);
const aggregate = this.#aggregateFactory({ id });

const queryOptions = this.#restoresFrom?.length ?
{ eventTypes: this.#restoresFrom, tail: 'last' } satisfies AggregateEventsQueryParams :
undefined;

const eventsIterable = this.#eventStore.getAggregateEvents(id, queryOptions);

let eventCount = 0;
for await (const event of eventsIterable) {
aggregate.mutate(event);
Expand Down
3 changes: 2 additions & 1 deletion src/CqrsContainerBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ export class CqrsContainerBuilder<TContainerInterface extends IContainer = ICont
container.createInstance(AggregateCommandHandler, {
aggregateFactory: (options: any) =>
container.createInstance(AggregateType, options),
handles: AggregateType.handles
handles: AggregateType.handles,
restoresFrom: AggregateType.restoresFrom
});

return this.registerCommandHandler(commandHandlerFactory);
Expand Down
15 changes: 10 additions & 5 deletions src/EventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
type IEventDispatcher,
type IEventBus,
type IContainer,
type AggregateEventsQueryParams,
isIdentifierProvider,
isIEventBus,
isIEventStorageReader,
Expand Down Expand Up @@ -102,20 +103,24 @@ export class EventStore implements IEventStore {
}

/** Retrieve all events of specific Aggregate */
async* getAggregateEvents(aggregateId: Identifier): IEventStream {
async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
if (!aggregateId)
throw new TypeError('aggregateId argument required');

this.#logger?.debug(`retrieving event stream for aggregate ${aggregateId}...`);

const snapshot = this.#snapshotStorage ?
await this.#snapshotStorage.getAggregateSnapshot(aggregateId) :
undefined;
// Get snapshot from snapshot storage if not provided in options
let snapshot = options?.snapshot;
if (!snapshot && this.#snapshotStorage)
snapshot = await this.#snapshotStorage.getAggregateSnapshot(aggregateId);

if (snapshot)
yield snapshot;

const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, { snapshot });
const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, {
...options,
snapshot
});

yield* eventsIterable;

Expand Down
18 changes: 15 additions & 3 deletions src/in-memory/InMemoryEventStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import type {
IEventStorageWriter,
Identifier,
IDispatchPipelineProcessor,
DispatchPipelineBatch
DispatchPipelineBatch,
AggregateEventsQueryParams
} from '../interfaces/index.ts';
import { nextCycle } from './utils/index.ts';

Expand Down Expand Up @@ -40,20 +41,31 @@ export class InMemoryEventStorage implements
return events;
}

async* getAggregateEvents(aggregateId: Identifier, options?: { snapshot: IEvent }): IEventStream {
async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
await nextCycle();

const afterVersion = options?.snapshot?.aggregateVersion;
const results = !afterVersion ?
const allAfterSnapshot = !afterVersion ?
this.#events.filter(e => e.aggregateId === aggregateId) :
this.#events.filter(e =>
e.aggregateId === aggregateId &&
e.aggregateVersion !== undefined &&
e.aggregateVersion > afterVersion);

const results = options?.eventTypes === undefined ?
allAfterSnapshot :
allAfterSnapshot.filter(e => options.eventTypes!.includes(e.type));

await nextCycle();

yield* results;

if (options?.tail === 'last' && allAfterSnapshot.length) {
const tailEvent = allAfterSnapshot[allAfterSnapshot.length - 1];
const alreadyYieldedTail = results.length && results[results.length - 1] === tailEvent;
if (!alreadyYieldedTail)
yield tailEvent;
}
}

async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream {
Expand Down
14 changes: 7 additions & 7 deletions src/in-memory/InMemorySnapshotStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
type IContainer,
type Identifier,
type IDispatchPipelineProcessor,
type IEvent,
type ISnapshotEvent,
type ILogger,
isSnapshotEvent
} from '../interfaces/index.ts';
Expand All @@ -16,26 +16,26 @@ import * as Event from '../Event.ts';
*/
export class InMemorySnapshotStorage implements IAggregateSnapshotStorage, IDispatchPipelineProcessor {

#snapshots: Map<Identifier, IEvent> = new Map();
#snapshots: Map<Identifier, ISnapshotEvent> = new Map();
#logger: ILogger | undefined;

constructor(c?: Partial<Pick<IContainer, 'logger'>>) {
this.#logger = c?.logger && 'child' in c?.logger ?
c?.logger.child({ service: new.target.name }) :
this.#logger = c?.logger && 'child' in c.logger ?
c.logger.child({ service: new.target.name }) :
c?.logger;
}

/**
* Get latest aggregate snapshot
*/
async getAggregateSnapshot(aggregateId: string): Promise<IEvent | undefined> {
async getAggregateSnapshot(aggregateId: string): Promise<ISnapshotEvent | undefined> {
return this.#snapshots.get(aggregateId);
}

/**
* Save new aggregate snapshot
*/
async saveAggregateSnapshot(snapshotEvent: IEvent) {
async saveAggregateSnapshot(snapshotEvent: ISnapshotEvent) {
if (!snapshotEvent.aggregateId)
throw new TypeError('event.aggregateId is required');

Expand All @@ -47,7 +47,7 @@ export class InMemorySnapshotStorage implements IAggregateSnapshotStorage, IDisp
/**
* Delete aggregate snapshot
*/
deleteAggregateSnapshot<TState>(snapshotEvent: IEvent<TState>): Promise<void> | void {
deleteAggregateSnapshot<TState>(snapshotEvent: ISnapshotEvent<TState>): Promise<void> | void {
if (!snapshotEvent.aggregateId)
throw new TypeError('snapshotEvent.aggregateId argument required');

Expand Down
23 changes: 23 additions & 0 deletions src/interfaces/IAggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ export interface IAggregate {

export interface IMutableAggregateState {

/**
* Optional list of event types that are required to restore this state.
*
* Exposed by AbstractAggregate as `restoresFrom` and may be used by the command handler
* to load only the state-relevant events when rehydrating an aggregate.
*/
handles?: Readonly<string[]>;

/**
* Apply a single event to mutate the aggregate's state.
*/
Expand All @@ -58,7 +66,22 @@ export interface IAggregateConstructor<
TAggregate extends IAggregate,
TState extends IMutableAggregateState | object | void
> {

/**
* List of command types handled by the aggregate.
*
* Used to subscribe AggregateCommandHandler to the command bus.
*/
readonly handles: string[];

/**
* Optional list of event types that are required to restore the aggregate state.
*
* If provided, AggregateCommandHandler can request only these events from storage
* (typically together with a `tail: 'last'` marker to restore the version).
*/
readonly restoresFrom?: Readonly<string[]>;

new(options: IAggregateConstructorParams<TState>): TAggregate;
}

Expand Down
8 changes: 4 additions & 4 deletions src/interfaces/IAggregateSnapshotStorage.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type { Identifier } from './Identifier.ts';
import type { IEvent } from './IEvent.ts';
import type { ISnapshotEvent } from './ISnapshotEvent.ts';

export interface IAggregateSnapshotStorage {
getAggregateSnapshot<TState>(aggregateId: Identifier):
Promise<IEvent<TState> | undefined> | IEvent<TState> | undefined;
Promise<ISnapshotEvent<TState> | undefined> | ISnapshotEvent<TState> | undefined;

saveAggregateSnapshot<TState>(snapshotEvent: IEvent<TState>): Promise<void> | void;
saveAggregateSnapshot<TState>(snapshotEvent: ISnapshotEvent<TState>): Promise<void> | void;

deleteAggregateSnapshot<TState>(snapshotEvent: IEvent<TState>): Promise<void> | void;
deleteAggregateSnapshot<TState>(snapshotEvent: ISnapshotEvent<TState>): Promise<void> | void;
}
28 changes: 27 additions & 1 deletion src/interfaces/IEventStorageReader.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Identifier } from './Identifier.ts';
import type { IEvent } from './IEvent.ts';
import type { IEventStream } from './IEventStream.ts';
import type { ISnapshotEvent } from './ISnapshotEvent.ts';
import { isObject } from './isObject.ts';

export type EventQueryAfter = {
Expand All @@ -15,6 +16,31 @@ export type EventQueryBefore = {
beforeEvent?: IEvent;
}

export type AggregateEventsQueryParams = {

/**
* Optional snapshot event. If provided, storage should return only events after
* the snapshot's aggregateVersion.
*/
snapshot?: ISnapshotEvent,

/**
* Optional list of event types to return.
*
* IMPORTANT: If you filter eventTypes, make sure you still restore the aggregate
* version correctly (e.g. via `tail: 'last'`), otherwise emitted events may get
* incorrect aggregateVersion values.
*/
eventTypes?: Readonly<string[]>,

/**
* Optionally include the last aggregate event (after snapshot), regardless of type.
* Useful together with `eventTypes` to restore the aggregate version without pulling
* the full stream.
*/
tail?: 'last'
}

export interface IEventStorageReader {

/**
Expand All @@ -25,7 +51,7 @@ export interface IEventStorageReader {
/**
* Retrieves all events (and optionally a snapshot) associated with a specific aggregate.
*/
getAggregateEvents(aggregateId: Identifier, options?: { snapshot?: IEvent }): IEventStream;
getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream;

/**
* Retrieves events associated with a saga, with optional filtering by version or timestamp.
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/ISnapshotEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type IEvent, isEvent } from './IEvent.ts';

export const SNAPSHOT_EVENT_TYPE: 'snapshot' = 'snapshot';

export interface ISnapshotEvent extends IEvent {
export interface ISnapshotEvent<TState = any> extends IEvent<TState> {
type: typeof SNAPSHOT_EVENT_TYPE
}

Expand Down
47 changes: 46 additions & 1 deletion tests/unit/AggregateCommandHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,29 @@ class MyAggregate extends AbstractAggregate<any> {
}
}

class SelectiveRestoreAggregate extends AbstractAggregate<any> {
static get handles() {
return ['do'];
}

static get restoresFrom() {
return ['stateEvent'];
}

constructor({ id }: { id: Identifier }) {
super({
id,
state: {
stateEvent() { }
}
});
}

do() {
this.emit('newEvent');
}
}

class CommandBus {
handlers: any = {};
on(messageType, listener) {
Expand Down Expand Up @@ -129,7 +152,29 @@ describe('AggregateCommandHandler', function () {
assert(getAggregateEventsSpy.calledOnce, 'getAggregateEvents was not called');

const { args } = getAggregateEventsSpy.lastCall;
expect(args).to.have.length(1);
expect(args[0]).to.eql(1);
});

it('can restore from filtered event types while keeping aggregate version via tail event', async () => {
const aggregateId = 'restore-filter-test-id';

await eventStore.dispatch([
{ aggregateId, aggregateVersion: 0, type: 'stateEvent' },
{ aggregateId, aggregateVersion: 1, type: 'irrelevant' },
{ aggregateId, aggregateVersion: 2, type: 'irrelevant' }
] as any);

const handler = new AggregateCommandHandler({ eventStore, aggregateType: SelectiveRestoreAggregate });

const events = await handler.execute({ type: 'do', aggregateId });

assert(getAggregateEventsSpy.called, 'getAggregateEvents was not called');
expect(getAggregateEventsSpy.lastCall.args).to.eql([
aggregateId,
{ eventTypes: ['stateEvent'], tail: 'last' }
]);

expect(events[0]).to.have.property('aggregateVersion', 3);
});

it('passes commands to aggregate.handle(cmd)', async () => {
Expand Down