Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ab1d227
Persist post-turn provider wakeups and backgrounded task completions
juliusmarminge Jul 3, 2026
e55af4a
Add orchestration v2 audit remediation plan and findings
juliusmarminge Jul 3, 2026
a195a63
Fail Claude runs on is_error results and resume idle-released sessions
juliusmarminge Jul 3, 2026
e13be35
Update audit plan: tick items 1 and 15, add sections 15/16
juliusmarminge Jul 3, 2026
37c59fd
Preserve nested failure causes in projected provider errors
juliusmarminge Jul 3, 2026
cd0178e
Tick audit plan item 2
juliusmarminge Jul 3, 2026
24f3a78
Persist cursor run correlation data on turn failures
juliusmarminge Jul 3, 2026
2ff9384
Tick audit plan item 3
juliusmarminge Jul 3, 2026
7e9eede
Keep output/error on opencode file_search items
juliusmarminge Jul 3, 2026
5ab65ac
Tick audit plan item 12
juliusmarminge Jul 3, 2026
59eaa56
Add claude_text_segments regression fixture
juliusmarminge Jul 3, 2026
6c17b5d
Tick audit plan item 11
juliusmarminge Jul 3, 2026
16ae867
Surface runtime-reconcile cancellations as visible turn items
juliusmarminge Jul 3, 2026
85b00e7
Tick audit plan item 5
juliusmarminge Jul 3, 2026
f326bdc
Write runner.error frames to native provider logs on SDK failures
juliusmarminge Jul 3, 2026
d1e25de
Tick audit plan item 4
juliusmarminge Jul 3, 2026
260b3a8
Route shared codex app-server logs to the owning app thread
juliusmarminge Jul 3, 2026
4f82406
Tick audit plan item 9
juliusmarminge Jul 3, 2026
eb7452d
Coalesce streamed ACP subagent assistant deltas before persistence
juliusmarminge Jul 3, 2026
dd30e28
Tick audit plan item 10
juliusmarminge Jul 3, 2026
b1a0017
Address review findings on wakeup lifecycle and ACP coalescer
juliusmarminge Jul 3, 2026
2a8052c
Park coalesced wakeups as follow-ups instead of dropping them
juliusmarminge Jul 3, 2026
0e0df04
Clear wakeup in-flight marker on drain interruption
juliusmarminge Jul 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
958 changes: 958 additions & 0 deletions .plans/22-orchestration-v2-audit-findings.json

Large diffs are not rendered by default.

628 changes: 628 additions & 0 deletions .plans/22-orchestration-v2-audit-remediation.md

Large diffs are not rendered by default.

