From c1c81fba92dc33ecc19a87906e5ffebb8e2154da Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 21:59:35 +0300 Subject: [PATCH 1/8] feat: add LLM-based reachability review stage Adds an opt-in LLM review stage (off by default, enabled via the new `--llm-reachability` flag on `openant scan`) that uses a strong model (Opus by default) to surface additional reachability signals beyond what the structural pass catches: - Likely entry points the structural analysis may miss (framework hooks, plugin/CLI registrations, message handlers). - External-input sites (HTTP request bodies, file/network reads, env/argv, stdin, untrusted IPC). - Cross-process / async data-flow indicators. Signals are advisory and *promote-only*: high-confidence entry-point signals can set `is_entry_point=True` on a unit, but no signal ever demotes a unit that the structural analysis already kept. This matches the "complements, does not replace" intent in issue #17. Output: - `llm_reachability.json` written to the scan dir with the full signal list. - Each unit gains an `llm_reachability_signals` array on the dataset. Cost & rate-limit safety: opt-in only, prompts are batched, and the client integration goes through the existing `AnthropicClient` (which respects `GlobalRateLimiter`). Refs #17. --- libs/openant-core/core/llm_reachability.py | 430 ++++++++++++++++ libs/openant-core/core/scanner.py | 83 ++++ libs/openant-core/openant/cli.py | 9 + libs/openant-core/tests/test_go_cli.py | 8 + .../tests/test_llm_reachability.py | 467 ++++++++++++++++++ 5 files changed, 997 insertions(+) create mode 100644 libs/openant-core/core/llm_reachability.py create mode 100644 libs/openant-core/tests/test_llm_reachability.py diff --git a/libs/openant-core/core/llm_reachability.py b/libs/openant-core/core/llm_reachability.py new file mode 100644 index 0000000..19334a0 --- /dev/null +++ b/libs/openant-core/core/llm_reachability.py @@ -0,0 +1,430 @@ +""" +LLM-based reachability review stage. + +A complementary, advisory pass over the parsed dataset that uses a strong +LLM (Opus by default) to surface additional reachability signals beyond +what the structural reachability analysis catches: + +- Likely entry points the structural pass missed (framework-specific + handlers, plugin registrations, lambdas, message handlers, etc.). +- External content ingestion sites (HTTP request bodies, file/network + reads, env/argv, IPC channels). +- Cross-process or async data flow indicators. + +Signals are **advisory only** — they may PROMOTE a unit's reachability +(e.g. set ``is_entry_point = True`` for a unit the structural pass didn't +flag), but they never DEMOTE a unit that structural analysis already +kept. This matches the "complements, not replaces" intent in issue #17. + +Output: +- ``analyze_reachability(...)`` returns a list of ``ReachabilitySignal`` + dicts. +- ``apply_signals(dataset, signals)`` mutates the dataset in place so each + unit gains an ``llm_reachability_signals`` field, and high-confidence + ``entry_point`` signals set ``is_entry_point = True`` on the target unit. + +Usage: + from core.llm_reachability import analyze_reachability, apply_signals + + signals = analyze_reachability(dataset, app_context=app_ctx) + apply_signals(dataset, signals) +""" + +from __future__ import annotations + +import json +import re +import sys +from dataclasses import dataclass, field, asdict +from typing import Any, Callable, Dict, List, Optional + + +# Models — matches the convention in core/analyzer.py / utilities/llm_client.py. +MODEL_PRIMARY = "claude-opus-4-20250514" +MODEL_SECONDARY = "claude-sonnet-4-20250514" + + +# Maximum number of units to send in a single LLM call. Larger batches save +# round trips but risk token-limit errors and degraded recall. +DEFAULT_BATCH_SIZE = 25 + +# Maximum bytes of code we send per unit. Trimmed to keep prompts tractable. +MAX_CODE_BYTES = 1500 + + +# --------------------------------------------------------------------------- +# Public dataclasses +# --------------------------------------------------------------------------- + + +@dataclass +class ReachabilitySignal: + """A single LLM-emitted reachability signal for one unit. + + ``kind`` is one of: + - ``entry_point`` — unit is itself a likely entry point. + - ``external_input`` — unit receives external/untrusted input. + - ``cross_process`` — unit participates in async / cross-process data flow. + + ``confidence`` is one of ``high``, ``medium``, ``low``. + """ + + unit_id: str + kind: str + confidence: str + reason: str + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +# --------------------------------------------------------------------------- +# Prompt construction +# --------------------------------------------------------------------------- + + +PROMPT_TEMPLATE = """You are a senior application-security engineer auditing +a codebase for REACHABILITY signals — places where untrusted input can enter +the system. A previous structural pass has already flagged some entry points +and reachable units; your job is to surface ADDITIONAL signals it may have +missed (framework-specific handlers, plugin/CLI registrations, message +queues, async tasks, file/network ingestion, env/argv, IPC, etc.). + +Be conservative. Only emit a signal when the code clearly indicates one of: + + - "entry_point" — this unit is itself a likely entry point reachable + by an external actor (HTTP/CLI/queue/stream handler, + scheduled task, framework lifecycle hook, etc.). + - "external_input" — this unit reads or accepts data from an external + source (request body, file, socket, env, argv, stdin, + child-process output, untrusted message, etc.). + - "cross_process" — this unit dispatches or receives data across async + / process / queue boundaries (so taint may flow in + or out via a path the static call-graph misses). + +Confidence levels: + - "high" — the code unambiguously demonstrates the pattern. + - "medium" — the pattern is present but partially obscured. + - "low" — only suggestive; emit only if you'd want a human reviewer. + +Return STRICT JSON of the form: + + {{ + "signals": [ + {{"unit_id": "", "kind": "entry_point|external_input|cross_process", + "confidence": "high|medium|low", "reason": ""}}, + ... + ] + }} + +If no signals apply, return ``{{"signals": []}}``. Do NOT wrap the JSON in +markdown fences. Do NOT include any prose outside the JSON. + +{app_context_block} + +UNITS TO REVIEW (existing structural flags shown for context — your job is to +ADD signals beyond what those already capture): + +{units_block} +""" + + +def _build_app_context_block(app_context: Optional[Dict[str, Any]]) -> str: + """Render an optional app-context section for the prompt.""" + if not app_context: + return "APPLICATION CONTEXT: (none provided)" + try: + ctx_json = json.dumps(app_context, indent=2, sort_keys=True) + except (TypeError, ValueError): + ctx_json = str(app_context) + return f"APPLICATION CONTEXT:\n{ctx_json}" + + +def _trim_code(code: str) -> str: + """Truncate a code blob so the batch fits in a reasonable prompt window.""" + if not code: + return "" + if len(code) <= MAX_CODE_BYTES: + return code + return code[:MAX_CODE_BYTES] + "\n# ...[truncated]" + + +def _unit_for_prompt(unit: Dict[str, Any]) -> Dict[str, Any]: + """Project a unit into the minimal shape we send to the LLM.""" + code_blob = "" + code = unit.get("code") or {} + if isinstance(code, dict): + code_blob = code.get("primary_code") or code.get("source") or "" + elif isinstance(code, str): + code_blob = code + + return { + "unit_id": unit.get("id", ""), + "unit_type": unit.get("unit_type", "function"), + "is_entry_point": bool(unit.get("is_entry_point", False)), + "reachable_from_entry": unit.get("reachable_from_entry"), + "code": _trim_code(code_blob), + } + + +def build_prompt( + units: List[Dict[str, Any]], + app_context: Optional[Dict[str, Any]] = None, +) -> str: + """Assemble the LLM prompt for a batch of units.""" + app_block = _build_app_context_block(app_context) + payload = [_unit_for_prompt(u) for u in units] + units_block = json.dumps(payload, indent=2) + return PROMPT_TEMPLATE.format( + app_context_block=app_block, + units_block=units_block, + ) + + +# --------------------------------------------------------------------------- +# Response parsing +# --------------------------------------------------------------------------- + + +_VALID_KINDS = {"entry_point", "external_input", "cross_process"} +_VALID_CONFIDENCES = {"high", "medium", "low"} + + +def _extract_json(text: str) -> Optional[Dict[str, Any]]: + """Best-effort JSON extraction from a model response. + + Strips common markdown fences and falls back to the first ``{...}`` + block in the text. Returns ``None`` if nothing valid is found. + """ + if not text: + return None + cleaned = text.strip() + + # Strip ```json ... ``` or ``` ... ``` fences. + fence = re.match( + r"^```(?:json)?\s*(?P.*?)\s*```\s*$", + cleaned, + re.DOTALL | re.IGNORECASE, + ) + if fence: + cleaned = fence.group("body").strip() + + try: + return json.loads(cleaned) + except json.JSONDecodeError: + pass + + # Fall back to the first balanced JSON object in the response. + start = cleaned.find("{") + end = cleaned.rfind("}") + if start != -1 and end > start: + snippet = cleaned[start : end + 1] + try: + return json.loads(snippet) + except json.JSONDecodeError: + return None + return None + + +def parse_response( + response_text: str, + valid_unit_ids: Optional[set] = None, + on_error: Optional[Callable[[str], None]] = None, +) -> List[ReachabilitySignal]: + """Parse a single LLM response into validated ``ReachabilitySignal``s. + + Malformed entries are skipped (not raised); the optional ``on_error`` + callback receives a one-line description per skipped item, useful for + logging. + """ + log = on_error or (lambda msg: print(f"[LLMReach] {msg}", file=sys.stderr)) + + data = _extract_json(response_text) + if not isinstance(data, dict): + log("malformed response: not a JSON object — skipping batch") + return [] + + raw_signals = data.get("signals") + if not isinstance(raw_signals, list): + log("malformed response: 'signals' missing or not a list — skipping batch") + return [] + + out: List[ReachabilitySignal] = [] + for idx, item in enumerate(raw_signals): + if not isinstance(item, dict): + log(f"signal #{idx}: not an object — skipped") + continue + unit_id = item.get("unit_id") + kind = item.get("kind") + confidence = item.get("confidence") + reason = item.get("reason", "") + + if not isinstance(unit_id, str) or not unit_id: + log(f"signal #{idx}: missing unit_id — skipped") + continue + if kind not in _VALID_KINDS: + log(f"signal #{idx}: invalid kind {kind!r} — skipped") + continue + if confidence not in _VALID_CONFIDENCES: + log(f"signal #{idx}: invalid confidence {confidence!r} — skipped") + continue + if valid_unit_ids is not None and unit_id not in valid_unit_ids: + log(f"signal #{idx}: unknown unit_id {unit_id!r} — skipped") + continue + + out.append( + ReachabilitySignal( + unit_id=unit_id, + kind=kind, + confidence=confidence, + reason=str(reason)[:500], + ) + ) + return out + + +# --------------------------------------------------------------------------- +# Main entry points +# --------------------------------------------------------------------------- + + +def _chunk(items: List[Any], size: int) -> List[List[Any]]: + return [items[i : i + size]] if size <= 0 else [ + items[i : i + size] for i in range(0, len(items), size) + ] + + +def analyze_reachability( + dataset: Dict[str, Any], + app_context: Optional[Dict[str, Any]] = None, + client: Any = None, + model: str = MODEL_PRIMARY, + batch_size: int = DEFAULT_BATCH_SIZE, + max_units: Optional[int] = None, + on_error: Optional[Callable[[str], None]] = None, +) -> List[ReachabilitySignal]: + """Run the LLM reachability review stage over a parsed dataset. + + Args: + dataset: Parsed dataset with a ``units`` list, as produced by the + parser stage. Units are expected to expose ``id``, ``code``, and + optionally ``is_entry_point`` / ``reachable_from_entry``. + app_context: Optional application context dict; included in the + prompt to help the model reason about expected entry points + (e.g. ``{"application_type": "web_app"}``). + client: An object exposing ``analyze_sync(prompt, max_tokens=..., + model=...)``. If omitted, an :class:`AnthropicClient` is + instantiated lazily. + model: Model id to use (defaults to Opus). + batch_size: Units per LLM call. + max_units: Optional cap on how many units to review. + on_error: Optional callback for parse/validation issues. + + Returns: + A flat list of :class:`ReachabilitySignal` for every unit the model + flagged. Unknown unit ids and malformed entries are filtered out. + """ + units = dataset.get("units") or [] + if max_units is not None and max_units >= 0: + units = units[:max_units] + if not units: + return [] + + if client is None: + # Lazy import so unit tests can stub this out without an API key. + from utilities.llm_client import AnthropicClient + + client = AnthropicClient(model=model) + + valid_ids = {u.get("id") for u in units if u.get("id")} + + signals: List[ReachabilitySignal] = [] + batches = _chunk(units, batch_size) + for i, batch in enumerate(batches): + prompt = build_prompt(batch, app_context=app_context) + try: + text = client.analyze_sync(prompt, max_tokens=4096, model=model) + except Exception as exc: # noqa: BLE001 — advisory stage; never crash pipeline + msg = f"batch {i + 1}/{len(batches)} failed: {exc}" + if on_error: + on_error(msg) + else: + print(f"[LLMReach] {msg}", file=sys.stderr) + continue + + parsed = parse_response( + text, valid_unit_ids=valid_ids, on_error=on_error + ) + signals.extend(parsed) + + return signals + + +# --------------------------------------------------------------------------- +# Signal application (promote-only) +# --------------------------------------------------------------------------- + + +# Confidences at or above this threshold promote ``entry_point`` signals to +# ``is_entry_point = True`` on the target unit. +_PROMOTE_ENTRY_POINT_AT = {"high"} + + +def apply_signals( + dataset: Dict[str, Any], + signals: List[ReachabilitySignal], +) -> Dict[str, int]: + """Merge LLM signals back into ``dataset`` (in place, promote-only). + + For each unit referenced by a signal: + - The signal is appended to a per-unit ``llm_reachability_signals`` list. + - If the signal kind is ``entry_point`` AND its confidence is in + :data:`_PROMOTE_ENTRY_POINT_AT`, the unit's ``is_entry_point`` field + is set to ``True`` (never set back to ``False``). + + Crucially, this never DEMOTES a unit. ``is_entry_point=True`` set by the + structural pass remains true regardless of what the LLM said. + + Returns a small summary dict:: + + { + "signals_applied": , + "entry_points_promoted": , + "units_touched": , + } + """ + units = dataset.get("units") or [] + by_id = {u.get("id"): u for u in units if u.get("id")} + + promoted = 0 + touched: set = set() + applied = 0 + + for sig in signals: + unit = by_id.get(sig.unit_id) + if unit is None: + continue + + existing = unit.setdefault("llm_reachability_signals", []) + existing.append(sig.to_dict()) + applied += 1 + touched.add(sig.unit_id) + + if ( + sig.kind == "entry_point" + and sig.confidence in _PROMOTE_ENTRY_POINT_AT + and not unit.get("is_entry_point", False) + ): + unit["is_entry_point"] = True + promoted += 1 + + return { + "signals_applied": applied, + "entry_points_promoted": promoted, + "units_touched": len(touched), + } + + +def signals_to_json(signals: List[ReachabilitySignal]) -> List[Dict[str, Any]]: + """Serialize a list of signals for JSON persistence.""" + return [s.to_dict() for s in signals] diff --git a/libs/openant-core/core/scanner.py b/libs/openant-core/core/scanner.py index 2eba6ee..2f3a555 100644 --- a/libs/openant-core/core/scanner.py +++ b/libs/openant-core/core/scanner.py @@ -60,6 +60,7 @@ def scan_repository( repo_url: str | None = None, commit_sha: str | None = None, diff_manifest: str | None = None, + llm_reachability: bool = False, ) -> ScanResult: """Scan a repository for vulnerabilities. @@ -107,6 +108,7 @@ def scan_repository( # Count total steps for progress display total_steps = _count_steps( generate_context, enhance, verify, generate_report, dynamic_test, + llm_reachability=llm_reachability, ) step_num = 0 @@ -171,6 +173,84 @@ def _step_label(name: str) -> str: # Active dataset path — may be updated by enhance step active_dataset_path = parse_result.dataset_path + # --------------------------------------------------------------- + # Step 1.5: LLM Reachability review (optional, opt-in) + # --------------------------------------------------------------- + # Runs after structural reachability (parse) and before enhance/analyze. + # Signals are advisory and PROMOTE-ONLY: they may flag additional entry + # points or external-input sites the structural pass missed, but never + # demote a unit that structural analysis already kept. + if llm_reachability: + from core.llm_reachability import ( + analyze_reachability, + apply_signals, + signals_to_json, + ) + + print(_step_label("Running LLM reachability review..."), file=sys.stderr) + + with step_context("llm-reachability", output_dir, inputs={ + "dataset_path": active_dataset_path, + "model": "opus", + }) as ctx: + try: + with open(active_dataset_path, encoding="utf-8") as f: + dataset = json.load(f) + except (OSError, json.JSONDecodeError) as exc: + print(f" WARNING: failed to load dataset: {exc}", file=sys.stderr) + ctx.summary = {"skipped": True, "reason": str(exc)} + dataset = None + + if dataset is not None: + app_ctx_payload = None + if app_context_path and os.path.exists(app_context_path): + try: + with open(app_context_path, encoding="utf-8") as f: + app_ctx_payload = json.load(f) + except (OSError, json.JSONDecodeError): + app_ctx_payload = None + + signals = analyze_reachability( + dataset=dataset, + app_context=app_ctx_payload, + max_units=limit, + ) + summary = apply_signals(dataset, signals) + + # Persist mutated dataset (so downstream stages see the + # promoted entry points and the per-unit signals). + with open(active_dataset_path, "w", encoding="utf-8") as f: + json.dump(dataset, f, indent=2) + + signals_path = os.path.join(output_dir, "llm_reachability.json") + with open(signals_path, "w", encoding="utf-8") as f: + json.dump( + {"signals": signals_to_json(signals)}, + f, + indent=2, + ) + + ctx.summary = { + "units_reviewed": len(dataset.get("units", [])), + "signals_added": summary["signals_applied"], + "entry_points_promoted": summary["entry_points_promoted"], + "units_touched": summary["units_touched"], + } + ctx.outputs = {"signals_path": signals_path} + + print( + f" LLM reachability: {summary['signals_applied']} signals, " + f"{summary['entry_points_promoted']} new entry points", + file=sys.stderr, + ) + + collected_step_reports.append( + _load_step_report(output_dir, "llm-reachability") + ) + else: + result.skipped_steps.append("llm-reachability") + print(file=sys.stderr) + # --------------------------------------------------------------- # Step 2: Application Context (optional) # --------------------------------------------------------------- @@ -522,6 +602,7 @@ def _count_steps( verify: bool, generate_report: bool, dynamic_test: bool, + llm_reachability: bool = False, ) -> int: """Count total steps for progress display (always includes parse, detect, build-output).""" count = 3 # parse + detect + build-output (always run) @@ -535,6 +616,8 @@ def _count_steps( count += 1 if dynamic_test: count += 1 + if llm_reachability: + count += 1 return count diff --git a/libs/openant-core/openant/cli.py b/libs/openant-core/openant/cli.py index e521b22..2d60074 100644 --- a/libs/openant-core/openant/cli.py +++ b/libs/openant-core/openant/cli.py @@ -75,6 +75,7 @@ def cmd_scan(args): repo_url=getattr(args, "repo_url", None), commit_sha=getattr(args, "commit_sha", None), diff_manifest=getattr(args, "diff_manifest", None), + llm_reachability=getattr(args, "llm_reachability", False), ) scan_payload = result.to_dict() @@ -988,6 +989,14 @@ def main(): scan_p.add_argument("--backoff", type=int, default=30, help="Seconds to wait when rate-limited (default: 30)") scan_p.add_argument("--diff-manifest", help="Path to diff_manifest.json for incremental scanning") + scan_p.add_argument( + "--llm-reachability", + action="store_true", + dest="llm_reachability", + help="Enable the LLM reachability review stage (Opus). " + "Surfaces additional entry points and external-input sites " + "beyond the structural pass. Off by default (cost-controlled).", + ) scan_p.set_defaults(func=cmd_scan) # --------------------------------------------------------------- diff --git a/libs/openant-core/tests/test_go_cli.py b/libs/openant-core/tests/test_go_cli.py index 42ad294..0101748 100644 --- a/libs/openant-core/tests/test_go_cli.py +++ b/libs/openant-core/tests/test_go_cli.py @@ -79,6 +79,14 @@ def test_scan_help(self): output = result.stdout + result.stderr assert "pipeline" in output.lower() + def test_scan_help_advertises_llm_reachability(self): + """The opt-in --llm-reachability flag (issue #17) should be discoverable + from `openant scan --help`.""" + result = run_cli("scan", "--help") + assert result.returncode == 0 + output = result.stdout + result.stderr + assert "llm-reachability" in output.lower() + class TestParse: def test_parse_python_repo(self, sample_python_repo, tmp_path): diff --git a/libs/openant-core/tests/test_llm_reachability.py b/libs/openant-core/tests/test_llm_reachability.py new file mode 100644 index 0000000..801fe42 --- /dev/null +++ b/libs/openant-core/tests/test_llm_reachability.py @@ -0,0 +1,467 @@ +"""Tests for the LLM reachability review stage (issue #17). + +The stage is opt-in and advisory: signals may PROMOTE a unit's +reachability but never demote one that the structural analysis kept. +These tests pin that behavior down with a fully mocked LLM client so they +run without network access or an API key. +""" + +from __future__ import annotations + +import json +from typing import List + +import pytest + +from core.llm_reachability import ( + ReachabilitySignal, + analyze_reachability, + apply_signals, + build_prompt, + parse_response, + signals_to_json, +) + + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + + +class FakeClient: + """Minimal stand-in for AnthropicClient. + + Records calls and replays a fixed sequence of canned responses. + """ + + def __init__(self, responses: List[str]): + self._responses = list(responses) + self.calls: List[dict] = [] + + def analyze_sync(self, prompt: str, max_tokens: int = 4096, model: str = ""): + self.calls.append( + {"prompt": prompt, "max_tokens": max_tokens, "model": model} + ) + if not self._responses: + return '{"signals": []}' + return self._responses.pop(0) + + +def _make_unit(unit_id: str, code: str = "pass", **kw) -> dict: + unit = { + "id": unit_id, + "unit_type": kw.pop("unit_type", "function"), + "code": {"primary_code": code}, + } + unit.update(kw) + return unit + + +# --------------------------------------------------------------------------- +# parse_response +# --------------------------------------------------------------------------- + + +class TestParseResponse: + def test_parses_well_formed_signal(self): + text = json.dumps( + { + "signals": [ + { + "unit_id": "app.py:handler", + "kind": "entry_point", + "confidence": "high", + "reason": "Express handler", + } + ] + } + ) + sigs = parse_response(text, valid_unit_ids={"app.py:handler"}) + assert len(sigs) == 1 + assert sigs[0].unit_id == "app.py:handler" + assert sigs[0].kind == "entry_point" + assert sigs[0].confidence == "high" + assert "Express" in sigs[0].reason + + def test_strips_markdown_fences(self): + text = "```json\n" + json.dumps( + {"signals": [ + {"unit_id": "x.py:f", "kind": "external_input", + "confidence": "medium", "reason": "reads argv"}]} + ) + "\n```" + sigs = parse_response(text, valid_unit_ids={"x.py:f"}) + assert len(sigs) == 1 + assert sigs[0].kind == "external_input" + + def test_falls_back_to_first_object(self): + text = "Sure! Here you go:\n" + json.dumps( + {"signals": [ + {"unit_id": "a.py:g", "kind": "cross_process", + "confidence": "low", "reason": "queue"}]} + ) + "\nEnd." + sigs = parse_response(text, valid_unit_ids={"a.py:g"}) + assert len(sigs) == 1 + + def test_malformed_json_returns_empty(self): + errors: List[str] = [] + sigs = parse_response( + "not json at all", + valid_unit_ids={"x"}, + on_error=errors.append, + ) + assert sigs == [] + assert any("malformed" in e for e in errors) + + def test_invalid_kind_skipped(self): + text = json.dumps( + {"signals": [ + {"unit_id": "x.py:f", "kind": "garbage", + "confidence": "high", "reason": "n/a"}]} + ) + errors: List[str] = [] + sigs = parse_response( + text, valid_unit_ids={"x.py:f"}, on_error=errors.append + ) + assert sigs == [] + assert any("invalid kind" in e for e in errors) + + def test_unknown_unit_id_skipped(self): + text = json.dumps( + {"signals": [ + {"unit_id": "ghost.py:f", "kind": "entry_point", + "confidence": "high", "reason": "n/a"}]} + ) + errors: List[str] = [] + sigs = parse_response( + text, valid_unit_ids={"real.py:f"}, on_error=errors.append + ) + assert sigs == [] + + def test_signals_not_a_list_returns_empty(self): + text = json.dumps({"signals": "nope"}) + errors: List[str] = [] + sigs = parse_response(text, on_error=errors.append) + assert sigs == [] + + +# --------------------------------------------------------------------------- +# build_prompt / app_context threading +# --------------------------------------------------------------------------- + + +class TestBuildPrompt: + def test_includes_unit_ids_and_code(self): + units = [_make_unit("app.py:handler", code="def handler(): ...")] + prompt = build_prompt(units) + assert "app.py:handler" in prompt + assert "def handler()" in prompt + + def test_no_app_context_marker(self): + prompt = build_prompt([_make_unit("a:f")]) + assert "(none provided)" in prompt + + def test_includes_app_context_when_provided(self): + ctx = {"application_type": "web_app", "framework": "Express"} + prompt = build_prompt([_make_unit("a:f")], app_context=ctx) + assert "web_app" in prompt + assert "Express" in prompt + + def test_truncates_overly_long_code(self): + big = "x = 1\n" * 5000 + prompt = build_prompt([_make_unit("a:f", code=big)]) + assert "[truncated]" in prompt + + +# --------------------------------------------------------------------------- +# analyze_reachability — full call with a mocked client +# --------------------------------------------------------------------------- + + +class TestAnalyzeReachability: + def test_parses_signals_from_mocked_llm(self): + dataset = { + "units": [ + _make_unit("app.py:handler"), + _make_unit("util.py:helper"), + ] + } + canned = json.dumps( + { + "signals": [ + { + "unit_id": "app.py:handler", + "kind": "entry_point", + "confidence": "high", + "reason": "Express handler", + }, + { + "unit_id": "util.py:helper", + "kind": "external_input", + "confidence": "medium", + "reason": "reads file", + }, + ] + } + ) + client = FakeClient([canned]) + signals = analyze_reachability(dataset, client=client) + assert len(signals) == 2 + assert {s.kind for s in signals} == {"entry_point", "external_input"} + assert len(client.calls) == 1 + + def test_app_context_threaded_into_prompt(self): + dataset = {"units": [_make_unit("a:f")]} + client = FakeClient(['{"signals": []}']) + ctx = {"application_type": "web_app", "framework": "Flask"} + analyze_reachability(dataset, app_context=ctx, client=client) + assert "Flask" in client.calls[0]["prompt"] + assert "web_app" in client.calls[0]["prompt"] + + def test_malformed_response_handled_gracefully(self): + dataset = {"units": [_make_unit("a:f")]} + errors: List[str] = [] + client = FakeClient(["this is not JSON"]) + sigs = analyze_reachability( + dataset, client=client, on_error=errors.append + ) + assert sigs == [] + assert errors # at least one error logged + + def test_empty_dataset_returns_empty(self): + client = FakeClient([]) + sigs = analyze_reachability({"units": []}, client=client) + assert sigs == [] + assert client.calls == [] # no LLM calls when nothing to review + + def test_batch_size_chunks_units(self): + dataset = {"units": [_make_unit(f"a:{i}") for i in range(7)]} + client = FakeClient(['{"signals": []}'] * 5) + analyze_reachability(dataset, client=client, batch_size=3) + # 7 units / 3 per batch = 3 calls + assert len(client.calls) == 3 + + def test_client_exception_does_not_crash(self): + class Boom: + def analyze_sync(self, *a, **kw): + raise RuntimeError("api boom") + + errors: List[str] = [] + sigs = analyze_reachability( + {"units": [_make_unit("a:f")]}, + client=Boom(), + on_error=errors.append, + ) + assert sigs == [] + assert any("api boom" in e for e in errors) + + +# --------------------------------------------------------------------------- +# apply_signals — promote-only semantics +# --------------------------------------------------------------------------- + + +class TestApplySignals: + def test_high_confidence_entry_point_promotes(self): + dataset = {"units": [_make_unit("a:f", is_entry_point=False)]} + sigs = [ + ReachabilitySignal("a:f", "entry_point", "high", "framework hook") + ] + summary = apply_signals(dataset, sigs) + assert dataset["units"][0]["is_entry_point"] is True + assert summary["entry_points_promoted"] == 1 + assert summary["signals_applied"] == 1 + assert summary["units_touched"] == 1 + + def test_medium_confidence_does_not_promote(self): + dataset = {"units": [_make_unit("a:f", is_entry_point=False)]} + sigs = [ + ReachabilitySignal("a:f", "entry_point", "medium", "maybe") + ] + summary = apply_signals(dataset, sigs) + assert dataset["units"][0]["is_entry_point"] is False + assert summary["entry_points_promoted"] == 0 + # but the signal is still attached for the reviewer + assert summary["signals_applied"] == 1 + + def test_external_input_does_not_set_entry_point(self): + dataset = {"units": [_make_unit("a:f", is_entry_point=False)]} + sigs = [ + ReachabilitySignal("a:f", "external_input", "high", "argv") + ] + apply_signals(dataset, sigs) + # external_input never sets is_entry_point regardless of confidence + assert dataset["units"][0]["is_entry_point"] is False + + def test_does_not_demote_existing_entry_point(self): + """Crucial promote-only invariant: a unit the structural pass + already marked as an entry point must never be unmarked, even if + the LLM emits no signal (or a low-confidence one) for it.""" + dataset = {"units": [_make_unit("a:f", is_entry_point=True)]} + # Empty signal list — apply_signals must not flip the flag. + apply_signals(dataset, []) + assert dataset["units"][0]["is_entry_point"] is True + + # Even a stray "low" entry_point signal must not flip it back. + sigs = [ReachabilitySignal("a:f", "entry_point", "low", "weak")] + apply_signals(dataset, sigs) + assert dataset["units"][0]["is_entry_point"] is True + + def test_signal_attached_to_unit(self): + dataset = {"units": [_make_unit("a:f")]} + sigs = [ + ReachabilitySignal("a:f", "external_input", "medium", "reads stdin") + ] + apply_signals(dataset, sigs) + unit = dataset["units"][0] + assert "llm_reachability_signals" in unit + assert len(unit["llm_reachability_signals"]) == 1 + attached = unit["llm_reachability_signals"][0] + assert attached["kind"] == "external_input" + assert attached["reason"] == "reads stdin" + + def test_multiple_signals_accumulate_on_same_unit(self): + dataset = {"units": [_make_unit("a:f")]} + sigs = [ + ReachabilitySignal("a:f", "external_input", "medium", "argv"), + ReachabilitySignal("a:f", "cross_process", "low", "queue"), + ] + apply_signals(dataset, sigs) + attached = dataset["units"][0]["llm_reachability_signals"] + assert len(attached) == 2 + + def test_unknown_unit_id_skipped(self): + dataset = {"units": [_make_unit("a:f")]} + sigs = [ReachabilitySignal("ghost:x", "entry_point", "high", "n/a")] + summary = apply_signals(dataset, sigs) + assert summary["signals_applied"] == 0 + assert summary["entry_points_promoted"] == 0 + + +class TestSerialization: + def test_signals_to_json_roundtrip(self): + sigs = [ + ReachabilitySignal("a:f", "entry_point", "high", "r1"), + ReachabilitySignal("b:g", "external_input", "low", "r2"), + ] + out = signals_to_json(sigs) + assert isinstance(out, list) + assert all(isinstance(item, dict) for item in out) + # Round-trips through JSON cleanly. + json.loads(json.dumps(out)) + + +# --------------------------------------------------------------------------- +# CLI flag plumbing — mock scan_repository to confirm wiring without API +# --------------------------------------------------------------------------- + + +class TestCliPlumbing: + """Confirms that the --llm-reachability flag exists in scan --help and + that, by default (no flag), the LLM reachability path is not invoked. + + These tests exercise the Python CLI directly (no Go binary required), so + they always run in the basic pytest suite. + """ + + def test_flag_appears_in_scan_help(self, capsys): + from openant.cli import main + + with pytest.raises(SystemExit): + import sys + old = sys.argv + try: + sys.argv = ["openant", "scan", "--help"] + main() + finally: + sys.argv = old + out = capsys.readouterr().out + capsys.readouterr().err + assert "--llm-reachability" in out + + def test_default_does_not_invoke_llm_reachability(self, monkeypatch, tmp_path): + """When --llm-reachability is NOT passed, ``analyze_reachability`` in + the scanner module must not be called. + + We achieve this by monkey-patching ``scan_repository`` to a stub + that records its kwargs, then driving ``cmd_scan`` through it. + """ + captured = {} + + from openant import cli as cli_mod + + def fake_scan(**kwargs): + captured.update(kwargs) + from core.schemas import ScanResult + r = ScanResult(output_dir=str(tmp_path)) + return r + + monkeypatch.setattr( + "core.scanner.scan_repository", fake_scan, raising=True + ) + + # Drive cmd_scan via argparse + import argparse + ns = argparse.Namespace( + repo=str(tmp_path), + output=str(tmp_path / "out"), + language="auto", + level="reachable", + verify=False, + no_context=True, + no_enhance=True, + enhance_mode="agentic", + no_report=True, + dynamic_test=False, + no_skip_tests=False, + limit=None, + model="opus", + workers=1, + repo_name=None, + repo_url=None, + commit_sha=None, + backoff=30, + diff_manifest=None, + llm_reachability=False, + ) + rc = cli_mod.cmd_scan(ns) + # rc 0 or 1 acceptable; we only care about plumbing. + assert rc in (0, 1) + assert captured.get("llm_reachability") is False + + def test_flag_passes_through_when_set(self, monkeypatch, tmp_path): + captured = {} + from openant import cli as cli_mod + + def fake_scan(**kwargs): + captured.update(kwargs) + from core.schemas import ScanResult + return ScanResult(output_dir=str(tmp_path)) + + monkeypatch.setattr( + "core.scanner.scan_repository", fake_scan, raising=True + ) + + import argparse + ns = argparse.Namespace( + repo=str(tmp_path), + output=str(tmp_path / "out"), + language="auto", + level="reachable", + verify=False, + no_context=True, + no_enhance=True, + enhance_mode="agentic", + no_report=True, + dynamic_test=False, + no_skip_tests=False, + limit=None, + model="opus", + workers=1, + repo_name=None, + repo_url=None, + commit_sha=None, + backoff=30, + diff_manifest=None, + llm_reachability=True, + ) + cli_mod.cmd_scan(ns) + assert captured.get("llm_reachability") is True From a78481b14e9cb4ced4a9011c6dea36c779b963b4 Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 22:18:03 +0300 Subject: [PATCH 2/8] fix: add --llm-reachability flag to Go scan command The Python CLI defines --llm-reachability for the LLM reachability stage (issue #17), but the Go CLI proxy did not expose it. The test TestHelp::test_scan_help_advertises_llm_reachability inspects 'openant scan --help' (Go cobra output) and was failing on all 3 OS targets. Register --llm-reachability as a Bool flag on the Go scan command and pass it through to the Python invocation when set. --- apps/openant-cli/cmd/scan.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/openant-cli/cmd/scan.go b/apps/openant-cli/cmd/scan.go index 2a646b5..c41206b 100644 --- a/apps/openant-cli/cmd/scan.go +++ b/apps/openant-cli/cmd/scan.go @@ -51,6 +51,7 @@ var ( scanDiffBase string scanPR int scanDiffScope string + scanLLMReachability bool ) func init() { @@ -79,6 +80,7 @@ func registerScanFlags(cmd *cobra.Command) { cmd.Flags().StringVar(&scanDiffBase, "diff-base", "", "Incremental mode: filter pipeline to units overlapping diff vs this ref (e.g. origin/main, HEAD~5)") cmd.Flags().IntVar(&scanPR, "pr", 0, "Incremental mode against a GitHub PR number (requires gh; mutex with --diff-base)") cmd.Flags().StringVar(&scanDiffScope, "diff-scope", "changed_functions", "Diff scope: changed_files, changed_functions, callers") + cmd.Flags().BoolVar(&scanLLMReachability, "llm-reachability", false, "Enable the LLM reachability review stage (Opus). Surfaces additional entry points and external-input sites beyond the structural pass. Off by default (cost-controlled).") } func runScan(cmd *cobra.Command, args []string) { @@ -197,6 +199,9 @@ func runScan(cmd *cobra.Command, args []string) { if manifestPath != "" { pyArgs = append(pyArgs, "--diff-manifest", manifestPath) } + if scanLLMReachability { + pyArgs = append(pyArgs, "--llm-reachability") + } // Pass repository metadata from project context so reports don't show // [NOT PROVIDED] placeholders. From 6a5f5980427fe4de8394af603dc6ae34ea78de64 Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 23:01:50 +0300 Subject: [PATCH 3/8] fix: address review findings on LLM reachability stage - scanner.py: forward-declare app_context_path before step 1.5 so the LLM reachability block doesn't hit a NameError when --llm-reachability is enabled (the block ran before the app-context step that defined it). - llm_reachability._chunk: non-positive batch_size used to reference an unbound loop variable; now collapses to a single batch covering all items. Adds a regression test. - Help text (Python CLI + Go CLI): note that --llm-reachability may incur additional LLM cost, per cost-safety review. --- apps/openant-cli/cmd/scan.go | 2 +- libs/openant-core/core/llm_reachability.py | 11 ++++++++--- libs/openant-core/core/scanner.py | 6 +++++- libs/openant-core/openant/cli.py | 3 ++- libs/openant-core/tests/test_llm_reachability.py | 9 +++++++++ 5 files changed, 25 insertions(+), 6 deletions(-) diff --git a/apps/openant-cli/cmd/scan.go b/apps/openant-cli/cmd/scan.go index c41206b..deb2390 100644 --- a/apps/openant-cli/cmd/scan.go +++ b/apps/openant-cli/cmd/scan.go @@ -80,7 +80,7 @@ func registerScanFlags(cmd *cobra.Command) { cmd.Flags().StringVar(&scanDiffBase, "diff-base", "", "Incremental mode: filter pipeline to units overlapping diff vs this ref (e.g. origin/main, HEAD~5)") cmd.Flags().IntVar(&scanPR, "pr", 0, "Incremental mode against a GitHub PR number (requires gh; mutex with --diff-base)") cmd.Flags().StringVar(&scanDiffScope, "diff-scope", "changed_functions", "Diff scope: changed_files, changed_functions, callers") - cmd.Flags().BoolVar(&scanLLMReachability, "llm-reachability", false, "Enable the LLM reachability review stage (Opus). Surfaces additional entry points and external-input sites beyond the structural pass. Off by default (cost-controlled).") + cmd.Flags().BoolVar(&scanLLMReachability, "llm-reachability", false, "Enable the LLM reachability review stage (Opus). Surfaces additional entry points and external-input sites beyond the structural pass. Off by default — enabling this may incur additional LLM cost (one Opus call per ~25 units).") } func runScan(cmd *cobra.Command, args []string) { diff --git a/libs/openant-core/core/llm_reachability.py b/libs/openant-core/core/llm_reachability.py index 19334a0..dccda34 100644 --- a/libs/openant-core/core/llm_reachability.py +++ b/libs/openant-core/core/llm_reachability.py @@ -289,9 +289,14 @@ def parse_response( def _chunk(items: List[Any], size: int) -> List[List[Any]]: - return [items[i : i + size]] if size <= 0 else [ - items[i : i + size] for i in range(0, len(items), size) - ] + """Split ``items`` into batches of ``size``. + + A non-positive ``size`` is treated as "everything in one batch" so callers + that disable batching never hit a NameError or empty-output surprise. + """ + if size <= 0: + return [list(items)] if items else [] + return [items[i : i + size] for i in range(0, len(items), size)] def analyze_reachability( diff --git a/libs/openant-core/core/scanner.py b/libs/openant-core/core/scanner.py index 2f3a555..983fc0b 100644 --- a/libs/openant-core/core/scanner.py +++ b/libs/openant-core/core/scanner.py @@ -173,6 +173,11 @@ def _step_label(name: str) -> str: # Active dataset path — may be updated by enhance step active_dataset_path = parse_result.dataset_path + # Forward-declared so step 1.5 (LLM reachability) can reference it before + # step 2 (app-context) populates it. The LLM reachability block uses + # app_context only if a file already exists on disk from a prior run. + app_context_path: str | None = None + # --------------------------------------------------------------- # Step 1.5: LLM Reachability review (optional, opt-in) # --------------------------------------------------------------- @@ -254,7 +259,6 @@ def _step_label(name: str) -> str: # --------------------------------------------------------------- # Step 2: Application Context (optional) # --------------------------------------------------------------- - app_context_path = None if generate_context and HAS_APP_CONTEXT: print(_step_label("Generating application context..."), file=sys.stderr) diff --git a/libs/openant-core/openant/cli.py b/libs/openant-core/openant/cli.py index 2d60074..d60dd1a 100644 --- a/libs/openant-core/openant/cli.py +++ b/libs/openant-core/openant/cli.py @@ -995,7 +995,8 @@ def main(): dest="llm_reachability", help="Enable the LLM reachability review stage (Opus). " "Surfaces additional entry points and external-input sites " - "beyond the structural pass. Off by default (cost-controlled).", + "beyond the structural pass. Off by default — enabling this " + "may incur additional LLM cost (one Opus call per ~25 units).", ) scan_p.set_defaults(func=cmd_scan) diff --git a/libs/openant-core/tests/test_llm_reachability.py b/libs/openant-core/tests/test_llm_reachability.py index 801fe42..627d084 100644 --- a/libs/openant-core/tests/test_llm_reachability.py +++ b/libs/openant-core/tests/test_llm_reachability.py @@ -240,6 +240,15 @@ def test_batch_size_chunks_units(self): # 7 units / 3 per batch = 3 calls assert len(client.calls) == 3 + def test_non_positive_batch_size_uses_single_batch(self): + """``batch_size <= 0`` historically tripped a NameError. Guard the + contract: non-positive size collapses to a single batch covering all + units (and never raises).""" + dataset = {"units": [_make_unit(f"a:{i}") for i in range(4)]} + client = FakeClient(['{"signals": []}']) + analyze_reachability(dataset, client=client, batch_size=0) + assert len(client.calls) == 1 + def test_client_exception_does_not_crash(self): class Boom: def analyze_sync(self, *a, **kw): From a55301d9bc66112b98ed061d9734dff37fee818e Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Mon, 4 May 2026 23:03:50 +0300 Subject: [PATCH 4/8] refactor: run LLM reachability after app-context, not before MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The LLM reachability stage threads app_context into its prompt to help the model reason about expected entry points (web_app vs cli_tool, etc). The previous ordering ran it before app-context generation, so the app_context_path was always None at the call site — the prompt threading silently no-op'd. Reordering the steps makes the threading actually work. This also retires the temporary forward-declaration introduced in the previous commit; app_context_path is now defined naturally by the preceding step before the LLM reachability block reads it. --- libs/openant-core/core/scanner.py | 80 +++++++++++++++---------------- 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/libs/openant-core/core/scanner.py b/libs/openant-core/core/scanner.py index 983fc0b..ef246f7 100644 --- a/libs/openant-core/core/scanner.py +++ b/libs/openant-core/core/scanner.py @@ -173,18 +173,49 @@ def _step_label(name: str) -> str: # Active dataset path — may be updated by enhance step active_dataset_path = parse_result.dataset_path - # Forward-declared so step 1.5 (LLM reachability) can reference it before - # step 2 (app-context) populates it. The LLM reachability block uses - # app_context only if a file already exists on disk from a prior run. + # --------------------------------------------------------------- + # Step 2: Application Context (optional) + # --------------------------------------------------------------- app_context_path: str | None = None + if generate_context and HAS_APP_CONTEXT: + print(_step_label("Generating application context..."), file=sys.stderr) + + with step_context("app-context", output_dir, inputs={ + "repo_path": repo_path, + }) as ctx: + try: + context = generate_application_context(Path(repo_path)) + app_context_path = os.path.join(output_dir, "application_context.json") + save_context(context, Path(app_context_path)) + result.app_context_path = app_context_path + ctx.summary = {"application_type": context.application_type} + ctx.outputs = {"app_context_path": app_context_path} + print(f" App type: {context.application_type}", file=sys.stderr) + except Exception as e: + print(f" WARNING: App context generation failed: {e}", file=sys.stderr) + print(" Continuing without app context.", file=sys.stderr) + ctx.summary = {"skipped": True, "reason": str(e)} + + collected_step_reports.append(_load_step_report(output_dir, "app-context")) + elif generate_context: + print(_step_label("Skipping application context (module not available)."), + file=sys.stderr) + result.skipped_steps.append("app-context") + else: + print(_step_label("Skipping application context (--no-context)."), + file=sys.stderr) + result.skipped_steps.append("app-context") + print(file=sys.stderr) # --------------------------------------------------------------- - # Step 1.5: LLM Reachability review (optional, opt-in) + # Step 2.5: LLM Reachability review (optional, opt-in) # --------------------------------------------------------------- - # Runs after structural reachability (parse) and before enhance/analyze. - # Signals are advisory and PROMOTE-ONLY: they may flag additional entry - # points or external-input sites the structural pass missed, but never - # demote a unit that structural analysis already kept. + # Runs after parse + app-context and before enhance/analyze. Signals are + # advisory and PROMOTE-ONLY: they may flag additional entry points or + # external-input sites the structural pass missed, but never demote a + # unit that structural analysis already kept. Threading app_context into + # the LLM prompt helps the model reason about expected entry points + # (e.g. "this is a web_app, look for HTTP handlers"). if llm_reachability: from core.llm_reachability import ( analyze_reachability, @@ -256,39 +287,6 @@ def _step_label(name: str) -> str: result.skipped_steps.append("llm-reachability") print(file=sys.stderr) - # --------------------------------------------------------------- - # Step 2: Application Context (optional) - # --------------------------------------------------------------- - if generate_context and HAS_APP_CONTEXT: - print(_step_label("Generating application context..."), file=sys.stderr) - - with step_context("app-context", output_dir, inputs={ - "repo_path": repo_path, - }) as ctx: - try: - context = generate_application_context(Path(repo_path)) - app_context_path = os.path.join(output_dir, "application_context.json") - save_context(context, Path(app_context_path)) - result.app_context_path = app_context_path - ctx.summary = {"application_type": context.application_type} - ctx.outputs = {"app_context_path": app_context_path} - print(f" App type: {context.application_type}", file=sys.stderr) - except Exception as e: - print(f" WARNING: App context generation failed: {e}", file=sys.stderr) - print(" Continuing without app context.", file=sys.stderr) - ctx.summary = {"skipped": True, "reason": str(e)} - - collected_step_reports.append(_load_step_report(output_dir, "app-context")) - elif generate_context: - print(_step_label("Skipping application context (module not available)."), - file=sys.stderr) - result.skipped_steps.append("app-context") - else: - print(_step_label("Skipping application context (--no-context)."), - file=sys.stderr) - result.skipped_steps.append("app-context") - print(file=sys.stderr) - # --------------------------------------------------------------- # Step 3: Enhance (optional) # --------------------------------------------------------------- From d90aec9b8cde19f438bc5537093dadfa1162f6b8 Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Tue, 12 May 2026 08:30:46 +0300 Subject: [PATCH 5/8] fix: run LLM reachability on full codebase before structural filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stage was running after the structural filter had already discarded units that weren't reachable from heuristic entry points — meaning the LLM could never find the missed entry points that are the feature's main value prop. Fix: - When --llm-reachability is set, parse with processing_level="all" so every unit is visible to the LLM. - After apply_signals promotes LLM-identified entry points, re-run the structural reachability filter (apply_reachability_filter) with those unit IDs added as extra BFS seeds. - The final dataset is filtered to the user's requested processing_level but expanded by any entry points the LLM found. Also: - Expose apply_reachability_filter as a public function in parser_adapter.py with an extra_entry_points parameter; preserve any is_entry_point=True already set by the LLM stage when re-stamping. - Update help text in cli.py and scan.go to reflect that cost scales with total repo size, not the filtered unit count. - Update llm_reachability.py docstring to document the correct pipeline ordering. Addresses review comment on PR #50. Co-Authored-By: Claude Sonnet 4.6 --- apps/openant-cli/cmd/scan.go | 2 +- libs/openant-core/core/llm_reachability.py | 23 +++++--- libs/openant-core/core/parser_adapter.py | 23 ++++++-- libs/openant-core/core/scanner.py | 69 +++++++++++++++++----- libs/openant-core/openant/cli.py | 9 ++- 5 files changed, 97 insertions(+), 29 deletions(-) diff --git a/apps/openant-cli/cmd/scan.go b/apps/openant-cli/cmd/scan.go index deb2390..a747e8c 100644 --- a/apps/openant-cli/cmd/scan.go +++ b/apps/openant-cli/cmd/scan.go @@ -80,7 +80,7 @@ func registerScanFlags(cmd *cobra.Command) { cmd.Flags().StringVar(&scanDiffBase, "diff-base", "", "Incremental mode: filter pipeline to units overlapping diff vs this ref (e.g. origin/main, HEAD~5)") cmd.Flags().IntVar(&scanPR, "pr", 0, "Incremental mode against a GitHub PR number (requires gh; mutex with --diff-base)") cmd.Flags().StringVar(&scanDiffScope, "diff-scope", "changed_functions", "Diff scope: changed_files, changed_functions, callers") - cmd.Flags().BoolVar(&scanLLMReachability, "llm-reachability", false, "Enable the LLM reachability review stage (Opus). Surfaces additional entry points and external-input sites beyond the structural pass. Off by default — enabling this may incur additional LLM cost (one Opus call per ~25 units).") + cmd.Flags().BoolVar(&scanLLMReachability, "llm-reachability", false, "Enable the LLM reachability review stage (Opus). Surfaces entry points and external-input sites the structural pass would miss by reviewing the full codebase before the reachability filter is applied. Off by default — enabling this incurs cost proportional to total repo size, not the filtered unit count (~one Opus call per 25 units across the whole codebase).") } func runScan(cmd *cobra.Command, args []string) { diff --git a/libs/openant-core/core/llm_reachability.py b/libs/openant-core/core/llm_reachability.py index dccda34..e0b0b86 100644 --- a/libs/openant-core/core/llm_reachability.py +++ b/libs/openant-core/core/llm_reachability.py @@ -1,9 +1,9 @@ """ LLM-based reachability review stage. -A complementary, advisory pass over the parsed dataset that uses a strong -LLM (Opus by default) to surface additional reachability signals beyond -what the structural reachability analysis catches: +A complementary, advisory pass over the **full, unfiltered** codebase that +uses a strong LLM (Opus by default) to surface reachability signals beyond +what the structural analysis catches: - Likely entry points the structural pass missed (framework-specific handlers, plugin registrations, lambdas, message handlers, etc.). @@ -11,10 +11,19 @@ reads, env/argv, IPC channels). - Cross-process or async data flow indicators. -Signals are **advisory only** — they may PROMOTE a unit's reachability -(e.g. set ``is_entry_point = True`` for a unit the structural pass didn't -flag), but they never DEMOTE a unit that structural analysis already -kept. This matches the "complements, not replaces" intent in issue #17. +Pipeline ordering (managed by ``core/scanner.py``): + +1. Parse with ``processing_level="all"`` so every unit is available. +2. ``analyze_reachability`` reviews all units and returns signals. +3. ``apply_signals`` promotes high-confidence ``entry_point`` signals by + setting ``is_entry_point=True`` on the target unit. +4. The structural reachability filter re-runs with LLM-promoted entry + points added as extra BFS seeds, yielding a dataset filtered to the + user's requested ``processing_level`` but expanded by LLM findings. + +Signals are **promote-only** — they never DEMOTE a unit that structural +analysis already kept. This matches the "complements, not replaces" intent +in issue #17. Output: - ``analyze_reachability(...)`` returns a list of ``ReachabilitySignal`` diff --git a/libs/openant-core/core/parser_adapter.py b/libs/openant-core/core/parser_adapter.py index f2f8174..f149c81 100644 --- a/libs/openant-core/core/parser_adapter.py +++ b/libs/openant-core/core/parser_adapter.py @@ -194,10 +194,11 @@ def _maybe_apply_diff_filter( # Reachability filter (shared by Python path; JS/Go handle it internally) # --------------------------------------------------------------------------- -def _apply_reachability_filter( +def apply_reachability_filter( dataset: dict, output_dir: str, processing_level: str, + extra_entry_points: "set[str] | None" = None, ) -> dict: """Filter dataset units to only those reachable from entry points. @@ -205,6 +206,12 @@ def _apply_reachability_filter( detects entry points, computes reachability via BFS, and removes unreachable units from the dataset. + ``extra_entry_points`` supplements the structurally-detected seed set. + Pass LLM-promoted unit IDs here so the BFS propagates from them even if + the structural heuristics missed them. Any unit that already has + ``is_entry_point=True`` in the dataset (e.g. set by the LLM reachability + stage) keeps that flag — this function never demotes it. + For ``codeql`` and ``exploitable`` levels the reachability filter is still applied (it is a prerequisite), but the additional CodeQL / LLM-classification filters are not yet wired into the Python path @@ -214,6 +221,7 @@ def _apply_reachability_filter( dataset: The full, unfiltered dataset dict (mutated in place). output_dir: Directory containing call_graph.json from the parser. processing_level: One of "reachable", "codeql", "exploitable". + extra_entry_points: Additional unit IDs to seed the BFS (e.g. from LLM). Returns: The (possibly filtered) dataset dict. @@ -251,9 +259,11 @@ def _load_module(name, filename): call_graph = call_graph_data.get("call_graph", {}) reverse_call_graph = call_graph_data.get("reverse_call_graph", {}) - # Detect entry points + # Detect entry points structurally, then seed with any extras (e.g. LLM-promoted). detector = EntryPointDetector(functions, call_graph) entry_points = detector.detect_entry_points() + if extra_entry_points: + entry_points = entry_points | extra_entry_points # Compute reachable set (BFS forward from entry points) reachability = ReachabilityAnalyzer( @@ -271,8 +281,9 @@ def _load_module(name, filename): unit_id = u.get("id", "") if unit_id in reachable_ids: u["reachable"] = True - u["is_entry_point"] = unit_id in entry_points - if unit_id in entry_points: + # Preserve any is_entry_point=True already set (e.g. by LLM stage). + u["is_entry_point"] = (unit_id in entry_points) or u.get("is_entry_point", False) + if unit_id in entry_points and not u.get("entry_point_reason"): u["entry_point_reason"] = detector.get_entry_point_reason(unit_id) filtered_units.append(u) @@ -316,6 +327,10 @@ def _load_module(name, filename): return dataset +# Private alias kept for the Python parser path which calls it directly. +_apply_reachability_filter = apply_reachability_filter + + # --------------------------------------------------------------------------- # Python parser # --------------------------------------------------------------------------- diff --git a/libs/openant-core/core/scanner.py b/libs/openant-core/core/scanner.py index ef246f7..2da274c 100644 --- a/libs/openant-core/core/scanner.py +++ b/libs/openant-core/core/scanner.py @@ -126,19 +126,31 @@ def _step_label(name: str) -> str: # --------------------------------------------------------------- from core.parser_adapter import parse_repository + # When LLM reachability is enabled the stage must see ALL units so it can + # identify entry points the structural pass would miss. Parse with "all" + # here; the structural filter is re-applied after LLM signals are merged. + effective_parse_level = ( + "all" if (llm_reachability and processing_level != "all") else processing_level + ) + print(_step_label("Parsing repository..."), file=sys.stderr) + if effective_parse_level != processing_level: + print( + " [LLM reachability] parsing all units; structural filter runs after LLM signals", + file=sys.stderr, + ) with step_context("parse", output_dir, inputs={ "repo_path": repo_path, "language": language, - "processing_level": processing_level, + "processing_level": effective_parse_level, "skip_tests": skip_tests, }) as ctx: parse_result = parse_repository( repo_path=repo_path, output_dir=output_dir, language=language, - processing_level=processing_level, + processing_level=effective_parse_level, skip_tests=skip_tests, diff_manifest=diff_manifest, ) @@ -210,12 +222,15 @@ def _step_label(name: str) -> str: # --------------------------------------------------------------- # Step 2.5: LLM Reachability review (optional, opt-in) # --------------------------------------------------------------- - # Runs after parse + app-context and before enhance/analyze. Signals are - # advisory and PROMOTE-ONLY: they may flag additional entry points or - # external-input sites the structural pass missed, but never demote a - # unit that structural analysis already kept. Threading app_context into - # the LLM prompt helps the model reason about expected entry points - # (e.g. "this is a web_app, look for HTTP handlers"). + # Runs after parse + app-context and before enhance/analyze. Because parse + # was done with processing_level="all" (when filtering is requested), the + # LLM sees every unit in the codebase and can identify entry points the + # structural heuristics would miss. After signals are applied the + # structural reachability filter is re-run with LLM-promoted entry points + # added as extra BFS seeds, so the final dataset honours the user's + # requested processing_level. Threading app_context into the prompt helps + # the model reason about expected entry points (e.g. "this is a web_app, + # look for HTTP handlers"). if llm_reachability: from core.llm_reachability import ( analyze_reachability, @@ -253,11 +268,6 @@ def _step_label(name: str) -> str: ) summary = apply_signals(dataset, signals) - # Persist mutated dataset (so downstream stages see the - # promoted entry points and the per-unit signals). - with open(active_dataset_path, "w", encoding="utf-8") as f: - json.dump(dataset, f, indent=2) - signals_path = os.path.join(output_dir, "llm_reachability.json") with open(signals_path, "w", encoding="utf-8") as f: json.dump( @@ -266,11 +276,37 @@ def _step_label(name: str) -> str: indent=2, ) + pre_filter_count = len(dataset.get("units", [])) + + # Re-apply the structural reachability filter using + # LLM-promoted entry points as additional BFS seeds. + if processing_level != "all": + from core.parser_adapter import apply_reachability_filter + llm_promoted_ids = { + u["id"] for u in dataset.get("units", []) + if u.get("is_entry_point") and u.get("id") + } + dataset = apply_reachability_filter( + dataset, + output_dir, + processing_level, + extra_entry_points=llm_promoted_ids, + ) + result.units_count = len(dataset.get("units", [])) + + # Persist final dataset so downstream stages see promoted + # entry points, per-unit signals, and the applied filter. + with open(active_dataset_path, "w", encoding="utf-8") as f: + json.dump(dataset, f, indent=2) + + post_filter_count = len(dataset.get("units", [])) + ctx.summary = { - "units_reviewed": len(dataset.get("units", [])), + "units_reviewed": pre_filter_count, "signals_added": summary["signals_applied"], "entry_points_promoted": summary["entry_points_promoted"], "units_touched": summary["units_touched"], + "post_filter_units": post_filter_count, } ctx.outputs = {"signals_path": signals_path} @@ -279,6 +315,11 @@ def _step_label(name: str) -> str: f"{summary['entry_points_promoted']} new entry points", file=sys.stderr, ) + if processing_level != "all": + print( + f" After reachability filter: {post_filter_count} units", + file=sys.stderr, + ) collected_step_reports.append( _load_step_report(output_dir, "llm-reachability") diff --git a/libs/openant-core/openant/cli.py b/libs/openant-core/openant/cli.py index d60dd1a..510134c 100644 --- a/libs/openant-core/openant/cli.py +++ b/libs/openant-core/openant/cli.py @@ -994,9 +994,12 @@ def main(): action="store_true", dest="llm_reachability", help="Enable the LLM reachability review stage (Opus). " - "Surfaces additional entry points and external-input sites " - "beyond the structural pass. Off by default — enabling this " - "may incur additional LLM cost (one Opus call per ~25 units).", + "Surfaces entry points and external-input sites the structural " + "pass would miss by reviewing the full codebase before the " + "reachability filter is applied. Off by default — enabling " + "this incurs cost proportional to total repo size, not the " + "filtered unit count (~one Opus call per 25 units across the " + "whole codebase).", ) scan_p.set_defaults(func=cmd_scan) From 3df8f0b74c3f4612f7e87dbf53e39e6f5b817632 Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Tue, 12 May 2026 16:09:18 +0300 Subject: [PATCH 6/8] fix: address round-2 review findings (High + Mediums + Lows) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit High — JS/Go/C/Ruby/PHP don't persist call_graph.json so the post-LLM re-filter was silently falling back to returning the full unfiltered dataset. Guard the re-filter call with an os.path.exists check on call_graph.json; when absent, print a prominent WARNING with the unit count and cost implication so the user knows --level was not applied. Also add refilter_supported to the step report summary. Medium 1 — align MODEL_PRIMARY from "claude-opus-4-20250514" to "claude-opus-4-6" to match analyzer.py and the rest of the Opus-using stages. Fix the misleading comment. Medium 2 — remove max_units=limit from the analyze_reachability call. --limit governs the analyze stage; the LLM reachability pass must review the full codebase to find missed entry points. Coverage is controlled by batch_size. Low 1 — remove unused `field` import from llm_reachability.py. Low 2 — fix prompt projection: reachable_from_entry (never written by any path) → reachable (the actual field name written by the reachability filter). Low 5 — set entry_point_reason on LLM-promoted units in apply_signals so the promotion source is visible without cross-referencing llm_reachability_signals. Also surface the actual MODEL_PRIMARY constant in the step_context inputs instead of the hardcoded string "opus". Co-Authored-By: Claude Sonnet 4.6 --- libs/openant-core/core/llm_reachability.py | 9 ++-- libs/openant-core/core/scanner.py | 59 +++++++++++++++------- 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/libs/openant-core/core/llm_reachability.py b/libs/openant-core/core/llm_reachability.py index e0b0b86..79a130a 100644 --- a/libs/openant-core/core/llm_reachability.py +++ b/libs/openant-core/core/llm_reachability.py @@ -44,12 +44,12 @@ import json import re import sys -from dataclasses import dataclass, field, asdict +from dataclasses import dataclass, asdict from typing import Any, Callable, Dict, List, Optional -# Models — matches the convention in core/analyzer.py / utilities/llm_client.py. -MODEL_PRIMARY = "claude-opus-4-20250514" +# Models — aligns with core/analyzer.py which uses "claude-opus-4-6" for Opus. +MODEL_PRIMARY = "claude-opus-4-6" MODEL_SECONDARY = "claude-sonnet-4-20250514" @@ -171,7 +171,7 @@ def _unit_for_prompt(unit: Dict[str, Any]) -> Dict[str, Any]: "unit_id": unit.get("id", ""), "unit_type": unit.get("unit_type", "function"), "is_entry_point": bool(unit.get("is_entry_point", False)), - "reachable_from_entry": unit.get("reachable_from_entry"), + "reachable": unit.get("reachable"), "code": _trim_code(code_blob), } @@ -430,6 +430,7 @@ def apply_signals( and not unit.get("is_entry_point", False) ): unit["is_entry_point"] = True + unit["entry_point_reason"] = f"llm_reachability: {sig.reason}" promoted += 1 return { diff --git a/libs/openant-core/core/scanner.py b/libs/openant-core/core/scanner.py index 2da274c..5a558d6 100644 --- a/libs/openant-core/core/scanner.py +++ b/libs/openant-core/core/scanner.py @@ -233,6 +233,7 @@ def _step_label(name: str) -> str: # look for HTTP handlers"). if llm_reachability: from core.llm_reachability import ( + MODEL_PRIMARY as _LLM_REACH_MODEL, analyze_reachability, apply_signals, signals_to_json, @@ -242,7 +243,7 @@ def _step_label(name: str) -> str: with step_context("llm-reachability", output_dir, inputs={ "dataset_path": active_dataset_path, - "model": "opus", + "model": _LLM_REACH_MODEL, }) as ctx: try: with open(active_dataset_path, encoding="utf-8") as f: @@ -261,10 +262,12 @@ def _step_label(name: str) -> str: except (OSError, json.JSONDecodeError): app_ctx_payload = None + # --limit governs the analyze stage, not how many units the + # LLM reachability pass reviews — it must see the full + # codebase to find missed entry points. signals = analyze_reachability( dataset=dataset, app_context=app_ctx_payload, - max_units=limit, ) summary = apply_signals(dataset, signals) @@ -277,36 +280,58 @@ def _step_label(name: str) -> str: ) pre_filter_count = len(dataset.get("units", [])) + post_filter_count = pre_filter_count + refilter_supported = False # Re-apply the structural reachability filter using # LLM-promoted entry points as additional BFS seeds. + # Only possible when call_graph.json was written by the parser + # (Python and Zig paths do this; JS/Go/C/Ruby/PHP handle + # reachability filtering internally and don't persist it). if processing_level != "all": - from core.parser_adapter import apply_reachability_filter - llm_promoted_ids = { - u["id"] for u in dataset.get("units", []) - if u.get("is_entry_point") and u.get("id") - } - dataset = apply_reachability_filter( - dataset, - output_dir, - processing_level, - extra_entry_points=llm_promoted_ids, - ) - result.units_count = len(dataset.get("units", [])) + call_graph_path = os.path.join(output_dir, "call_graph.json") + if os.path.exists(call_graph_path): + from core.parser_adapter import apply_reachability_filter + llm_promoted_ids = { + u["id"] for u in dataset.get("units", []) + if u.get("is_entry_point") and u.get("id") + } + dataset = apply_reachability_filter( + dataset, + output_dir, + processing_level, + extra_entry_points=llm_promoted_ids, + ) + post_filter_count = len(dataset.get("units", [])) + result.units_count = post_filter_count + refilter_supported = True + else: + # Parser doesn't persist call_graph.json — the full + # unfiltered dataset will flow to downstream stages. + # Warn loudly so the cost impact is visible. + print( + f"\n WARNING: --llm-reachability with " + f"--level {processing_level}: " + f"{parse_result.language} does not yet support " + f"post-LLM re-filtering (call_graph.json not found). " + f"Downstream stages will process all " + f"{pre_filter_count} units instead of the filtered " + f"subset — this may significantly increase cost.", + file=sys.stderr, + ) # Persist final dataset so downstream stages see promoted # entry points, per-unit signals, and the applied filter. with open(active_dataset_path, "w", encoding="utf-8") as f: json.dump(dataset, f, indent=2) - post_filter_count = len(dataset.get("units", [])) - ctx.summary = { "units_reviewed": pre_filter_count, "signals_added": summary["signals_applied"], "entry_points_promoted": summary["entry_points_promoted"], "units_touched": summary["units_touched"], "post_filter_units": post_filter_count, + "refilter_supported": refilter_supported, } ctx.outputs = {"signals_path": signals_path} @@ -315,7 +340,7 @@ def _step_label(name: str) -> str: f"{summary['entry_points_promoted']} new entry points", file=sys.stderr, ) - if processing_level != "all": + if processing_level != "all" and refilter_supported: print( f" After reachability filter: {post_filter_count} units", file=sys.stderr, From 2b0dddf752dddc39364333535ec2f5167e0a29cd Mon Sep 17 00:00:00 2001 From: joshbouncesecurity Date: Tue, 12 May 2026 16:32:35 +0300 Subject: [PATCH 7/8] feat: write call_graph.json for all parser languages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enables the post-LLM reachability re-filter to work on JS, Go, C, Ruby, and PHP repositories, not just Python and Zig. C / Ruby / PHP (trivial — 3 lines each): call_graph.json is written immediately after CallGraphBuilder.export() in test_pipeline.py. graph_result already contains functions, call_graph, and reverse_call_graph in the format apply_reachability_filter expects. JavaScript (easy): call_graph.json is written at the end of run_typescript_analyzer(), right after analyzer_output.json is available. Keys are normalised from camelCase (callGraph, reverseCallGraph) to snake_case so the Python-side filter can read them without any extra handling. Go (moderate): call_graph.json is written at the end of run_go_parser_all(), after both analyzer_output.json and dataset.json are available. Functions are normalised to the camelCase shape EntryPointDetector expects (same conversion already done in apply_reachability_filter). Call graph edges are reconstructed from unit metadata.direct_calls / direct_callers (the same source apply_reachability_filter used). Tests (test_call_graph_output.py): - TestApplyReachabilityFilterPublicAPI (5 tests, always run): verifies the public apply_reachability_filter API — basic filtering, extra entry points expand the reachable set, is_entry_point stamping, LLM-promoted flag preservation, missing-file graceful return. - TestPythonCallGraphOutput (2 tests, always run): parse_repository with processing_level=all and reachable both produce call_graph.json. - TestJavaScript/Go/C/Ruby/PHP CallGraphOutput (2 tests each, skip-guarded): same assertions, skip when the parser's runtime dependency is absent. Co-Authored-By: Claude Sonnet 4.6 --- libs/openant-core/parsers/c/test_pipeline.py | 5 + libs/openant-core/parsers/go/test_pipeline.py | 55 +++ .../parsers/javascript/test_pipeline.py | 19 + .../openant-core/parsers/php/test_pipeline.py | 5 + .../parsers/ruby/test_pipeline.py | 5 + .../tests/test_call_graph_output.py | 419 ++++++++++++++++++ 6 files changed, 508 insertions(+) create mode 100644 libs/openant-core/tests/test_call_graph_output.py diff --git a/libs/openant-core/parsers/c/test_pipeline.py b/libs/openant-core/parsers/c/test_pipeline.py index 5072d68..1ca1e6e 100644 --- a/libs/openant-core/parsers/c/test_pipeline.py +++ b/libs/openant-core/parsers/c/test_pipeline.py @@ -184,6 +184,11 @@ def run_parser_pipeline(self) -> bool: analyzer_output = generator.generate_analyzer_output() write_json(self.analyzer_output_file, analyzer_output) + # Write call graph for post-LLM reachability re-filtering + call_graph_file = os.path.join(self.output_dir, 'call_graph.json') + with open(call_graph_file, 'w') as f: + json.dump(graph_result, f, indent=2) + elapsed = (datetime.now() - start_time).total_seconds() summary = { diff --git a/libs/openant-core/parsers/go/test_pipeline.py b/libs/openant-core/parsers/go/test_pipeline.py index 7e2aa11..2de25ac 100644 --- a/libs/openant-core/parsers/go/test_pipeline.py +++ b/libs/openant-core/parsers/go/test_pipeline.py @@ -283,6 +283,61 @@ def run_go_parser_all(self) -> bool: print(f"Warning: Could not apply dataset name: {e}") self.results['stages']['go_parser'] = result + + # Write call_graph.json immediately after parsing so the post-LLM + # reachability re-filter can use it regardless of processing_level. + # Go's analyzer_output.json has functions; the call graph edges live + # in each unit's metadata.direct_calls / direct_callers. + if ( + result.get('success', False) + and self.analyzer_output_file and os.path.exists(self.analyzer_output_file) + and self.dataset_file and os.path.exists(self.dataset_file) + ): + try: + with open(self.analyzer_output_file, 'r') as f: + analyzer = json.load(f) + with open(self.dataset_file, 'r') as f: + dataset_for_cg = json.load(f) + + raw_functions = analyzer.get("functions", {}) + # Normalise to the camelCase shape EntryPointDetector expects. + normalized_functions = { + func_id: { + 'name': fd.get('name', ''), + 'unitType': fd.get('unit_type', fd.get('unitType', 'function')), + 'code': fd.get('code', ''), + 'filePath': fd.get('file_path', fd.get('filePath', '')), + 'startLine': fd.get('start_line', fd.get('startLine', 0)), + 'endLine': fd.get('end_line', fd.get('endLine', 0)), + 'package': fd.get('package', ''), + 'receiver': fd.get('receiver', ''), + 'isExported': fd.get('is_exported', fd.get('isExported', False)), + } + for func_id, fd in raw_functions.items() + } + + call_graph: dict = {} + reverse_call_graph: dict = {} + for unit in dataset_for_cg.get('units', []): + unit_id = unit.get('id') + metadata = unit.get('metadata', {}) + direct_calls = metadata.get('direct_calls', metadata.get('directCalls', [])) + direct_callers = metadata.get('direct_callers', metadata.get('directCallers', [])) + if direct_calls: + call_graph[unit_id] = direct_calls + if direct_callers: + reverse_call_graph[unit_id] = direct_callers + + call_graph_file = os.path.join(self.output_dir, 'call_graph.json') + with open(call_graph_file, 'w') as f: + json.dump({ + "functions": normalized_functions, + "call_graph": call_graph, + "reverse_call_graph": reverse_call_graph, + }, f, indent=2) + except (OSError, json.JSONDecodeError, KeyError) as e: + print(f"Warning: could not write call_graph.json: {e}") + return result.get('success', False) def apply_reachability_filter(self) -> bool: diff --git a/libs/openant-core/parsers/javascript/test_pipeline.py b/libs/openant-core/parsers/javascript/test_pipeline.py index 667bf1f..16a53f5 100644 --- a/libs/openant-core/parsers/javascript/test_pipeline.py +++ b/libs/openant-core/parsers/javascript/test_pipeline.py @@ -307,6 +307,25 @@ def run_typescript_analyzer(self, files: list = None) -> bool: ) self.results['stages']['typescript_analyzer'] = result + + # Write call_graph.json immediately after the analyzer output is + # available so the post-LLM reachability re-filter can use it + # regardless of processing_level (which may be "all"). + if result.get('success', False) and os.path.exists(self.analyzer_output_file): + try: + with open(self.analyzer_output_file, 'r') as f: + analyzer = json.load(f) + call_graph_data = { + "functions": analyzer.get("functions", {}), + "call_graph": analyzer.get("call_graph", analyzer.get("callGraph", {})), + "reverse_call_graph": analyzer.get("reverse_call_graph", analyzer.get("reverseCallGraph", {})), + } + call_graph_file = os.path.join(self.output_dir, 'call_graph.json') + with open(call_graph_file, 'w') as f: + json.dump(call_graph_data, f, indent=2) + except (OSError, json.JSONDecodeError, KeyError) as e: + print(f"Warning: could not write call_graph.json: {e}") + return result.get('success', False) def run_stage_with_stdout_capture(self, name: str, command: list, output_file: str) -> dict: diff --git a/libs/openant-core/parsers/php/test_pipeline.py b/libs/openant-core/parsers/php/test_pipeline.py index 7529ea9..1ab62be 100644 --- a/libs/openant-core/parsers/php/test_pipeline.py +++ b/libs/openant-core/parsers/php/test_pipeline.py @@ -184,6 +184,11 @@ def run_parser_pipeline(self) -> bool: analyzer_output = generator.generate_analyzer_output() write_json(self.analyzer_output_file, analyzer_output) + # Write call graph for post-LLM reachability re-filtering + call_graph_file = os.path.join(self.output_dir, 'call_graph.json') + with open(call_graph_file, 'w') as f: + json.dump(graph_result, f, indent=2) + elapsed = (datetime.now() - start_time).total_seconds() summary = { diff --git a/libs/openant-core/parsers/ruby/test_pipeline.py b/libs/openant-core/parsers/ruby/test_pipeline.py index 947d495..1461156 100644 --- a/libs/openant-core/parsers/ruby/test_pipeline.py +++ b/libs/openant-core/parsers/ruby/test_pipeline.py @@ -184,6 +184,11 @@ def run_parser_pipeline(self) -> bool: analyzer_output = generator.generate_analyzer_output() write_json(self.analyzer_output_file, analyzer_output) + # Write call graph for post-LLM reachability re-filtering + call_graph_file = os.path.join(self.output_dir, 'call_graph.json') + with open(call_graph_file, 'w') as f: + json.dump(graph_result, f, indent=2) + elapsed = (datetime.now() - start_time).total_seconds() summary = { diff --git a/libs/openant-core/tests/test_call_graph_output.py b/libs/openant-core/tests/test_call_graph_output.py new file mode 100644 index 0000000..288180e --- /dev/null +++ b/libs/openant-core/tests/test_call_graph_output.py @@ -0,0 +1,419 @@ +"""Tests that each parser writes call_graph.json to the output directory. + +The call_graph.json file is required by apply_reachability_filter (and the +post-LLM re-filter path) so it must be present regardless of processing_level, +including when --llm-reachability causes a parse with processing_level="all". + +Structure expected by apply_reachability_filter: + { + "functions": {: {}, ...}, + "call_graph": {: [, ...], ...}, + "reverse_call_graph": {: [, ...], ...}, + } + +Parser availability gates (identical to patterns used in test_js_parser.py): +- Python: always available +- JavaScript: requires Node.js + parsers/javascript/node_modules +- Go: requires parsers/go/go_parser/go_parser binary +- C: requires tree_sitter_c Python package +- Ruby: requires tree_sitter_ruby Python package +- PHP: requires tree_sitter_php Python package +""" + +from __future__ import annotations + +import json +import shutil +import sys +from pathlib import Path + +import pytest + +from core.parser_adapter import apply_reachability_filter, parse_repository + +TESTS_DIR = Path(__file__).parent +FIXTURES_DIR = TESTS_DIR / "fixtures" +PARSERS_DIR = Path(__file__).parent.parent / "parsers" + +# --------------------------------------------------------------------------- +# Availability checks (used by skipif marks) +# --------------------------------------------------------------------------- + +def _node_available() -> bool: + return bool(shutil.which("node")) and (PARSERS_DIR / "javascript" / "node_modules").exists() + +def _go_parser_available() -> bool: + binary = PARSERS_DIR / "go" / "go_parser" / "go_parser" + if not binary.exists() or binary.stat().st_size == 0: + return False + import subprocess + try: + subprocess.run([str(binary), "--help"], capture_output=True, timeout=5) + return True + except (OSError, subprocess.TimeoutExpired): + return False + +def _ts_c_available() -> bool: + try: + import tree_sitter_c # noqa: F401 + return True + except ImportError: + return False + +def _ts_ruby_available() -> bool: + try: + import tree_sitter_ruby # noqa: F401 + return True + except ImportError: + return False + +def _ts_php_available() -> bool: + try: + import tree_sitter_php # noqa: F401 + return True + except ImportError: + return False + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_REQUIRED_KEYS = {"functions", "call_graph", "reverse_call_graph"} + + +def _assert_call_graph_valid(output_dir: str) -> dict: + """Load call_graph.json from output_dir and assert it has the right shape.""" + cg_path = Path(output_dir) / "call_graph.json" + assert cg_path.exists(), f"call_graph.json not found in {output_dir}" + with open(cg_path) as f: + data = json.load(f) + assert _REQUIRED_KEYS <= data.keys(), ( + f"call_graph.json missing keys: {_REQUIRED_KEYS - data.keys()}" + ) + assert isinstance(data["functions"], dict) + assert isinstance(data["call_graph"], dict) + assert isinstance(data["reverse_call_graph"], dict) + return data + + +# --------------------------------------------------------------------------- +# apply_reachability_filter unit tests (always run — no external deps) +# --------------------------------------------------------------------------- + + +class TestApplyReachabilityFilterPublicAPI: + """apply_reachability_filter is the consumer of call_graph.json. + These tests verify it works correctly with a synthetic fixture.""" + + def _make_call_graph_json(self, tmp_path: Path) -> None: + """Write a minimal call_graph.json that apply_reachability_filter can parse. + + route_handler uses the ``@app.route`` decorator pattern that + EntryPointDetector recognises, making it a structural entry point. + """ + cg = { + "functions": { + "app.py:route_handler": { + "name": "route_handler", + "filePath": "app.py", + "unitType": "function", + "isExported": False, + "decorators": ["@app.route('/foo')"], + }, + "app.py:helper": { + "name": "helper", + "filePath": "app.py", + "unitType": "function", + "isExported": False, + "decorators": [], + }, + "app.py:orphan": { + "name": "orphan", + "filePath": "app.py", + "unitType": "function", + "isExported": False, + "decorators": [], + }, + }, + "call_graph": { + "app.py:route_handler": ["app.py:helper"], + }, + "reverse_call_graph": { + "app.py:helper": ["app.py:route_handler"], + }, + } + (tmp_path / "call_graph.json").write_text(json.dumps(cg)) + + def _make_dataset(self, unit_ids: list[str]) -> dict: + return { + "units": [ + {"id": uid, "code": {"primary_code": "pass"}, "unit_type": "function"} + for uid in unit_ids + ] + } + + def test_filters_to_reachable_units(self, tmp_path): + self._make_call_graph_json(tmp_path) + dataset = self._make_dataset( + ["app.py:route_handler", "app.py:helper", "app.py:orphan"] + ) + result = apply_reachability_filter(dataset, str(tmp_path), "reachable") + unit_ids = {u["id"] for u in result["units"]} + assert "app.py:route_handler" in unit_ids + assert "app.py:helper" in unit_ids + assert "app.py:orphan" not in unit_ids + + def test_extra_entry_points_expand_reachable_set(self, tmp_path): + self._make_call_graph_json(tmp_path) + dataset = self._make_dataset( + ["app.py:route_handler", "app.py:helper", "app.py:orphan"] + ) + # Promote orphan as an extra entry point (simulating LLM signal). + result = apply_reachability_filter( + dataset, str(tmp_path), "reachable", + extra_entry_points={"app.py:orphan"}, + ) + unit_ids = {u["id"] for u in result["units"]} + assert "app.py:orphan" in unit_ids + + def test_is_entry_point_set_on_structural_entry_points(self, tmp_path): + self._make_call_graph_json(tmp_path) + dataset = self._make_dataset(["app.py:route_handler", "app.py:helper"]) + result = apply_reachability_filter(dataset, str(tmp_path), "reachable") + by_id = {u["id"]: u for u in result["units"]} + assert by_id["app.py:route_handler"]["is_entry_point"] is True + assert by_id["app.py:helper"]["is_entry_point"] is False + + def test_llm_promoted_is_entry_point_preserved(self, tmp_path): + self._make_call_graph_json(tmp_path) + dataset = self._make_dataset(["app.py:route_handler", "app.py:helper"]) + # Pre-set is_entry_point=True on helper (simulating LLM promotion). + dataset["units"][1]["is_entry_point"] = True + result = apply_reachability_filter( + dataset, str(tmp_path), "reachable", + extra_entry_points={"app.py:helper"}, + ) + by_id = {u["id"]: u for u in result["units"]} + assert by_id["app.py:helper"]["is_entry_point"] is True + + def test_missing_call_graph_returns_dataset_unchanged(self, tmp_path): + dataset = self._make_dataset(["app.py:route_handler"]) + result = apply_reachability_filter(dataset, str(tmp_path), "reachable") + assert len(result["units"]) == 1 + + +# --------------------------------------------------------------------------- +# Python parser — always runs +# --------------------------------------------------------------------------- + + +class TestPythonCallGraphOutput: + def test_call_graph_json_written(self, sample_python_repo, tmp_output_dir): + parse_repository( + repo_path=sample_python_repo, + output_dir=tmp_output_dir, + language="python", + processing_level="all", + ) + _assert_call_graph_valid(tmp_output_dir) + + def test_call_graph_json_written_with_reachable_level( + self, sample_python_repo, tmp_output_dir + ): + parse_repository( + repo_path=sample_python_repo, + output_dir=tmp_output_dir, + language="python", + processing_level="reachable", + ) + _assert_call_graph_valid(tmp_output_dir) + + +# --------------------------------------------------------------------------- +# JavaScript parser +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif(not _node_available(), reason="Node.js or JS parser npm deps not available") +class TestJavaScriptCallGraphOutput: + def test_call_graph_json_written(self, sample_js_repo, tmp_output_dir): + parse_repository( + repo_path=sample_js_repo, + output_dir=tmp_output_dir, + language="javascript", + processing_level="all", + ) + _assert_call_graph_valid(tmp_output_dir) + + def test_call_graph_json_written_with_reachable_level( + self, sample_js_repo, tmp_output_dir + ): + parse_repository( + repo_path=sample_js_repo, + output_dir=tmp_output_dir, + language="javascript", + processing_level="reachable", + ) + _assert_call_graph_valid(tmp_output_dir) + + +# --------------------------------------------------------------------------- +# Go parser +# --------------------------------------------------------------------------- + + +@pytest.fixture +def sample_go_repo(tmp_path): + """Minimal Go repository fixture.""" + repo = tmp_path / "go_repo" + repo.mkdir() + (repo / "go.mod").write_text("module example.com/myapp\n\ngo 1.21\n") + (repo / "main.go").write_text( + 'package main\n\nimport "fmt"\n\n' + "func main() {\n\tgreet()\n}\n\n" + 'func greet() {\n\tfmt.Println("hello")\n}\n' + ) + return str(repo) + + +@pytest.mark.skipif(not _go_parser_available(), reason="go_parser binary not available") +class TestGoCallGraphOutput: + def test_call_graph_json_written(self, sample_go_repo, tmp_output_dir): + parse_repository( + repo_path=sample_go_repo, + output_dir=tmp_output_dir, + language="go", + processing_level="all", + ) + _assert_call_graph_valid(tmp_output_dir) + + def test_call_graph_json_written_with_reachable_level( + self, sample_go_repo, tmp_output_dir + ): + parse_repository( + repo_path=sample_go_repo, + output_dir=tmp_output_dir, + language="go", + processing_level="reachable", + ) + _assert_call_graph_valid(tmp_output_dir) + + +# --------------------------------------------------------------------------- +# C parser +# --------------------------------------------------------------------------- + + +@pytest.fixture +def sample_c_repo(tmp_path): + """Minimal C repository fixture.""" + repo = tmp_path / "c_repo" + repo.mkdir() + (repo / "main.c").write_text( + "#include \n\nvoid greet() {\n printf(\"hello\\n\");\n}\n\n" + "int main() {\n greet();\n return 0;\n}\n" + ) + return str(repo) + + +@pytest.mark.skipif(not _ts_c_available(), reason="tree_sitter_c not installed") +class TestCCallGraphOutput: + def test_call_graph_json_written(self, sample_c_repo, tmp_output_dir): + parse_repository( + repo_path=sample_c_repo, + output_dir=tmp_output_dir, + language="c", + processing_level="all", + ) + _assert_call_graph_valid(tmp_output_dir) + + def test_call_graph_json_written_with_reachable_level( + self, sample_c_repo, tmp_output_dir + ): + parse_repository( + repo_path=sample_c_repo, + output_dir=tmp_output_dir, + language="c", + processing_level="reachable", + ) + _assert_call_graph_valid(tmp_output_dir) + + +# --------------------------------------------------------------------------- +# Ruby parser +# --------------------------------------------------------------------------- + + +@pytest.fixture +def sample_ruby_repo(tmp_path): + """Minimal Ruby repository fixture.""" + repo = tmp_path / "ruby_repo" + repo.mkdir() + (repo / "app.rb").write_text( + "def greet\n puts 'hello'\nend\n\ndef main\n greet\nend\n" + ) + return str(repo) + + +@pytest.mark.skipif(not _ts_ruby_available(), reason="tree_sitter_ruby not installed") +class TestRubyCallGraphOutput: + def test_call_graph_json_written(self, sample_ruby_repo, tmp_output_dir): + parse_repository( + repo_path=sample_ruby_repo, + output_dir=tmp_output_dir, + language="ruby", + processing_level="all", + ) + _assert_call_graph_valid(tmp_output_dir) + + def test_call_graph_json_written_with_reachable_level( + self, sample_ruby_repo, tmp_output_dir + ): + parse_repository( + repo_path=sample_ruby_repo, + output_dir=tmp_output_dir, + language="ruby", + processing_level="reachable", + ) + _assert_call_graph_valid(tmp_output_dir) + + +# --------------------------------------------------------------------------- +# PHP parser +# --------------------------------------------------------------------------- + + +@pytest.fixture +def sample_php_repo(tmp_path): + """Minimal PHP repository fixture.""" + repo = tmp_path / "php_repo" + repo.mkdir() + (repo / "index.php").write_text( + " Date: Tue, 12 May 2026 16:43:38 +0300 Subject: [PATCH 8/8] ci: ensure go_parser binary is built in python-tests job Add Go toolchain setup and per-platform go_parser build steps to the python-tests CI job so call_graph.json tests for Go and JS don't silently skip on all platforms. Also harden _go_parser_available() to try-execute the binary (catching WinError 193 for cross-platform Linux ELF) and check both go_parser and go_parser.exe. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/test.yaml | 16 ++++++++++++++++ libs/openant-core/parsers/c/test_pipeline.py | 3 +-- libs/openant-core/parsers/go/test_pipeline.py | 17 +++++++---------- .../parsers/javascript/test_pipeline.py | 6 ++---- libs/openant-core/parsers/php/test_pipeline.py | 3 +-- libs/openant-core/parsers/ruby/test_pipeline.py | 3 +-- .../tests/test_call_graph_output.py | 7 +++++-- 7 files changed, 33 insertions(+), 22 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 2bb6841..134016f 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -52,6 +52,12 @@ jobs: cache: "npm" cache-dependency-path: libs/openant-core/parsers/javascript/package-lock.json + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: libs/openant-core/parsers/go/go_parser/go.mod + cache-dependency-path: libs/openant-core/parsers/go/go_parser/go.mod + - name: Install Python dependencies working-directory: libs/openant-core run: pip install -r requirements.txt && pip install ".[dev]" @@ -68,6 +74,16 @@ jobs: working-directory: libs/openant-core/parsers/javascript run: npm ci + - name: Build go_parser binary (Linux/macOS) + if: runner.os != 'Windows' + working-directory: libs/openant-core/parsers/go/go_parser + run: go build -o go_parser . + + - name: Build go_parser binary (Windows) + if: runner.os == 'Windows' + working-directory: libs/openant-core/parsers/go/go_parser + run: go build -o go_parser.exe . + - name: Run Python and parser tests working-directory: libs/openant-core run: python -m pytest tests/ -v diff --git a/libs/openant-core/parsers/c/test_pipeline.py b/libs/openant-core/parsers/c/test_pipeline.py index 1ca1e6e..b8f94a2 100644 --- a/libs/openant-core/parsers/c/test_pipeline.py +++ b/libs/openant-core/parsers/c/test_pipeline.py @@ -186,8 +186,7 @@ def run_parser_pipeline(self) -> bool: # Write call graph for post-LLM reachability re-filtering call_graph_file = os.path.join(self.output_dir, 'call_graph.json') - with open(call_graph_file, 'w') as f: - json.dump(graph_result, f, indent=2) + write_json(call_graph_file, graph_result) elapsed = (datetime.now() - start_time).total_seconds() diff --git a/libs/openant-core/parsers/go/test_pipeline.py b/libs/openant-core/parsers/go/test_pipeline.py index 2de25ac..5abdf83 100644 --- a/libs/openant-core/parsers/go/test_pipeline.py +++ b/libs/openant-core/parsers/go/test_pipeline.py @@ -294,10 +294,8 @@ def run_go_parser_all(self) -> bool: and self.dataset_file and os.path.exists(self.dataset_file) ): try: - with open(self.analyzer_output_file, 'r') as f: - analyzer = json.load(f) - with open(self.dataset_file, 'r') as f: - dataset_for_cg = json.load(f) + analyzer = read_json(self.analyzer_output_file) + dataset_for_cg = read_json(self.dataset_file) raw_functions = analyzer.get("functions", {}) # Normalise to the camelCase shape EntryPointDetector expects. @@ -329,12 +327,11 @@ def run_go_parser_all(self) -> bool: reverse_call_graph[unit_id] = direct_callers call_graph_file = os.path.join(self.output_dir, 'call_graph.json') - with open(call_graph_file, 'w') as f: - json.dump({ - "functions": normalized_functions, - "call_graph": call_graph, - "reverse_call_graph": reverse_call_graph, - }, f, indent=2) + write_json(call_graph_file, { + "functions": normalized_functions, + "call_graph": call_graph, + "reverse_call_graph": reverse_call_graph, + }) except (OSError, json.JSONDecodeError, KeyError) as e: print(f"Warning: could not write call_graph.json: {e}") diff --git a/libs/openant-core/parsers/javascript/test_pipeline.py b/libs/openant-core/parsers/javascript/test_pipeline.py index 16a53f5..2eee6bd 100644 --- a/libs/openant-core/parsers/javascript/test_pipeline.py +++ b/libs/openant-core/parsers/javascript/test_pipeline.py @@ -313,16 +313,14 @@ def run_typescript_analyzer(self, files: list = None) -> bool: # regardless of processing_level (which may be "all"). if result.get('success', False) and os.path.exists(self.analyzer_output_file): try: - with open(self.analyzer_output_file, 'r') as f: - analyzer = json.load(f) + analyzer = read_json(self.analyzer_output_file) call_graph_data = { "functions": analyzer.get("functions", {}), "call_graph": analyzer.get("call_graph", analyzer.get("callGraph", {})), "reverse_call_graph": analyzer.get("reverse_call_graph", analyzer.get("reverseCallGraph", {})), } call_graph_file = os.path.join(self.output_dir, 'call_graph.json') - with open(call_graph_file, 'w') as f: - json.dump(call_graph_data, f, indent=2) + write_json(call_graph_file, call_graph_data) except (OSError, json.JSONDecodeError, KeyError) as e: print(f"Warning: could not write call_graph.json: {e}") diff --git a/libs/openant-core/parsers/php/test_pipeline.py b/libs/openant-core/parsers/php/test_pipeline.py index 1ab62be..ae34aa2 100644 --- a/libs/openant-core/parsers/php/test_pipeline.py +++ b/libs/openant-core/parsers/php/test_pipeline.py @@ -186,8 +186,7 @@ def run_parser_pipeline(self) -> bool: # Write call graph for post-LLM reachability re-filtering call_graph_file = os.path.join(self.output_dir, 'call_graph.json') - with open(call_graph_file, 'w') as f: - json.dump(graph_result, f, indent=2) + write_json(call_graph_file, graph_result) elapsed = (datetime.now() - start_time).total_seconds() diff --git a/libs/openant-core/parsers/ruby/test_pipeline.py b/libs/openant-core/parsers/ruby/test_pipeline.py index 1461156..c0b4d25 100644 --- a/libs/openant-core/parsers/ruby/test_pipeline.py +++ b/libs/openant-core/parsers/ruby/test_pipeline.py @@ -186,8 +186,7 @@ def run_parser_pipeline(self) -> bool: # Write call graph for post-LLM reachability re-filtering call_graph_file = os.path.join(self.output_dir, 'call_graph.json') - with open(call_graph_file, 'w') as f: - json.dump(graph_result, f, indent=2) + write_json(call_graph_file, graph_result) elapsed = (datetime.now() - start_time).total_seconds() diff --git a/libs/openant-core/tests/test_call_graph_output.py b/libs/openant-core/tests/test_call_graph_output.py index 288180e..8aac7a6 100644 --- a/libs/openant-core/tests/test_call_graph_output.py +++ b/libs/openant-core/tests/test_call_graph_output.py @@ -43,8 +43,11 @@ def _node_available() -> bool: return bool(shutil.which("node")) and (PARSERS_DIR / "javascript" / "node_modules").exists() def _go_parser_available() -> bool: - binary = PARSERS_DIR / "go" / "go_parser" / "go_parser" - if not binary.exists() or binary.stat().st_size == 0: + go_dir = PARSERS_DIR / "go" / "go_parser" + # Check both Unix and Windows binary names. + candidates = [go_dir / "go_parser", go_dir / "go_parser.exe"] + binary = next((p for p in candidates if p.exists() and p.stat().st_size > 0), None) + if binary is None: return False import subprocess try: