Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions rivetkit-typescript/packages/workflow-engine/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
appendName,
emptyLocation,
isLocationPrefix,
isLoopIterationMarker,
locationToKey,
registerName,
} from "./location.js";
Expand Down Expand Up @@ -498,6 +499,52 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
this.visitedKeys.add(key);
}

/**
* Mark every surviving history entry belonging to already-completed
* iterations of a resuming loop as visited.
*
* A loop that resumes from saved state starts at its persisted iteration
* and never replays earlier iterations, so their history entries are never
* re-visited on this run. When the loop is nested inside another loop (or
* any branch that calls validateComplete), the enclosing branch validates
* its entire subtree and would otherwise reject those unvisited entries as
* a history divergence. Marking them visited here keeps the enclosing
* validation consistent with the loop's intentional skip. Entries for the
* resumed iteration and later are left alone so they are still validated as
* the loop replays forward.
*/
private markCompletedLoopIterationsVisited(
loopLocation: Location,
resumedIteration: number,
): void {
if (resumedIteration <= 0) {
return;
}

const loopSegment = loopLocation[loopLocation.length - 1];
if (typeof loopSegment !== "number") {
throw new Error("Expected loop location to end with a name index");
}

for (const [key, entry] of this.storage.history.entries) {
if (!isLocationPrefix(loopLocation, entry.location)) {
continue;
}

const iterationSegment = entry.location[loopLocation.length];
if (
!iterationSegment ||
!isLoopIterationMarker(iterationSegment) ||
iterationSegment.loop !== loopSegment ||
iterationSegment.iteration >= resumedIteration
) {
continue;
}

this.markVisited(key);
}
}

/**
* Check if a name has already been used at the current location in this execution.
* Throws HistoryDivergedError if duplicate detected.
Expand Down Expand Up @@ -1194,6 +1241,7 @@ export class WorkflowContextImpl implements WorkflowContextInterface {
entry = existing;
state = loopData.state as S;
iteration = loopData.iteration;
this.markCompletedLoopIterationsVisited(location, iteration);
if (rollbackMode) {
rollbackOutput = loopData.output as T | undefined;
rollbackIterationRan = rollbackOutput !== undefined;
Expand Down
107 changes: 107 additions & 0 deletions rivetkit-typescript/packages/workflow-engine/tests/loops.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,113 @@ for (const mode of modes) {
expect(result.output).toEqual(["a", "b", "c"]);
});

it("should resume an inner loop that suspends mid-iteration inside a parent loop", async () => {
// An inner loop whose body suspends (queue.next with no message)
// advances past iteration 0, persists its iteration, and resumes on
// the next run. The inner loop resume path starts at the saved
// iteration and never re-visits iteration 0, so the enclosing
// parent-loop branch's validateComplete() must still treat the
// already-completed inner iterations as accounted for.
const ticks: number[] = [];

const workflow = async (ctx: WorkflowContextInterface) => {
return await ctx.loop({
name: "outer",
state: { done: false },
run: async (outerCtx, outerState) => {
if (outerState.done) {
return Loop.break(ticks.length);
}

await outerCtx.loop({
name: "inner",
state: { count: 0 },
run: async (innerCtx, innerState) => {
if (innerState.count >= 2) {
return Loop.break(undefined);
}

const message = await innerCtx.queue.next<{
n: number;
}>("tick", { names: ["tick"] });

await innerCtx.step(
`record-${innerState.count}`,
async () => {
ticks.push(message.body.n);
},
);

return Loop.continue({
count: innerState.count + 1,
});
},
});

return Loop.break(ticks.length);
},
});
};

if (mode === "yield") {
await driver.messageDriver.addMessage({
id: "tick-1",
name: "tick",
data: { n: 1 },
sentAt: Date.now(),
});

// First run: inner iteration 0 consumes tick-1 and continues;
// inner iteration 1 finds no message and suspends, persisting
// the inner loop at iteration 1.
const firstRun = await runWorkflow(
"wf-1",
workflow,
undefined,
driver,
{ mode },
).result;

expect(firstRun.state).toBe("sleeping");
expect(ticks).toEqual([1]);

await driver.messageDriver.addMessage({
id: "tick-2",
name: "tick",
data: { n: 2 },
sentAt: Date.now(),
});

// Second run: replay resumes the inner loop at iteration 1
// (iteration 0 is never re-visited), then the parent branch is
// validated against the full subtree.
const secondRun = await runWorkflow(
"wf-1",
workflow,
undefined,
driver,
{ mode },
).result;

expect(secondRun.state).toBe("completed");
expect(secondRun.output).toBe(2);
expect(ticks).toEqual([1, 2]);
return;
}

const handle = runWorkflow("wf-1", workflow, undefined, driver, {
mode,
});

await handle.message("tick", { n: 1 });
await handle.message("tick", { n: 2 });

const result = await handle.result;
expect(result.state).toBe("completed");
expect(result.output).toBe(2);
expect(ticks).toEqual([1, 2]);
});

it("should resume nested joins across parent loop iterations", async () => {
const processed: string[] = [];

Expand Down
Loading