Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions apps/mobile/src/lib/threadActivity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,70 @@ function resolvePendingUserInputAnswer(
return normalizeDraftAnswer(draft?.selectedOptionLabel);
}

/**
* Task ids owned by a workflow run. Mirrors the desktop derivation in
* apps/web/src/workflow-logic.ts (collectWorkflowTaskIds) — mobile has no
* workflow card yet, so all rows for those tasks are suppressed rather than
* rendered as per-tick noise.
*/
function collectWorkflowTaskIds(
activities: ReadonlyArray<OrchestrationThreadActivity>,
): Set<string> {
const taskIds = new Set<string>();
for (const activity of activities) {
const payload = activity.payload as Record<string, unknown> | null | undefined;
const taskId =
payload && typeof payload === "object" && typeof payload["taskId"] === "string"
? payload["taskId"]
: undefined;
if (!taskId) continue;
const workflowName =
payload && typeof payload === "object" ? payload["workflowName"] : undefined;
const taskType = payload && typeof payload === "object" ? payload["taskType"] : undefined;
if (
activity.kind === "task.workflow-updated" ||
activity.kind === "task.workflow-meta" ||
(activity.kind === "task.started" &&
(taskType === "local_workflow" || typeof workflowName === "string"))
) {
taskIds.add(taskId);
}
}
return taskIds;
}

function activityBelongsToWorkflow(
activity: OrchestrationThreadActivity,
workflowTaskIds: ReadonlySet<string>,
): boolean {
const payload = activity.payload as Record<string, unknown> | null | undefined;
const taskId = payload && typeof payload === "object" ? payload["taskId"] : undefined;
return typeof taskId === "string" && workflowTaskIds.has(taskId);
}

