From a2f2cd74eee9b5c7926d1759aad473a7746fafe9 Mon Sep 17 00:00:00 2001 From: Noah Horton Date: Tue, 21 Apr 2026 13:26:22 -0600 Subject: [PATCH] MCP workflow hardening (extracted from OpenClaw PR) Adds and hardens the DeepWork MCP workflow runtime: - New tools: get_active_workflow, validate_step_outputs - Platform-aware workflow invocation / review-guidance text - StateManager/StatusWriter.set_project_root rebind support - RootResolver normalizes OpenClaw plugin-bundle roots to the enclosing workspace - FORMATTERS registry with format_for_openclaw - short_instruction_filename alias files for OpenClaw review spawns - Instruction/test coverage for the above Extracted from the feat/openclaw-support branch (PR #387) so the MCP runtime changes can be reviewed separately from the OpenClaw bundle content. Note: tests/unit/review/test_formatter.py::TestFormatForOpenClaw::test_output_mentions_sessions_spawn and ::test_agent_name_becomes_agent_type were failing on the openclaw branch before extraction (pre-existing NameError: '_task_name' helper missing in format_for_openclaw). Left as-is for reviewer visibility. --- doc/mcp_interface.md | 83 ++++++- src/deepwork/jobs/mcp/quality_gate.py | 27 ++- src/deepwork/jobs/mcp/roots.py | 35 ++- src/deepwork/jobs/mcp/schemas.py | 94 ++++++++ src/deepwork/jobs/mcp/server.py | 133 +++++++++-- src/deepwork/jobs/mcp/state.py | 6 + src/deepwork/jobs/mcp/status.py | 7 +- src/deepwork/jobs/mcp/tools.py | 268 +++++++++++++++++++---- src/deepwork/review/formatter.py | 52 +++++ src/deepwork/review/instructions.py | 10 + src/deepwork/review/mcp.py | 6 +- tests/unit/jobs/mcp/test_quality_gate.py | 61 ++++++ tests/unit/jobs/mcp/test_roots.py | 19 ++ tests/unit/jobs/mcp/test_state.py | 9 + tests/unit/jobs/mcp/test_status.py | 10 + tests/unit/jobs/mcp/test_tools.py | 67 ++++++ tests/unit/review/test_formatter.py | 42 +++- tests/unit/review/test_instructions.py | 7 + tests/unit/review/test_mcp.py | 34 +++ 19 files changed, 882 insertions(+), 88 deletions(-) diff --git a/doc/mcp_interface.md b/doc/mcp_interface.md index dace2763..f577d150 100644 --- a/doc/mcp_interface.md +++ b/doc/mcp_interface.md @@ -10,7 +10,7 @@ This document describes the Model Context Protocol (MCP) tools exposed by the De ## Tools -DeepWork exposes eleven MCP tools: +DeepWork exposes thirteen MCP tools: ### 1. `get_workflows` @@ -54,7 +54,64 @@ interface WorkflowInfo { --- -### 2. `start_workflow` +### 2. `get_active_workflow` + +Return the currently active workflow for a session, if one exists. This is useful after compaction, reset, or any host-specific session restore flow. + +#### Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `session_id` | `string` | Yes | The persistent DeepWork session ID for the current host session. In Claude Code this is `CLAUDE_CODE_SESSION_ID`. | +| `agent_id` | `string \| null` | No | Optional host-specific agent identifier for agent-scoped workflow state. In Claude Code this is `CLAUDE_CODE_AGENT_ID`. | + +#### Returns + +```typescript +{ + has_active_workflow: boolean; + stack: StackEntry[]; + active_workflow?: { + job_name: string; + workflow_name: string; + goal: string; + started_at: string; + step_number: number; + total_steps: number; + completed_steps: string[]; + current_step: ActiveStepInfo; + } | null; +} +``` + +--- + +### 3. `validate_step_outputs` + +Validate a planned `finished_step` payload against the active step without advancing the workflow or running quality reviews. Use this as a dry run when you want to catch wrong output names, missing required outputs, bad types, or missing files before calling `finished_step`. + +#### Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `outputs` | `Record` | Yes | Map of planned step output names to values. Validation uses the active step's declared output contract without advancing the workflow. | +| `session_id` | `string` | Yes | The persistent DeepWork session ID for the current host session. In Claude Code this is `CLAUDE_CODE_SESSION_ID`. | +| `agent_id` | `string \| null` | No | Optional host-specific agent identifier for agent-scoped workflow state. In Claude Code this is `CLAUDE_CODE_AGENT_ID`. | + +#### Returns + +```typescript +{ + valid: boolean; + errors: string[]; + current_step: ActiveStepInfo; + stack: StackEntry[]; +} +``` + +--- + +### 4. `start_workflow` Start a new workflow session. Initializes state tracking and returns the first step's instructions. Supports nested workflows — starting a workflow while one is active pushes onto a stack. @@ -82,7 +139,7 @@ Start a new workflow session. Initializes state tracking and returns the first s --- -### 3. `finished_step` +### 5. `finished_step` Report that you've finished a workflow step. Validates outputs and runs quality reviews (from step definitions and .deepreview rules), then returns the next action. @@ -121,7 +178,7 @@ Report that you've finished a workflow step. Validates outputs and runs quality --- -### 4. `abort_workflow` +### 6. `abort_workflow` Abort the current workflow and return to the parent workflow (if nested). Use this when a workflow cannot be completed. @@ -149,7 +206,7 @@ Abort the current workflow and return to the parent workflow (if nested). Use th --- -### 5. `go_to_step` +### 7. `go_to_step` Navigate back to a prior step in the current workflow. Clears all progress from the target step onward, forcing re-execution of subsequent steps to ensure consistency. Use this when earlier outputs need revision or quality issues are discovered in later steps. @@ -181,7 +238,7 @@ Navigate back to a prior step in the current workflow. Clears all progress from --- -### 6. `get_review_instructions` +### 8. `get_review_instructions` Run a review of changed files based on `.deepreview` configuration files and DeepSchema-generated synthetic review rules. Returns a list of review tasks to invoke in parallel. Each task has `description`, `subagent_type`, and `prompt` fields for the Agent tool. @@ -201,7 +258,7 @@ A plain string with one of: --- -### 7. `get_configured_reviews` +### 9. `get_configured_reviews` List all configured review rules from `.deepreview` files and DeepSchema-generated synthetic rules. Returns each rule's name, description, and defining file location. Optionally filters to rules matching specific files. @@ -225,7 +282,7 @@ Array<{ --- -### 8. `mark_review_as_passed` +### 10. `mark_review_as_passed` Mark a review as passed so it won't be re-run while reviewed files remain unchanged. Call this when a review has no findings, when all findings have been fixed, or when remaining findings have been explicitly dismissed by the user. The `review_id` is provided in the instruction file's "After Review" section. @@ -245,7 +302,7 @@ A plain string with either: --- -### 9. `get_named_schemas` +### 11. `get_named_schemas` List all named DeepSchemas discovered across all schema sources (project-local, standard, and env var). Returns each schema's name, summary, and matcher patterns. @@ -263,7 +320,7 @@ Array<{ }> ``` -### 10. `register_session_job` +### 12. `register_session_job` Register a transient job definition scoped to the current session. The job is validated against the job schema and stored so that `start_workflow` can discover it. Can be called multiple times to overwrite. @@ -288,7 +345,7 @@ Register a transient job definition scoped to the current session. The job is va On validation failure, returns `{ error: string }` with details about what failed. -### 11. `get_session_job` +### 13. `get_session_job` Retrieve the YAML content of a session-scoped job definition previously registered with `register_session_job`. @@ -375,7 +432,9 @@ The `finished_step` tool returns one of three statuses: | 3. Execute step instructions, create outputs | -4. finished_step(outputs, session_id) +4. validate_step_outputs(outputs, session_id) // optional dry run + | +5. finished_step(outputs, session_id) | +-- status = "needs_work" -> Fix issues, goto 4 +-- status = "next_step" -> Execute new instructions, goto 4 diff --git a/src/deepwork/jobs/mcp/quality_gate.py b/src/deepwork/jobs/mcp/quality_gate.py index ba1de5cd..b2997c9e 100644 --- a/src/deepwork/jobs/mcp/quality_gate.py +++ b/src/deepwork/jobs/mcp/quality_gate.py @@ -22,7 +22,7 @@ ) from deepwork.review.config import ReviewRule, ReviewTask from deepwork.review.discovery import load_all_rules -from deepwork.review.formatter import format_for_claude +from deepwork.review.formatter import FORMATTERS, format_for_claude from deepwork.review.instructions import ( write_instruction_files, ) @@ -461,16 +461,35 @@ def run_quality_gate( return None # 9. Format as review instructions - review_output = format_for_claude(task_files, project_root) + formatter = FORMATTERS.get(platform, format_for_claude) + review_output = formatter(task_files, project_root) # 10. Build complete response with guidance - guidance = _build_review_guidance(review_output) + guidance = _build_review_guidance(review_output, platform) return guidance -def _build_review_guidance(review_output: str) -> str: +def _build_review_guidance(review_output: str, platform: str = "claude") -> str: """Build the complete review guidance including /review skill instructions.""" + if platform == "openclaw": + return f"""Quality reviews are required before this step can advance. + +{review_output} + +## How to Run Reviews + +For each review task listed above, launch it as a parallel OpenClaw sub-agent with `sessions_spawn`. + +- Spawn every listed review before waiting for any completion event. +- Use each instruction path exactly as written, relative to the workspace root. Do not rewrite it as an absolute host path. +- Do not set `timeoutSeconds` on these review spawns; let the runtime default apply. If the tool requires a timeout value, use `0`. +- After all spawns are accepted, use `sessions_yield` to wait for completion events before continuing. + +## After Reviews + +For any failing reviews, if you believe the issue is invalid, then you can call `mark_review_as_passed` on it. Otherwise, you should act on any feedback from the review to fix the issues. Once done, call `finished_step` again to see if you will pass now.""" + return f"""Quality reviews are required before this step can advance. {review_output} diff --git a/src/deepwork/jobs/mcp/roots.py b/src/deepwork/jobs/mcp/roots.py index 738aaa44..f08975e0 100644 --- a/src/deepwork/jobs/mcp/roots.py +++ b/src/deepwork/jobs/mcp/roots.py @@ -1,10 +1,16 @@ """MCP root resolution via listRoots client capability. Resolves the project root dynamically by asking the MCP client for its -filesystem roots. When ``--path`` is explicitly passed on the CLI the -resolver always returns that path. Otherwise it calls ``ctx.list_roots()`` +filesystem roots. When ``--path`` is explicitly passed on the CLI the +resolver always returns that path. Otherwise it calls ``ctx.list_roots()`` on every tool invocation so it tracks workspace changes (e.g. git worktree switches) without caching stale values. + +For OpenClaw bundle installs, the MCP server can be launched from the plugin +bundle directory itself (for example ``plugins/openclaw``) when the host does +not expose a usable ``listRoots`` capability. In that case we normalize the +bundle directory back to the enclosing workspace root when we can detect +OpenClaw workspace markers. """ from __future__ import annotations @@ -19,6 +25,9 @@ logger = logging.getLogger("deepwork.jobs.mcp") +_OPENCLAW_PLUGIN_MARKER = Path(".codex-plugin") / "plugin.json" +_OPENCLAW_WORKSPACE_MARKER = Path(".openclaw") / "workspace-state.json" + async def resolve_project_root(ctx: Context, fallback: Path) -> Path: """Ask the MCP client for its filesystem root. @@ -78,4 +87,24 @@ async def get_root(self, ctx: Context) -> Path: """ if self._explicit: return self._fallback - return await resolve_project_root(ctx, self._fallback) + candidate = await resolve_project_root(ctx, self._fallback) + return _normalize_openclaw_bundle_root(candidate) + + +def _normalize_openclaw_bundle_root(candidate: Path) -> Path: + """Map an OpenClaw plugin bundle path back to the workspace root.""" + + resolved = candidate.resolve() + if not (resolved / _OPENCLAW_PLUGIN_MARKER).exists(): + return resolved + + for ancestor in (resolved, *resolved.parents): + if (ancestor / _OPENCLAW_WORKSPACE_MARKER).exists(): + logger.debug( + "Normalized OpenClaw plugin bundle root %s to workspace root %s", + resolved, + ancestor, + ) + return ancestor + + return resolved diff --git a/src/deepwork/jobs/mcp/schemas.py b/src/deepwork/jobs/mcp/schemas.py index e2bf501f..6058e936 100644 --- a/src/deepwork/jobs/mcp/schemas.py +++ b/src/deepwork/jobs/mcp/schemas.py @@ -144,6 +144,31 @@ class FinishedStepInput(BaseModel): ) +class ValidateStepOutputsInput(BaseModel): + """Input for validate_step_outputs tool.""" + + outputs: dict[str, ArgumentValue] = Field( + description=( + "Map of planned step output names to values. " + "Validation uses the active step's declared output contract without " + "advancing the workflow or running quality reviews." + ) + ) + session_id: str = Field( + description=( + "The persistent DeepWork session ID for the current host session. " + "In Claude Code this is CLAUDE_CODE_SESSION_ID." + ), + ) + agent_id: str | None = Field( + default=None, + description=( + "Optional host-specific agent identifier for agent-scoped workflow state. " + "In Claude Code this is CLAUDE_CODE_AGENT_ID." + ), + ) + + class AbortWorkflowInput(BaseModel): """Input for abort_workflow tool.""" @@ -182,6 +207,24 @@ class GoToStepInput(BaseModel): ) +class GetActiveWorkflowInput(BaseModel): + """Input for get_active_workflow tool.""" + + session_id: str = Field( + description=( + "The persistent DeepWork session ID for the current host session. " + "In Claude Code this is CLAUDE_CODE_SESSION_ID." + ), + ) + agent_id: str | None = Field( + default=None, + description=( + "Optional host-specific agent identifier for agent-scoped workflow state. " + "In Claude Code this is CLAUDE_CODE_AGENT_ID." + ), + ) + + # ============================================================================= # Tool Output Models # NOTE: Changes to these models affect MCP tool return types. @@ -320,6 +363,23 @@ class FinishedStepResponse(BaseModel): ) +class ValidateStepOutputsResponse(BaseModel): + """Response from validate_step_outputs tool.""" + + valid: bool = Field(description="Whether the submitted outputs satisfy the active step contract") + errors: list[str] = Field( + default_factory=list, + description="Validation errors that must be fixed before calling finished_step", + ) + current_step: ActiveStepInfo = Field( + description="The current step, including the declared expected outputs", + ) + stack: list[StackEntry] = Field( + default_factory=list, + description="Current workflow stack after validation", + ) + + class AbortWorkflowResponse(BaseModel): """Response from abort_workflow tool.""" @@ -349,6 +409,40 @@ class GoToStepResponse(BaseModel): ) +class ActiveWorkflowState(BaseModel): + """Current active workflow session details.""" + + job_name: str = Field(description="Name of the active job") + workflow_name: str = Field(description="Name of the active workflow") + goal: str = Field(description="Goal originally supplied when the workflow started") + started_at: str = Field(description="ISO timestamp when the workflow started") + step_number: int = Field(description="1-based index of the current step") + total_steps: int = Field(description="Total number of steps in the workflow") + completed_steps: list[str] = Field( + default_factory=list, + description="Step IDs already completed in this workflow session", + ) + current_step: ActiveStepInfo = Field( + description="The active step and its current resolved instructions", + ) + + +class GetActiveWorkflowResponse(BaseModel): + """Response from get_active_workflow tool.""" + + has_active_workflow: bool = Field( + description="Whether the given session currently has an active workflow" + ) + stack: list[StackEntry] = Field( + default_factory=list, + description="Current workflow stack visible to this session/agent", + ) + active_workflow: ActiveWorkflowState | None = Field( + default=None, + description="Details of the active workflow when one exists", + ) + + # ============================================================================= # Session Job Models # NOTE: These models support register_session_job / get_session_job tools. diff --git a/src/deepwork/jobs/mcp/server.py b/src/deepwork/jobs/mcp/server.py index 910166b5..d484cf62 100644 --- a/src/deepwork/jobs/mcp/server.py +++ b/src/deepwork/jobs/mcp/server.py @@ -27,10 +27,12 @@ AbortWorkflowInput, ArgumentValue, FinishedStepInput, + GetActiveWorkflowInput, GetSessionJobInput, GoToStepInput, RegisterSessionJobInput, StartWorkflowInput, + ValidateStepOutputsInput, ) from deepwork.jobs.mcp.state import StateManager from deepwork.jobs.mcp.status import StatusWriter @@ -40,6 +42,26 @@ logger = logging.getLogger("deepwork.jobs.mcp") +def _session_id_hint(platform: str) -> str: + """Describe how the host provides the DeepWork session identifier.""" + if platform == "openclaw": + return ( + "the DeepWork session ID from the OpenClaw bootstrap note " + "(use the current OpenClaw sessionId)" + ) + return "CLAUDE_CODE_SESSION_ID from startup context" + + +def _agent_id_hint(platform: str) -> str: + """Describe how the host provides the optional agent identifier.""" + if platform == "openclaw": + return ( + "an optional host-specific agent scope from the OpenClaw bootstrap note " + "(usually leave unset unless you intentionally want isolated workflow state)" + ) + return "CLAUDE_CODE_AGENT_ID from startup context" + + def _ensure_schema_available(project_root: Path) -> None: """Copy job.schema.json to .deepwork/ so agents have a stable reference path.""" from deepwork.jobs.schema import get_schema_path @@ -93,6 +115,7 @@ def create_server( project_root=project_path, state_manager=state_manager, status_writer=status_writer, + platform=platform or "claude", ) # Write initial manifest at startup @@ -103,7 +126,7 @@ def create_server( # Detect issues at startup (used for instructions and tool response warnings) startup_issues = detect_issues(project_path) - instructions = _build_startup_instructions(project_path, startup_issues) + instructions = _build_startup_instructions(project_path, startup_issues, tools.platform) # Create MCP server mcp = FastMCP( @@ -111,6 +134,14 @@ def create_server( instructions=instructions, ) + async def _refresh_project_root(ctx: Context) -> Path: + """Resolve and propagate the effective project root for this tool call.""" + root = await root_resolver.get_root(ctx) + if root != tools.project_root: + tools.set_project_root(root) + _ensure_schema_available(root) + return root + # ========================================================================= # Issue detection — append to tool responses when issues exist # ========================================================================= @@ -159,10 +190,68 @@ def _log_tool_call( async def get_workflows(ctx: Context) -> dict[str, Any]: """Get all available workflows.""" _log_tool_call("get_workflows") - tools.project_root = await root_resolver.get_root(ctx) + await _refresh_project_root(ctx) response = tools.get_workflows() return _append_issues(response.model_dump()) + @mcp.tool( + description=( + "Return the currently active workflow for this session, if one exists. " + "Useful after compaction, reset, or session restore. " + f"Required: session_id ({_session_id_hint(tools.platform)}). " + f"Optional: agent_id ({_agent_id_hint(tools.platform)})." + ) + ) + async def get_active_workflow( + session_id: str, + ctx: Context, + agent_id: str | None = None, + ) -> dict[str, Any]: + """Get the current active workflow state for this session.""" + _log_tool_call( + "get_active_workflow", + {"agent_id": agent_id}, + session_id=session_id, + agent_id=agent_id, + ) + await _refresh_project_root(ctx) + input_data = GetActiveWorkflowInput(session_id=session_id, agent_id=agent_id) + response = tools.get_active_workflow(input_data) + return _append_issues(response.model_dump()) + + @mcp.tool( + description=( + "Validate a planned `finished_step` output payload against the active step without " + "advancing the workflow or running quality reviews. " + "Use this as a dry run when you want to catch wrong output names, missing required " + "outputs, bad types, or missing files before calling `finished_step`. " + "Required: outputs (map of step_argument names to values), " + f"session_id ({_session_id_hint(tools.platform)}). " + f"Optional: agent_id ({_agent_id_hint(tools.platform)})." + ) + ) + async def validate_step_outputs( + outputs: dict[str, ArgumentValue], + session_id: str, + ctx: Context, + agent_id: str | None = None, + ) -> dict[str, Any]: + """Validate outputs for the active step without advancing the workflow.""" + _log_tool_call( + "validate_step_outputs", + {"outputs": outputs, "agent_id": agent_id}, + session_id=session_id, + agent_id=agent_id, + ) + await _refresh_project_root(ctx) + input_data = ValidateStepOutputsInput( + outputs=outputs, + session_id=session_id, + agent_id=agent_id, + ) + response = tools.validate_step_outputs(input_data) + return _append_issues(response.model_dump()) + @mcp.tool( description=( "Start a new workflow session. " @@ -198,7 +287,7 @@ async def start_workflow( session_id=session_id, agent_id=agent_id, ) - tools.project_root = await root_resolver.get_root(ctx) + await _refresh_project_root(ctx) input_data = StartWorkflowInput( goal=goal, job_name=job_name, @@ -253,7 +342,7 @@ async def finished_step( session_id=session_id, agent_id=agent_id, ) - tools.project_root = await root_resolver.get_root(ctx) + await _refresh_project_root(ctx) input_data = FinishedStepInput( outputs=outputs, work_summary=work_summary, @@ -291,7 +380,7 @@ async def abort_workflow( session_id=session_id, agent_id=agent_id, ) - tools.project_root = await root_resolver.get_root(ctx) + await _refresh_project_root(ctx) input_data = AbortWorkflowInput( explanation=explanation, session_id=session_id, agent_id=agent_id ) @@ -327,7 +416,7 @@ async def go_to_step( session_id=session_id, agent_id=agent_id, ) - tools.project_root = await root_resolver.get_root(ctx) + await _refresh_project_root(ctx) input_data = GoToStepInput(step_id=step_id, session_id=session_id, agent_id=agent_id) response = await tools.go_to_step(input_data) return _append_issues(response.model_dump()) @@ -359,7 +448,7 @@ async def register_session_job( {"job_name": job_name}, session_id=session_id, ) - tools.project_root = await root_resolver.get_root(ctx) + await _refresh_project_root(ctx) input_data = RegisterSessionJobInput( job_name=job_name, job_definition_yaml=job_definition_yaml, @@ -391,7 +480,7 @@ async def get_session_job( {"job_name": job_name}, session_id=session_id, ) - tools.project_root = await root_resolver.get_root(ctx) + await _refresh_project_root(ctx) input_data = GetSessionJobInput( job_name=job_name, session_id=session_id, @@ -508,7 +597,10 @@ async def mark_review_as_passed(review_id: str, ctx: Context) -> str: return mcp -_STATIC_INSTRUCTIONS = """\ +def _static_instructions(platform: str) -> str: + session_hint = _session_id_hint(platform) + agent_hint = _agent_id_hint(platform) + return f"""\ # DeepWork Workflow Server Multi-step workflows with quality gates. @@ -516,19 +608,24 @@ async def mark_review_as_passed(review_id: str, ctx: Context) -> str: **Session identity**: On Claude Code pass `CLAUDE_CODE_SESSION_ID` as `session_id`. \ On other platforms omit `session_id` in `start_workflow`; the server auto-generates one \ and returns it in `begin_step.session_id` — use that value for all subsequent calls. \ -Sub-agents on Claude Code also pass `agent_id` (CLAUDE_CODE_AGENT_ID). +Sub-agents may also pass `agent_id` ({agent_hint}). Host session hints come from \ +{session_hint}. ## Workflow Lifecycle 1. `get_workflows` — discover available workflows -2. `start_workflow` — begin with goal, job_name, workflow_name (session_id optional — see above) -3. Follow step instructions; use `begin_step.session_id` for all subsequent calls, then call `finished_step` with outputs -4. If `needs_work`: fix issues and retry. If `next_step`: continue. If `workflow_complete`: done. +2. `get_active_workflow` — resume current workflow context after compaction/reset when needed +3. `start_workflow` — begin with goal, job_name, workflow_name (session_id optional — see above) +4. Follow step instructions, then call `validate_step_outputs` or compare against `step_expected_outputs` +5. Call `finished_step` with outputs once the output contract is satisfied +6. If `needs_work`: fix issues and retry. If `next_step`: continue. If `workflow_complete`: done. Workflows nest via stack. Use `abort_workflow` to cancel, `go_to_step` to revisit earlier steps. """ +_STATIC_INSTRUCTIONS = _static_instructions("claude") + _WORKFLOW_HEADER = ( "## Available Workflows\n\n" "This project uses DeepWork. If the user wants to do something matching " @@ -544,6 +641,7 @@ async def mark_review_as_passed(review_id: str, ctx: Context) -> str: def _build_startup_instructions( project_root: Path, issues: list[Issue], + platform: str = "claude", ) -> str: """Build MCP server instructions with dynamic content first (survives truncation). @@ -555,20 +653,21 @@ def _build_startup_instructions( "Suggest repairing this immediately to the user.\n\n" + format_issues_for_agent(issues) + "\n\n" - + _STATIC_INSTRUCTIONS + + _static_instructions(platform) ) # No issues — list available workflows jobs, _ = load_all_jobs(project_root) if not jobs: - return _STATIC_INSTRUCTIONS + return _static_instructions(platform) lines: list[str] = [] for job in jobs: wf_names = ", ".join(job.workflows.keys()) lines.append(f"- **{job.name}** ({wf_names}): {job.summary}") - result = _WORKFLOW_HEADER + "\n".join(lines) + "\n\n" + _STATIC_INSTRUCTIONS + static_instructions = _static_instructions(platform) + result = _WORKFLOW_HEADER + "\n".join(lines) + "\n\n" + static_instructions if len(result) <= _MAX_INSTRUCTIONS_SIZE: return result @@ -577,5 +676,5 @@ def _build_startup_instructions( "## Available Workflows\n\n" "This project has DeepWork workflows installed. " "Call `get_workflows` to see all available workflows and use them for " - "anything the user requests that seem related.\n\n" + _STATIC_INSTRUCTIONS + "anything the user requests that seem related.\n\n" + static_instructions ) diff --git a/src/deepwork/jobs/mcp/state.py b/src/deepwork/jobs/mcp/state.py index 67a2144f..67ac0b40 100644 --- a/src/deepwork/jobs/mcp/state.py +++ b/src/deepwork/jobs/mcp/state.py @@ -67,6 +67,12 @@ def __init__(self, project_root: Path, platform: str): self.sessions_dir = project_root / ".deepwork" / "tmp" / "sessions" / platform self._lock = asyncio.Lock() + def set_project_root(self, project_root: Path) -> None: + """Rebind persisted state paths to a new project root.""" + resolved = project_root.resolve() + self.project_root = resolved + self.sessions_dir = resolved / ".deepwork" / "tmp" / "sessions" / self.platform + def _state_file(self, session_id: str, agent_id: str | None = None) -> Path: """Get the path to a state file.""" session_dir = self.sessions_dir / f"session-{session_id}" diff --git a/src/deepwork/jobs/mcp/status.py b/src/deepwork/jobs/mcp/status.py index b86e3d63..b9c5e629 100644 --- a/src/deepwork/jobs/mcp/status.py +++ b/src/deepwork/jobs/mcp/status.py @@ -51,7 +51,12 @@ class StatusWriter: """ def __init__(self, project_root: Path): - self.status_dir = project_root / ".deepwork" / "tmp" / "status" / "v1" + self.set_project_root(project_root) + + def set_project_root(self, project_root: Path) -> None: + """Rebind status file paths to a new project root.""" + resolved = project_root.resolve() + self.status_dir = resolved / ".deepwork" / "tmp" / "status" / "v1" self.manifest_path = self.status_dir / "job_manifest.yml" self.sessions_dir = self.status_dir / "sessions" diff --git a/src/deepwork/jobs/mcp/tools.py b/src/deepwork/jobs/mcp/tools.py index 6b0c9d60..dfa3a4bf 100644 --- a/src/deepwork/jobs/mcp/tools.py +++ b/src/deepwork/jobs/mcp/tools.py @@ -13,6 +13,7 @@ from __future__ import annotations import logging +import re from pathlib import Path from typing import TYPE_CHECKING from uuid import uuid4 @@ -25,10 +26,13 @@ AbortWorkflowInput, AbortWorkflowResponse, ActiveStepInfo, + ActiveWorkflowState, ArgumentValue, ExpectedOutput, FinishedStepInput, FinishedStepResponse, + GetActiveWorkflowInput, + GetActiveWorkflowResponse, GetSessionJobInput, GetWorkflowsResponse, GoToStepInput, @@ -40,6 +44,8 @@ StartWorkflowResponse, StepInputInfo, StepStatus, + ValidateStepOutputsInput, + ValidateStepOutputsResponse, WorkflowInfo, ) from deepwork.jobs.mcp.state import StateError, StateManager @@ -52,6 +58,8 @@ ) logger = logging.getLogger("deepwork.jobs.mcp") +OPENCLAW_RUNTIME_NOTE = Path(".deepwork/tmp/openclaw/DEEPWORK_OPENCLAW_BOOTSTRAP.md") +OPENCLAW_SESSION_ID_RE = re.compile(r"session_id:\s*(?:`([^`]+)`|([^\s`]+))") if TYPE_CHECKING: from deepwork.jobs.mcp.status import StatusWriter @@ -71,6 +79,7 @@ def __init__( project_root: Path, state_manager: StateManager, status_writer: StatusWriter | None = None, + platform: str = "claude", ): """Initialize workflow tools. @@ -83,6 +92,70 @@ def __init__( self.state_manager = state_manager self.status_writer = status_writer + def _resolve_openclaw_runtime_session_id(self, session_id: str) -> str: + """Prefer the current OpenClaw runtime session hint over stale caller values.""" + if self.platform != "openclaw": + return session_id + + runtime_note = self.project_root / OPENCLAW_RUNTIME_NOTE + try: + content = runtime_note.read_text(encoding="utf-8") + except OSError: + return session_id + + match = OPENCLAW_SESSION_ID_RE.search(content) + if not match: + return session_id + + runtime_session_id = next((group for group in match.groups() if group), "").strip() + if not runtime_session_id or runtime_session_id == session_id: + return session_id + + logger.warning( + "Overriding stale OpenClaw session_id %s with runtime session_id %s from %s", + session_id, + runtime_session_id, + runtime_note, + ) + return runtime_session_id + + def _start_workflow_tool_name(self) -> str: + """Return the host-specific visible start_workflow tool name.""" + if self.platform == "openclaw": + return "deepwork__start_workflow" + return "mcp__plugin_deepwork_deepwork__start_workflow" + + def _workflow_invocation_text(self, job_name: str, workflow_name: str, agent: str | None) -> str: + """Build host-specific workflow invocation help text.""" + start_tool = self._start_workflow_tool_name() + if self.platform == "openclaw": + if agent: + return ( + "Prefer launching this as a parallel OpenClaw sub-agent with " + "`sessions_spawn`, giving it full context and instructions to call " + f"`{start_tool}` with job_name=\"{job_name}\" and " + f'workflow_name="{workflow_name}". If sub-agents are unavailable, ' + "invoke the workflow directly in the current session." + ) + return ( + f"Call `{start_tool}` with job_name=\"{job_name}\" and " + f'workflow_name="{workflow_name}", then follow the step instructions it returns.' + ) + + if agent: + return ( + f'Invoke as a Task using subagent_type="{agent}" with a prompt ' + f"giving full context needed and instructions to call " + f"`{start_tool}` " + f'(job_name="{job_name}", workflow_name="{workflow_name}"). ' + f"If you do not have Task as an available tool, invoke the workflow directly." + ) + return ( + f"Call `{start_tool}` with " + f'job_name="{job_name}" and workflow_name="{workflow_name}", ' + f"then follow the step instructions it returns." + ) + @property def platform(self) -> str: """Return the platform from the state manager.""" @@ -91,7 +164,7 @@ def platform(self) -> str: def _resolve_session_id(self, session_id: str | None) -> str: """Resolve session_id: require it on Claude Code, auto-generate otherwise.""" if session_id: - return session_id + return self._resolve_openclaw_runtime_session_id(session_id) if self.platform == "claude": raise ToolError( "session_id is required on Claude Code. " @@ -128,6 +201,14 @@ def _write_manifest(self, jobs: list[JobDefinition] | None = None) -> None: except Exception: logger.warning("Failed to write job manifest", exc_info=True) + def set_project_root(self, project_root: Path) -> None: + """Update project-scoped helpers when the effective root changes.""" + resolved = project_root.resolve() + self.project_root = resolved + self.state_manager.set_project_root(resolved) + if self.status_writer is not None: + self.status_writer.set_project_root(resolved) + def _load_all_jobs(self) -> tuple[list[JobDefinition], list[JobLoadError]]: """Load all job definitions from all configured job folders.""" return load_all_jobs(self.project_root) @@ -136,20 +217,7 @@ def _job_to_info(self, job: JobDefinition) -> JobInfo: """Convert a JobDefinition to JobInfo for response.""" workflows = [] for wf_name, wf in job.workflows.items(): - if wf.agent: - how_to_invoke = ( - f'Invoke as an Agent using subagent_type="{wf.agent}" with a prompt ' - f"giving full context needed and instructions to call " - f"`mcp__plugin_deepwork_deepwork__start_workflow` " - f'(job_name="{job.name}", workflow_name="{wf_name}"). ' - f"If you do not have Agent as an available tool, invoke the workflow directly." - ) - else: - how_to_invoke = ( - f"Call `mcp__plugin_deepwork_deepwork__start_workflow` with " - f'job_name="{job.name}" and workflow_name="{wf_name}", ' - f"then follow the step instructions it returns." - ) + how_to_invoke = self._workflow_invocation_text(job.name, wf_name, wf.agent) workflows.append( WorkflowInfo( name=wf_name, @@ -249,8 +317,8 @@ def _validate_outputs( extra = submitted_names - declared_names if extra: raise ToolError( - f"Unknown output names: {', '.join(sorted(extra))}. " - f"Declared outputs: {', '.join(sorted(declared_names))}" + f"Unknown output names for step '{step.name}': {', '.join(sorted(extra))}. " + f"{self._output_contract_summary(step, job)}" ) # Check for missing required output keys @@ -258,8 +326,8 @@ def _validate_outputs( missing = required_names - submitted_names if missing: raise ToolError( - f"Missing required outputs: {', '.join(sorted(missing))}. " - f"All required outputs must be provided." + f"Missing required outputs for step '{step.name}': {', '.join(sorted(missing))}. " + f"{self._output_contract_summary(step, job)}" ) # Validate types and file existence @@ -273,16 +341,24 @@ def _validate_outputs( # the agent (not untrusted external input) and may legitimately # reference paths outside the project root (e.g. worktrees). if isinstance(value, str): + if not value.strip(): + raise ToolError(f"Output '{name}': file path cannot be empty or whitespace") full_path = self.project_root / value if not full_path.exists(): raise ToolError(f"Output '{name}': file not found at '{value}'") elif isinstance(value, list): + if not value: + raise ToolError(f"Output '{name}': file path list cannot be empty") for path in value: if not isinstance(path, str): raise ToolError( f"Output '{name}': all paths must be strings, " f"got {type(path).__name__}" ) + if not path.strip(): + raise ToolError( + f"Output '{name}': file paths cannot be empty or whitespace" + ) full_path = self.project_root / path if not full_path.exists(): raise ToolError(f"Output '{name}': file not found at '{path}'") @@ -324,6 +400,48 @@ def _build_expected_outputs( ) return results + def _output_contract_summary(self, step: WorkflowStep, job: JobDefinition) -> str: + """Summarize the declared output contract for a step.""" + expected = self._build_expected_outputs(step, job) + if not expected: + return f"Step '{step.name}' does not declare any outputs." + + parts = [] + for output in expected: + required = "required" if output.required else "optional" + parts.append(f"{output.name} ({output.type}, {required})") + return f"Declared outputs for step '{step.name}': {', '.join(parts)}" + + def _load_active_step_context( + self, + session_id: str, + agent_id: str | None, + ) -> tuple[object, JobDefinition, Workflow, WorkflowStep, dict[str, ArgumentValue]]: + """Resolve the current active session, job, workflow, step, and input values.""" + try: + session = self.state_manager.resolve_session(session_id, agent_id) + except StateError as err: + raise ToolError( + "No active workflow session. Call `get_active_workflow` to inspect the current " + "session or `start_workflow` to begin one." + ) from err + + job = self._get_job(session.job_name, session_id=session_id) + workflow = self._get_workflow(job, session.workflow_name) + current_step = workflow.get_step(session.current_step_id) + if current_step is None: + raise ToolError(f"Current step not found: {session.current_step_id}") + + input_values = self.state_manager.get_step_input_values( + session_id, session.current_step_id, agent_id + ) + if not input_values: + input_values = self._resolve_input_values( + current_step, job, workflow, session_id, agent_id + ) + + return session, job, workflow, current_step, input_values + def _build_step_inputs_info( self, step: WorkflowStep, @@ -450,8 +568,10 @@ def get_workflows(self) -> GetWorkflowsResponse: async def start_workflow(self, input_data: StartWorkflowInput) -> StartWorkflowResponse: """Start a new workflow session.""" + sid = self._resolve_session_id(input_data.session_id) + # Load job and workflow (check session jobs first) - job = self._get_job(input_data.job_name, session_id=input_data.session_id) + job = self._get_job(input_data.job_name, session_id=sid) workflow = self._get_workflow(job, input_data.workflow_name) if not workflow.steps: @@ -459,7 +579,6 @@ async def start_workflow(self, input_data: StartWorkflowInput) -> StartWorkflowR first_step = workflow.steps[0] - sid = self._resolve_session_id(input_data.session_id) aid = input_data.agent_id # Create session (use resolved workflow name in case it was auto-selected) @@ -498,32 +617,20 @@ async def start_workflow(self, input_data: StartWorkflowInput) -> StartWorkflowR async def finished_step(self, input_data: FinishedStepInput) -> FinishedStepResponse: """Report step completion and get next instructions.""" - sid = input_data.session_id + sid = self._resolve_openclaw_runtime_session_id(input_data.session_id) aid = input_data.agent_id + session, job, workflow, current_step, input_values = self._load_active_step_context( + sid, aid + ) + current_step_name = current_step.name + try: - session = self.state_manager.resolve_session(sid, aid) - except StateError as err: + self._validate_outputs(input_data.outputs, current_step, job) + except ToolError as err: raise ToolError( - "No active workflow session. " - "Provide the session_id from the start_workflow response (begin_step.session_id). " - "If you want to resume a workflow, just start it again and call finished_step " - "with quality_review_override_reason until you get back to your prior step." + f"{err} Run `validate_step_outputs` before `finished_step` if you want a " + "dry run against the active step without advancing the workflow." ) from err - current_step_name = session.current_step_id - - # Load job and workflow (check session jobs first) - job = self._get_job(session.job_name, session_id=sid) - workflow = self._get_workflow(job, session.workflow_name) - current_step = workflow.get_step(current_step_name) - - if current_step is None: - raise ToolError(f"Current step not found: {current_step_name}") - - # Validate outputs against step's declared output refs - self._validate_outputs(input_data.outputs, current_step, job) - - # Get input values from state - input_values = self.state_manager.get_step_input_values(sid, current_step_name, aid) # Run quality gate if not overridden if not input_data.quality_review_override_reason: @@ -535,6 +642,7 @@ async def finished_step(self, input_data: FinishedStepInput) -> FinishedStepResp input_values=input_values, work_summary=input_data.work_summary, project_root=self.project_root, + platform=self.platform, ) if review_feedback: @@ -601,9 +709,79 @@ async def finished_step(self, input_data: FinishedStepInput) -> FinishedStepResp self._write_session_status(sid) return response + def validate_step_outputs( + self, input_data: ValidateStepOutputsInput + ) -> ValidateStepOutputsResponse: + """Validate outputs for the active step without advancing the workflow.""" + sid = self._resolve_openclaw_runtime_session_id(input_data.session_id) + aid = input_data.agent_id + _session, job, workflow, current_step, input_values = self._load_active_step_context( + sid, aid + ) + + errors: list[str] = [] + try: + self._validate_outputs(input_data.outputs, current_step, job) + except ToolError as err: + errors.append(str(err)) + + return ValidateStepOutputsResponse( + valid=not errors, + errors=errors, + current_step=self._build_active_step_info( + sid, current_step, job, workflow, input_values + ), + stack=self.state_manager.get_stack(sid, aid), + ) + + def get_active_workflow( + self, input_data: GetActiveWorkflowInput + ) -> GetActiveWorkflowResponse: + """Return the current active workflow state for this session, if any.""" + sid = self._resolve_openclaw_runtime_session_id(input_data.session_id) + aid = input_data.agent_id + + try: + session, job, workflow, current_step, input_values = self._load_active_step_context( + sid, aid + ) + except ToolError as err: + if "No active workflow session." not in str(err): + raise + return GetActiveWorkflowResponse( + has_active_workflow=False, + stack=self.state_manager.get_stack(sid, aid), + ) + + completed_steps = [ + step.name + for step in workflow.steps + if step.name in session.step_progress + and session.step_progress[step.name].completed_at is not None + ] + + active_workflow = ActiveWorkflowState( + job_name=session.job_name, + workflow_name=session.workflow_name, + goal=session.goal, + started_at=session.started_at, + step_number=session.current_step_index + 1, + total_steps=len(workflow.steps), + completed_steps=completed_steps, + current_step=self._build_active_step_info( + sid, current_step, job, workflow, input_values + ), + ) + + return GetActiveWorkflowResponse( + has_active_workflow=True, + stack=self.state_manager.get_stack(sid, aid), + active_workflow=active_workflow, + ) + async def abort_workflow(self, input_data: AbortWorkflowInput) -> AbortWorkflowResponse: """Abort the current workflow and return to the previous one.""" - sid = input_data.session_id + sid = self._resolve_openclaw_runtime_session_id(input_data.session_id) aid = input_data.agent_id aborted_session, new_active = await self.state_manager.abort_workflow( sid, input_data.explanation, agent_id=aid @@ -624,7 +802,7 @@ async def abort_workflow(self, input_data: AbortWorkflowInput) -> AbortWorkflowR async def go_to_step(self, input_data: GoToStepInput) -> GoToStepResponse: """Navigate back to a prior step, clearing progress from that step onward.""" - sid = input_data.session_id + sid = self._resolve_openclaw_runtime_session_id(input_data.session_id) aid = input_data.agent_id session = self.state_manager.resolve_session(sid, aid) diff --git a/src/deepwork/review/formatter.py b/src/deepwork/review/formatter.py index f3776600..ea529fe4 100644 --- a/src/deepwork/review/formatter.py +++ b/src/deepwork/review/formatter.py @@ -10,6 +10,15 @@ from pathlib import Path from deepwork.review.config import ReviewTask +from deepwork.review.instructions import short_instruction_filename + + +def _relative_instruction_path(file_path: Path, project_root: Path) -> Path: + """Resolve instruction file path relative to the project when possible.""" + try: + return file_path.relative_to(project_root) + except ValueError: + return file_path def _git_common_dir(project_root: Path) -> Path | None: @@ -115,6 +124,49 @@ def format_for_claude( return "\n".join(lines) +def format_for_openclaw( + task_files: list[tuple[ReviewTask, Path]], + project_root: Path, +) -> str: + """Format review tasks as OpenClaw `sessions_spawn` work items.""" + if not task_files: + return "No review tasks to execute." + + lines: list[str] = [] + lines.append("Spawn the following review sub-agents in parallel with `sessions_spawn`.") + lines.append( + "IMPORTANT: Do not complete these reviews in the main session unless you cannot " + "spawn sub-agents. Spawn all of them before you wait for completions. Use each " + "instruction path exactly as written, relative to the workspace, and do not " + "rewrite it as an absolute host path. Omit `timeoutSeconds` on review spawns so " + "the runtime default applies; if a timeout value is required, use `0`.\n" + ) + + for idx, (task, file_path) in enumerate(task_files, start=1): + alias_path = file_path.with_name(short_instruction_filename(file_path.stem)) + rel_path = _relative_instruction_path(alias_path, project_root) + name = _task_name(task) + description = _task_description(task) + + lines.append(f"{idx}. label: {name}") + lines.append(f" description: {description}") + lines.append( + " task: Read the review instructions at " + f"`{rel_path}` and execute that review. Return only findings and concrete fixes." + ) + if task.agent_name: + lines.append(f" agent_type: {task.agent_name}") + lines.append("") + + return "\n".join(lines) + + +FORMATTERS = { + "claude": format_for_claude, + "openclaw": format_for_openclaw, +} + + def _task_description(task: ReviewTask) -> str: """Generate a short description for a review task. diff --git a/src/deepwork/review/instructions.py b/src/deepwork/review/instructions.py index 76e3feca..59e2e490 100644 --- a/src/deepwork/review/instructions.py +++ b/src/deepwork/review/instructions.py @@ -199,6 +199,8 @@ def write_instruction_files( passed_marker = child.with_suffix(".passed") if not passed_marker.exists(): child.unlink() + elif child.suffix == ".txt": + child.unlink() instructions_dir.mkdir(parents=True, exist_ok=True) # Run all precompute commands in parallel before building files @@ -229,13 +231,21 @@ def write_instruction_files( ) content = build_instruction_file(task, review_id, precomputed_info, project_root) file_path = instructions_dir / f"{review_id}.md" + alias_path = instructions_dir / short_instruction_filename(review_id) safe_write(file_path, content) + safe_write(alias_path, content) results.append((task, file_path)) return results +def short_instruction_filename(review_id: str) -> str: + """Build a short deterministic filename for OpenClaw review prompts.""" + digest = hashlib.sha256(review_id.encode("utf-8")).hexdigest()[:10] + return f"r-{digest}.txt" + + def build_instruction_file( task: ReviewTask, review_id: str = "", diff --git a/src/deepwork/review/mcp.py b/src/deepwork/review/mcp.py index 5277f354..bd234a89 100644 --- a/src/deepwork/review/mcp.py +++ b/src/deepwork/review/mcp.py @@ -9,7 +9,7 @@ from deepwork.deepschema.review_bridge import generate_review_rules as gen_schema_rules from deepwork.review.config import ReviewRule from deepwork.review.discovery import DiscoveryError, load_all_rules -from deepwork.review.formatter import format_for_claude +from deepwork.review.formatter import FORMATTERS, format_for_claude from deepwork.review.instructions import ( INSTRUCTIONS_DIR, compute_review_id, @@ -23,10 +23,6 @@ match_rule, ) -FORMATTERS = { - "claude": format_for_claude, -} - SUPPORTED_PLATFORMS = set(FORMATTERS.keys()) diff --git a/tests/unit/jobs/mcp/test_quality_gate.py b/tests/unit/jobs/mcp/test_quality_gate.py index 32a38540..5cd43f10 100644 --- a/tests/unit/jobs/mcp/test_quality_gate.py +++ b/tests/unit/jobs/mcp/test_quality_gate.py @@ -816,6 +816,67 @@ def test_dynamic_rules_match_all_outputs_regardless_of_git(self, tmp_path: Path) mock_match.assert_called_once() assert result is not None + def test_uses_openclaw_formatter_when_requested(self, tmp_path: Path) -> None: + """OpenClaw quality-gate output uses the OpenClaw formatter, not Claude's.""" + review = ReviewBlock(strategy="individual", instructions="Check quality") + arg = StepArgument(name="report", description="Report", type="file_path") + output_ref = StepOutputRef(argument_name="report", required=True, review=review) + step = WorkflowStep(name="write", outputs={"report": output_ref}) + job, workflow = _make_job(tmp_path, [arg], step) + + report_file = tmp_path / "report.md" + report_file.write_text("Report content") + + mock_task = ReviewTask( + rule_name="step_write_output_report", + files_to_review=["report.md"], + instructions="Check quality", + agent_name=None, + ) + instruction_path = tmp_path / ".deepwork" / "tmp" / "review_instruction.md" + instruction_path.parent.mkdir(parents=True, exist_ok=True) + instruction_path.write_text("Review instruction content") + + def openclaw_formatter(*_args: object, **_kwargs: object) -> str: + return "openclaw formatted output" + + def claude_formatter(*_args: object, **_kwargs: object) -> str: + return "claude formatted output" + + with ( + patch("deepwork.jobs.mcp.quality_gate.load_all_rules", return_value=([], [])), + patch( + "deepwork.jobs.mcp.quality_gate.match_files_to_rules", + return_value=[mock_task], + ), + patch( + "deepwork.jobs.mcp.quality_gate.write_instruction_files", + return_value=[(mock_task, instruction_path)], + ), + patch.dict( + "deepwork.jobs.mcp.quality_gate.FORMATTERS", + {"claude": claude_formatter, "openclaw": openclaw_formatter}, + clear=False, + ), + ): + result = run_quality_gate( + step=step, + job=job, + workflow=workflow, + outputs={"report": "report.md"}, + input_values={}, + work_summary=None, + project_root=tmp_path, + platform="openclaw", + ) + + assert result is not None + assert "openclaw formatted output" in result + assert "claude formatted output" not in result + assert "sessions_spawn" in result + assert "sessions_yield" in result + assert "Do not set `timeoutSeconds`" in result + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-004.5.4). # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_deepreview_skipped_when_get_changed_files_fails(self, tmp_path: Path) -> None: diff --git a/tests/unit/jobs/mcp/test_roots.py b/tests/unit/jobs/mcp/test_roots.py index c013c795..a36e16b7 100644 --- a/tests/unit/jobs/mcp/test_roots.py +++ b/tests/unit/jobs/mcp/test_roots.py @@ -157,3 +157,22 @@ async def test_dynamic_falls_back_on_error() -> None: ctx = _make_ctx(RuntimeError("disconnected")) result = await resolver.get_root(ctx) assert result == FALLBACK + + +@pytest.mark.asyncio +async def test_dynamic_normalizes_openclaw_plugin_bundle_to_workspace_root( + tmp_path: Path, +) -> None: + workspace = tmp_path / "workspace" + plugin_dir = workspace / "plugins" / "openclaw" + (workspace / ".openclaw").mkdir(parents=True) + (workspace / ".openclaw" / "workspace-state.json").write_text("{}") + (plugin_dir / ".codex-plugin").mkdir(parents=True) + (plugin_dir / ".codex-plugin" / "plugin.json").write_text("{}") + + resolver = RootResolver(fallback_root=plugin_dir, explicit=False) + ctx = _make_ctx(RuntimeError("listRoots unavailable")) + + result = await resolver.get_root(ctx) + + assert result == workspace.resolve() diff --git a/tests/unit/jobs/mcp/test_state.py b/tests/unit/jobs/mcp/test_state.py index 227c8f1d..175b6a65 100644 --- a/tests/unit/jobs/mcp/test_state.py +++ b/tests/unit/jobs/mcp/test_state.py @@ -48,6 +48,15 @@ def test_init(self, state_manager: StateManager, project_root: Path) -> None: ) assert state_manager.get_stack_depth(SESSION_ID) == 0 + def test_set_project_root_updates_session_paths(self, state_manager: StateManager) -> None: + new_root = Path("/tmp/other-root") + resolved = new_root.resolve() + + state_manager.set_project_root(new_root) + + assert state_manager.project_root == resolved + assert state_manager.sessions_dir == resolved / ".deepwork" / "tmp" / "sessions" / "test" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-003.3.5, JOBS-REQ-003.3.8, JOBS-REQ-003.3.9). # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_create_session(self, state_manager: StateManager) -> None: diff --git a/tests/unit/jobs/mcp/test_status.py b/tests/unit/jobs/mcp/test_status.py index c6c70f28..2d957525 100644 --- a/tests/unit/jobs/mcp/test_status.py +++ b/tests/unit/jobs/mcp/test_status.py @@ -107,6 +107,16 @@ def test_status_dir_uses_v1_path(self, status_writer: StatusWriter, project_root assert status_writer.manifest_path == status_writer.status_dir / "job_manifest.yml" assert status_writer.sessions_dir == status_writer.status_dir / "sessions" + def test_set_project_root_updates_paths(self, status_writer: StatusWriter) -> None: + new_root = Path("/tmp/other-root") + resolved = new_root.resolve() + + status_writer.set_project_root(new_root) + + assert status_writer.status_dir == resolved / ".deepwork" / "tmp" / "status" / "v1" + assert status_writer.manifest_path == status_writer.status_dir / "job_manifest.yml" + assert status_writer.sessions_dir == status_writer.status_dir / "sessions" + class TestWriteManifest: # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.1.2). diff --git a/tests/unit/jobs/mcp/test_tools.py b/tests/unit/jobs/mcp/test_tools.py index 4b86ff85..7b9e551b 100644 --- a/tests/unit/jobs/mcp/test_tools.py +++ b/tests/unit/jobs/mcp/test_tools.py @@ -14,6 +14,7 @@ AbortWorkflowInput, FinishedStepInput, FinishedStepResponse, + GetActiveWorkflowInput, GoToStepInput, StartWorkflowInput, StartWorkflowResponse, @@ -328,6 +329,61 @@ async def test_job_not_found(self, tools: WorkflowTools) -> None: with pytest.raises(ToolError, match="not found"): await tools.start_workflow(inp) + @pytest.mark.asyncio + async def test_openclaw_runtime_session_hint_overrides_stale_session_id( + self, project_root: Path + ) -> None: + runtime_session_id = "openclaw-live-session" + runtime_note = ( + project_root + / ".deepwork" + / "tmp" + / "openclaw" + / "DEEPWORK_OPENCLAW_BOOTSTRAP.md" + ) + runtime_note.parent.mkdir(parents=True, exist_ok=True) + runtime_note.write_text( + "# DeepWork OpenClaw Runtime\n\n- session_id: `openclaw-live-session`\n", + encoding="utf-8", + ) + + state_manager = StateManager(project_root, platform="openclaw") + tools = WorkflowTools(project_root, state_manager, platform="openclaw") + + response = await tools.start_workflow( + StartWorkflowInput( + goal="Test", + job_name="test_job", + workflow_name="main", + session_id="stale-session-id", + ) + ) + + assert response.begin_step.session_id == runtime_session_id + assert ( + project_root + / ".deepwork" + / "tmp" + / "sessions" + / "openclaw" + / f"session-{runtime_session_id}" + / "state.json" + ).exists() + assert not ( + project_root + / ".deepwork" + / "tmp" + / "sessions" + / "openclaw" + / "session-stale-session-id" + / "state.json" + ).exists() + + active = tools.get_active_workflow(GetActiveWorkflowInput(session_id="stale-session-id")) + assert active.has_active_workflow is True + assert active.active_workflow is not None + assert active.active_workflow.current_step.session_id == runtime_session_id + @pytest.mark.asyncio # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-001.3.3, JOBS-REQ-001.3.8). # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES @@ -407,6 +463,17 @@ async def test_file_not_found(self, tools: WorkflowTools) -> None: override="skip", ) + @pytest.mark.asyncio + async def test_empty_file_path_rejected(self, tools: WorkflowTools) -> None: + await _start_main_workflow(tools) + + with pytest.raises(ToolError, match="file path cannot be empty or whitespace"): + await _finish_step( + tools, + outputs={"output1": ""}, + override="skip", + ) + @pytest.mark.asyncio # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-001.4.13). # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES diff --git a/tests/unit/review/test_formatter.py b/tests/unit/review/test_formatter.py index 7b4b8a28..431e1e04 100644 --- a/tests/unit/review/test_formatter.py +++ b/tests/unit/review/test_formatter.py @@ -4,7 +4,8 @@ from unittest.mock import patch from deepwork.review.config import ReviewTask -from deepwork.review.formatter import _resolve_file_ref_root, format_for_claude +from deepwork.review.formatter import _resolve_file_ref_root, format_for_claude, format_for_openclaw +from deepwork.review.instructions import short_instruction_filename def _make_task( @@ -240,3 +241,42 @@ def test_normal_repo_prompt_path_relative_to_project_root(self, tmp_path: Path) result = format_for_claude([(task, file_path)], tmp_path) assert 'prompt: "@.deepwork/tmp/review_instructions/review_123.md"' in result + + +class TestFormatForOpenClaw: + """Tests for format_for_openclaw.""" + + def test_empty_tasks_returns_no_tasks_message(self, tmp_path: Path) -> None: + result = format_for_openclaw([], tmp_path) + assert "No review tasks" in result + + def test_output_mentions_sessions_spawn(self, tmp_path: Path) -> None: + task = _make_task(rule_name="py_review", files=["src/app.py"]) + file_path = ( + tmp_path + / ".deepwork" + / "tmp" + / "review_instructions" + / "py_review--src-app.py--123456789abc.md" + ) + file_path.parent.mkdir(parents=True) + file_path.write_text("content") + + result = format_for_openclaw([(task, file_path)], tmp_path) + + assert "sessions_spawn" in result + assert "label: py_review review of src/app.py" in result + assert "` .deepwork" not in result + alias_name = short_instruction_filename(file_path.stem) + assert f"`.deepwork/tmp/review_instructions/{alias_name}`" in result + assert "workspace" in result + assert "absolute host path" in result + assert "timeoutSeconds" in result + + def test_agent_name_becomes_agent_type(self, tmp_path: Path) -> None: + task = _make_task(agent_name="security-expert") + file_path = tmp_path / "instructions.md" + + result = format_for_openclaw([(task, file_path)], tmp_path) + + assert "agent_type: security-expert" in result diff --git a/tests/unit/review/test_instructions.py b/tests/unit/review/test_instructions.py index 696d0203..eb718705 100644 --- a/tests/unit/review/test_instructions.py +++ b/tests/unit/review/test_instructions.py @@ -19,6 +19,7 @@ _sanitize_for_id, build_instruction_file, compute_review_id, + short_instruction_filename, write_instruction_files, ) @@ -537,6 +538,12 @@ def test_pass_caching_works_for_inline_content( results = write_instruction_files([task], tmp_path) assert results == [] + def test_short_instruction_filename_is_stable(self, tmp_path: Path) -> None: + review_id = "rule--src-app.py--abc123def456" + assert short_instruction_filename(review_id) == short_instruction_filename(review_id) + assert short_instruction_filename(review_id).startswith("r-") + assert short_instruction_filename(review_id).endswith(".txt") + class TestPrecomputedContext: """Tests for precomputed info command execution — validates REVIEW-REQ-001.9, REVIEW-REQ-005.7.""" diff --git a/tests/unit/review/test_mcp.py b/tests/unit/review/test_mcp.py index a863e071..52777371 100644 --- a/tests/unit/review/test_mcp.py +++ b/tests/unit/review/test_mcp.py @@ -143,6 +143,30 @@ def test_full_pipeline_returns_formatted_output( assert "Invoke the following" in result mock_write.assert_called_once() + @patch("deepwork.review.mcp.write_instruction_files") + @patch("deepwork.review.mcp.match_files_to_rules") + @patch("deepwork.review.mcp.get_changed_files") + @patch("deepwork.review.mcp.load_all_rules") + def test_openclaw_pipeline_returns_openclaw_output( + self, mock_load: Any, mock_diff: Any, mock_match: Any, mock_write: Any, tmp_path: Path + ) -> None: + rule = _make_rule(tmp_path) + task = ReviewTask( + rule_name="test_rule", + files_to_review=["app.py"], + instructions="Review it.", + agent_name=None, + ) + mock_load.return_value = ([rule], []) + mock_diff.return_value = ["app.py"] + mock_match.return_value = [task] + mock_write.return_value = [(task, tmp_path / "instr.md")] + + result = run_review(tmp_path, "openclaw") + + assert "sessions_spawn" in result + assert "label: test_rule review of app.py" in result + def test_unsupported_platform_raises_review_tool_error(self, tmp_path: Path) -> None: # THIS TEST VALIDATES A HARD REQUIREMENT (REVIEW-REQ-006.1.3). # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES @@ -313,6 +337,16 @@ def test_get_configured_reviews_tool_is_registered(self, tmp_path: Path) -> None ) assert "get_configured_reviews" in _get_tool_names(server) + def test_get_active_workflow_tool_is_registered(self, tmp_path: Path) -> None: + """get_active_workflow is registered on the MCP server.""" + from deepwork.jobs.mcp.server import create_server + + server = create_server( + project_root=tmp_path, + enable_quality_gate=False, + ) + assert "get_active_workflow" in _get_tool_names(server) + @pytest.mark.usefixtures("without_standard_schemas") class TestGetConfiguredReviews: