Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions dev-suite/src/agents/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

from pydantic import BaseModel, Field

from ..tools.github_fetch import fetch_refs_as_context_items

logger = logging.getLogger(__name__)

# Default model for the Planner agent — lightweight and cheap.
Expand All @@ -50,6 +52,15 @@
# Session TTL in seconds (30 minutes idle timeout)
SESSION_TTL_SECONDS = 30 * 60

# Issue #193: cap GitHub refs pre-fetched per Planner message. Keeps
# the context injection bounded if the user pastes a long list of refs.
PLANNER_MAX_GITHUB_REFS = 5

# Issue #193: per-ref body char budget for the Planner's pre-fetch.
# Tighter than the Architect's gather_context (2000) because the
# Planner only needs a quick orientation, not full issue bodies.
PLANNER_GITHUB_REF_MAX_CHARS = 1200


# ---------------------------------------------------------------------------
# Models
Expand Down Expand Up @@ -136,6 +147,10 @@ class TaskSpec(BaseModel):
acceptance_criteria: list[str] = Field(default_factory=list)
constraints: list[str] = Field(default_factory=list)
related_files: list[str] = Field(default_factory=list)
# Issue #193: Planner-side pre-fetched GitHub issue/PR summaries,
# shape matches `gathered_context` entries. Passed through to the
# orchestrator on submit so the Architect doesn't re-fetch.
github_context: list[dict] = Field(default_factory=list)

def to_description(self) -> str:
"""Serialize to a rich description string for the Architect."""
Expand Down Expand Up @@ -549,6 +564,34 @@ def _get_planner_model_name() -> str:
return os.getenv("PLANNER_MODEL", DEFAULT_PLANNER_MODEL)


def _format_github_context_block(github_context: list[dict]) -> str:
"""Render pre-fetched GitHub issue/PR summaries for the system prompt.

Returns a human-readable block of summaries, or an empty string
when there's nothing to show. Called from `_build_planner_messages`
so the Planner LLM can reference issue/PR content that the user
mentioned without asking the user to paste it.
"""
if not github_context:
return ""
sections: list[str] = [
"Pre-fetched GitHub references (from the user's message; "
"use these to orient the task, do not ask the user to paste "
"them):",
]
for item in github_context:
if not isinstance(item, dict):
continue
path = item.get("path") or "github://unknown"
content = (item.get("content") or "").strip()
if not content:
continue
sections.append(f"--- {path} ---\n{content}")
if len(sections) == 1:
return ""
return "\n\n".join(sections)