function deriveWorkLogEntries(
activities: ReadonlyArray<OrchestrationThreadActivity>,
): DerivedWorkLogEntry[] {
const ordered = Arr.sort(activities, activityOrder);
const workflowTaskIds = collectWorkflowTaskIds(activities);
const entries: DerivedWorkLogEntry[] = [];
for (const activity of ordered) {
if (activity.kind === "tool.started") continue;
if (activity.kind === "task.started") continue;
// Workflow snapshot/meta activities back the desktop workflow card; on
// mobile they would render as ever-mutating raw rows, so skip them —
// along with the per-tick progress rows the workflow owns. task.completed
// stays: with no workflow card here it is mobile's only signal that a
// workflow finished, failed, or was stopped.
if (activity.kind === "task.workflow-updated") continue;
if (activity.kind === "task.workflow-meta") continue;
Comment thread
cursor[bot] marked this conversation as resolved.
if (
activity.kind === "task.progress" &&
workflowTaskIds.size > 0 &&
activityBelongsToWorkflow(activity, workflowTaskIds)
) {
continue;
}
Comment thread
cursor[bot] marked this conversation as resolved.
if (activity.kind === "context-window.updated") continue;
if (activity.summary === "Checkpoint captured") continue;
if (isPlanBoundaryToolActivity(activity)) continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ function createProviderServiceHarness(
startSession: () => unsupported(),
sendTurn: () => unsupported(),
interruptTurn: () => unsupported(),
stopTask: () => unsupported(),
respondToRequest: () => unsupported(),
respondToUserInput: () => unsupported(),
stopSession: () => unsupported(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ describe("ProviderCommandReactor", () => {
}
}),
);
const stopTask = vi.fn<ProviderServiceShape["stopTask"]>(() => Effect.void);
const renameBranch = vi.fn((input: unknown) =>
Effect.succeed({
branch:
Expand Down Expand Up @@ -301,6 +302,7 @@ describe("ProviderCommandReactor", () => {
respondToRequest: respondToRequest as ProviderServiceShape["respondToRequest"],
respondToUserInput: respondToUserInput as ProviderServiceShape["respondToUserInput"],
stopSession: stopSession as ProviderServiceShape["stopSession"],
stopTask: stopTask as ProviderServiceShape["stopTask"],
listSessions: () => Effect.succeed(runtimeSessions),
getCapabilities: (_provider) =>
Effect.succeed({
Expand Down Expand Up @@ -417,6 +419,7 @@ describe("ProviderCommandReactor", () => {
respondToRequest,
respondToUserInput,
stopSession,
stopTask,
renameBranch,
refreshStatus,
generateBranchName,
Expand Down Expand Up @@ -2094,4 +2097,80 @@ describe("ProviderCommandReactor", () => {
expect(thread?.session?.providerInstanceId).toBe(ProviderInstanceId.make("codex_work"));
expect(thread?.session?.activeTurnId).toBeNull();
});

effectIt.effect(
"reacts to thread.task.stop by stopping the background task on the active session",
() =>
Effect.gen(function* () {
const harness = yield* Effect.promise(() => createHarness());
const now = "2026-01-01T00:00:00.000Z";

yield* harness.engine.dispatch({
type: "thread.session.set",
commandId: CommandId.make("cmd-session-set-for-task-stop"),
threadId: ThreadId.make("thread-1"),
session: {
threadId: ThreadId.make("thread-1"),
status: "running",
providerName: "claudeAgent",
runtimeMode: "approval-required",
activeTurnId: asTurnId("turn-1"),
lastError: null,
updatedAt: now,
},
createdAt: now,
});

yield* harness.engine.dispatch({
type: "thread.task.stop",
commandId: CommandId.make("cmd-task-stop"),
threadId: ThreadId.make("thread-1"),
taskId: "task-9",
createdAt: now,
});

yield* Effect.promise(() => waitFor(() => harness.stopTask.mock.calls.length === 1));
expect(harness.stopTask.mock.calls[0]?.[0]).toEqual({
threadId: "thread-1",
taskId: "task-9",
});
}),
);

effectIt.effect("appends a task-stop failure activity when no active session is bound", () =>
Effect.gen(function* () {
const harness = yield* Effect.promise(() => createHarness());
const now = "2026-01-01T00:00:00.000Z";

yield* harness.engine.dispatch({
type: "thread.task.stop",
commandId: CommandId.make("cmd-task-stop-no-session"),
threadId: ThreadId.make("thread-1"),
taskId: "task-9",
createdAt: now,
});

yield* Effect.promise(() =>
waitFor(async () => {
const readModel = await harness.readModel();
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
return (
thread?.activities.some((activity) => activity.kind === "provider.task.stop.failed") ??
false
);
}),
);

expect(harness.stopTask).not.toHaveBeenCalled();
const readModel = yield* Effect.promise(() => harness.readModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
expect(
thread?.activities.find((activity) => activity.kind === "provider.task.stop.failed"),
).toMatchObject({
payload: {
detail: expect.stringContaining("No active provider session"),
},
});
}),
);
});
48 changes: 46 additions & 2 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type ProviderIntentEvent = Extract<
| "thread.turn-interrupt-requested"
| "thread.approval-response-requested"
| "thread.user-input-response-requested"
| "thread.session-stop-requested";
| "thread.session-stop-requested"
| "thread.task-stop-requested";
}
>;

Expand Down Expand Up @@ -219,6 +220,7 @@ const make = Effect.gen(function* () {
readonly kind:
| "provider.turn.start.failed"
| "provider.turn.interrupt.failed"
| "provider.task.stop.failed"
| "provider.approval.respond.failed"
| "provider.user-input.respond.failed"
| "provider.session.stop.failed";
Expand Down Expand Up @@ -1002,6 +1004,44 @@ const make = Effect.gen(function* () {
});
});

const processTaskStopRequested = Effect.fn("processTaskStopRequested")(function* (
event: Extract<ProviderIntentEvent, { type: "thread.task-stop-requested" }>,
) {
const thread = yield* resolveThread(event.payload.threadId);
if (!thread) {
return;
}
const hasSession = thread.session && thread.session.status !== "stopped";
if (!hasSession) {
return yield* appendProviderFailureActivity({
threadId: event.payload.threadId,
kind: "provider.task.stop.failed",
summary: "Background task stop failed",
detail: "No active provider session is bound to this thread.",
turnId: null,
createdAt: event.payload.createdAt,
});
}

yield* providerService
.stopTask({
threadId: event.payload.threadId,
taskId: event.payload.taskId,
})
.pipe(
Effect.catchCause((cause) =>
appendProviderFailureActivity({
threadId: event.payload.threadId,
kind: "provider.task.stop.failed",
summary: "Background task stop failed",
detail: Cause.pretty(cause),
turnId: null,
createdAt: event.payload.createdAt,
}),
),
);
});

const processDomainEvent = Effect.fn("processDomainEvent")(function* (
event: ProviderIntentEvent,
) {
Expand Down Expand Up @@ -1042,6 +1082,9 @@ const make = Effect.gen(function* () {
case "thread.session-stop-requested":
yield* processSessionStopRequested(event);
return;
case "thread.task-stop-requested":
yield* processTaskStopRequested(event);
return;
}
});

Expand All @@ -1068,7 +1111,8 @@ const make = Effect.gen(function* () {
event.type === "thread.turn-interrupt-requested" ||
event.type === "thread.approval-response-requested" ||
event.type === "thread.user-input-response-requested" ||
event.type === "thread.session-stop-requested"
event.type === "thread.session-stop-requested" ||
event.type === "thread.task-stop-requested"
) {
return yield* worker.enqueue(event);
}
Expand Down
Loading
Loading