82 changes: 71 additions & 11 deletions apps/server/src/orchestration-v2/Adapters/AcpAdapterV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { modelSelectionsEqual } from "@t3tools/shared/model";
import * as Cause from "effect/Cause";
import * as DateTime from "effect/DateTime";
import * as Deferred from "effect/Deferred";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as FileSystem from "effect/FileSystem";
import * as Option from "effect/Option";
Expand Down Expand Up @@ -84,6 +85,13 @@ import {

export const ACP_PROTOCOL = "acp.ndjson-jsonrpc" as const;

/**
* Window for coalescing streamed subagent assistant deltas into one persisted
* snapshot. Matches the codex agent-message coalescer cadence; the final text
* is always flushed on task completion regardless of this interval.
*/
const SUBAGENT_STREAM_FLUSH_INTERVAL_MS = 100;

export interface AcpAdapterV2RuntimeInput {
readonly cwd: string;
readonly mcpServers: ReadonlyArray<EffectAcpSchema.McpServer>;
Expand Down Expand Up @@ -576,6 +584,12 @@ interface ActiveAcpSubagent {
childSessionId: string | null;
assistantText: string;
nextChildOrdinal: number;
// Streaming-emit throttle: ACP streams the subagent result per token, so a
// full-row event pair per chunk amplified one 6KB result into ~2700 stored
// events (audit plan #10). We coalesce intermediate emits and always flush
// the final text.
streamFlushScheduled: boolean;
streamPendingText: boolean;
}

type PendingRuntimeRequest = {
Expand Down Expand Up @@ -842,12 +856,10 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV
yield* emitTextSegment(context, kind, false);
});

const emitSubagentAssistant = Effect.fnUntraced(function* (
const emitSubagentAssistantSnapshot = Effect.fnUntraced(function* (
subagent: ActiveAcpSubagent,
text: string,
) {
if (text.length === 0) return;
subagent.assistantText += text;
if (subagent.assistantText.length === 0) return;
const now = yield* DateTime.now;
const nativeItemId = `${subagent.task.nativeTaskRef?.nativeId ?? subagent.task.id}:result`;
const artifacts = makeSubagentConversationArtifacts({
Expand All @@ -871,13 +883,48 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV
});
});

// Streaming append: accumulate and emit at most once per flush window.
const streamSubagentAssistant = Effect.fnUntraced(function* (
subagent: ActiveAcpSubagent,
text: string,
) {
if (text.length === 0) return;
subagent.assistantText += text;
subagent.streamPendingText = true;
if (subagent.streamFlushScheduled) return;
subagent.streamFlushScheduled = true;
yield* Effect.sleep(Duration.millis(SUBAGENT_STREAM_FLUSH_INTERVAL_MS)).pipe(
Effect.andThen(
Effect.suspend(() => {
subagent.streamFlushScheduled = false;
if (!subagent.streamPendingText) return Effect.void;
subagent.streamPendingText = false;
return emitSubagentAssistantSnapshot(subagent);
}),
),
Effect.forkIn(sessionScope),
);
});

// Terminal/one-shot emit: always persists the final text immediately.
const flushSubagentAssistant = Effect.fnUntraced(function* (
subagent: ActiveAcpSubagent,
finalText?: string,
) {
if (finalText !== undefined && finalText.length > 0) {
subagent.assistantText += finalText;
}
subagent.streamPendingText = false;
yield* emitSubagentAssistantSnapshot(subagent);
Comment thread
juliusmarminge marked this conversation as resolved.
Comment thread
juliusmarminge marked this conversation as resolved.
});

const projectSubagentNotification = Effect.fnUntraced(function* (
subagent: ActiveAcpSubagent,
notification: EffectAcpSchema.SessionNotification,
) {
const update = notification.update;
if (update.sessionUpdate === "agent_message_chunk" && update.content.type === "text") {
yield* emitSubagentAssistant(subagent, update.content.text);
yield* streamSubagentAssistant(subagent, update.content.text);
}
});

Expand Down Expand Up @@ -952,6 +999,8 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV
childSessionId: null,
assistantText: "",
nextChildOrdinal: 101,
streamFlushScheduled: false,
streamPendingText: false,
};
subagent.task = task;
context.subagents.set(update.nativeTaskId, subagent);
Expand Down Expand Up @@ -1044,12 +1093,15 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV
);
}

if (
taskStatus !== "running" &&
subagent.assistantText.length === 0 &&
update.result !== null
) {
yield* emitSubagentAssistant(subagent, update.result);
if (taskStatus !== "running") {
// Terminal: flush the final text immediately (adopting the
// one-shot result when nothing streamed) instead of leaving the
// last throttled snapshot possibly unemitted.
if (subagent.assistantText.length === 0 && update.result !== null) {
yield* flushSubagentAssistant(subagent, update.result);
} else if (subagent.streamPendingText) {
yield* flushSubagentAssistant(subagent);
}
Comment thread
juliusmarminge marked this conversation as resolved.
}
const result = subagent.assistantText || update.result;
subagent.task = {
Expand Down Expand Up @@ -2002,6 +2054,14 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV
if (context.finalized) return;
context.finalized = true;
yield* closeTextStreams(context);
// Persist any throttled subagent text now: after the turn ends the
// run fiber that routes child-thread events may be gone, so waiting
// for the coalescer's timer flush could drop the tail of a stream.
for (const subagent of context.subagents.values()) {
if (subagent.streamPendingText) {
yield* flushSubagentAssistant(subagent);
}
}
const now = yield* DateTime.now;
const turn = providerTurnPayload(context, status, now);
yield* Ref.update(providerTurns, (current) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ function isClaudeSdkReplayMessage(frame: unknown): frame is SDKMessage {
type === "user" ||
type === "result" ||
type === "system" ||
// Partial-assistant stream events are ignored by the adapter's message
// pipeline but announce provider-initiated turn wakeups, matching the
// recorded transcripts where message_start precedes the first complete
// assistant message by several seconds.
type === "stream_event" ||
type === "rate_limit_event"
);
}
Expand Down
Loading
Loading