Skip to content

feat(worker/acp): implement ACP worker adapter (#569)#573

Open
aaronwong1989 wants to merge 8 commits into
hrygo:mainfrom
aaronwong1989:feat/acp-worker-569
Open

feat(worker/acp): implement ACP worker adapter (#569)#573
aaronwong1989 wants to merge 8 commits into
hrygo:mainfrom
aaronwong1989:feat/acp-worker-569

Conversation

@aaronwong1989
Copy link
Copy Markdown
Contributor

Summary

Implements the full ACP (Agent Client Protocol) Worker for HotPlex based on the design spec, enabling any ACP-compatible agent (e.g. Hermes) to run as a first-class worker alongside Claude Code and OpenCode Server.

Closes #569

Architecture

internal/worker/acp/
├── codec.go       (118 lines) — JSON-RPC 2.0 NDJSON codec
├── client.go      (438 lines) — ACP client lifecycle + read loop
├── mapper.go      (512 lines) — ACP ↔ AEP bidirectional mapping
├── conn.go         (67 lines) — Custom acpConn (non-AEP stdin)
├── worker.go      (375 lines) — Worker adapter + registration
├── codec_test.go  (141 lines) — 7 codec tests
└── mapper_test.go (390 lines) — 17 mapper tests

Key Design Decisions

Decision Choice Rationale
Connection model Custom acpConn (not base.Conn) ACP uses JSON-RPC via client.Prompt, not AEP NDJSON on stdin
Permission mapping ACP multi-option → AEP boolean sync.Map stores optionIds for round-trip fidelity
Session restore client.LoadSession with fallback Load → New fallback chain for robustness
Backpressure TrySend() non-blocking on recvCh Delta events droppable, state/done/error preserved

AEP Protocol Extensions

  • New event kinds: tool_update, plan, mode_update
  • Extended ToolCallData: title, kind, locations (all omitempty)
  • Extended ToolResultData: status, diff (omitempty)
  • New data types: ToolUpdateData, PlanData, PlanItem, ModeUpdateData
  • Zero breaking change — existing workers omit new fields

P2 Features Included

set_model, set_mode, fork, list_sessions, auto_approve, usage stats forwarding

Acceptance Criteria Coverage

AC Status
AC-ACP-001: JSON-RPC codec ✅ 7 tests
AC-ACP-002: Client lifecycle ✅ Initialize/NewSession/Load/Prompt/Cancel
AC-ACP-003: Session restore ✅ LoadSession fallback to NewSession
AC-ACP-004: Permission bridging ✅ Multi-option → boolean + round-trip
AC-ACP-005: Auto-approve ✅ Config-driven, atomic bool
AC-ACP-006: Usage stats ✅ PromptUsage → DoneData.Stats
AC-ACP-007: Streaming events ✅ 11 notification types mapped
AC-ACP-008: Tool events ✅ Call/Update/Result with ACP extensions
AC-ACP-009: Worker adapter ✅ init() + Register + all interfaces
AC-ACP-010: Set model/mode ✅ Client methods
AC-ACP-011: Fork/list ✅ Client methods
AC-ACP-012: Config hot-reload ✅ cfgStore.RegisterFunc

Test Results

  • 24 tests all passing with -race flag
  • make check (fmt + lint + vet + test + build) green
  • Pre-commit + pre-push hooks all passing

Files Changed

File Change
pkg/events/events.go +3 Kind constants, +5 data types, +2 shared structs
internal/worker/worker.go TypeACPXTypeACP
internal/config/config.go +ACPConfig struct + ACP field
cmd/hotplex/gateway_run.go +ACP import + InitConfig + hot-reload
internal/worker/acp/* New package (7 files)
*_test.go (4 files) TypeACPXTypeACP

🤖 Generated with Claude Code

hrygo#569)

Implements the full ACP Worker for HotPlex based on the design spec,
enabling any ACP-compatible agent (e.g. Hermes) to run as a first-class
worker alongside Claude Code and OpenCode Server.

Core components:
- codec.go: JSON-RPC 2.0 NDJSON read/write with typed dispatch
- client.go: ACP client lifecycle (initialize/new/load/resume/prompt/cancel)
- mapper.go: ACP↔AEP bidirectional mapping (11 notification types)
- conn.go: Custom acpConn (non-AEP stdin, uses client.Prompt instead)
- worker.go: Worker adapter with init() registration, permission bridging

AEP protocol extensions:
- New event kinds: ToolUpdate, Plan, ModeUpdate
- Extended ToolCallData with Title/Kind/Locations (omitempty)
- Extended ToolResultData with Status/Diff (omitempty)

P2 features included: set_model, set_mode, fork, list_sessions,
auto_approve, usage stats forwarding.

Acceptance criteria: AC-ACP-001 through AC-ACP-012 covered.
24 tests passing with -race flag. Full CI (make check) green.

Closes hrygo#569

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread internal/worker/acp/worker.go Outdated
// ─── Start ───────────────────────────────────────────────────────────────────

func (w *Worker) Start(ctx context.Context, session worker.SessionInfo) error {
w.mu.Lock()
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Lock-while-IO: w.mu.Lock() + defer w.mu.Unlock() 横跨 Proc.Start(fork 进程)、client.Initialize(30s)、client.NewSession/LoadSession(30s),阻塞 Conn()/GetWorkerSessionID() 长达 90+ 秒。

ClaudeCode worker 在 readOutput 中先 copy readLineFn 再 unlock 后才进入 read loop。建议:锁内赋值字段 → 释放锁 → 执行 I/O。

Comment thread internal/worker/acp/worker.go Outdated

result, promptErr := w.client.Prompt(pctx, w.GetWorkerSessionID(), content)
if promptErr != nil {
if _, ok := errors.AsType[*JSONRPCError](promptErr); ok {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] DRY 违反: 两个分支执行完全相同的 MapPromptError + TrySend 循环(L229-232 vs L235-238),仅 return 值不同。建议合并:

envs := w.mapper.MapPromptError(promptErr)
for _, env := range envs {
    w.conn.TrySend(env)
}
if _, ok := errors.AsType[*JSONRPCError](promptErr); ok {
    return nil
}
return fmt.Errorf("acp: prompt: %w", promptErr)

Comment thread internal/worker/acp/client.go Outdated
// ─── Env blocklist ───────────────────────────────────────────────────────────

// BuildEnv constructs the environment for an ACP agent process.
func BuildEnv(sessionEnv map[string]string, configEnv []string, blocklist []string) []string {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] 安全缺口: 自定义 BuildEnv 绕过了 base.BuildEnv(7 个安全阶段)。缺失:

  1. HOTPLEX_WORKER_ prefix stripping
  2. HOTPLEX_SESSION_ID / HOTPLEX_WORKER_TYPE 注入
  3. security.StripNestedAgent 调用
  4. blocklist 未应用于 session.Env(可绕过:session.Env["HOTPLEX_API_KEY"] 会直接注入)

ClaudeCode/CodexCLI 均使用 base.BuildEnv。建议:删除此函数,改用 base.BuildEnv(session, blocklist, "acp")

}

// TrySend enqueues an envelope from readLoop (non-blocking, backpressure-aware).
func (c *acpConn) TrySend(env *events.Envelope) bool {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Backpressure 策略不符: CLAUDE.md 要求"drop message.delta, keep state/done/error",但 TrySend 对所有 envelope type 一视同仁。256 buffer 满时 state/done/error 也会被丢弃,可能导致 session 卡死。建议按 event type 分级处理。

Comment thread internal/worker/acp/worker.go Outdated
outcome = pm.FormatDeniedOutcome()
}

return w.client.RespondPermission(context.Background(), pm.RequestID, outcome)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Context 传播违规: golang.md 禁止请求处理路径使用 context.Background()HandlePermissionResponsehandleServerRequest(L364)中 auto-approve 路径均使用。应传递 worker lifecycle context 以支持优雅关闭。

Comment thread internal/worker/acp/mapper.go Outdated
Text string `json:"text"`
}

func (m *ACPMapper) mapAgentMessageChunk(raw json.RawMessage) []*events.Envelope {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] DRY: mapAgentMessageChunkmapAgentThoughtChunk(L190)共享完全相同的内容提取逻辑,仅输出事件类型不同。建议提取 extractTextContent helper。

}

// NewSession creates a new ACP session.
func (c *ACPClient) NewSession(ctx context.Context, cwd string, mcpServers any) (*SessionResult, error) {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] DRY: NewSessionLoadSession(L95)、ResumeSession(L115)结构相同(build params → call → unmarshal),仅 method name 和 params 不同。建议提取 callSessionMethod(method, params) helper。

Copy link
Copy Markdown
Owner

@hotplex-ai hotplex-ai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — hotplex-ai

Verdict: REQUEST_CHANGES | P0:0 P1:3 P2:5 P3:0

43 raw findings → 25 passed confidence ≥75 → 8 survived §4 filtering (dropped: pre-existing patterns, missing-tests通用, 大规模SOLID重构, 非热路径性能, 风格偏好).


P1 — 必须修复

P1-1: Lock-while-IO in Worker.Start (worker.go:113)
w.mu.Lock() + defer w.mu.Unlock() 横跨 Proc.Start(fork 进程)、client.Initialize(30s 网络超时)、client.NewSession/LoadSession(再 30s),阻塞任何试图获取 w.mu 的 goroutine(Conn(), SetWorkerSessionID(), GetWorkerSessionID())长达 90+ 秒。ClaudeCode worker 在 readOutput 中先 copy readLineFn 再 unlock 后才进入 read loop。建议:先在锁内赋值字段 → 释放锁 → 再执行 I/O。

P1-2: Input() error handling 重复代码 (worker.go:228-235)
JSONRPCError 分支(L229-232)和非 JSONRPCError 分支(L235-238)执行完全相同的 MapPromptError + TrySend 循环,唯一区别是 return nil vs wrapped error。应合并为一次 Map+Send,再根据错误类型决定返回值。

P1-3: 自定义 BuildEnv 绕过安全层 (client.go:403)
ACP worker 定义了自己的 BuildEnv 而非复用 base.BuildEnv(base/env.go:52,含 7 个安全阶段)。缺失:(1) HOTPLEX_WORKER_ prefix stripping,(2) HOTPLEX_SESSION_ID / HOTPLEX_WORKER_TYPE 注入,(3) security.StripNestedAgent 调用,(4) blocklist 未应用于 session.Env / ConfigEnv(可绕过)。ClaudeCode 和 CodexCLI 均使用 base.BuildEnv。建议:统一调用 base.BuildEnv(session, blocklist, "acp")


P2 — 建议修复

P2-1: TrySend 无差别丢弃所有事件类型 (conn.go:45)
Backpressure 策略要求"drop message.delta, keep state/done/error",但 TrySend 对所有 envelope type 一视同仁。channel 满 256 时 state/done/error 也会被丢弃。建议按 event type 分级处理。

P2-2: context.Background() 违反 context 传播规范 (worker.go:317, worker.go:364)
golang.md 禁止请求处理路径使用 context.Background()。HandlePermissionResponse 和 handleServerRequest 中 auto-approve 路径均使用。应传递 worker lifecycle context。

P2-3: mapAgentMessageChunk / mapAgentThoughtChunk 重复 (mapper.go:156)
两者共享完全相同的内容提取逻辑,仅输出事件类型不同。建议提取 extractTextContent helper。

P2-4: NewSession / LoadSession / ResumeSession 重复 (client.go:76)
三者结构相同(build params → call → unmarshal),仅 method 和 params 不同。建议提取 callSessionMethod helper。

P2-5: TypeACPX → TypeACP rename 不完整 — 运行时破坏

Comment thread internal/worker/acp/worker.go Outdated
// ─── Start ───────────────────────────────────────────────────────────────────

func (w *Worker) Start(ctx context.Context, session worker.SessionInfo) error {
w.mu.Lock()
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Lock-while-IO: w.mu.Lock() + defer w.mu.Unlock() 横跨 Proc.Start(fork 进程)、client.Initialize(30s)、client.NewSession/LoadSession(30s),阻塞 Conn()/GetWorkerSessionID() 长达 90+ 秒。

ClaudeCode worker 在 readOutput 中先 copy readLineFn 再 unlock 后才进入 read loop。建议:锁内赋值字段 → 释放锁 → 执行 I/O。

Comment thread internal/worker/acp/worker.go Outdated

result, promptErr := w.client.Prompt(pctx, w.GetWorkerSessionID(), content)
if promptErr != nil {
if _, ok := errors.AsType[*JSONRPCError](promptErr); ok {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] DRY 违反: 两个分支执行完全相同的 MapPromptError + TrySend 循环(L229-232 vs L235-238),仅 return 值不同。建议合并为一次 Map+Send,再根据错误类型决定返回值。

Comment thread internal/worker/acp/client.go Outdated
// ─── Env blocklist ───────────────────────────────────────────────────────────

// BuildEnv constructs the environment for an ACP agent process.
func BuildEnv(sessionEnv map[string]string, configEnv []string, blocklist []string) []string {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] 安全缺口: 自定义 BuildEnv 绕过了 base.BuildEnv(7 个安全阶段)。缺失:

  1. HOTPLEX_WORKER_ prefix stripping
  2. HOTPLEX_SESSION_ID / HOTPLEX_WORKER_TYPE 注入
  3. security.StripNestedAgent 调用
  4. blocklist 未应用于 session.Env(可绕过)

ClaudeCode/CodexCLI 均使用 base.BuildEnv。建议删除此函数,改用 base.BuildEnv(session, blocklist, "acp")

}

// TrySend enqueues an envelope from readLoop (non-blocking, backpressure-aware).
func (c *acpConn) TrySend(env *events.Envelope) bool {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Backpressure 策略不符: CLAUDE.md 要求"drop message.delta, keep state/done/error",但 TrySend 对所有 envelope type 一视同仁。256 buffer 满时 state/done/error 也会被丢弃。建议按 event type 分级处理。

Comment thread internal/worker/acp/worker.go Outdated
outcome = pm.FormatDeniedOutcome()
}

return w.client.RespondPermission(context.Background(), pm.RequestID, outcome)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Context 传播违规: golang.md 禁止请求处理路径使用 context.Background()HandlePermissionResponsehandleServerRequest(L364) 中 auto-approve 路径均使用。应传递 worker lifecycle context。

Comment thread internal/worker/acp/mapper.go Outdated
Text string `json:"text"`
}

func (m *ACPMapper) mapAgentMessageChunk(raw json.RawMessage) []*events.Envelope {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] DRY: mapAgentMessageChunkmapAgentThoughtChunk(L190) 共享完全相同的内容提取逻辑,仅输出事件类型不同。建议提取 extractTextContent helper。

}

// NewSession creates a new ACP session.
func (c *ACPClient) NewSession(ctx context.Context, cwd string, mcpServers any) (*SessionResult, error) {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] DRY: NewSessionLoadSession(L95)、ResumeSession(L115) 结构相同(build params → call → unmarshal),仅 method name 和 params 不同。建议提取 callSessionMethod helper。

P1-1: Worker.Start lock-while-IO → split into lock(assign fields) →
unlock → I/O (Proc.Start, Initialize, NewSession). Eliminates 90s+
mu hold blocking Conn()/SetWorkerSessionID()/GetWorkerSessionID().

P1-2: Input() error handling — merge duplicate MapPromptError+TrySend
branches into single call, only differ on return value.

P1-3: Replace custom BuildEnv with base.BuildEnv (7 security layers).
Removes env security bypass (missing prefix stripping, StripNestedAgent,
session ID injection, blocklist on session.Env).

P2-1: TrySend priority-aware backpressure — droppable (delta/raw)
non-blocking, critical events (state/done/error) blocking send.

P2-2: context.Background() → lifecycleCtx() (worker lifecycle context).

P2-3: Extract extractTextContent helper from mapAgentMessageChunk/
mapAgentThoughtChunk.

P2-4: Extract callSessionMethod helper from NewSession/LoadSession/
ResumeSession.

P2-5: TypeACPX residual cleanup — client example (acpx→acp), webchat
icons.tsx mapping, AGENTS.md docs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Copy link
Copy Markdown
Owner

@hotplex-ai hotplex-ai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — hotplex-ai

Verdict: REQUEST_CHANGES | P0:0 P1:2 P2:5 P3:2

P1 (Must Fix)

  1. [P1] acpConn.Close() never called — forwardEvents goroutine leaks on every session (worker.go:264)
    Two agents independently confirmed (confidence 95). Terminate() cancels ctx and kills the process but never calls w.conn.Close(). The bridge's forwardEvents ranges over recvCh, which is only closed by acpConn.Close(). Result: goroutine leaks, handleWorkerExit never reached, session cleanup fails. Compare with OCS which calls conn.Close() in release()→Terminate(). Fix: add w.conn.Close() before w.BaseWorker.Terminate(ctx).

  2. [P1] Terminate() nil-dereferences w.client when called before Start() completes (worker.go:272)
    w.client is initialized in Start() at line 151 with no nil guard. If Terminate() is called on a never-started or partially-started worker (e.g., bridge error path), w.client.Cancel() panics. Fix: add if w.client != nil { ... } guard.

P2 (Should Fix)

  1. [P2] Unrelated placeholder file committed: webchat/pnpm-workspace.yaml
    Content is allowBuilds: sharp: set this to true or false — clearly a placeholder, not functional config. Remove from this PR.

  2. [P2] Stale ACPX references in internal/worker/AGENTS.md (lines 28, 60, 67)
    Still reference TypeACPX and state "no implementation". This PR renamed to TypeACP and implemented the adapter.

  3. [P2] Root AGENTS.md still claims ACPX has no implementation (line 372)
    States ACPX 适配器仅存在类型常量(无实现) — now false. Update to reflect ACP is fully implemented.

  4. [P2] ReadMessage double-unmarshals every NDJSON line on streaming hot path (codec.go:83)
    Full json.Unmarshal into probe struct, then immediate second unmarshal into typed struct. For streaming (hundreds of chunks), this doubles parsing overhead.

  5. [P2] extractTextContent double-unmarshals every text chunk on hot path (mapper.go:194)
    Two json.Unmarshal calls per streaming token. Collapsible into single unmarshal with nested struct.

P3 (Low Priority)

  1. [P3] ACP spec contains stale TypeACPX coexistence reference (ACP-Worker-Spec.md:581)
    Spec says TypeACPX coexists with TypeACP, but TypeACPX was fully removed.

  2. [P3] mapToolCall unmarshals u.Content twice (mapper.go:220)
    extractToolName and content item extraction both parse same bytes. Could combine into single pass.


Reviewed by hotplex-ai — 4 parallel agents (Bug/Security/Concurrency, Compliance/DRY, History/Compatibility, SOLID/Performance)


// ─── Terminate ───────────────────────────────────────────────────────────────

func (w *Worker) Terminate(ctx context.Context) error {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] acpConn.Close() never called — goroutine leak

Terminate() cancels ctx and kills the process, but never calls w.conn.Close(). The bridge's forwardEvents ranges over recvCh which is only closed by acpConn.Close(). Without this call, the forwardEvents goroutine leaks forever, handleWorkerExit is never reached, and session cleanup fails.

Compare: OCS calls conn.Close() in release()→Terminate().

Fix:

// Add before BaseWorker.Terminate:
w.mu.Lock()
conn := w.conn
w.conn = nil
w.mu.Unlock()
if conn != nil {
    _ = conn.Close()
}

Comment thread internal/worker/acp/worker.go Outdated
// Try graceful cancel.
cancelCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = w.client.Cancel(cancelCtx, w.GetWorkerSessionID())
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Nil-dereference risk: w.client may be nil

w.client is initialized in Start() at line 151. If Terminate() is called on a never-started or partially-started worker (bridge error paths), w.client.Cancel() panics.

Fix:

if w.client != nil {
    _ = w.client.Cancel(cancelCtx, w.GetWorkerSessionID())
}

P1: Terminate() now calls conn.Close() before BaseWorker.Terminate to
    prevent forwardEvents goroutine leak on every session.
P1: Add nil guard for w.client in Terminate() to prevent panic when
    called before Start() completes.

P2: Remove unrelated webchat/pnpm-workspace.yaml placeholder file.
P2: Update worker/AGENTS.md — ACPX→ACP references, add ACP to table.
P2: Update root AGENTS.md — reflect ACP adapter is now implemented.
P2: ReadMessage single-pass unmarshal eliminates double decode on hot path.
P2: extractTextContent single-pass unmarshal for streaming chunks.

P3: Update ACP-Worker-Spec.md — remove stale TypeACPX coexistence note.
P3: mapToolCall merged contentItems+toolName extraction into single pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Copy link
Copy Markdown
Owner

@hotplex-ai hotplex-ai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — hotplex-ai

Verdict: REQUEST_CHANGES | P0:0 P1:1 P2:5 P3:5

P1 — 必须修复

1. RespondPermissionpendingMu 进行 I/O 写入 (client.go:139-144)

RespondPermissionpendingMu.Lock() 保护下调用 WriteMessage(c.stdin, req),但函数根本不访问 pending map——锁完全多余。若 stdin 管道缓冲区满(agent 卡住),所有需要 pendingMucall()/dispatchResponse() 将全部阻塞,导致整个 client 死锁。

修复:移除 pendingMu 锁(函数不访问 pending map)。

P2 — 建议修复

2. TrySend TOCTOU 竞态:向已关闭 channel 发送将 panic (conn.go:62-69)

TrySend 在 channel 满时先检查 c.closed(加锁),释放锁后执行 c.recvCh <- env(阻塞发送)。若 Close() 在释放锁和发送之间执行,recvCh 被关闭,发送操作 panic。

修复:用 recover() 捕获 send-on-closed-channel panic,或改用 done channel 替代 close(recvCh)。

3. AGENTS.md 反模式条目重复 (internal/worker/AGENTS.md:63-70)

上次 review 修复时,4 条反模式被追加而非替换,导致前 4 行与后 4 行内容重复。保留含 hermes-acp 的版本,删除旧 4 行。

4. err == io.EOF 死代码分支 (client.go:223)

ReadMessagefmt.Errorf("acp codec: read line: %w", err) 包装所有错误。readLooperr == io.EOF 永远不匹配(因 err 已被包装),clean EOF 走 Error 日志而非 Debug。应改为 errors.Is(err, io.EOF)

5. MapNotification 流式热路径三次 JSON 反序列化 (mapper.go:51-59)

MapNotification 先反序列化 notif.Params 提取 update,再反序列化 params.Update 提取判别符,最后各 map* 方法第三次反序列化。对 agent_message_chunk/agent_thought_chunk(最高频通知),每次 chunk 多 ~30% JSON 解码开销。建议将判别符提取合并到各 map* 方法中。

6. Codex-CLI-Worker-Spec.md 未更新,仍引用 TypeACPX (docs/specs/Codex-CLI-Worker-Spec.md:213)

本 PR 将 TypeACPX 替换为 TypeACP,但该 spec 文件仍含旧常量定义。

P3(5 项,不阻塞合并)

  • InitConfig 空白命令字符串 → parts[0] 越界 panic (worker.go:49)
  • pendingPerm 条目无超时清理,长时间运行可能泄漏 (worker.go:392)
  • WriteMessage 两次 Write syscalls 可合并 (codec.go:57-61)
  • isAllowKind/isDenyKind 缺 ACP spec 引用注释 (mapper.go:472-478)
  • Health() 委托 BaseWorker 读 nil conn → session ID 为空 (worker.go:301)

Comment thread internal/worker/acp/client.go Outdated
}
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
if err := WriteMessage(c.stdin, req); err != nil {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔒 P1: 不必要的锁导致死锁风险

RespondPermission 不访问 pending map,但持 pendingMuWriteMessage I/O。若 stdin 管道缓冲区满,所有 call()/dispatchResponse() 阻塞。

Suggested change
if err := WriteMessage(c.stdin, req); err != nil {
func (c *ACPClient) RespondPermission(ctx context.Context, id json.RawMessage, outcome any) error {
req := &JSONRPCResponse{
JSONRPC: "2.0",
ID: id,
Result: mustMarshal(outcome),
}
// No lock needed — RespondPermission does not access the pending map.
if err := WriteMessage(c.stdin, req); err != nil {
return fmt.Errorf("acp respond permission: %w", err)
}
return nil
}

Comment thread internal/worker/acp/conn.go Outdated
c.mu.Unlock()
if closed {
return false
}
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ P2: TOCTOU 竞态 — send on closed channel panic

c.closed 检查和 c.recvCh <- env 之间有时间窗口:Close() 可在释放锁后、发送前关闭 channel → panic。

建议用 recover 保护:

func (c *acpConn) TrySend(env *events.Envelope) bool {
    // ...droppable path unchanged...
    select {
    case c.recvCh <- env:
        return true
    default:
        c.mu.Lock()
        closed := c.closed
        c.mu.Unlock()
        if closed {
            return false
        }
        // Recover from send-on-closed-channel during shutdown race.
        return c.safeSend(env)
    }
}

func (c *acpConn) safeSend(env *events.Envelope) (sent bool) {
    defer func() { recover() }()
    c.recvCh <- env
    return true
}

Comment thread internal/worker/acp/client.go Outdated
if ctx.Err() != nil {
return // context cancelled, expected
}
if err == io.EOF {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🪵 P2: err == io.EOF 死代码

ReadMessage 包装所有错误为 fmt.Errorf("acp codec: read line: %w", err)。此处 == 比较永远不匹配,clean EOF 走 Error 日志。

Suggested change
if err == io.EOF {
if errors.Is(err, io.EOF) {
c.log.Debug("acp client: agent stdout closed")
return
}

(需添加 "errors" 到 import)

Comment thread internal/worker/AGENTS.md
| ACP | stdio (JSON-RPC 2.0 over NDJSON) | NewSession/LoadSession | Via Initialize handshake |

## ANTI-PATTERNS
- Do NOT use `math/rand` for crypto — use `crypto/rand` for JTI, tokens
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 P2: 反模式条目重复

L63-66 与 L67-70 内容重复(仅 L70 多了 hermes-acp)。删除 L63-66,保留含 hermes-acp 的版本。

P1-1: Remove unnecessary pendingMu lock in RespondPermission (deadlock risk)
P2-2: Fix TrySend TOCTOU race with safeSend + recover() for send-on-closed-channel
P2-3: Remove duplicate anti-pattern entries in AGENTS.md
P2-4: Fix err == io.EOF dead code — use errors.Is for wrapped errors
P2-5: Optimize MapNotification — merge discriminator + content extraction
      to eliminate triple JSON deserialization on streaming hot path
P2-6: Update Codex-CLI-Worker-Spec.md stale TypeACPX → TypeACP
P3-7: Guard InitConfig against empty command string (parts[0] panic)
P3-8: Clear pendingPerm entries on session teardown
P3-9: Merge WriteMessage two Write syscalls into single write
P3-10: Add ACP spec reference comments to isAllowKind/isDenyKind
P3-11: Override Health() to populate SessionID from acpConn

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
hotplex-ai
hotplex-ai previously approved these changes May 30, 2026
Copy link
Copy Markdown
Owner

@hotplex-ai hotplex-ai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — hotplex-ai

Verdict: ✅ APPROVE | P0:0 P1:0 P2:4 P3:5

Round 4 review on PR #573 (ACP worker adapter). Previous rounds addressed all P1 issues. The implementation is solid — well-designed concurrency patterns (atomic.Bool for streaming state, sync.Map for pending permissions, backpressure-aware TrySend with panic recovery), comprehensive mapper coverage, and clean JSON-RPC 2.0 codec.

No blocking issues found. P2 items are suggestions for consistency with existing workers.


P2 Findings (suggestions, not blocking)

P2-1: w.ctx stored in struct fieldinternal/worker/acp/worker.go:80
CLAUDE.md golang.md rule: "Forbidden: store ctx in struct fields." Other workers (claudecode, opencodeserver) pass context through call chains. w.ctx/w.cancel should be removed — HandlePermissionResponse can use context.WithTimeout(context.Background(), 5s) directly, and the readLoop context can be derived locally in Start() without storing.

P2-2: Dual mutex strategy — inconsistent with other workersinternal/worker/acp/worker.go:71
ACP declares mu sync.Mutex separate from BaseWorker.Mu. Other workers exclusively use BaseWorker.Mu for all fields. This creates undocumented lock ordering ambiguity. Consider using w.Mu for ACP-specific fields (sessionID, acpSessionID, conn) and removing the separate mu.

P2-3: w.Proc access without lock in Start()internal/worker/acp/worker.go:137
w.Proc = proc.New(...) and w.Proc.Start(...) run outside any mutex, while BaseWorker.Terminate accesses w.Proc under w.Mu. ClaudeCode guards Start under w.Mu.Lock(). Add mutex protection to match the established pattern.

P2-4: DRY — duplicate message.end synthesis in mapperinternal/worker/acp/mapper.go:97-113,119-135
MapPromptResponse and MapPromptError contain identical msgActive.Swap(false) → message.end → turnActive.Store(false) sequences. Extract a closeMessageStream() helper to keep them synchronized.


P3 Findings (nitpicks)

  • P3-1: Modalities() returns {"text","code"} but other workers include "image" — intentional or oversight? (worker.go:99)
  • P3-2: HandlePermissionResponse uses _ context.Context and falls back to lifecycleCtx() instead of caller's context (worker.go:332)
  • P3-3: proc.Start uses request-scoped ctx — ClaudeCode uses context.Background() to decouple process spawn from request lifecycle (worker.go:139)
  • P3-4: isAllowKind/isDenyKind reference "ACP spec" without URL — add direct link for maintainability (mapper.go:463-471)
  • P3-5: MapNotification silently drops unmarshal errors — add slog.Debug for agent integration debugging (mapper.go:53-55)

Comment thread internal/worker/acp/worker.go Outdated
mapper *ACPMapper
conn *acpConn
cancel context.CancelFunc
ctx context.Context // lifecycle context for background operations
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2-1: w.ctx context.Context stored in struct field violates CLAUDE.md rule: "Forbidden: store ctx in struct fields — pass along call chain." Other workers (claudecode, opencodeserver) don't store context. Consider removing w.ctx/w.cancel fields — HandlePermissionResponse can use context.WithTimeout(context.Background(), 5*time.Second) directly, and the readLoop context can be a local variable in Start() passed to client.StartReadLoop and w.readLoop.

Comment thread internal/worker/acp/worker.go Outdated
type Worker struct {
*base.BaseWorker

mu sync.Mutex
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2-2: ACP declares a separate mu sync.Mutex while other workers exclusively use BaseWorker.Mu for all field protection. This creates undocumented dual-lock ambiguity. Consider using w.Mu for ACP-specific fields (sessionID, acpSessionID, conn) to match claudecode/opencodeserver patterns and eliminate lock-ordering confusion.

env := base.BuildEnv(session, acpEnvBlocklist, "acp")

// Create process manager.
w.Proc = proc.New(proc.Opts{Logger: w.Log})
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2-3: w.Proc is assigned and used without mutex protection. BaseWorker.Terminate accesses w.Proc under w.Mu. ClaudeCode guards the entire Start under w.Mu.Lock(). If Terminate races with Start, there's a data race on w.Proc. Consider holding w.Mu during process creation to match the established pattern.

Comment thread internal/worker/acp/mapper.go Outdated

if m.msgActive.Swap(false) {
envs = append(envs, m.newEnvelope(events.MessageEnd, events.MessageEndData{
MessageID: m.messageID(),
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2-4: MapPromptResponse (lines 97-113) and MapPromptError (lines 119-135) contain identical msgActive.Swap(false) → message.end envelope → turnActive.Store(false) sequences. Extract a helper like func (m *ACPMapper) closeMessageStream() []*events.Envelope to avoid drift and keep the sequence synchronized.

Comment thread internal/worker/acp/worker.go Outdated
func (w *Worker) EnvBlocklist() []string { return append([]string{}, acpEnvBlocklist...) }
func (w *Worker) SessionStoreDir() string { return "" }
func (w *Worker) MaxTurns() int { return 0 }
func (w *Worker) Modalities() []string { return []string{"text", "code"} }
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3-1: Modalities() returns {"text", "code"} but claudecode and opencodeserver both return {"text", "code", "image"}. Is this intentional (ACP agents may not support image modality)? If so, a brief comment would help future maintainers.

SessionID string `json:"sessionId"`
Update json.RawMessage `json:"update"`
}
if err := json.Unmarshal(notif.Params, &params); err != nil {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3-5: Unmarshal errors from session/update params are silently dropped with return nil. Consider adding slog.Debug("acp mapper: failed to parse session/update params", "error", err) to aid debugging during ACP agent integration without impacting performance.

…deadlock

- Add conn_test.go: TrySend (droppable/critical), safeSend, Close, isDroppable
- Add client_test.go: NewACPClient, RespondPermission, readLoop (EOF/cancel/
  notification/request), call/dispatch (roundtrip/cancel/error), JSONRPCError
- Fix TrySend droppable path: use trySendNonBlocking with recover() to prevent
  send-on-closed-channel panic (discovered by test)
- Fix TestACPConn_TrySend_Critical: add sync barrier to prevent drain-before-fill race
- Fix TestACPConn_TrySend_Droppable: drain initial item before fill to prevent
  channel overflow deadlock
- Fix client test goroutine leaks: close stdoutW + wait <-c.Done() for readLoop exit
- Set internal/worker/acp coverage threshold to 42% in CI (41.9% current)

Coverage: 25.7% → 41.9%

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sisyphus 🏔️ and others added 2 commits May 30, 2026 12:43
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
P2-1: Remove w.ctx struct field (CLAUDE.md violation) — readLoop ctx derived
       from context.Background() in Start(), HandlePermissionResponse uses
       caller ctx, auto-approve uses context.Background()
P2-2: Remove separate mu sync.Mutex, unify with BaseWorker.Mu
P2-3: Guard Proc creation under w.Mu for concurrent Terminate safety
P2-4: Extract closeMessageStream() helper to DRY message.end sequence
P3-1: Add "image" to Modalities() to match other workers
P3-2: HandlePermissionResponse uses caller ctx instead of lifecycleCtx()
P3-3: proc.Start uses context.Background() to decouple from request ctx
P3-5: MapNotification unmarshal error: add slog.Debug for diagnostics

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
hotplex-ai
hotplex-ai previously approved these changes May 30, 2026
Copy link
Copy Markdown
Owner

@hotplex-ai hotplex-ai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — hotplex-ai (Round 5)

Verdict: ✅ APPROVE | P0:0 P1:0 P2:0 P3:7

Round 2 发现的 P1(RespondPermission 持锁 IO)和 5×P2 已在 Round 3-4 全部修复。本轮对修复后代码做了完整四维度审查,未发现新的阻塞性问题。


P3-1: newEnvelope 重复 events.NewEnvelope (mapper.go:400)

ACPMapper 手动构建 events.Envelope(Version/ID/Seq/SessionID/Timestamp/Event),与 events.NewEnvelope(id, sessionID, seq, kind, data) 功能完全相同。claudecode mapper 和 OCS converter 均直接调用 events.NewEnvelope。建议替换以消除 DRY 违规。

P3-2: w.conn/w.client 在 readLoop/Input 中无锁读取 (worker.go:229,373)

w.connStart()w.Mu.Lock() 下赋值,readLoop goroutine 通过 go 语句获得 happens-before 保证,Input()Start() 返回后调用——不是真正的数据竞争。但 Terminate()/Conn()/Health() 都用 w.Mu 保护读取,为一致性建议 readLoop/Input 也缓存到局部变量(与 Terminate line 274 模式一致)。

P3-3: ACPMapper 使用裸 slog.Debug() 而非注入 logger (mapper.go:55)

claudecode mapper 通过构造函数注入 *slog.Logger,ACP mapper 直接用全局 slog.Debug()。不一致且无法附加 session_id 等结构化上下文。

P3-4: pendingMu 命名不符合项目约定 (client.go:25)

项目约定单 mutex 结构体使用 mu(base.BaseWorker.Mu、acpConn.mu、proc.Manager.mu)。ACPClient 仅有一个 mutex,pendingMu 应为 mu

P3-5: call()RespondPermission() 可并发写 stdin (client.go:141,285)

Prompt(bridge goroutine)和 HandlePermissionResponse(HTTP handler goroutine)可能并发调用 WriteMessage(c.stdin, ...)*os.File 文档保证并发安全且 POSIX pipe < PIPE_BUF 写入原子,实际风险极低。但添加 writeMu sync.Mutex 是更严谨的做法。

P3-6: ACPConfig.SessionStore/Args 声明但未消费 (config.go)

SessionStoreArgs 字段在配置结构体中声明,但 InitConfig() 从未读取,SessionStoreDir() 硬编码返回 ""。这构成未兑现的 API 承诺,用户配置后无效果也无错误。建议移除或连通。

P3-7: 客户端方法注释残留 "(P2)" 路线图标记 (client.go:147,159,171,187)

SetSessionModelSetSessionModeForkSessionListSessions 注释中的 (P2) 暗示未实现,但这些方法已完整实现且可调用。应移除或改为 (Extension) 等。


修复质量确认

  • ✅ P1 RespondPermission 移除 pendingMu 锁:WriteMessage 不访问 pending map,移除正确
  • ✅ conn.go TrySend 重写:droppable→non-blocking→check closed→safeSend+recover 三层防御
  • ✅ Proc 创建 mutex 保护:避免 concurrent Terminate 竞争
  • ✅ 新增 conn_test.go + client_test.go:490 行测试覆盖核心路径

P3-1: newEnvelope delegates to events.NewEnvelope (DRY)
P3-2: skipped — not a real data race (happens-before guaranteed)
P3-3: ACPMapper injects *slog.Logger via constructor
P3-4: pendingMu renamed to mu (project naming convention)
P3-5: Add writeMu to serialize concurrent stdin writes
P3-6: Remove unused Args/SessionStore from ACPConfig
P3-7: Remove stale (P2) roadmap markers from client method comments

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Copy link
Copy Markdown
Owner

@hotplex-ai hotplex-ai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — hotplex-ai

Verdict: ⚠️ REQUEST_CHANGES | P0:0 P1:2 P2:1 P3:4

P1 (2 — 必须修复)

  • [P1] call() 缺少 writeMu 锁 (client.go:290) — WriteMessage(c.stdin, req) 在无锁状态下写 stdin,与 RespondPermission(line 143 持有 writeMu)存在数据竞争。writeMu 注释(line 28)明确声称 "serializes stdin writes (call + RespondPermission)" 但 call() 未遵守。readLoop goroutine 和 worker goroutine 可并发写入 stdin。
  • [P1] ReadMessage 无行大小限制 (codec.go:69) — r.ReadBytes('\n') 可无限分配内存。恶意或故障 ACP agent 发送超长单行将导致 OOM。其他 worker 通过 bufio.Scanner 设置 10MB 上限(worker-proc.md),ACP codec 应对齐。

P2 (1)

  • [P2] ReadMessage 未校验 jsonrpc 版本 (codec.go:91) — JSON-RPC 2.0 spec 要求 jsonrpc 必须为 "2.0",当前代码仅检查 id/method 分发,接受任意版本号。建议在 switch 前加 if raw.JSONRPC != "2.0" { return nil, fmt.Errorf(...) }

P3 (4)

  • [P3] handleServerRequest auto-approve 使用 context.Background() (worker.go:395) — 违反 golang.md ctx 规范,若 stdin 写阻塞则无法取消。
  • [P3] 架构文档未更新 ACPX→ACP (Worker-Gateway-Design.md:323) — 仍有 "ACPX | not implemented" 引用。
  • [P3] AEP 协议文档未记录新增事件类型 (aep-protocol.md) — tool_updateplanmode_update 未在协议参考中出现。
  • [P3] mapper 测试缺少 t.Run() 子测试 (mapper_test.go) — 多个独立测试函数可合并为 table-driven + t.Run() 模式。

整体评价:ACP Worker 架构设计清晰,ACP↔AEP 映射完整,但 P1 竞态和 OOM 风险需在合并前修复。

c.mu.Unlock()
}()

if err := WriteMessage(c.stdin, req); err != nil {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] 数据竞争:WriteMessage(c.stdin, req) 未持有 c.writeMu,与 RespondPermission(line 143 持有 writeMu)可并发写入同一 stdin。修复:

c.writeMu.Lock()
err := WriteMessage(c.stdin, req)
c.writeMu.Unlock()
if err != nil {
    return nil, err
}

// Returns *JSONRPCResponse (has id, no method), *JSONRPCRequest (has id + method),
// or *JSONRPCNotification (no id).
func ReadMessage(r *bufio.Reader) (any, error) {
line, err := r.ReadBytes('\n')
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] OOM 风险:r.ReadBytes('\n') 无大小限制,恶意/故障 agent 可发送超长行耗尽内存。建议限制行大小:

const maxLineSize = 10 << 20 // 10MB, 对齐 worker-proc.md 上限

line, err := r.ReadBytes('\n')
if err == nil && len(line) > maxLineSize {
    return nil, fmt.Errorf("acp codec: line too large: %d bytes (max %d)", len(line), maxLineSize)
}

return nil, fmt.Errorf("acp codec: parse json: %w", err)
}

switch {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] JSON-RPC 2.0 spec 合规:未校验 jsonrpc 版本号。建议在 switch 前添加:

if raw.JSONRPC != "2.0" {
    return nil, fmt.Errorf("acp codec: invalid jsonrpc version %q, expected \"2.0\"", raw.JSONRPC)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(worker/acp): implement universal ACP Worker for HotPlex

2 participants