Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/batch-event-creation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"@workflow/core": patch
"@workflow/world": patch
"@workflow/world-local": patch
"@workflow/world-postgres": patch
"@workflow/world-vercel": patch
---

perf: add events.createBatch() for batch event creation

- Add `createBatch()` method to Storage interface for creating multiple events atomically
- Use batch event creation in suspension handler for improved performance
- Use batch event creation for wait_completed events in runtime
13 changes: 8 additions & 5 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,14 @@ export function workflowEntrypoint(
correlationId: e.correlationId,
}));

// Create all wait_completed events
for (const waitEvent of waitsToComplete) {
const result = await world.events.create(runId, waitEvent);
// Add the event to the events array so the workflow can see it
events.push(result.event);
// Batch create all wait_completed events
if (waitsToComplete.length > 0) {
const completedResults = await world.events.createBatch(
runId,
waitsToComplete
);
// Add the events to the events array so the workflow can see them
events.push(...completedResults.map((r) => r.event));
}

const result = await runWorkflow(
Expand Down
206 changes: 98 additions & 108 deletions packages/core/src/runtime/suspension-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export interface SuspensionHandlerResult {
/**
* Handles a workflow suspension by processing all pending operations (hooks, steps, waits).
* Uses an event-sourced architecture where entities (steps, hooks) are created atomically
* with their corresponding events via events.create().
* with their corresponding events via createBatch.
*
* Processing order:
* 1. Hooks are processed first to prevent race conditions with webhook receivers
Expand Down Expand Up @@ -72,130 +72,120 @@ export async function handleSuspension({
});

// Process hooks first to prevent race conditions with webhook receivers
// All hook creations run in parallel
if (hookEvents.length > 0) {
await Promise.all(
hookEvents.map(async (hookEvent) => {
try {
await world.events.create(runId, hookEvent);
} catch (err) {
if (WorkflowAPIError.is(err)) {
if (err.status === 409) {
console.warn(`Hook already exists, continuing: ${err.message}`);
} else if (err.status === 410) {
console.warn(
`Workflow run "${runId}" has already completed, skipping hook: ${err.message}`
);
} else {
throw err;
}
} else {
throw err;
}
try {
await world.events.createBatch(runId, hookEvents);
} catch (err) {
if (WorkflowAPIError.is(err)) {
if (err.status === 409) {
console.warn(`Some hooks already exist, continuing: ${err.message}`);
} else if (err.status === 410) {
console.warn(
`Workflow run "${runId}" has already completed, skipping hooks: ${err.message}`
);
} else {
throw err;
}
})
);
} else {
throw err;
}
}
}

// Build a map of stepId -> step event for steps that need creation
const stepsNeedingCreation = new Set(
stepItems
.filter((queueItem) => !queueItem.hasCreatedEvent)
.map((queueItem) => queueItem.correlationId)
// Build step_created events only for steps that haven't been created yet
// Steps with hasCreatedEvent=true already have their event in the log
const stepsNeedingCreation = stepItems.filter(
(queueItem) => !queueItem.hasCreatedEvent
);
const stepEvents: CreateEventRequest[] = stepsNeedingCreation.map(
(queueItem) => {
const dehydratedInput = dehydrateStepArguments(
{
args: queueItem.args,
closureVars: queueItem.closureVars,
},
suspension.globalThis
);
return {
eventType: 'step_created' as const,
correlationId: queueItem.correlationId,
eventData: {
stepName: queueItem.stepName,
input: dehydratedInput as Serializable,
},
};
}
);

// Process steps and waits in parallel
// Each step: create event (if needed) -> queue message
// Each wait: create event (if needed)
const ops: Promise<void>[] = [];
// Build wait_created events (only for waits that haven't been created yet)
const waitEvents: CreateEventRequest[] = waitItems
.filter((queueItem) => !queueItem.hasCreatedEvent)
.map((queueItem) => ({
eventType: 'wait_created' as const,
correlationId: queueItem.correlationId,
eventData: {
resumeAt: queueItem.resumeAt,
},
}));

// Steps: create event then queue message, all in parallel
for (const queueItem of stepItems) {
ops.push(
(async () => {
// Create step event if not already created
if (stepsNeedingCreation.has(queueItem.correlationId)) {
const dehydratedInput = dehydrateStepArguments(
{
args: queueItem.args,
closureVars: queueItem.closureVars,
},
suspension.globalThis
);
const stepEvent: CreateEventRequest = {
eventType: 'step_created' as const,
correlationId: queueItem.correlationId,
eventData: {
stepName: queueItem.stepName,
input: dehydratedInput as Serializable,
},
};
try {
await world.events.create(runId, stepEvent);
} catch (err) {
if (WorkflowAPIError.is(err) && err.status === 409) {
console.warn(`Step already exists, continuing: ${err.message}`);
} else {
throw err;
}
// Process steps and waits in parallel using batch creation
await Promise.all([
// Create step events (World creates step entities atomically)
// Only for steps that don't already have a step_created event
stepEvents.length > 0
? world.events.createBatch(runId, stepEvents).catch((err) => {
if (WorkflowAPIError.is(err) && err.status === 409) {
console.warn(
`Some steps already exist, continuing: ${err.message}`
);
} else {
throw err;
}
}

// Queue step execution message
await queueMessage(
world,
`__wkf_step_${queueItem.stepName}`,
{
workflowName,
workflowRunId: runId,
workflowStartedAt,
stepId: queueItem.correlationId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
},
{
idempotencyKey: queueItem.correlationId,
})
: Promise.resolve(),
// Create wait events
waitEvents.length > 0
? world.events.createBatch(runId, waitEvents).catch((err) => {
if (WorkflowAPIError.is(err) && err.status === 409) {
console.warn(
`Some waits already exist, continuing: ${err.message}`
);
} else {
throw err;
}
);
})()
);
}
})
: Promise.resolve(),
]);

// Waits: create events in parallel (no queueing needed for waits)
for (const queueItem of waitItems) {
if (!queueItem.hasCreatedEvent) {
ops.push(
(async () => {
const waitEvent: CreateEventRequest = {
eventType: 'wait_created' as const,
correlationId: queueItem.correlationId,
eventData: {
resumeAt: queueItem.resumeAt,
},
};
try {
await world.events.create(runId, waitEvent);
} catch (err) {
if (WorkflowAPIError.is(err) && err.status === 409) {
console.warn(`Wait already exists, continuing: ${err.message}`);
} else {
throw err;
}
}
})()
);
}
}
// Queue step execution messages for ALL pending steps in parallel
// (both newly created and those with existing step_created events)
const queueOps = stepItems.map(async (queueItem) => {
await queueMessage(
world,
`__wkf_step_${queueItem.stepName}`,
{
workflowName,
workflowRunId: runId,
workflowStartedAt,
stepId: queueItem.correlationId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
},
{
idempotencyKey: queueItem.correlationId,
}
);
});

// Wait for all step and wait operations to complete
// Wait for all queue operations to complete
waitUntil(
Promise.all(ops).catch((opErr) => {
Promise.all(queueOps).catch((opErr) => {
const isAbortError =
opErr?.name === 'AbortError' || opErr?.name === 'ResponseAborted';
if (!isAbortError) throw opErr;
})
);
await Promise.all(ops);
await Promise.all(queueOps);

// Calculate minimum timeout from waits
const now = Date.now();
Expand Down
10 changes: 10 additions & 0 deletions packages/world-local/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,16 @@ export function createStorage(basedir: string): Storage {
};
},

async createBatch(runId, data, params): Promise<EventResult[]> {
// createBatch is just a sequential loop over create() to ensure monotonic ULIDs
const results: EventResult[] = [];
for (const eventData of data) {
const result = await this.create(runId, eventData, params);
results.push(result);
}
return results;
},

async list(params) {
const { runId } = params;
const resolveData = params.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION;
Expand Down
Loading
Loading