Skip to content

Commit 752bc6b

Browse files
pranaygpclaude
andcommitted
perf: add events.createBatch() for batch event creation
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 6472eb6 commit 752bc6b

File tree

8 files changed

+408
-115
lines changed

8 files changed

+408
-115
lines changed

.changeset/batch-event-creation.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
"@workflow/core": patch
3+
"@workflow/world": patch
4+
"@workflow/world-local": patch
5+
"@workflow/world-postgres": patch
6+
"@workflow/world-vercel": patch
7+
---
8+
9+
perf: add events.createBatch() for batch event creation
10+
11+
- Add `createBatch()` method to Storage interface for creating multiple events atomically
12+
- Use batch event creation in suspension handler for improved performance
13+
- Use batch event creation for wait_completed events in runtime

packages/core/src/runtime.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,11 +340,14 @@ export function workflowEntrypoint(
340340
correlationId: e.correlationId,
341341
}));
342342

343-
// Create all wait_completed events
344-
for (const waitEvent of waitsToComplete) {
345-
const result = await world.events.create(runId, waitEvent);
346-
// Add the event to the events array so the workflow can see it
347-
events.push(result.event);
343+
// Batch create all wait_completed events
344+
if (waitsToComplete.length > 0) {
345+
const completedResults = await world.events.createBatch(
346+
runId,
347+
waitsToComplete
348+
);
349+
// Add the events to the events array so the workflow can see them
350+
events.push(...completedResults.map((r) => r.event));
348351
}
349352

350353
const result = await runWorkflow(

packages/core/src/runtime/suspension-handler.ts

Lines changed: 98 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export interface SuspensionHandlerResult {
3030
/**
3131
* Handles a workflow suspension by processing all pending operations (hooks, steps, waits).
3232
* Uses an event-sourced architecture where entities (steps, hooks) are created atomically
33-
* with their corresponding events via events.create().
33+
* with their corresponding events via createBatch.
3434
*
3535
* Processing order:
3636
* 1. Hooks are processed first to prevent race conditions with webhook receivers
@@ -72,130 +72,120 @@ export async function handleSuspension({
7272
});
7373

7474
// Process hooks first to prevent race conditions with webhook receivers
75-
// All hook creations run in parallel
7675
if (hookEvents.length > 0) {
77-
await Promise.all(
78-
hookEvents.map(async (hookEvent) => {
79-
try {
80-
await world.events.create(runId, hookEvent);
81-
} catch (err) {
82-
if (WorkflowAPIError.is(err)) {
83-
if (err.status === 409) {
84-
console.warn(`Hook already exists, continuing: ${err.message}`);
85-
} else if (err.status === 410) {
86-
console.warn(
87-
`Workflow run "${runId}" has already completed, skipping hook: ${err.message}`
88-
);
89-
} else {
90-
throw err;
91-
}
92-
} else {
93-
throw err;
94-
}
76+
try {
77+
await world.events.createBatch(runId, hookEvents);
78+
} catch (err) {
79+
if (WorkflowAPIError.is(err)) {
80+
if (err.status === 409) {
81+
console.warn(`Some hooks already exist, continuing: ${err.message}`);
82+
} else if (err.status === 410) {
83+
console.warn(
84+
`Workflow run "${runId}" has already completed, skipping hooks: ${err.message}`
85+
);
86+
} else {
87+
throw err;
9588
}
96-
})
97-
);
89+
} else {
90+
throw err;
91+
}
92+
}
9893
}
9994

100-
// Build a map of stepId -> step event for steps that need creation
101-
const stepsNeedingCreation = new Set(
102-
stepItems
103-
.filter((queueItem) => !queueItem.hasCreatedEvent)
104-
.map((queueItem) => queueItem.correlationId)
95+
// Build step_created events only for steps that haven't been created yet
96+
// Steps with hasCreatedEvent=true already have their event in the log
97+
const stepsNeedingCreation = stepItems.filter(
98+
(queueItem) => !queueItem.hasCreatedEvent
99+
);
100+
const stepEvents: CreateEventRequest[] = stepsNeedingCreation.map(
101+
(queueItem) => {
102+
const dehydratedInput = dehydrateStepArguments(
103+
{
104+
args: queueItem.args,
105+
closureVars: queueItem.closureVars,
106+
},
107+
suspension.globalThis
108+
);
109+
return {
110+
eventType: 'step_created' as const,
111+
correlationId: queueItem.correlationId,
112+
eventData: {
113+
stepName: queueItem.stepName,
114+
input: dehydratedInput as Serializable,
115+
},
116+
};
117+
}
105118
);
106119

107-
// Process steps and waits in parallel
108-
// Each step: create event (if needed) -> queue message
109-
// Each wait: create event (if needed)
110-
const ops: Promise<void>[] = [];
120+
// Build wait_created events (only for waits that haven't been created yet)
121+
const waitEvents: CreateEventRequest[] = waitItems
122+
.filter((queueItem) => !queueItem.hasCreatedEvent)
123+
.map((queueItem) => ({
124+
eventType: 'wait_created' as const,
125+
correlationId: queueItem.correlationId,
126+
eventData: {
127+
resumeAt: queueItem.resumeAt,
128+
},
129+
}));
111130

112-
// Steps: create event then queue message, all in parallel
113-
for (const queueItem of stepItems) {
114-
ops.push(
115-
(async () => {
116-
// Create step event if not already created
117-
if (stepsNeedingCreation.has(queueItem.correlationId)) {
118-
const dehydratedInput = dehydrateStepArguments(
119-
{
120-
args: queueItem.args,
121-
closureVars: queueItem.closureVars,
122-
},
123-
suspension.globalThis
124-
);
125-
const stepEvent: CreateEventRequest = {
126-
eventType: 'step_created' as const,
127-
correlationId: queueItem.correlationId,
128-
eventData: {
129-
stepName: queueItem.stepName,
130-
input: dehydratedInput as Serializable,
131-
},
132-
};
133-
try {
134-
await world.events.create(runId, stepEvent);
135-
} catch (err) {
136-
if (WorkflowAPIError.is(err) && err.status === 409) {
137-
console.warn(`Step already exists, continuing: ${err.message}`);
138-
} else {
139-
throw err;
140-
}
131+
// Process steps and waits in parallel using batch creation
132+
await Promise.all([
133+
// Create step events (World creates step entities atomically)
134+
// Only for steps that don't already have a step_created event
135+
stepEvents.length > 0
136+
? world.events.createBatch(runId, stepEvents).catch((err) => {
137+
if (WorkflowAPIError.is(err) && err.status === 409) {
138+
console.warn(
139+
`Some steps already exist, continuing: ${err.message}`
140+
);
141+
} else {
142+
throw err;
141143
}
142-
}
143-
144-
// Queue step execution message
145-
await queueMessage(
146-
world,
147-
`__wkf_step_${queueItem.stepName}`,
148-
{
149-
workflowName,
150-
workflowRunId: runId,
151-
workflowStartedAt,
152-
stepId: queueItem.correlationId,
153-
traceCarrier: await serializeTraceCarrier(),
154-
requestedAt: new Date(),
155-
},
156-
{
157-
idempotencyKey: queueItem.correlationId,
144+
})
145+
: Promise.resolve(),
146+
// Create wait events
147+
waitEvents.length > 0
148+
? world.events.createBatch(runId, waitEvents).catch((err) => {
149+
if (WorkflowAPIError.is(err) && err.status === 409) {
150+
console.warn(
151+
`Some waits already exist, continuing: ${err.message}`
152+
);
153+
} else {
154+
throw err;
158155
}
159-
);
160-
})()
161-
);
162-
}
156+
})
157+
: Promise.resolve(),
158+
]);
163159

164-
// Waits: create events in parallel (no queueing needed for waits)
165-
for (const queueItem of waitItems) {
166-
if (!queueItem.hasCreatedEvent) {
167-
ops.push(
168-
(async () => {
169-
const waitEvent: CreateEventRequest = {
170-
eventType: 'wait_created' as const,
171-
correlationId: queueItem.correlationId,
172-
eventData: {
173-
resumeAt: queueItem.resumeAt,
174-
},
175-
};
176-
try {
177-
await world.events.create(runId, waitEvent);
178-
} catch (err) {
179-
if (WorkflowAPIError.is(err) && err.status === 409) {
180-
console.warn(`Wait already exists, continuing: ${err.message}`);
181-
} else {
182-
throw err;
183-
}
184-
}
185-
})()
186-
);
187-
}
188-
}
160+
// Queue step execution messages for ALL pending steps in parallel
161+
// (both newly created and those with existing step_created events)
162+
const queueOps = stepItems.map(async (queueItem) => {
163+
await queueMessage(
164+
world,
165+
`__wkf_step_${queueItem.stepName}`,
166+
{
167+
workflowName,
168+
workflowRunId: runId,
169+
workflowStartedAt,
170+
stepId: queueItem.correlationId,
171+
traceCarrier: await serializeTraceCarrier(),
172+
requestedAt: new Date(),
173+
},
174+
{
175+
idempotencyKey: queueItem.correlationId,
176+
}
177+
);
178+
});
189179

190-
// Wait for all step and wait operations to complete
180+
// Wait for all queue operations to complete
191181
waitUntil(
192-
Promise.all(ops).catch((opErr) => {
182+
Promise.all(queueOps).catch((opErr) => {
193183
const isAbortError =
194184
opErr?.name === 'AbortError' || opErr?.name === 'ResponseAborted';
195185
if (!isAbortError) throw opErr;
196186
})
197187
);
198-
await Promise.all(ops);
188+
await Promise.all(queueOps);
199189

200190
// Calculate minimum timeout from waits
201191
const now = Date.now();

packages/world-local/src/storage.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,16 @@ export function createStorage(basedir: string): Storage {
687687
};
688688
},
689689

690+
async createBatch(runId, data, params): Promise<EventResult[]> {
691+
// createBatch is just a sequential loop over create() to ensure monotonic ULIDs
692+
const results: EventResult[] = [];
693+
for (const eventData of data) {
694+
const result = await this.create(runId, eventData, params);
695+
results.push(result);
696+
}
697+
return results;
698+
},
699+
690700
async list(params) {
691701
const { runId } = params;
692702
const resolveData = params.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION;

0 commit comments

Comments
 (0)