Skip to content

Commit 6472eb6

Browse files
pranaygpclaude
andcommitted
fix: fix postgres tests and parallelize suspension handler
- Fix world-postgres tests to use EventResult.event instead of direct access - Fix test expectations for event counts (include run_created from createRun) - Parallelize hook creation, step creation+queueing, and wait creation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3c27e56 commit 6472eb6

File tree

2 files changed

+169
-154
lines changed

2 files changed

+169
-154
lines changed

packages/core/src/runtime/suspension-handler.ts

Lines changed: 112 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -72,119 +72,130 @@ export async function handleSuspension({
7272
});
7373

7474
// Process hooks first to prevent race conditions with webhook receivers
75-
for (const hookEvent of hookEvents) {
76-
try {
77-
await world.events.create(runId, hookEvent);
78-
} catch (err) {
79-
if (WorkflowAPIError.is(err)) {
80-
if (err.status === 409) {
81-
console.warn(`Hook already exists, continuing: ${err.message}`);
82-
} else if (err.status === 410) {
83-
console.warn(
84-
`Workflow run "${runId}" has already completed, skipping hook: ${err.message}`
85-
);
86-
} else {
87-
throw err;
75+
// All hook creations run in parallel
76+
if (hookEvents.length > 0) {
77+
await Promise.all(
78+
hookEvents.map(async (hookEvent) => {
79+
try {
80+
await world.events.create(runId, hookEvent);
81+
} catch (err) {
82+
if (WorkflowAPIError.is(err)) {
83+
if (err.status === 409) {
84+
console.warn(`Hook already exists, continuing: ${err.message}`);
85+
} else if (err.status === 410) {
86+
console.warn(
87+
`Workflow run "${runId}" has already completed, skipping hook: ${err.message}`
88+
);
89+
} else {
90+
throw err;
91+
}
92+
} else {
93+
throw err;
94+
}
8895
}
89-
} else {
90-
throw err;
91-
}
92-
}
96+
})
97+
);
9398
}
9499

95-
// Build step_created events only for steps that haven't been created yet
96-
// Steps with hasCreatedEvent=true already have their event in the log
97-
const stepsNeedingCreation = stepItems.filter(
98-
(queueItem) => !queueItem.hasCreatedEvent
99-
);
100-
const stepEvents: CreateEventRequest[] = stepsNeedingCreation.map(
101-
(queueItem) => {
102-
const dehydratedInput = dehydrateStepArguments(
103-
{
104-
args: queueItem.args,
105-
closureVars: queueItem.closureVars,
106-
},
107-
suspension.globalThis
108-
);
109-
return {
110-
eventType: 'step_created' as const,
111-
correlationId: queueItem.correlationId,
112-
eventData: {
113-
stepName: queueItem.stepName,
114-
input: dehydratedInput as Serializable,
115-
},
116-
};
117-
}
100+
// Build a map of stepId -> step event for steps that need creation
101+
const stepsNeedingCreation = new Set(
102+
stepItems
103+
.filter((queueItem) => !queueItem.hasCreatedEvent)
104+
.map((queueItem) => queueItem.correlationId)
118105
);
119106

120-
// Build wait_created events (only for waits that haven't been created yet)
121-
const waitEvents: CreateEventRequest[] = waitItems
122-
.filter((queueItem) => !queueItem.hasCreatedEvent)
123-
.map((queueItem) => ({
124-
eventType: 'wait_created' as const,
125-
correlationId: queueItem.correlationId,
126-
eventData: {
127-
resumeAt: queueItem.resumeAt,
128-
},
129-
}));
130-
131-
// Process steps and waits
107+
// Process steps and waits in parallel
108+
// Each step: create event (if needed) -> queue message
109+
// Each wait: create event (if needed)
132110
const ops: Promise<void>[] = [];
133111

134-
// Create step events (World creates step entities atomically)
135-
// Only for steps that don't already have a step_created event
136-
for (const stepEvent of stepEvents) {
137-
try {
138-
await world.events.create(runId, stepEvent);
139-
} catch (err) {
140-
if (WorkflowAPIError.is(err) && err.status === 409) {
141-
console.warn(`Step already exists, continuing: ${err.message}`);
142-
} else {
143-
throw err;
144-
}
145-
}
112+
// Steps: create event then queue message, all in parallel
113+
for (const queueItem of stepItems) {
114+
ops.push(
115+
(async () => {
116+
// Create step event if not already created
117+
if (stepsNeedingCreation.has(queueItem.correlationId)) {
118+
const dehydratedInput = dehydrateStepArguments(
119+
{
120+
args: queueItem.args,
121+
closureVars: queueItem.closureVars,
122+
},
123+
suspension.globalThis
124+
);
125+
const stepEvent: CreateEventRequest = {
126+
eventType: 'step_created' as const,
127+
correlationId: queueItem.correlationId,
128+
eventData: {
129+
stepName: queueItem.stepName,
130+
input: dehydratedInput as Serializable,
131+
},
132+
};
133+
try {
134+
await world.events.create(runId, stepEvent);
135+
} catch (err) {
136+
if (WorkflowAPIError.is(err) && err.status === 409) {
137+
console.warn(`Step already exists, continuing: ${err.message}`);
138+
} else {
139+
throw err;
140+
}
141+
}
142+
}
143+
144+
// Queue step execution message
145+
await queueMessage(
146+
world,
147+
`__wkf_step_${queueItem.stepName}`,
148+
{
149+
workflowName,
150+
workflowRunId: runId,
151+
workflowStartedAt,
152+
stepId: queueItem.correlationId,
153+
traceCarrier: await serializeTraceCarrier(),
154+
requestedAt: new Date(),
155+
},
156+
{
157+
idempotencyKey: queueItem.correlationId,
158+
}
159+
);
160+
})()
161+
);
146162
}
147163

148-
// Create wait events
149-
for (const waitEvent of waitEvents) {
150-
try {
151-
await world.events.create(runId, waitEvent);
152-
} catch (err) {
153-
if (WorkflowAPIError.is(err) && err.status === 409) {
154-
console.warn(`Wait already exists, continuing: ${err.message}`);
155-
} else {
156-
throw err;
157-
}
164+
// Waits: create events in parallel (no queueing needed for waits)
165+
for (const queueItem of waitItems) {
166+
if (!queueItem.hasCreatedEvent) {
167+
ops.push(
168+
(async () => {
169+
const waitEvent: CreateEventRequest = {
170+
eventType: 'wait_created' as const,
171+
correlationId: queueItem.correlationId,
172+
eventData: {
173+
resumeAt: queueItem.resumeAt,
174+
},
175+
};
176+
try {
177+
await world.events.create(runId, waitEvent);
178+
} catch (err) {
179+
if (WorkflowAPIError.is(err) && err.status === 409) {
180+
console.warn(`Wait already exists, continuing: ${err.message}`);
181+
} else {
182+
throw err;
183+
}
184+
}
185+
})()
186+
);
158187
}
159188
}
160189

161-
// Queue step execution messages for ALL pending steps
162-
// (both newly created and those with existing step_created events)
163-
for (const queueItem of stepItems) {
164-
waitUntil(
165-
Promise.all(ops).catch((opErr) => {
166-
const isAbortError =
167-
opErr?.name === 'AbortError' || opErr?.name === 'ResponseAborted';
168-
if (!isAbortError) throw opErr;
169-
})
170-
);
171-
172-
await queueMessage(
173-
world,
174-
`__wkf_step_${queueItem.stepName}`,
175-
{
176-
workflowName,
177-
workflowRunId: runId,
178-
workflowStartedAt,
179-
stepId: queueItem.correlationId,
180-
traceCarrier: await serializeTraceCarrier(),
181-
requestedAt: new Date(),
182-
},
183-
{
184-
idempotencyKey: queueItem.correlationId,
185-
}
186-
);
187-
}
190+
// Wait for all step and wait operations to complete
191+
waitUntil(
192+
Promise.all(ops).catch((opErr) => {
193+
const isAbortError =
194+
opErr?.name === 'AbortError' || opErr?.name === 'ResponseAborted';
195+
if (!isAbortError) throw opErr;
196+
})
197+
);
198+
await Promise.all(ops);
188199

189200
// Calculate minimum timeout from waits
190201
const now = Date.now();

0 commit comments

Comments
 (0)