Skip to content

Commit 0acf959

Browse files
pranaygpclaude
andcommitted
perf: implement event-sourced architecture with batch event creation
Part 1: Event-sourced architecture - Add run lifecycle events (run_created, run_started, run_completed, run_failed) - Update world implementations to create/update entities from events - Entities (runs, steps, hooks) are now materializations of the event log Part 2: Batch event creation - Add events.createBatch() method to World interface - Implement in world-vercel, world-local, world-postgres - Update runtime to use batch event creation for wait_completed events 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent ea2a67e commit 0acf959

File tree

22 files changed

+1927
-1241
lines changed

22 files changed

+1927
-1241
lines changed

.changeset/brave-dots-bake.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
"@workflow/core": patch
3+
"@workflow/world": patch
4+
"@workflow/world-local": patch
5+
"@workflow/world-vercel": patch
6+
"@workflow/world-postgres": patch
7+
---
8+
9+
perf: add events.createBatch() for batch event creation
10+
11+
- Add `events.createBatch()` method to World interface for creating multiple events atomically
12+
- Implement batch event creation in world-vercel (parallel API calls)
13+
- Implement batch event creation in world-local (sequential for ULID ordering)
14+
- Implement batch event creation in world-postgres (single batch INSERT)
15+
- Update runtime to use batch event creation for wait_completed events
16+
- Optimize wait completion from O(n²) to O(n) with Set-based lookup
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
"@workflow/core": patch
3+
"@workflow/world": patch
4+
"@workflow/world-local": patch
5+
"@workflow/world-postgres": patch
6+
---
7+
8+
perf: implement event-sourced architecture for runs, steps, and hooks
9+
10+
- Add run lifecycle events (run_created, run_started, run_completed, run_failed, run_cancelled, run_paused, run_resumed)
11+
- Update world-local and world-postgres to create/update entities from events via createBatch
12+
- Entities (runs, steps, hooks) are now materializations of the event log
13+
- This makes the system faster, easier to reason about, and resilient to data inconsistencies

packages/cli/src/lib/inspect/run.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ export const startRun = async (
6868
};
6969

7070
export const cancelRun = async (world: World, runId: string) => {
71-
await world.runs.cancel(runId);
71+
await world.events.create(runId, { eventType: 'run_cancelled' });
7272
logger.log(chalk.green(`Cancel signal sent to run ${runId}`));
7373
};

packages/core/src/events-consumer.test.ts

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ describe('EventsConsumer', () => {
7373
await waitForNextTick();
7474

7575
expect(callback).toHaveBeenCalledWith(event);
76+
// Without auto-advance, callback is only called once
7677
expect(callback).toHaveBeenCalledTimes(1);
7778
});
7879
});
@@ -87,6 +88,7 @@ describe('EventsConsumer', () => {
8788
await waitForNextTick();
8889

8990
expect(callback).toHaveBeenCalledWith(event);
91+
// Without auto-advance, callback is only called once
9092
expect(callback).toHaveBeenCalledTimes(1);
9193
});
9294

@@ -109,23 +111,27 @@ describe('EventsConsumer', () => {
109111
consumer.subscribe(callback);
110112
await waitForNextTick();
111113

114+
// callback finishes at event1, index advances to 1
115+
// Without auto-advance, event2 is NOT processed
112116
expect(consumer.eventIndex).toBe(1);
113117
expect(consumer.callbacks).toHaveLength(0);
114118
});
115119