def _build_planner_messages(
session: PlannerSession,
) -> list[dict[str, str]]:
Expand All @@ -564,6 +607,10 @@ def _build_planner_messages(

messages: list[dict[str, str]] = [{"role": "system", "content": system_content}]

github_block = _format_github_context_block(session.task_spec.github_context)
if github_block:
messages.append({"role": "system", "content": github_block})

for msg in session.messages:
if msg.role == "system":
continue # System context is in the system prompt
Expand All @@ -573,6 +620,50 @@ def _build_planner_messages(
return messages


async def _prefetch_github_refs_for_message(
session: PlannerSession,
user_message: str,
) -> list[dict]:
"""Deterministically pre-fetch GitHub refs mentioned in the user message.

Populates `session.task_spec.github_context` with gathered_context-
shaped dicts. Dedupes against refs already in the session (so follow-up
messages that mention the same issue don't re-fetch). Best-effort:
missing GITHUB_TOKEN or network errors quietly return no new items.
"""
token = os.getenv("GITHUB_TOKEN", "")
if not token:
return []

default_owner = os.getenv("GITHUB_OWNER", "")
default_repo = os.getenv("GITHUB_REPO", "")

existing_paths = {
item.get("path") for item in session.task_spec.github_context
if isinstance(item, dict) and item.get("path")
}

items = await fetch_refs_as_context_items(
user_message,
default_owner=default_owner,
default_repo=default_repo,
token=token,
max_refs=PLANNER_MAX_GITHUB_REFS,
max_chars=PLANNER_GITHUB_REF_MAX_CHARS,
)
new_items = [
item for item in items
if item.get("path") and item["path"] not in existing_paths
]
if new_items:
session.task_spec.github_context.extend(new_items)
logger.info(
"[PLANNER] Pre-fetched %d GitHub ref(s) for session %s",
len(new_items), session.session_id,
)
return new_items


def _extract_task_spec_updates(response_text: str) -> dict[str, Any]:
"""Extract TaskSpec field updates from the Planner's JSON response.

Expand Down Expand Up @@ -621,6 +712,8 @@ def _apply_spec_updates(task_spec: TaskSpec, updates: dict[str, Any]) -> TaskSpe
continue
if field_name == "workspace":
continue # Never override workspace from LLM output
if field_name == "github_context":
continue # Populated deterministically by pre-fetch, not LLM
if value is None or value == "" or value == []:
continue

Expand Down Expand Up @@ -687,6 +780,15 @@ async def send_planner_message(
PlannerMessage(role="user", content=user_message)
)

# Issue #193: deterministic GitHub pre-fetch. Scans the user's
# message for issue/PR refs and populates task_spec.github_context
# before the LLM sees the turn so the Planner can orient itself
# without consuming tool tokens.
try:
await _prefetch_github_refs_for_message(session, user_message)
except Exception as exc: # noqa: BLE001 - best-effort pre-fetch
logger.debug("[PLANNER] GitHub pre-fetch failed: %s", exc)

# Build messages for LLM
llm_messages = _build_planner_messages(session)

Expand Down
4 changes: 4 additions & 0 deletions dev-suite/src/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,10 @@ async def submit_planner_session(
github_repo=body.github_repo if body else None,
github_branch=body.github_branch if body else None,
github_feature_branch=body.github_feature_branch if body else None,
# Issue #193: Planner pre-fetched GitHub summaries flow into
# the orchestrator so `gather_context_node` can reuse them
# instead of refetching.
prefetched_gathered_context=list(session.task_spec.github_context),
)

# Mark session as submitted
Expand Down
4 changes: 4 additions & 0 deletions dev-suite/src/api/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def submit(
github_repo: str | None = None,
github_branch: str | None = None,
github_feature_branch: str | None = None,
prefetched_gathered_context: list[dict] | None = None,
) -> None:
"""Submit a task for background execution."""
if task_id in self._tasks:
Expand All @@ -130,6 +131,7 @@ def submit(
github_repo=github_repo,
github_branch=github_branch,
github_feature_branch=github_feature_branch,
prefetched_gathered_context=prefetched_gathered_context,
)
async_task = asyncio.create_task(coro, name=f"orchestrator-{task_id}")
self._tasks[task_id] = async_task
Expand Down Expand Up @@ -170,6 +172,7 @@ async def _run_task(
github_repo: str | None = None,
github_branch: str | None = None,
github_feature_branch: str | None = None,
prefetched_gathered_context: list[dict] | None = None,
) -> None:
"""Run the orchestrator via astream() and emit SSE events."""
from .state import state_manager
Expand Down Expand Up @@ -272,6 +275,7 @@ async def _run_task(
"github_repo": github_repo,
"github_branch": github_branch,
"github_feature_branch": github_feature_branch,
"prefetched_gathered_context": prefetched_gathered_context or [],
}

stream_config = {
Expand Down
61 changes: 48 additions & 13 deletions dev-suite/src/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
parse_generated_code,
validate_paths_for_workspace,
)
from .tools.github_fetch import fetch_refs_as_context_items
from .tools.github_fetch import extract_github_refs, fetch_issue_or_pr
from .tracing import add_trace_event, create_trace_config

load_dotenv()
Expand Down Expand Up @@ -130,6 +130,7 @@ class GraphState(TypedDict, total=False):
pr_url: str | None
pr_number: int | None
gathered_context: list[dict] | None
prefetched_gathered_context: list[dict] | None
decomposition: TaskDecomposition | None
current_subtask_index: int
completed_subtasks: list[dict]
Expand Down Expand Up @@ -160,6 +161,7 @@ class AgentState(BaseModel):
pr_url: str | None = None
pr_number: int | None = None
gathered_context: list[dict] | None = None
prefetched_gathered_context: list[dict] | None = None
decomposition: TaskDecomposition | None = None
current_subtask_index: int = 0
completed_subtasks: list[dict] = []
Expand Down Expand Up @@ -764,23 +766,56 @@ def _resolve_candidate(raw: str) -> Path | None:
ordered_files, workspace_root, allowed_root=repo_root
)

# Source 4: GitHub issue/PR pre-fetch (issue #193).
# Source 4a: Planner-supplied pre-fetched context (issue #193 PR 2).
# The Planner's conversational pre-graph phase may have already
# fetched GitHub issue/PR summaries for refs the user mentioned.
# We fold those in here so the Architect sees them and we avoid
# a redundant second fetch in Source 4b.
prefetched_paths: set[str] = set()
prefetched_items = state.get("prefetched_gathered_context") or []
for item in prefetched_items:
if not isinstance(item, dict):
continue
path = item.get("path")
if not path or path in prefetched_paths:
continue
prefetched_paths.add(path)
gathered.append(item)
if prefetched_items:
trace.append(
f"gather_context: reused {len(prefetched_paths)} pre-fetched item(s)"
)

# Source 4b: GitHub issue/PR pre-fetch (issue #193).
# Scans the task description for refs like "issue #113",
# "fixes #42", or "owner/repo#99" and fetches their summaries so
# the Architect has the context without needing tools. Best-effort:
# missing token, network errors, and 404s are silently skipped.
github_items = await fetch_refs_as_context_items(
task_description,
default_owner=os.getenv("GITHUB_OWNER", ""),
default_repo=os.getenv("GITHUB_REPO", ""),
token=os.getenv("GITHUB_TOKEN", ""),
max_refs=5,
max_chars=2000,
)
if github_items:
gathered.extend(github_items)
# Refs already covered by prefetched_gathered_context are filtered
# out BEFORE the network call so the Planner's earlier fetch is
# not repeated.
github_token = os.getenv("GITHUB_TOKEN", "")
new_github_items: list[dict] = []
if github_token:
refs = extract_github_refs(
task_description,
default_owner=os.getenv("GITHUB_OWNER", ""),
default_repo=os.getenv("GITHUB_REPO", ""),
max_refs=5,
)
for ref in refs:
if ref.synthetic_path in prefetched_paths:
continue
item = await fetch_issue_or_pr(
ref.owner, ref.repo, ref.number,
token=github_token, max_chars=2000,
)
if item is not None:
new_github_items.append(item)
if new_github_items:
gathered.extend(new_github_items)
trace.append(
f"gather_context: pre-fetched {len(github_items)} GitHub ref(s)"
f"gather_context: pre-fetched {len(new_github_items)} GitHub ref(s)"
)

if not gathered:
Expand Down
87 changes: 87 additions & 0 deletions dev-suite/tests/test_gather_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,93 @@ async def test_github_fetch_failure_degrades_gracefully(
# No exception; empty gathered_context
assert result["gathered_context"] == []

@pytest.mark.asyncio
async def test_prefetched_context_is_reused(self, tmp_path, monkeypatch):
"""Planner-supplied prefetched GitHub items are merged into gathered."""
from unittest.mock import AsyncMock, patch

(tmp_path / ".git").mkdir()
monkeypatch.delenv("GITHUB_TOKEN", raising=False)

prefetched = [
{
"path": "github://acme/widgets/issues/42",
"content": "Issue #42: Broken login\n\nState: open",
"truncated": False,
"source": "github_issue",
}
]

state: GraphState = {
"task_description": "Please fix the login bug",
"workspace_root": str(tmp_path),
"trace": [],
"status": WorkflowStatus.PLANNING,
"prefetched_gathered_context": prefetched,
}

# No network needed — prefetched is reused, task has no refs
mock_client = AsyncMock()
with patch(
"src.tools.github_fetch.httpx.AsyncClient", return_value=mock_client
):
result = await gather_context_node(state)

gh_items = [
c for c in result["gathered_context"]
if c.get("source") == "github_issue"
]
assert len(gh_items) == 1
assert gh_items[0]["path"] == "github://acme/widgets/issues/42"
assert any("reused 1 pre-fetched" in t for t in result["trace"])

@pytest.mark.asyncio
async def test_prefetched_dedupes_with_node_fetch(
self, tmp_path, monkeypatch,
):
"""Task mentions an issue already in prefetched — node skips refetch."""
from unittest.mock import AsyncMock, patch

(tmp_path / ".git").mkdir()
monkeypatch.setenv("GITHUB_TOKEN", "t")
monkeypatch.setenv("GITHUB_OWNER", "acme")
monkeypatch.setenv("GITHUB_REPO", "widgets")

prefetched = [
{
"path": "github://acme/widgets/issues/42",
"content": "Issue #42: Broken login",
"truncated": False,
"source": "github_issue",
}
]

# AsyncClient would raise if called — ensures no refetch
mock_client = AsyncMock()
mock_client.__aenter__.side_effect = AssertionError(
"no refetch expected for already-prefetched ref"
)

state: GraphState = {
"task_description": "Please address issue #42 (acme/widgets)",
"workspace_root": str(tmp_path),
"trace": [],
"status": WorkflowStatus.PLANNING,
"prefetched_gathered_context": prefetched,
}
with patch(
"src.tools.github_fetch.httpx.AsyncClient", return_value=mock_client
):
result = await gather_context_node(state)

gh_items = [
c for c in result["gathered_context"]
if c.get("source") == "github_issue"
]
# Only the prefetched item, no duplicate
assert len(gh_items) == 1
assert gh_items[0]["path"] == "github://acme/widgets/issues/42"

@pytest.mark.asyncio
async def test_rejects_path_outside_repo_root(self, tmp_path):
(tmp_path / ".git").mkdir()
Expand Down
Loading
Loading