diff --git a/.changeset/batch-event-creation.md b/.changeset/batch-event-creation.md new file mode 100644 index 000000000..2fa9ba564 --- /dev/null +++ b/.changeset/batch-event-creation.md @@ -0,0 +1,13 @@ +--- +"@workflow/core": patch +"@workflow/world": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +"@workflow/world-vercel": patch +--- + +perf: add events.createBatch() for batch event creation + +- Add `createBatch()` method to Storage interface for creating multiple events atomically +- Use batch event creation in suspension handler for improved performance +- Use batch event creation for wait_completed events in runtime diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index ae02b408d..2f63e9793 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -340,11 +340,14 @@ export function workflowEntrypoint( correlationId: e.correlationId, })); - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - const result = await world.events.create(runId, waitEvent); - // Add the event to the events array so the workflow can see it - events.push(result.event); + // Batch create all wait_completed events + if (waitsToComplete.length > 0) { + const completedResults = await world.events.createBatch( + runId, + waitsToComplete + ); + // Add the events to the events array so the workflow can see them + events.push(...completedResults.map((r) => r.event)); } const result = await runWorkflow( diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index 493909c07..6e16514fd 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -30,7 +30,7 @@ export interface SuspensionHandlerResult { /** * Handles a workflow suspension by processing all pending operations (hooks, steps, waits). * Uses an event-sourced architecture where entities (steps, hooks) are created atomically - * with their corresponding events via events.create(). + * with their corresponding events via createBatch. * * Processing order: * 1. Hooks are processed first to prevent race conditions with webhook receivers @@ -72,130 +72,120 @@ export async function handleSuspension({ }); // Process hooks first to prevent race conditions with webhook receivers - // All hook creations run in parallel if (hookEvents.length > 0) { - await Promise.all( - hookEvents.map(async (hookEvent) => { - try { - await world.events.create(runId, hookEvent); - } catch (err) { - if (WorkflowAPIError.is(err)) { - if (err.status === 409) { - console.warn(`Hook already exists, continuing: ${err.message}`); - } else if (err.status === 410) { - console.warn( - `Workflow run "${runId}" has already completed, skipping hook: ${err.message}` - ); - } else { - throw err; - } - } else { - throw err; - } + try { + await world.events.createBatch(runId, hookEvents); + } catch (err) { + if (WorkflowAPIError.is(err)) { + if (err.status === 409) { + console.warn(`Some hooks already exist, continuing: ${err.message}`); + } else if (err.status === 410) { + console.warn( + `Workflow run "${runId}" has already completed, skipping hooks: ${err.message}` + ); + } else { + throw err; } - }) - ); + } else { + throw err; + } + } } - // Build a map of stepId -> step event for steps that need creation - const stepsNeedingCreation = new Set( - stepItems - .filter((queueItem) => !queueItem.hasCreatedEvent) - .map((queueItem) => queueItem.correlationId) + // Build step_created events only for steps that haven't been created yet + // Steps with hasCreatedEvent=true already have their event in the log + const stepsNeedingCreation = stepItems.filter( + (queueItem) => !queueItem.hasCreatedEvent + ); + const stepEvents: CreateEventRequest[] = stepsNeedingCreation.map( + (queueItem) => { + const dehydratedInput = dehydrateStepArguments( + { + args: queueItem.args, + closureVars: queueItem.closureVars, + }, + suspension.globalThis + ); + return { + eventType: 'step_created' as const, + correlationId: queueItem.correlationId, + eventData: { + stepName: queueItem.stepName, + input: dehydratedInput as Serializable, + }, + }; + } ); - // Process steps and waits in parallel - // Each step: create event (if needed) -> queue message - // Each wait: create event (if needed) - const ops: Promise[] = []; + // Build wait_created events (only for waits that haven't been created yet) + const waitEvents: CreateEventRequest[] = waitItems + .filter((queueItem) => !queueItem.hasCreatedEvent) + .map((queueItem) => ({ + eventType: 'wait_created' as const, + correlationId: queueItem.correlationId, + eventData: { + resumeAt: queueItem.resumeAt, + }, + })); - // Steps: create event then queue message, all in parallel - for (const queueItem of stepItems) { - ops.push( - (async () => { - // Create step event if not already created - if (stepsNeedingCreation.has(queueItem.correlationId)) { - const dehydratedInput = dehydrateStepArguments( - { - args: queueItem.args, - closureVars: queueItem.closureVars, - }, - suspension.globalThis - ); - const stepEvent: CreateEventRequest = { - eventType: 'step_created' as const, - correlationId: queueItem.correlationId, - eventData: { - stepName: queueItem.stepName, - input: dehydratedInput as Serializable, - }, - }; - try { - await world.events.create(runId, stepEvent); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 409) { - console.warn(`Step already exists, continuing: ${err.message}`); - } else { - throw err; - } + // Process steps and waits in parallel using batch creation + await Promise.all([ + // Create step events (World creates step entities atomically) + // Only for steps that don't already have a step_created event + stepEvents.length > 0 + ? world.events.createBatch(runId, stepEvents).catch((err) => { + if (WorkflowAPIError.is(err) && err.status === 409) { + console.warn( + `Some steps already exist, continuing: ${err.message}` + ); + } else { + throw err; } - } - - // Queue step execution message - await queueMessage( - world, - `__wkf_step_${queueItem.stepName}`, - { - workflowName, - workflowRunId: runId, - workflowStartedAt, - stepId: queueItem.correlationId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { - idempotencyKey: queueItem.correlationId, + }) + : Promise.resolve(), + // Create wait events + waitEvents.length > 0 + ? world.events.createBatch(runId, waitEvents).catch((err) => { + if (WorkflowAPIError.is(err) && err.status === 409) { + console.warn( + `Some waits already exist, continuing: ${err.message}` + ); + } else { + throw err; } - ); - })() - ); - } + }) + : Promise.resolve(), + ]); - // Waits: create events in parallel (no queueing needed for waits) - for (const queueItem of waitItems) { - if (!queueItem.hasCreatedEvent) { - ops.push( - (async () => { - const waitEvent: CreateEventRequest = { - eventType: 'wait_created' as const, - correlationId: queueItem.correlationId, - eventData: { - resumeAt: queueItem.resumeAt, - }, - }; - try { - await world.events.create(runId, waitEvent); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 409) { - console.warn(`Wait already exists, continuing: ${err.message}`); - } else { - throw err; - } - } - })() - ); - } - } + // Queue step execution messages for ALL pending steps in parallel + // (both newly created and those with existing step_created events) + const queueOps = stepItems.map(async (queueItem) => { + await queueMessage( + world, + `__wkf_step_${queueItem.stepName}`, + { + workflowName, + workflowRunId: runId, + workflowStartedAt, + stepId: queueItem.correlationId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }, + { + idempotencyKey: queueItem.correlationId, + } + ); + }); - // Wait for all step and wait operations to complete + // Wait for all queue operations to complete waitUntil( - Promise.all(ops).catch((opErr) => { + Promise.all(queueOps).catch((opErr) => { const isAbortError = opErr?.name === 'AbortError' || opErr?.name === 'ResponseAborted'; if (!isAbortError) throw opErr; }) ); - await Promise.all(ops); + await Promise.all(queueOps); // Calculate minimum timeout from waits const now = Date.now(); diff --git a/packages/world-local/src/storage.ts b/packages/world-local/src/storage.ts index be8daea45..7a3edcb38 100644 --- a/packages/world-local/src/storage.ts +++ b/packages/world-local/src/storage.ts @@ -691,6 +691,16 @@ export function createStorage(basedir: string): Storage { }; }, + async createBatch(runId, data, params): Promise { + // createBatch is just a sequential loop over create() to ensure monotonic ULIDs + const results: EventResult[] = []; + for (const eventData of data) { + const result = await this.create(runId, eventData, params); + results.push(result); + } + return results; + }, + async list(params) { const { runId } = params; const resolveData = params.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index 5477908fa..c752f1602 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -489,6 +489,228 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { const resolveData = params?.resolveData ?? 'all'; return { event: filterEventData(parsed, resolveData), run, step, hook }; }, + async createBatch(runId, data, params): Promise { + if (data.length === 0) { + return []; + } + + const resolveData = params?.resolveData ?? 'all'; + const now = new Date(); + + // For run_created events, generate runId server-side if not provided + const hasRunCreated = data.some((e) => e.eventType === 'run_created'); + let effectiveRunId = runId; + if (hasRunCreated && (!runId || runId === '')) { + effectiveRunId = `wrun_${ulid()}`; + } + + // Separate events by type for entity creation + const runCreatedEvents = data.filter( + (e) => e.eventType === 'run_created' + ); + const runStartedEvents = data.filter( + (e) => e.eventType === 'run_started' + ); + const runCompletedEvents = data.filter( + (e) => e.eventType === 'run_completed' + ); + const runFailedEvents = data.filter((e) => e.eventType === 'run_failed'); + const runCancelledEvents = data.filter( + (e) => e.eventType === 'run_cancelled' + ); + const runPausedEvents = data.filter((e) => e.eventType === 'run_paused'); + const runResumedEvents = data.filter( + (e) => e.eventType === 'run_resumed' + ); + const stepCreatedEvents = data.filter( + (e) => e.eventType === 'step_created' + ); + const hookCreatedEvents = data.filter( + (e) => e.eventType === 'hook_created' + ); + + // Create run entities atomically with events + if (runCreatedEvents.length > 0) { + const runsToInsert = runCreatedEvents.map((eventData) => { + const runData = (eventData as any).eventData as { + deploymentId: string; + workflowName: string; + input: any[]; + executionContext?: Record; + }; + return { + runId: effectiveRunId, + deploymentId: runData.deploymentId, + workflowName: runData.workflowName, + input: runData.input as SerializedContent, + executionContext: runData.executionContext as + | SerializedContent + | undefined, + status: 'pending' as const, + }; + }); + await drizzle + .insert(Schema.runs) + .values(runsToInsert) + .onConflictDoNothing(); + } + + // Update run status for run_started events + if (runStartedEvents.length > 0) { + await drizzle + .update(Schema.runs) + .set({ + status: 'running', + startedAt: now, + updatedAt: now, + }) + .where(eq(Schema.runs.runId, effectiveRunId)); + } + + // Update run status for run_completed events + if (runCompletedEvents.length > 0) { + const completedData = (runCompletedEvents[0] as any).eventData as { + output?: any; + }; + await drizzle + .update(Schema.runs) + .set({ + status: 'completed', + output: completedData.output as SerializedContent | undefined, + completedAt: now, + updatedAt: now, + }) + .where(eq(Schema.runs.runId, effectiveRunId)); + } + + // Update run status for run_failed events + if (runFailedEvents.length > 0) { + const failedData = (runFailedEvents[0] as any).eventData as { + error: any; + errorCode?: string; + }; + const errorMessage = + typeof failedData.error === 'string' + ? failedData.error + : (failedData.error?.message ?? 'Unknown error'); + // Store structured error as JSON for deserializeRunError to parse + const errorJson = JSON.stringify({ + message: errorMessage, + stack: failedData.error?.stack, + code: failedData.errorCode, + }); + await drizzle + .update(Schema.runs) + .set({ + status: 'failed', + error: errorJson, + completedAt: now, + updatedAt: now, + }) + .where(eq(Schema.runs.runId, effectiveRunId)); + } + + // Update run status for run_cancelled events + if (runCancelledEvents.length > 0) { + await drizzle + .update(Schema.runs) + .set({ + status: 'cancelled', + completedAt: now, + updatedAt: now, + }) + .where(eq(Schema.runs.runId, effectiveRunId)); + } + + // Update run status for run_paused events + if (runPausedEvents.length > 0) { + await drizzle + .update(Schema.runs) + .set({ + status: 'paused', + updatedAt: now, + }) + .where(eq(Schema.runs.runId, effectiveRunId)); + } + + // Update run status for run_resumed events + if (runResumedEvents.length > 0) { + await drizzle + .update(Schema.runs) + .set({ + status: 'running', + updatedAt: now, + }) + .where(eq(Schema.runs.runId, effectiveRunId)); + } + + // Create step entities atomically with events + if (stepCreatedEvents.length > 0) { + const stepsToInsert = stepCreatedEvents.map((eventData) => { + const stepData = (eventData as any).eventData as { + stepName: string; + input: any; + }; + return { + runId: effectiveRunId, + stepId: eventData.correlationId!, + stepName: stepData.stepName, + input: stepData.input as SerializedContent, + status: 'pending' as const, + attempt: 0, + }; + }); + await drizzle + .insert(Schema.steps) + .values(stepsToInsert) + .onConflictDoNothing(); + } + + // Create hook entities atomically with events + if (hookCreatedEvents.length > 0) { + const hooksToInsert = hookCreatedEvents.map((eventData) => { + const hookData = (eventData as any).eventData as { + token: string; + metadata?: any; + }; + return { + runId: effectiveRunId, + hookId: eventData.correlationId!, + token: hookData.token, + metadata: hookData.metadata as SerializedContent, + ownerId: '', // TODO: get from context + projectId: '', // TODO: get from context + environment: '', // TODO: get from context + }; + }); + await drizzle + .insert(Schema.hooks) + .values(hooksToInsert) + .onConflictDoNothing(); + } + + // Insert all events in a single batch query + const eventsToInsert = data.map((eventData) => ({ + runId: effectiveRunId, + eventId: `wevt_${ulid()}`, + correlationId: eventData.correlationId, + eventType: eventData.eventType, + eventData: 'eventData' in eventData ? eventData.eventData : undefined, + })); + + const values = await drizzle + .insert(events) + .values(eventsToInsert) + .returning({ eventId: events.eventId, createdAt: events.createdAt }); + + // Combine input data with returned values + // TODO: Return actual entity data from the database after entity creation is moved here + return data.map((eventData, i) => { + const result = { ...eventData, ...values[i], runId: effectiveRunId }; + const parsed = EventSchema.parse(result); + return { event: filterEventData(parsed, resolveData) }; + }); + }, async list(params: ListEventsParams): Promise> { const limit = params?.pagination?.limit ?? 100; const sortOrder = params.pagination?.sortOrder || 'asc'; diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index adc49953e..6ffcb2641 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -126,3 +126,37 @@ export async function createWorkflowRunEvent( event: filterEventData(result.event, resolveData), }; } + +export async function createWorkflowRunEventBatch( + id: string, + data: CreateEventRequest[], + params?: CreateEventParams, + config?: APIConfig +): Promise { + if (data.length === 0) { + return []; + } + + const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; + + // TODO: Use a dedicated batch endpoint when available on the server + // For now, create events in parallel for improved performance + const results = await Promise.all( + data.map((eventData) => + makeRequest({ + endpoint: `/v1/runs/${id}/events`, + options: { + method: 'POST', + body: JSON.stringify(eventData, dateToStringReplacer), + }, + config, + schema: EventResultSchema, + }) + ) + ); + + return results.map((result: any) => ({ + ...result, + event: filterEventData(result.event, resolveData), + })); +} diff --git a/packages/world-vercel/src/storage.ts b/packages/world-vercel/src/storage.ts index da874bbf7..9d728d444 100644 --- a/packages/world-vercel/src/storage.ts +++ b/packages/world-vercel/src/storage.ts @@ -1,5 +1,9 @@ import type { Storage } from '@workflow/world'; -import { createWorkflowRunEvent, getWorkflowRunEvents } from './events.js'; +import { + createWorkflowRunEvent, + createWorkflowRunEventBatch, + getWorkflowRunEvents, +} from './events.js'; import { getHook, getHookByToken, listHooks } from './hooks.js'; import { getWorkflowRun, listWorkflowRuns } from './runs.js'; import { getStep, listWorkflowRunSteps } from './steps.js'; @@ -19,6 +23,8 @@ export function createStorage(config?: APIConfig): Storage { events: { create: (runId, data, params) => createWorkflowRunEvent(runId, data, params, config), + createBatch: (runId, data, params) => + createWorkflowRunEventBatch(runId, data, params, config), list: (params) => getWorkflowRunEvents(params, config), listByCorrelationId: (params) => getWorkflowRunEvents(params, config), }, diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index a84d5c205..964e9b80f 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -38,7 +38,7 @@ export interface Streamer { /** * Storage interface for workflow data. * - * All entity mutations (runs, steps, hooks) MUST go through events.create(). + * All entity mutations (runs, steps, hooks) MUST go through events.create() or events.createBatch(). * The World implementation atomically creates the entity when processing the corresponding event. * * Entity methods are read-only: @@ -97,7 +97,21 @@ export interface Storage { data: CreateEventRequest, params?: CreateEventParams ): Promise; - + /** + * Create multiple events in a single atomic operation. + * This is more efficient than calling create() multiple times + * and ensures all events are created atomically. + * + * @param runId - The workflow run ID + * @param data - Array of events to create + * @param params - Optional parameters for event creation + * @returns Promise resolving to the created events and affected entities + */ + createBatch( + runId: string, + data: CreateEventRequest[], + params?: CreateEventParams + ): Promise; list(params: ListEventsParams): Promise>; listByCorrelationId( params: ListEventsByCorrelationIdParams