diff --git a/rivetkit-typescript/packages/workflow-engine/src/context.ts b/rivetkit-typescript/packages/workflow-engine/src/context.ts index 96cee97213..3e79389a42 100644 --- a/rivetkit-typescript/packages/workflow-engine/src/context.ts +++ b/rivetkit-typescript/packages/workflow-engine/src/context.ts @@ -27,6 +27,7 @@ import { appendName, emptyLocation, isLocationPrefix, + isLoopIterationMarker, locationToKey, registerName, } from "./location.js"; @@ -498,6 +499,52 @@ export class WorkflowContextImpl implements WorkflowContextInterface { this.visitedKeys.add(key); } + /** + * Mark every surviving history entry belonging to already-completed + * iterations of a resuming loop as visited. + * + * A loop that resumes from saved state starts at its persisted iteration + * and never replays earlier iterations, so their history entries are never + * re-visited on this run. When the loop is nested inside another loop (or + * any branch that calls validateComplete), the enclosing branch validates + * its entire subtree and would otherwise reject those unvisited entries as + * a history divergence. Marking them visited here keeps the enclosing + * validation consistent with the loop's intentional skip. Entries for the + * resumed iteration and later are left alone so they are still validated as + * the loop replays forward. + */ + private markCompletedLoopIterationsVisited( + loopLocation: Location, + resumedIteration: number, + ): void { + if (resumedIteration <= 0) { + return; + } + + const loopSegment = loopLocation[loopLocation.length - 1]; + if (typeof loopSegment !== "number") { + throw new Error("Expected loop location to end with a name index"); + } + + for (const [key, entry] of this.storage.history.entries) { + if (!isLocationPrefix(loopLocation, entry.location)) { + continue; + } + + const iterationSegment = entry.location[loopLocation.length]; + if ( + !iterationSegment || + !isLoopIterationMarker(iterationSegment) || + iterationSegment.loop !== loopSegment || + iterationSegment.iteration >= resumedIteration + ) { + continue; + } + + this.markVisited(key); + } + } + /** * Check if a name has already been used at the current location in this execution. * Throws HistoryDivergedError if duplicate detected. @@ -1194,6 +1241,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface { entry = existing; state = loopData.state as S; iteration = loopData.iteration; + this.markCompletedLoopIterationsVisited(location, iteration); if (rollbackMode) { rollbackOutput = loopData.output as T | undefined; rollbackIterationRan = rollbackOutput !== undefined; diff --git a/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts b/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts index 2a18dbebf2..e488c46ecb 100644 --- a/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts +++ b/rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts @@ -232,6 +232,113 @@ for (const mode of modes) { expect(result.output).toEqual(["a", "b", "c"]); }); + it("should resume an inner loop that suspends mid-iteration inside a parent loop", async () => { + // An inner loop whose body suspends (queue.next with no message) + // advances past iteration 0, persists its iteration, and resumes on + // the next run. The inner loop resume path starts at the saved + // iteration and never re-visits iteration 0, so the enclosing + // parent-loop branch's validateComplete() must still treat the + // already-completed inner iterations as accounted for. + const ticks: number[] = []; + + const workflow = async (ctx: WorkflowContextInterface) => { + return await ctx.loop({ + name: "outer", + state: { done: false }, + run: async (outerCtx, outerState) => { + if (outerState.done) { + return Loop.break(ticks.length); + } + + await outerCtx.loop({ + name: "inner", + state: { count: 0 }, + run: async (innerCtx, innerState) => { + if (innerState.count >= 2) { + return Loop.break(undefined); + } + + const message = await innerCtx.queue.next<{ + n: number; + }>("tick", { names: ["tick"] }); + + await innerCtx.step( + `record-${innerState.count}`, + async () => { + ticks.push(message.body.n); + }, + ); + + return Loop.continue({ + count: innerState.count + 1, + }); + }, + }); + + return Loop.break(ticks.length); + }, + }); + }; + + if (mode === "yield") { + await driver.messageDriver.addMessage({ + id: "tick-1", + name: "tick", + data: { n: 1 }, + sentAt: Date.now(), + }); + + // First run: inner iteration 0 consumes tick-1 and continues; + // inner iteration 1 finds no message and suspends, persisting + // the inner loop at iteration 1. + const firstRun = await runWorkflow( + "wf-1", + workflow, + undefined, + driver, + { mode }, + ).result; + + expect(firstRun.state).toBe("sleeping"); + expect(ticks).toEqual([1]); + + await driver.messageDriver.addMessage({ + id: "tick-2", + name: "tick", + data: { n: 2 }, + sentAt: Date.now(), + }); + + // Second run: replay resumes the inner loop at iteration 1 + // (iteration 0 is never re-visited), then the parent branch is + // validated against the full subtree. + const secondRun = await runWorkflow( + "wf-1", + workflow, + undefined, + driver, + { mode }, + ).result; + + expect(secondRun.state).toBe("completed"); + expect(secondRun.output).toBe(2); + expect(ticks).toEqual([1, 2]); + return; + } + + const handle = runWorkflow("wf-1", workflow, undefined, driver, { + mode, + }); + + await handle.message("tick", { n: 1 }); + await handle.message("tick", { n: 2 }); + + const result = await handle.result; + expect(result.state).toBe("completed"); + expect(result.output).toBe(2); + expect(ticks).toEqual([1, 2]); + }); + it("should resume nested joins across parent loop iterations", async () => { const processed: string[] = [];