116-
it('should not increment event index when callback returns false', async () => {
120+
it('should NOT auto-advance when all callbacks return NotConsumed', async () => {
117121
const event = createMockEvent();
118122
const consumer = new EventsConsumer([event]);
119123
const callback = vi.fn().mockReturnValue(EventConsumerResult.NotConsumed);
120124

121125
consumer.subscribe(callback);
122126
await waitForNextTick();
127+
await waitForNextTick(); // Extra tick to confirm no auto-advance
123128

129+
// Without auto-advance, eventIndex stays at 0
124130
expect(consumer.eventIndex).toBe(0);
125131
expect(consumer.callbacks).toContain(callback);
126132
});
127133

128-
it('should process multiple callbacks until one returns true', async () => {
134+
it('should process multiple callbacks until one returns Consumed or Finished', async () => {
129135
const event = createMockEvent();
130136
const consumer = new EventsConsumer([event]);
131137
const callback1 = vi
@@ -140,15 +146,17 @@ describe('EventsConsumer', () => {
140146
consumer.subscribe(callback2);
141147
consumer.subscribe(callback3);
142148
await waitForNextTick();
149+
await waitForNextTick(); // For next event processing
143150

144151
expect(callback1).toHaveBeenCalledWith(event);
145152
expect(callback2).toHaveBeenCalledWith(event);
153+
// callback3 sees the next event (null since we only have one event)
146154
expect(callback3).toHaveBeenCalledWith(null);
147155
expect(consumer.eventIndex).toBe(1);
148156
expect(consumer.callbacks).toEqual([callback1, callback3]);
149157
});
150158

151-
it('should process all callbacks when none return true', async () => {
159+
it('should NOT advance when all callbacks return NotConsumed', async () => {
152160
const event = createMockEvent();
153161
const consumer = new EventsConsumer([event]);
154162
const callback1 = vi
@@ -169,6 +177,7 @@ describe('EventsConsumer', () => {
169177
expect(callback1).toHaveBeenCalledWith(event);
170178
expect(callback2).toHaveBeenCalledWith(event);
171179
expect(callback3).toHaveBeenCalledWith(event);
180+
// Without auto-advance, eventIndex stays at 0
172181
expect(consumer.eventIndex).toBe(0);
173182
expect(consumer.callbacks).toEqual([callback1, callback2, callback3]);
174183
});
@@ -211,7 +220,7 @@ describe('EventsConsumer', () => {
211220
expect(callback2).toHaveBeenCalledWith(null);
212221
});
213222

214-
it('should handle complex event processing scenario', async () => {
223+
it('should handle complex event processing with multiple consumers', async () => {
215224
const events = [
216225
createMockEvent({ id: 'event-1', event_type: 'type-a' }),
217226
createMockEvent({ id: 'event-2', event_type: 'type-b' }),
@@ -241,13 +250,14 @@ describe('EventsConsumer', () => {
241250
consumer.subscribe(typeBCallback);
242251
await waitForNextTick();
243252
await waitForNextTick(); // Wait for recursive processing
244-
await waitForNextTick(); // Wait for final processing
245253

246-
// typeACallback processes event-1 and gets removed, so it won't process event-3
254+
// typeACallback processes event-1 and gets removed
247255
expect(typeACallback).toHaveBeenCalledTimes(1); // Called for event-1 only
256+
// typeBCallback processes event-2 and gets removed
248257
expect(typeBCallback).toHaveBeenCalledTimes(1); // Called for event-2
249-
expect(consumer.eventIndex).toBe(2); // Only 2 events processed (event-3 remains)
250-
expect(consumer.callbacks).toHaveLength(0); // Both callbacks removed after consuming their events
258+
// eventIndex is at 2 (after event-1 and event-2 were consumed)
259+
expect(consumer.eventIndex).toBe(2);
260+
expect(consumer.callbacks).toHaveLength(0);
251261
});
252262
});
253263

@@ -297,8 +307,9 @@ describe('EventsConsumer', () => {
297307
consumer.subscribe(callback3);
298308
await waitForNextTick();
299309

300-
// callback2 should be removed when it returns true
310+
// callback2 should be removed when it returns Finished
301311
expect(consumer.callbacks).toEqual([callback1, callback3]);
312+
// callback3 is called with the next event (null after event-1)
302313
expect(callback3).toHaveBeenCalledWith(null);
303314
});
304315

@@ -314,25 +325,6 @@ describe('EventsConsumer', () => {
314325
expect(consumer.eventIndex).toBe(1);
315326
});
316327

317-
it('should handle multiple subscriptions happening in sequence', async () => {
318-
const event1 = createMockEvent({ id: 'event-1' });
319-
const event2 = createMockEvent({ id: 'event-2' });
320-
const consumer = new EventsConsumer([event1, event2]);
321-
322-
const callback1 = vi.fn().mockReturnValue(EventConsumerResult.Finished);
323-
const callback2 = vi.fn().mockReturnValue(EventConsumerResult.Finished);
324-
325-
consumer.subscribe(callback1);
326-
await waitForNextTick();
327-
328-
consumer.subscribe(callback2);
329-
await waitForNextTick();
330-
331-
expect(callback1).toHaveBeenCalledWith(event1);
332-
expect(callback2).toHaveBeenCalledWith(event2);
333-
expect(consumer.eventIndex).toBe(2);
334-
});
335-
336328
it('should handle empty events array gracefully', async () => {
337329
const consumer = new EventsConsumer([]);
338330
const callback = vi.fn().mockReturnValue(EventConsumerResult.NotConsumed);
@@ -343,5 +335,49 @@ describe('EventsConsumer', () => {
343335
expect(callback).toHaveBeenCalledWith(null);
344336
expect(consumer.eventIndex).toBe(0);
345337
});
338+
339+
it('should process events in order with proper consumers', async () => {
340+
// This test simulates the workflow scenario:
341+
// - run_created consumer consumes it
342+
// - step consumer gets step_created, step_completed
343+
const events = [
344+
createMockEvent({ id: 'run-created', event_type: 'run_created' }),
345+
createMockEvent({ id: 'step-created', event_type: 'step_created' }),
346+
createMockEvent({ id: 'step-completed', event_type: 'step_completed' }),
347+
];
348+
const consumer = new EventsConsumer(events);
349+
350+
// Run lifecycle consumer - consumes run_created
351+
const runConsumer = vi.fn().mockImplementation((event: Event | null) => {
352+
if (event?.event_type === 'run_created') {
353+
return EventConsumerResult.Consumed;
354+
}
355+
return EventConsumerResult.NotConsumed;
356+
});
357+
358+
// Step consumer - consumes step_created, finishes on step_completed
359+
const stepConsumer = vi.fn().mockImplementation((event: Event | null) => {
360+
if (event?.event_type === 'step_created') {
361+
return EventConsumerResult.Consumed;
362+
}
363+
if (event?.event_type === 'step_completed') {
364+
return EventConsumerResult.Finished;
365+
}
366+
return EventConsumerResult.NotConsumed;
367+
});
368+
369+
consumer.subscribe(runConsumer);
370+
consumer.subscribe(stepConsumer);
371+
await waitForNextTick();
372+
await waitForNextTick();
373+
await waitForNextTick();
374+
375+
// runConsumer consumes run_created
376+
expect(runConsumer).toHaveBeenCalledWith(events[0]);
377+
// stepConsumer consumes step_created, then finishes on step_completed
378+
expect(stepConsumer).toHaveBeenCalledWith(events[1]);
379+
expect(stepConsumer).toHaveBeenCalledWith(events[2]);
380+
expect(consumer.eventIndex).toBe(3);
381+
});
346382
});
347383
});

packages/core/src/events-consumer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,5 +78,10 @@ export class EventsConsumer {
7878
return;
7979
}
8080
}
81+
82+
// If we reach here, all callbacks returned NotConsumed.
83+
// We do NOT auto-advance - every event must have a consumer.
84+
// With proper consumers for run_created/run_started/step_created,
85+
// this should not cause events to get stuck.
8186
};
8287
}

packages/core/src/global.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export interface StepInvocationQueueItem {
66
stepName: string;
77
args: Serializable[];
88
closureVars?: Record<string, Serializable>;
9+
hasCreatedEvent?: boolean;
910
}
1011

1112
export interface HookInvocationQueueItem {

packages/core/src/runtime.ts

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ export class Run<TResult> {
104104
* Cancels the workflow run.
105105
*/
106106
async cancel(): Promise<void> {
107-
await this.world.runs.cancel(this.runId);
107+
await this.world.events.create(this.runId, {
108+
eventType: 'run_cancelled',
109+
});
108110
}
109111

110112
/**
@@ -272,10 +274,14 @@ export function workflowEntrypoint(
272274
let workflowRun = await world.runs.get(runId);
273275

274276
if (workflowRun.status === 'pending') {
275-
workflowRun = await world.runs.update(runId, {
276-
// This sets the `startedAt` timestamp at the database level
277-
status: 'running',
277+
// Transition run to 'running' via event (event-sourced architecture)
278+
const result = await world.events.create(runId, {
279+
eventType: 'run_started',
278280
});
281+
// Use the run entity from the event response (no extra get call needed)
282+
if (result.run) {
283+
workflowRun = result.run;
284+
}
279285
}
280286

281287
// At this point, the workflow is "running" and `startedAt` should
@@ -310,27 +316,38 @@ export function workflowEntrypoint(
310316
// Load all events into memory before running
311317
const events = await getAllWorkflowRunEvents(workflowRun.runId);
312318

313-
// Check for any elapsed waits and create wait_completed events
319+
// Check for any elapsed waits and batch create wait_completed events
314320
const now = Date.now();
315-
for (const event of events) {
316-
if (event.eventType === 'wait_created') {
317-
const resumeAt = event.eventData.resumeAt as Date;
318-
const hasCompleted = events.some(
319-
(e) =>
320-
e.eventType === 'wait_completed' &&
321-
e.correlationId === event.correlationId
322-
);
323321

324-
// If wait has elapsed and hasn't been completed yet
325-
if (!hasCompleted && now >= resumeAt.getTime()) {
326-
const completedEvent = await world.events.create(runId, {
327-
eventType: 'wait_completed',
328-
correlationId: event.correlationId,
329-
});
330-
// Add the event to the events array so the workflow can see it
331-
events.push(completedEvent);
332-
}
333-
}
322+
// Pre-compute completed correlation IDs for O(n) lookup instead of O(n²)
323+
const completedWaitIds = new Set(
324+
events
325+
.filter((e) => e.eventType === 'wait_completed')
326+
.map((e) => e.correlationId)
327+
);
328+
329+
// Collect all waits that need completion
330+
const waitsToComplete = events
331+
.filter(
332+
(e): e is typeof e & { correlationId: string } =>
333+
e.eventType === 'wait_created' &&
334+
e.correlationId !== undefined &&
335+
!completedWaitIds.has(e.correlationId) &&
336+
now >= (e.eventData.resumeAt as Date).getTime()
337+
)
338+
.map((e) => ({
339+
eventType: 'wait_completed' as const,
340+
correlationId: e.correlationId,
341+
}));
342+
343+
// Batch create all wait_completed events
344+
if (waitsToComplete.length > 0) {
345+
const completedResults = await world.events.createBatch(
346+
runId,
347+
waitsToComplete
348+
);
349+
// Add the events to the events array so the workflow can see them
350+
events.push(...completedResults.map((r) => r.event));
334351
}
335352

336353
const result = await runWorkflow(
@@ -339,10 +356,12 @@ export function workflowEntrypoint(
339356
events
340357
);
341358

342-
// Update the workflow run with the result
343-
await world.runs.update(runId, {
344-
status: 'completed',
345-
output: result as Serializable,
359+
// Complete the workflow run via event (event-sourced architecture)
360+
await world.events.create(runId, {
361+
eventType: 'run_completed',
362+
eventData: {
363+
output: result as Serializable,
364+
},
346365
});
347366

348367
span?.setAttributes({
@@ -393,11 +412,14 @@ export function workflowEntrypoint(
393412
console.error(
394413
`${errorName} while running "${runId}" workflow:\n\n${errorStack}`
395414
);
396-
await world.runs.update(runId, {
397-
status: 'failed',
398-
error: {
399-
message: errorMessage,
400-
stack: errorStack,
415+
// Fail the workflow run via event (event-sourced architecture)
416+
await world.events.create(runId, {
417+
eventType: 'run_failed',
418+
eventData: {
419+
error: {
420+
message: errorMessage,
421+
stack: errorStack,
422+
},
401423
// TODO: include error codes when we define them
402424
},
403425
});

0 commit comments

Comments
 (0)