Skip to content

Commit 853963d

Browse files
pranaygpclaude
andcommitted
perf: add events.createBatch() for batch event creation
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3c27e56 commit 853963d

File tree

8 files changed

+343
-39
lines changed

8 files changed

+343
-39
lines changed

.changeset/batch-event-creation.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
"@workflow/core": patch
3+
"@workflow/world": patch
4+
"@workflow/world-local": patch
5+
"@workflow/world-postgres": patch
6+
"@workflow/world-vercel": patch
7+
---
8+
9+
perf: add events.createBatch() for batch event creation
10+
11+
- Add `createBatch()` method to Storage interface for creating multiple events atomically
12+
- Use batch event creation in suspension handler for improved performance
13+
- Use batch event creation for wait_completed events in runtime

packages/core/src/runtime.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,11 +340,14 @@ export function workflowEntrypoint(
340340
correlationId: e.correlationId,
341341
}));
342342

343-
// Create all wait_completed events
344-
for (const waitEvent of waitsToComplete) {
345-
const result = await world.events.create(runId, waitEvent);
346-
// Add the event to the events array so the workflow can see it
347-
events.push(result.event);
343+
// Batch create all wait_completed events
344+
if (waitsToComplete.length > 0) {
345+
const completedResults = await world.events.createBatch(
346+
runId,
347+
waitsToComplete
348+
);
349+
// Add the events to the events array so the workflow can see them
350+
events.push(...completedResults.map((r) => r.event));
348351
}
349352

350353
const result = await runWorkflow(

packages/core/src/runtime/suspension-handler.ts

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export interface SuspensionHandlerResult {
3030
/**
3131
* Handles a workflow suspension by processing all pending operations (hooks, steps, waits).
3232
* Uses an event-sourced architecture where entities (steps, hooks) are created atomically
33-
* with their corresponding events via events.create().
33+
* with their corresponding events via createBatch.
3434
*
3535
* Processing order:
3636
* 1. Hooks are processed first to prevent race conditions with webhook receivers
@@ -72,16 +72,16 @@ export async function handleSuspension({
7272
});
7373

7474
// Process hooks first to prevent race conditions with webhook receivers
75-
for (const hookEvent of hookEvents) {
75+
if (hookEvents.length > 0) {
7676
try {
77-
await world.events.create(runId, hookEvent);
77+
await world.events.createBatch(runId, hookEvents);
7878
} catch (err) {
7979
if (WorkflowAPIError.is(err)) {
8080
if (err.status === 409) {
81-
console.warn(`Hook already exists, continuing: ${err.message}`);
81+
console.warn(`Some hooks already exist, continuing: ${err.message}`);
8282
} else if (err.status === 410) {
8383
console.warn(
84-
`Workflow run "${runId}" has already completed, skipping hook: ${err.message}`
84+
`Workflow run "${runId}" has already completed, skipping hooks: ${err.message}`
8585
);
8686
} else {
8787
throw err;
@@ -128,35 +128,36 @@ export async function handleSuspension({
128128
},
129129
}));
130130

131-
// Process steps and waits
131+
// Process steps and waits in parallel
132132
const ops: Promise<void>[] = [];
133133

134-
// Create step events (World creates step entities atomically)
135-
// Only for steps that don't already have a step_created event
136-
for (const stepEvent of stepEvents) {
137-
try {
138-
await world.events.create(runId, stepEvent);
139-
} catch (err) {
140-
if (WorkflowAPIError.is(err) && err.status === 409) {
141-
console.warn(`Step already exists, continuing: ${err.message}`);
142-
} else {
143-
throw err;
144-
}
145-
}
146-
}
147-
148-
// Create wait events
149-
for (const waitEvent of waitEvents) {
150-
try {
151-
await world.events.create(runId, waitEvent);
152-
} catch (err) {
153-
if (WorkflowAPIError.is(err) && err.status === 409) {
154-
console.warn(`Wait already exists, continuing: ${err.message}`);
155-
} else {
156-
throw err;
157-
}
158-
}
159-
}
134+
await Promise.all([
135+
// Create step events (World creates step entities atomically)
136+
// Only for steps that don't already have a step_created event
137+
stepEvents.length > 0
138+
? world.events.createBatch(runId, stepEvents).catch((err) => {
139+
if (WorkflowAPIError.is(err) && err.status === 409) {
140+
console.warn(
141+
`Some steps already exist, continuing: ${err.message}`
142+
);
143+
} else {
144+
throw err;
145+
}
146+
})
147+
: Promise.resolve(),
148+
// Create wait events
149+
waitEvents.length > 0
150+
? world.events.createBatch(runId, waitEvents).catch((err) => {
151+
if (WorkflowAPIError.is(err) && err.status === 409) {
152+
console.warn(
153+
`Some waits already exist, continuing: ${err.message}`
154+
);
155+
} else {
156+
throw err;
157+
}
158+
})
159+
: Promise.resolve(),
160+
]);
160161

161162
// Queue step execution messages for ALL pending steps
162163
// (both newly created and those with existing step_created events)

packages/world-local/src/storage.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,16 @@ export function createStorage(basedir: string): Storage {
687687
};
688688
},
689689

690+
async createBatch(runId, data, params): Promise<EventResult[]> {
691+
// createBatch is just a sequential loop over create() to ensure monotonic ULIDs
692+
const results: EventResult[] = [];
693+
for (const eventData of data) {
694+
const result = await this.create(runId, eventData, params);
695+
results.push(result);
696+
}
697+
return results;
698+
},
699+
690700
async list(params) {
691701
const { runId } = params;
692702
const resolveData = params.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION;

packages/world-postgres/src/storage.ts

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,228 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
448448
const resolveData = params?.resolveData ?? 'all';
449449
return { event: filterEventData(parsed, resolveData), run, step, hook };
450450
},
451+
async createBatch(runId, data, params): Promise<EventResult[]> {
452+
if (data.length === 0) {
453+
return [];
454+
}
455+
456+
const resolveData = params?.resolveData ?? 'all';
457+
const now = new Date();
458+
459+
// For run_created events, generate runId server-side if not provided
460+
const hasRunCreated = data.some((e) => e.eventType === 'run_created');
461+
let effectiveRunId = runId;
462+
if (hasRunCreated && (!runId || runId === '')) {
463+
effectiveRunId = `wrun_${ulid()}`;
464+
}
465+
466+
// Separate events by type for entity creation
467+
const runCreatedEvents = data.filter(
468+
(e) => e.eventType === 'run_created'
469+
);
470+
const runStartedEvents = data.filter(
471+
(e) => e.eventType === 'run_started'
472+
);
473+
const runCompletedEvents = data.filter(
474+
(e) => e.eventType === 'run_completed'
475+
);
476+
const runFailedEvents = data.filter((e) => e.eventType === 'run_failed');
477+
const runCancelledEvents = data.filter(
478+
(e) => e.eventType === 'run_cancelled'
479+
);
480+
const runPausedEvents = data.filter((e) => e.eventType === 'run_paused');
481+
const runResumedEvents = data.filter(
482+
(e) => e.eventType === 'run_resumed'
483+
);
484+
const stepCreatedEvents = data.filter(
485+
(e) => e.eventType === 'step_created'
486+
);
487+
const hookCreatedEvents = data.filter(
488+
(e) => e.eventType === 'hook_created'
489+
);
490+
491+
// Create run entities atomically with events
492+
if (runCreatedEvents.length > 0) {
493+
const runsToInsert = runCreatedEvents.map((eventData) => {
494+
const runData = (eventData as any).eventData as {
495+
deploymentId: string;
496+
workflowName: string;
497+
input: any[];
498+
executionContext?: Record<string, any>;
499+
};
500+
return {
501+
runId: effectiveRunId,
502+
deploymentId: runData.deploymentId,
503+
workflowName: runData.workflowName,
504+
input: runData.input as SerializedContent,
505+
executionContext: runData.executionContext as
506+
| SerializedContent
507+
| undefined,
508+
status: 'pending' as const,
509+
};
510+
});
511+
await drizzle
512+
.insert(Schema.runs)
513+
.values(runsToInsert)
514+
.onConflictDoNothing();
515+
}
516+
517+
// Update run status for run_started events
518+
if (runStartedEvents.length > 0) {
519+
await drizzle
520+
.update(Schema.runs)
521+
.set({
522+
status: 'running',
523+
startedAt: now,
524+
updatedAt: now,
525+
})
526+
.where(eq(Schema.runs.runId, effectiveRunId));
527+
}
528+
529+
// Update run status for run_completed events
530+
if (runCompletedEvents.length > 0) {
531+
const completedData = (runCompletedEvents[0] as any).eventData as {
532+
output?: any;
533+
};
534+
await drizzle
535+
.update(Schema.runs)
536+
.set({
537+
status: 'completed',
538+
output: completedData.output as SerializedContent | undefined,
539+
completedAt: now,
540+
updatedAt: now,
541+
})
542+
.where(eq(Schema.runs.runId, effectiveRunId));
543+
}
544+
545+
// Update run status for run_failed events
546+
if (runFailedEvents.length > 0) {
547+
const failedData = (runFailedEvents[0] as any).eventData as {
548+
error: any;
549+
errorCode?: string;
550+
};
551+
const errorMessage =
552+
typeof failedData.error === 'string'
553+
? failedData.error
554+
: (failedData.error?.message ?? 'Unknown error');
555+
// Store structured error as JSON for deserializeRunError to parse
556+
const errorJson = JSON.stringify({
557+
message: errorMessage,
558+
stack: failedData.error?.stack,
559+
code: failedData.errorCode,
560+
});
561+
await drizzle
562+
.update(Schema.runs)
563+
.set({
564+
status: 'failed',
565+
error: errorJson,
566+
completedAt: now,
567+
updatedAt: now,
568+
})
569+
.where(eq(Schema.runs.runId, effectiveRunId));
570+
}
571+
572+
// Update run status for run_cancelled events
573+
if (runCancelledEvents.length > 0) {
574+
await drizzle
575+
.update(Schema.runs)
576+
.set({
577+
status: 'cancelled',
578+
completedAt: now,
579+
updatedAt: now,
580+
})
581+
.where(eq(Schema.runs.runId, effectiveRunId));
582+
}
583+
584+
// Update run status for run_paused events
585+
if (runPausedEvents.length > 0) {
586+
await drizzle
587+
.update(Schema.runs)
588+
.set({
589+
status: 'paused',
590+
updatedAt: now,
591+
})
592+
.where(eq(Schema.runs.runId, effectiveRunId));
593+
}
594+
595+
// Update run status for run_resumed events
596+
if (runResumedEvents.length > 0) {
597+
await drizzle
598+
.update(Schema.runs)
599+
.set({
600+
status: 'running',
601+
updatedAt: now,
602+
})
603+
.where(eq(Schema.runs.runId, effectiveRunId));
604+
}
605+
606+
// Create step entities atomically with events
607+
if (stepCreatedEvents.length > 0) {
608+
const stepsToInsert = stepCreatedEvents.map((eventData) => {
609+
const stepData = (eventData as any).eventData as {
610+
stepName: string;
611+
input: any;
612+
};
613+
return {
614+
runId: effectiveRunId,
615+
stepId: eventData.correlationId!,
616+
stepName: stepData.stepName,
617+
input: stepData.input as SerializedContent,
618+
status: 'pending' as const,
619+
attempt: 0,
620+
};
621+
});
622+
await drizzle
623+
.insert(Schema.steps)
624+
.values(stepsToInsert)
625+
.onConflictDoNothing();
626+
}
627+
628+
// Create hook entities atomically with events
629+
if (hookCreatedEvents.length > 0) {
630+
const hooksToInsert = hookCreatedEvents.map((eventData) => {
631+
const hookData = (eventData as any).eventData as {
632+
token: string;
633+
metadata?: any;
634+
};
635+
return {
636+
runId: effectiveRunId,
637+
hookId: eventData.correlationId!,
638+
token: hookData.token,
639+
metadata: hookData.metadata as SerializedContent,
640+
ownerId: '', // TODO: get from context
641+
projectId: '', // TODO: get from context
642+
environment: '', // TODO: get from context
643+
};
644+
});
645+
await drizzle
646+
.insert(Schema.hooks)
647+
.values(hooksToInsert)
648+
.onConflictDoNothing();
649+
}
650+
651+
// Insert all events in a single batch query
652+
const eventsToInsert = data.map((eventData) => ({
653+
runId: effectiveRunId,
654+
eventId: `wevt_${ulid()}`,
655+
correlationId: eventData.correlationId,
656+
eventType: eventData.eventType,
657+
eventData: 'eventData' in eventData ? eventData.eventData : undefined,
658+
}));
659+
660+
const values = await drizzle
661+
.insert(events)
662+
.values(eventsToInsert)
663+
.returning({ eventId: events.eventId, createdAt: events.createdAt });
664+
665+
// Combine input data with returned values
666+
// TODO: Return actual entity data from the database after entity creation is moved here
667+
return data.map((eventData, i) => {
668+
const result = { ...eventData, ...values[i], runId: effectiveRunId };
669+
const parsed = EventSchema.parse(result);
670+
return { event: filterEventData(parsed, resolveData) };
671+
});
672+
},
451673
async list(params: ListEventsParams): Promise<PaginatedResponse<Event>> {
452674
const limit = params?.pagination?.limit ?? 100;
453675
const sortOrder = params.pagination?.sortOrder || 'asc';

0 commit comments

Comments
 (0)