diff --git a/guides/projects/security-code-reviewer.mdx b/guides/projects/security-code-reviewer.mdx new file mode 100644 index 0000000..3dee34b --- /dev/null +++ b/guides/projects/security-code-reviewer.mdx @@ -0,0 +1,1285 @@ +--- +title: "Building a Two-Agent Security Code Reviewer" +slug: security-code-reviewer-venice +"og:title": "Building a Two-Agent Security Code Reviewer with Venice AI" +"og:description": "A practical guide to building a Python security agent that finds atomic vulnerabilities and chains them into exploit paths using Venice AI, an AST repo map, and Pydantic guardrails." + +--- +import { AuthorByline } from "/snippets/authorByline.jsx"; + + + +Most static security tools find bugs in isolation. They scan one file, list the issues, and move on. The problem is that the most damaging vulnerabilities in modern codebases are rarely a single bug. They're a chain: a hardcoded signing key plus a missing authorization check plus a SQL injection that, on their own, all look manageable. Together they're an account-takeover path. + +This is exactly the kind of cross-cutting reasoning LLMs are good at, if you give them the right structure. In this article, we'll build a two-agent security code reviewer using Python and the Venice AI API. By the end, you'll have a CLI you can point at any Python codebase to produce a Markdown report with atomic findings and exploit chains. + +Interested in the full code implementation? Check out [the GitHub repo.](https://github.com/joshua-mo-143/venice-security-agent-demo) + +Before we continue, you'll need a Venice API key. Export it as an environment variable: + +```bash +export VENICE_API_KEY= +``` + +## What We're Building + +The reviewer is a small Python project with a few clear parts: + +| Part | What it does | +| --- | --- | +| Pydantic models | Define `Evidence`, `Finding`, and `Chain`, and give us a hard validation boundary between the LLM and the rest of the program | +| Venice client | Wraps the OpenAI Python SDK pointed at Venice's OpenAI-compatible endpoint | +| AST repo map | Walks the target tree with Python's `ast` module and builds a deterministic map of every module's public symbols and import edges | +| Scanner agent | Reads one Python file at a time plus a per-file neighbourhood slice of the repo map, and emits atomic vulnerability findings with file:line evidence | +| Chainer agent | Reads the union of findings plus a condensed full repo map, and emits exploit chains that combine two or more findings | +| Reference validator | Drops any chain that references a finding ID the Scanner did not produce, or names a file none of its referenced findings actually came from | +| Markdown report | Renders findings and chains into a human-readable report | +| CLI | Wires everything together with Typer | + +The flow looks like this: + +1. Walk the target directory for `.py` files. +2. Build a deterministic repo map (imports, public symbols, signatures). +3. For each file, send the Scanner its source plus a per-file neighbourhood slice of the map and collect atomic findings. +4. Send the union of findings plus the condensed repo map to the Chainer and collect exploit chains. +5. Drop any chain that references a finding ID the Scanner did not produce, or that names a file none of its referenced findings actually came from. +6. Write a Markdown report. + +Two design decisions are worth flagging before we start writing code. + +The first is **why two agents instead of one**. A single-agent scanner that tries to do everything in one prompt has to balance being thorough about per-file bugs against being clever about combinatorial reasoning. Splitting the work means the Scanner can be relentless and noisy, and the Chainer can be selective and quiet. Adding one extra LLM call dedicated to combining findings unlocks an entire class of bug for very little extra code. + +The second is **why a repo map**. Real codebases live across many files. A bug that consists of "the validator runs but doesn't apply per-iteration in the fetcher, and the fetcher's response ends up in the renderer" is invisible to a per-file scanner. Before any LLM call, we walk the target tree with Python's `ast` and build a structural map. The Scanner sees a per-file *neighbourhood* (who imports from this file, what this file imports, signatures of those external symbols). The Chainer sees a *condensed* full map (every module, every public symbol, every import edge, no source). That's the smallest amount of context engineering we have found that lets the Chainer construct chains whose data flow crosses module boundaries, without paying the token cost of stuffing the whole codebase into every prompt. + +## Pre-requisites + +- Python 3.12+ +- A Venice API key from [venice.ai](https://venice.ai) +- Basic familiarity with Pydantic, Python's `ast` module, and the OpenAI Python SDK + +The reference repo uses [`uv`](https://docs.astral.sh/uv/) for dependency management, but a regular virtual environment works just as well. + +## Setting Up the Project + +Create a new project and install the dependencies: + +```bash +mkdir venice-security-reviewer +cd venice-security-reviewer +uv init +uv add "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0" +``` + +If you prefer `pip`, create a virtual environment instead: + +```bash +python -m venv .venv +source .venv/bin/activate +pip install "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0" +``` + +Create a `.env` file for local development: + +```bash +VENICE_API_KEY=your-venice-api-key-here +# Optional overrides +# VENICE_BASE_URL=https://api.venice.ai/api/v1 +# VENICE_MODEL=zai-org-glm-5 +``` + +We'll lay the source out under `src/venice_security_reviewer/` to keep it importable as a package, with prompts under `prompts/` at the repo root so they can be reviewed and diffed like any other source artefact: + +``` +src/venice_security_reviewer/ + __init__.py + models.py # Pydantic models + client.py # Venice client factory + repo_map.py # AST-built repo map + scanner.py # Scanner agent + chainer.py # Chainer agent + report.py # Jinja2 Markdown rendering + cli.py # Typer CLI + templates/ + report.md.j2 +prompts/ + scanner.md + chainer.md +``` + +## Setting Up the Venice Client + +Venice is OpenAI-compatible, so we can use the official OpenAI Python SDK and just point its `base_url` at Venice. Centralising the client construction in one file means the rest of the code never has to know which provider it's talking to: swapping backends would only touch this one module. + +Create `src/venice_security_reviewer/client.py`: + +```python +from __future__ import annotations + +import os +from dataclasses import dataclass + +from dotenv import load_dotenv +from openai import OpenAI + +DEFAULT_BASE_URL = "https://api.venice.ai/api/v1" +DEFAULT_MODEL = "zai-org-glm-5" + + +class VeniceConfigError(RuntimeError): + """Raised when Venice client config is missing or invalid.""" + + +@dataclass(frozen=True, slots=True) +class VeniceConfig: + api_key: str + base_url: str + model: str + + @classmethod + def from_env(cls) -> "VeniceConfig": + load_dotenv() + api_key = os.getenv("VENICE_API_KEY") + if not api_key: + raise VeniceConfigError( + "VENICE_API_KEY is not set. Copy .env.example to .env and add your key, " + "or export VENICE_API_KEY in your shell." + ) + return cls( + api_key=api_key, + base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL), + model=os.getenv("VENICE_MODEL", DEFAULT_MODEL), + ) + + +def build_client(config: VeniceConfig | None = None) -> tuple[OpenAI, str]: + cfg = config or VeniceConfig.from_env() + client = OpenAI(api_key=cfg.api_key, base_url=cfg.base_url) + return client, cfg.model +``` + +A few things worth noting: + +- We default to `zai-org-glm-5` because it's a strong general-purpose Venice model, but you can override it with the `VENICE_MODEL` environment variable. For larger or more nuanced codebases, swapping in a stronger model can make the Chainer notably better at narrative quality. +- `build_client` returns the client *and* the model id, so callers don't have to read environment variables themselves and tests can inject a fake config without monkeypatching. + +## Defining the Data Models + +The whole point of using Pydantic here, rather than passing raw dicts around, is that we get a hard validation boundary between the LLM and the rest of the program. If the model returns malformed JSON or invents a finding ID that doesn't exist, parsing fails loudly and we never propagate the hallucination into the report. + +Create `src/venice_security_reviewer/models.py`: + +```python +from __future__ import annotations + +from pathlib import Path +from typing import Literal, Self + +from pydantic import BaseModel, ConfigDict, Field, model_validator + +Severity = Literal["low", "medium", "high", "critical"] +ChainSeverity = Literal["high", "critical"] + + +class Evidence(BaseModel): + """A concrete code span that justifies a finding.""" + + model_config = ConfigDict(frozen=True) + + file: Path + start_line: int = Field(ge=1) + end_line: int = Field(ge=1) + snippet: str + + @model_validator(mode="after") + def _check_line_range(self) -> Self: + if self.end_line < self.start_line: + raise ValueError( + f"end_line ({self.end_line}) must be >= start_line ({self.start_line})" + ) + return self + + +class Finding(BaseModel): + """An atomic vulnerability surfaced by the Scanner agent.""" + + model_config = ConfigDict(frozen=True) + + id: str = Field(pattern=r"^F-\d{3,}$") + title: str = Field(min_length=1) + severity: Severity + description: str = Field(min_length=1) + cwe: str | None = None + evidence: Evidence + + +class Chain(BaseModel): + """An exploit chain combining two or more atomic findings.""" + + model_config = ConfigDict(frozen=True) + + id: str = Field(pattern=r"^C-\d{3,}$") + findings: list[str] = Field(min_length=2) + narrative: str = Field(min_length=1) + severity: ChainSeverity + files_involved: list[Path] = Field(min_length=1) +``` + +The constraints are doing real work here: + +- `Finding.id` and `Chain.id` are constrained to a regex like `F-001`, `C-001`. If the model gets creative with the format, validation fails. +- `Chain.findings` requires at least two entries: a "chain" of one finding is just a finding. +- `Chain.severity` is restricted to `high` or `critical`. A combination of findings that doesn't raise the impact above the highest individual severity isn't a chain worth reporting. +- `Evidence` enforces that `end_line >= start_line` so the model can't return nonsensical line ranges. + +That's the *shape* validation. We also need *cross-reference* validation: a chain that references a finding ID the Scanner never produced is meaningless. Add this function to `models.py`: + +```python +def validate_chain_references( + chains: list[Chain], findings: list[Finding] +) -> tuple[list[Chain], list[Chain]]: + findings_by_id = {f.id: f for f in findings} + valid: list[Chain] = [] + dropped: list[Chain] = [] + for chain in chains: + if not all(ref in findings_by_id for ref in chain.findings): + dropped.append(chain) + continue + chain_evidence_files = { + findings_by_id[ref].evidence.file.as_posix() for ref in chain.findings + } + if not all(p.as_posix() in chain_evidence_files for p in chain.files_involved): + dropped.append(chain) + continue + valid.append(chain) + return valid, dropped +``` + +This is the deterministic guardrail that keeps the Chainer honest. It can only reference findings the Scanner actually produced, and it can only claim files involved in the chain that one of those findings actually came from. Returning the dropped chains rather than silently filtering them lets the CLI surface a warning when the model tries to invent something. + +## Building the AST Repo Map + +The repo map is the structural skeleton of a Python codebase: every module's public surface, every import edge, and a reverse index from "module M" to "modules that import from M". It's built once per scan run with Python's `ast`, never via execution, so it's safe to run on adversarial code: the parser doesn't import or invoke anything from the scanned tree. + +We'll consume the map in two shapes. The Scanner gets a per-file *neighbourhood* slice so its prompts stay bounded in size. The Chainer gets a *condensed* full map so it can construct chains across files. + +Create `src/venice_security_reviewer/repo_map.py` and start with the Pydantic models that describe the map: + +```python +from __future__ import annotations + +import ast +import logging +from collections.abc import Iterable +from pathlib import Path +from typing import Literal + +from pydantic import BaseModel, ConfigDict, Field + +logger = logging.getLogger(__name__) + +SymbolKind = Literal["function", "class", "constant"] +_SIGNATURE_CHAR_CAP = 200 + +SKIP_DIR_NAMES: frozenset[str] = frozenset({ + ".git", ".venv", "venv", "env", "__pycache__", "node_modules", + "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache", + "site-packages", +}) + + +class SymbolDef(BaseModel): + model_config = ConfigDict(frozen=True) + name: str + kind: SymbolKind + line: int = Field(ge=1) + signature: str | None = None + + +class ImportEdge(BaseModel): + model_config = ConfigDict(frozen=True) + from_module: str + imported_names: list[str] + line: int = Field(ge=1) + + +class ModuleEntry(BaseModel): + model_config = ConfigDict(frozen=True) + path: Path + module_name: str + defines: list[SymbolDef] + imports: list[ImportEdge] + exports: list[str] +``` + +Now the helper that walks the tree and skips directories we shouldn't index: + +```python +def _iter_python_files(root: Path) -> Iterable[Path]: + for path in sorted(root.rglob("*.py")): + if any(part in SKIP_DIR_NAMES for part in path.parts): + continue + if path.is_file(): + yield path + + +def _path_to_module_name(path: Path, root: Path) -> str: + rel = path.relative_to(root) + parts = list(rel.with_suffix("").parts) + if parts and parts[-1] == "__init__": + parts = parts[:-1] + return ".".join(parts) +``` + +For each file we want three things out of the AST: the top-level symbols it defines, the import edges, and an explicit `__all__` list if one is present. Function signatures and class headers get rendered as compact strings the LLM can read directly: + +```python +def _render_signature(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str: + try: + prefix = "async def " if isinstance(node, ast.AsyncFunctionDef) else "def " + args = ast.unparse(node.args) + returns = f" -> {ast.unparse(node.returns)}" if node.returns is not None else "" + sig = f"{prefix}{node.name}({args}){returns}" + if len(sig) > _SIGNATURE_CHAR_CAP: + return f"{prefix}{node.name}(...)" + return sig + except Exception: + return f"def {node.name}(...)" + + +def _render_class_header(node: ast.ClassDef) -> str: + try: + bases = [ast.unparse(b) for b in node.bases] + sig = f"class {node.name}({', '.join(bases)})" if bases else f"class {node.name}" + if len(sig) > _SIGNATURE_CHAR_CAP: + return f"class {node.name}(...)" + return sig + except Exception: + return f"class {node.name}" +``` + +The `_SIGNATURE_CHAR_CAP` of 200 preserves typical real signatures (including type hints) while preventing pathological cases like a 200-line typed union from blowing up the prompt. + +Next, the extractor that pulls the structural data out of a parsed module. We handle `ast.FunctionDef`, `ast.ClassDef`, top-level `ast.Assign` and `ast.AnnAssign` for constants, and both `ast.Import` and `ast.ImportFrom` for the import edges. Relative imports get resolved into their absolute dotted form so the Chainer can match them against module names later: + +```python +def _resolve_relative_package( + *, importer_module: str, importer_is_init: bool, level: int +) -> str | None: + if level <= 0: + return None + importer_parts = importer_module.split(".") if importer_module else [] + base_parts = list(importer_parts) if importer_is_init else importer_parts[:-1] + steps_up = level - 1 + if steps_up > len(base_parts): + return None + package_parts = ( + base_parts[: len(base_parts) - steps_up] if steps_up else list(base_parts) + ) + return ".".join(package_parts) +``` + +The full extraction logic walks `tree.body` and emits `SymbolDef` and `ImportEdge` entries for each top-level node. The reference repo's `_extract` function in [`repo_map.py`](https://github.com/joshua-mo-143/venice-security-agent-demo/blob/main/src/venice_security_reviewer/repo_map.py) covers the full implementation. The shape that comes out is a list of `ModuleEntry` objects, one per file. + +The interesting part is what we do with those entries. Wrap them in a `RepoMap` with two consumer-facing methods: + +```python +class RepoMap(BaseModel): + model_config = ConfigDict(frozen=True) + root: Path + modules: list[ModuleEntry] + + def by_module_name(self, module_name: str) -> ModuleEntry | None: + for m in self.modules: + if m.module_name == module_name: + return m + return None + + def importers_of(self, module_name: str) -> list["ImportingRef"]: + refs: list["ImportingRef"] = [] + for m in self.modules: + for edge in m.imports: + if edge.from_module == module_name: + refs.append( + ImportingRef( + importer_path=m.path, + importer_module=m.module_name, + imported_names=list(edge.imported_names), + line=edge.line, + ) + ) + return refs + + def neighborhood(self, path: Path) -> "ModuleNeighborhood | None": + m = next((mod for mod in self.modules if mod.path == path), None) + if m is None: + return None + return ModuleNeighborhood( + this_module=m, + imported_by=self.importers_of(m.module_name), + imports_from_repo=self.resolve_imports_in_repo(m.module_name), + ) + + def condensed_dict(self) -> dict[str, object]: + return { + "modules": [ + { + "path": str(m.path), + "module": m.module_name, + "exports": list(m.exports), + "imports": [ + {"from": e.from_module, "names": list(e.imported_names)} + for e in m.imports + ], + } + for m in self.modules + ] + } +``` + +`neighborhood(path)` is what the Scanner calls for each file. It returns a `ModuleNeighborhood` object containing the module itself, every other module that imports from it, and every in-repo symbol it imports from elsewhere (with their resolved signatures). That gives the Scanner enough context to flag findings that are only obvious in cross-file context, without dragging the whole codebase into the prompt. + +`condensed_dict()` is what the Chainer gets. Snippets and signatures are dropped; only paths, module names, public exports, and import edges remain. That's the smallest representation that still lets the Chainer reason about cross-module data flow. + +Finally, the entry point that builds the whole thing: + +```python +def build_repo_map(root: Path) -> RepoMap: + root = root.resolve() + modules: list[ModuleEntry] = [] + for path in _iter_python_files(root): + rel = path.relative_to(root) + module_name = _path_to_module_name(path, root) + is_init = path.stem == "__init__" + try: + source = path.read_text(encoding="utf-8") + tree = ast.parse(source) + except (OSError, SyntaxError, UnicodeDecodeError) as exc: + logger.warning("repo_map: skipping %s: %s", rel, exc) + continue + defines, imports, explicit_all = _extract( + tree, importer_module=module_name, importer_is_init=is_init + ) + exports = explicit_all or [s.name for s in defines if not s.name.startswith("_")] + modules.append( + ModuleEntry( + path=rel, + module_name=module_name, + defines=defines, + imports=imports, + exports=exports, + ) + ) + return RepoMap(root=root, modules=modules) +``` + +Files we can't read or that fail to parse get logged and skipped. We return a partial map rather than failing the whole run; the worst case is that a Scanner call sees no neighbourhood for one file, which is still a working scan. + +## Writing the Scanner Agent + +The Scanner walks a target path, picks up Python source files, and asks Venice to identify atomic vulnerabilities one file at a time. Per-file scanning keeps the prompt small and makes failures isolated: one bad file doesn't kill the whole run. + +We'll keep the prompt itself in a separate file so it can be reviewed and diffed like any other source artefact. Create `prompts/scanner.md`: + +````markdown +You are a static security analyst reviewing a single Python source file for +vulnerabilities. You will be given the file path, its full contents, and a +*neighborhood* slice of the surrounding repo: which other modules import +from this file (and what symbols they pull), and which in-repo symbols this +file imports from elsewhere. You must respond with a JSON object that lists +every distinct vulnerability you can identify, with concrete file:line +evidence for each. + +# Rules + +1. Output a single JSON object. No prose before or after. No markdown fences. +2. The object must match this schema exactly: + +```json +{ + "findings": [ + { + "id": "F-001", + "title": "Short imperative title, e.g. 'Hardcoded session signing key'", + "severity": "low | medium | high | critical", + "description": "One to three sentences explaining the vulnerability and why it matters.", + "cwe": "CWE-798 or null if not applicable", + "evidence": { + "file": "{filename}", + "start_line": 12, + "end_line": 14, + "snippet": "the exact lines from the source, copied verbatim including whitespace" + } + } + ] +} +``` + +3. Finding IDs must be sequential within this file: F-001, F-002, F-003, etc. +4. The `file` field in evidence must equal the filename you were given, exactly. +5. `start_line` and `end_line` must be 1-indexed line numbers from the source you were given. +6. The `snippet` must be the exact text of those lines, copied verbatim. Do not paraphrase. Do not truncate. +7. Do not invent vulnerabilities. If you are unsure, omit it. False positives waste the operator's time and erode trust in the tool. +8. Every finding's evidence must point at lines in THIS file. Do not produce findings whose evidence lives in a different file. The Chainer is the agent that reasons across files. +9. If the file contains no vulnerabilities, return `{"findings": []}`. +```` + +The full prompt in the [reference repo](https://github.com/joshua-mo-143/venice-security-agent-demo/blob/main/prompts/scanner.md) also contains a "What to look for" section listing common vulnerability classes (hardcoded secrets, SQL injection, command injection, SSRF, insecure deserialization, etc.) and a "How to use the neighborhood" section explaining how the model should consume the cross-file context. + +A few prompt design notes: + +- We tell the model to emit JSON only, with no prose or fences. The OpenAI SDK supports a `response_format={"type": "json_object"}` parameter that enforces this on the API side, but reinforcing it in the prompt cuts down on edge cases. +- We explicitly forbid the Scanner from producing cross-file chains. Chains are the Chainer's job, and asking the Scanner to do both blurs the responsibility. +- We require the snippet to be copied verbatim. This means the report can quote the exact bytes the model claims to have seen, and a reviewer can spot-check a finding by comparing the snippet to the source. + +Now the agent code. Create `src/venice_security_reviewer/scanner.py` and start with the file walker and prompt loader: + +```python +from __future__ import annotations + +import json +import logging +from collections.abc import Iterable, Iterator +from pathlib import Path + +from openai import OpenAI +from pydantic import ValidationError + +from .models import Finding +from .repo_map import ModuleNeighborhood, RepoMap + +logger = logging.getLogger(__name__) + +DEFAULT_SOURCE_EXTENSIONS: frozenset[str] = frozenset({".py"}) + +SKIP_DIR_NAMES: frozenset[str] = frozenset({ + ".git", ".venv", "venv", "env", "__pycache__", "node_modules", + "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache", + "site-packages", +}) + +MAX_FILE_BYTES = 200_000 + + +def _load_prompt_template(name: str) -> str: + here = Path(__file__).resolve() + return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8") + + +def iter_source_files( + root: Path, extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS +) -> Iterator[Path]: + exts = {e.lower() for e in extensions} + for path in sorted(root.rglob("*")): + if not path.is_file(): + continue + if path.suffix.lower() not in exts: + continue + if any(part in SKIP_DIR_NAMES for part in path.parts): + continue + try: + if path.stat().st_size > MAX_FILE_BYTES: + logger.warning("skipping %s: exceeds %d bytes", path, MAX_FILE_BYTES) + continue + except OSError: + continue + yield path +``` + +`MAX_FILE_BYTES` is a safety cap. Beyond ~200 KB we skip rather than send a huge prompt that's likely to be both expensive and low quality. + +The next piece is the prompt builder. The template uses `{filename}`, `{source}`, and `{neighborhood}` as placeholders; we use `str.replace` rather than `.format()` because the template contains JSON examples with literal braces: + +```python +def _render_neighborhood(neighborhood: ModuleNeighborhood | None) -> str: + if neighborhood is None: + return "null" + return neighborhood.model_dump_json(indent=2) + + +def _build_prompt( + template: str, *, filename: str, source: str, neighborhood: ModuleNeighborhood | None +) -> str: + return ( + template.replace("{filename}", filename) + .replace("{source}", source) + .replace("{neighborhood}", _render_neighborhood(neighborhood)) + ) +``` + +Now the parser. We deserialise the JSON, validate each finding through Pydantic, and drop individual malformed findings rather than failing the whole file. One bad finding shouldn't lose us the good ones: + +```python +def _parse_findings(raw: str, *, source_file: Path) -> list[Finding]: + try: + data = json.loads(raw) + except json.JSONDecodeError as exc: + raise ValueError(f"model did not return valid JSON: {exc}") from exc + + if not isinstance(data, dict) or "findings" not in data: + raise ValueError("model JSON missing 'findings' key") + + findings: list[Finding] = [] + for entry in data["findings"]: + try: + findings.append(Finding.model_validate(entry)) + except ValidationError as exc: + logger.warning("dropping malformed finding from %s: %s", source_file, exc) + return findings +``` + +The Scanner emits IDs like `F-001` per file, but the Chainer needs to reference findings across the whole repo. We re-issue the IDs against a monotonic counter so they're globally unique: + +```python +def _renumber_findings(findings: list[Finding], offset: int) -> tuple[list[Finding], int]: + renumbered: list[Finding] = [] + for i, f in enumerate(findings): + new_id = f"F-{offset + i + 1:03d}" + renumbered.append(f.model_copy(update={"id": new_id})) + return renumbered, offset + len(findings) +``` + +The single-file scan call combines all of this. We read the file, build the prompt, send it to Venice with `response_format={"type": "json_object"}` and a low temperature, and parse the result: + +```python +def scan_file( + client: OpenAI, + model: str, + path: Path, + *, + prompt_template: str, + repo_root: Path, + repo_map: RepoMap, + max_retries: int = 1, +) -> list[Finding]: + try: + source = path.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError) as exc: + logger.warning("could not read %s: %s", path, exc) + return [] + + rel = path.relative_to(repo_root) + neighborhood = repo_map.neighborhood(rel) + prompt = _build_prompt( + prompt_template, filename=str(rel), source=source, neighborhood=neighborhood + ) + + last_error: Exception | None = None + for attempt in range(max_retries + 1): + try: + response = client.chat.completions.create( + model=model, + messages=[ + { + "role": "system", + "content": ( + "You are a precise static security analyst. You respond " + "only with valid JSON matching the schema in the user prompt." + ), + }, + {"role": "user", "content": prompt}, + ], + response_format={"type": "json_object"}, + temperature=0.1, + ) + except Exception as exc: + logger.warning("Venice call failed for %s on attempt %d: %s", rel, attempt, exc) + last_error = exc + continue + + content = response.choices[0].message.content or "" + try: + findings = _parse_findings(content, source_file=path) + except ValueError as exc: + logger.warning("parse failure for %s on attempt %d: %s", rel, attempt, exc) + last_error = exc + continue + + return [ + f.model_copy(update={"evidence": f.evidence.model_copy(update={"file": rel})}) + for f in findings + ] + + logger.error("giving up on %s after %d attempts: %s", rel, max_retries + 1, last_error) + return [] +``` + +Two details worth highlighting: + +- We patch the evidence file path to be relative to `repo_root` *after* parsing, since the model echoes back whatever filename we gave it but we want a single canonical form throughout the report. +- `temperature=0.1` is intentionally low. We want the Scanner to be conservative and consistent across runs; creativity is the Chainer's job. + +Finally, the orchestrator that scans every eligible file under the root: + +```python +def scan_path( + client: OpenAI, + model: str, + root: Path, + repo_map: RepoMap, + *, + extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS, +) -> list[Finding]: + template = _load_prompt_template("scanner.md") + all_findings: list[Finding] = [] + offset = 0 + for path in iter_source_files(root, extensions=extensions): + logger.info("scanning %s", path.relative_to(root)) + findings = scan_file( + client, model, path, + prompt_template=template, + repo_root=root, + repo_map=repo_map, + ) + renumbered, offset = _renumber_findings(findings, offset) + all_findings.extend(renumbered) + return all_findings +``` + +The repo map gets built once by the caller and reused for every file, so the Scanner sees a consistent global structure even when individual files fail to parse or get skipped. + +## Writing the Chainer Agent + +The Chainer takes the union of Scanner findings plus the condensed repo map and asks Venice whether any of the findings combine into a real exploit chain. Two deterministic guardrails sit between the LLM and the report: + +1. Every chain must reference only finding IDs the Scanner produced. +2. Every chain must claim only files that at least one referenced finding's evidence touches. + +Chains that violate either rule get dropped at parse time. This stops the model from hallucinating chains "just in case" and from claiming a chain spans files it has no evidence for. + +The Chainer prompt lives at `prompts/chainer.md`. The core of it looks like this: + +````markdown +You are a senior offensive security engineer. You are given a list of atomic +vulnerability findings discovered in a single codebase, plus a structural map +of that codebase showing every module's public symbols and import edges. Your +job is to identify whether any subset of the findings can be combined into a +real, end-to-end exploit chain. + +# Rules + +1. Output a single JSON object. No prose before or after. No markdown fences. +2. The object must match this schema exactly: + +```json +{ + "chains": [ + { + "id": "C-001", + "findings": ["F-001", "F-003"], + "narrative": "Step-by-step explanation of how an attacker combines these specific findings into a single exploit. Reference each finding by ID where it is used.", + "severity": "high | critical", + "files_involved": ["pkg/validators.py", "pkg/fetcher.py"] + } + ] +} +``` + +3. Chain IDs must be sequential: C-001, C-002, C-003, etc. +4. Every entry in `findings` MUST be the ID of a finding from the input list. You may NOT invent new finding IDs. +5. Every entry in `files_involved` MUST be the `evidence.file` of at least one of the findings you reference in this chain. +6. A chain must reference at least two distinct findings. +7. Chains are by definition severity high or critical. If a combination doesn't raise the impact above the highest individual severity, it is not a chain worth reporting. +8. If no real chain exists, return `{"chains": []}`. It is correct and expected for many codebases to have findings that do not chain. +```` + +The full prompt in the [reference repo](https://github.com/joshua-mo-143/venice-security-agent-demo/blob/main/prompts/chainer.md) also explains how to read the repo map, how to decide what goes in `files_involved`, and crucially, when *not* to chain. Telling the model "it is correct and expected for many codebases to have findings that do not chain" is what stops it from inventing chains to look productive. + +Now the agent code. Create `src/venice_security_reviewer/chainer.py`: + +```python +from __future__ import annotations + +import json +import logging +from pathlib import Path + +from openai import OpenAI +from pydantic import ValidationError + +from .models import Chain, Finding, validate_chain_references +from .repo_map import RepoMap + +logger = logging.getLogger(__name__) + +MAX_REPO_MAP_CHARS = 8000 + + +def _load_prompt_template(name: str) -> str: + here = Path(__file__).resolve() + return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8") +``` + +`MAX_REPO_MAP_CHARS = 8000` is a soft ceiling for the JSON-rendered repo map block in the Chainer prompt. At roughly 4 chars per token, that's ~2000 tokens, which sits comfortably inside any Venice model's context window even with findings and the narrative budget on top. + +We serialise findings into a compact JSON block. Note we strip the `snippet` from evidence here on purpose: the Chainer doesn't need raw bytes to decide whether two findings combine, and including them roughly doubles the token cost on real codebases: + +```python +def _findings_to_input_json(findings: list[Finding]) -> str: + payload = [ + { + "id": f.id, + "title": f.title, + "severity": f.severity, + "description": f.description, + "cwe": f.cwe, + "evidence": { + "file": str(f.evidence.file), + "start_line": f.evidence.start_line, + "end_line": f.evidence.end_line, + }, + } + for f in findings + ] + return json.dumps(payload, indent=2) +``` + +For larger codebases the full condensed repo map can blow past our character budget. When that happens, we prune to finding-bearing modules plus their direct neighbours. That preserves enough structure for the Chainer to reason about chains we have evidence for, and discards the rest: + +```python +def _prune_for_budget( + repo_map: RepoMap, findings: list[Finding], *, char_budget: int +) -> dict[str, object]: + full = repo_map.condensed_dict() + if len(json.dumps(full)) <= char_budget: + return full + + finding_files = {f.evidence.file for f in findings} + keep_modules = { + m.module_name for m in repo_map.modules if m.path in finding_files + } + if not keep_modules: + return full + + neighbours: set[str] = set() + for m in repo_map.modules: + if m.module_name in keep_modules: + for edge in m.imports: + neighbours.add(edge.from_module) + for edge in m.imports: + if edge.from_module in keep_modules: + neighbours.add(m.module_name) + keep_modules.update(neighbours) + + pruned_modules = [ + { + "path": str(m.path), + "module": m.module_name, + "exports": list(m.exports), + "imports": [ + {"from": e.from_module, "names": list(e.imported_names)} + for e in m.imports + ], + } + for m in repo_map.modules + if m.module_name in keep_modules + ] + return { + "modules": pruned_modules, + "_pruned": True, + "_kept": len(pruned_modules), + "_total": len(repo_map.modules), + } + + +def _render_repo_map( + repo_map: RepoMap, findings: list[Finding], *, char_budget: int = MAX_REPO_MAP_CHARS +) -> str: + payload = _prune_for_budget(repo_map, findings, char_budget=char_budget) + if payload.get("_pruned"): + logger.info( + "chainer: repo map pruned for token budget (kept %s of %s modules)", + payload.get("_kept"), + payload.get("_total"), + ) + return json.dumps(payload, indent=2) +``` + +The pruning strategy is intentionally simple: keep the modules our findings live in, and keep their direct import-graph neighbours. Anything further out has no plausible role in a chain we currently have evidence for, so it can be dropped without losing reasoning power. We also annotate the payload with `_pruned`, `_kept`, and `_total` markers, so the Chainer prompt can warn the model when the map has been trimmed. + +Parsing the response is the same shape as the Scanner: deserialise, validate each chain through Pydantic, drop malformed entries: + +```python +def _parse_chains(raw: str) -> list[Chain]: + try: + data = json.loads(raw) + except json.JSONDecodeError as exc: + raise ValueError(f"chainer did not return valid JSON: {exc}") from exc + + if not isinstance(data, dict) or "chains" not in data: + raise ValueError("chainer JSON missing 'chains' key") + + chains: list[Chain] = [] + for entry in data["chains"]: + try: + chains.append(Chain.model_validate(entry)) + except ValidationError as exc: + logger.warning("dropping malformed chain: %s", exc) + return chains +``` + +Then the agent itself: + +```python +def find_chains( + client: OpenAI, + model: str, + findings: list[Finding], + repo_map: RepoMap, + *, + max_retries: int = 1, +) -> tuple[list[Chain], list[Chain]]: + if len(findings) < 2: + return [], [] + + template = _load_prompt_template("chainer.md") + prompt = template.replace( + "{findings_json}", _findings_to_input_json(findings) + ).replace("{repo_map}", _render_repo_map(repo_map, findings)) + + last_error: Exception | None = None + for attempt in range(max_retries + 1): + try: + response = client.chat.completions.create( + model=model, + messages=[ + { + "role": "system", + "content": ( + "You are a senior offensive security engineer. You respond " + "only with valid JSON matching the schema in the user prompt." + ), + }, + {"role": "user", "content": prompt}, + ], + response_format={"type": "json_object"}, + temperature=0.2, + ) + except Exception as exc: + logger.warning("Venice call failed on attempt %d: %s", attempt, exc) + last_error = exc + continue + + content = response.choices[0].message.content or "" + try: + chains = _parse_chains(content) + except ValueError as exc: + logger.warning("chainer parse failure on attempt %d: %s", attempt, exc) + last_error = exc + continue + + valid, dropped = validate_chain_references(chains, findings) + if dropped: + logger.warning( + "chainer referenced %d unknown finding id(s) or file(s); chains dropped: %s", + len(dropped), + [c.id for c in dropped], + ) + return valid, dropped + + logger.error("giving up on chainer after %d attempts: %s", max_retries + 1, last_error) + return [], [] +``` + +A couple of things worth pointing out: + +- We bail out before calling the model when there are fewer than two findings. You can't chain a single finding, and skipping the call means we don't burn tokens on a guaranteed-empty result. +- `temperature=0.2` is slightly higher than the Scanner's `0.1`. The Chainer benefits from a touch more creativity to spot non-obvious combinations, but we still want it grounded in the findings and map it was given. +- After parsing, `validate_chain_references` runs the deterministic cross-reference check we wrote earlier. Anything that survives is safe to render; anything that doesn't gets logged so the operator knows the model tried to invent something. + +That cross-reference check is the most important piece of the whole project. It's the boundary between "useful security tool" and "occasionally confidently wrong AI report." With it in place, even if the model hallucinates, the wrong chain never reaches the report. + +## Rendering the Markdown Report + +Keeping rendering separate from agent logic means the same `Finding` and `Chain` objects can later be fed into other formats (JSON, SARIF, HTML) without touching the Scanner or Chainer. + +We'll use Jinja2 with a small template file. Create `src/venice_security_reviewer/templates/report.md.j2`: + +````jinja +# Security Review Report + +**Target:** `{{ target }}` +**Scanned at:** {{ scanned_at }} +**Model:** `{{ model }}` + +--- + +## Summary + +- **Atomic findings:** {{ findings | length }} +- **Exploit chains:** {{ chains | length }} +{%- if dropped_chains %} +- **Dropped chains (referenced unknown findings):** {{ dropped_chains | length }} +{%- endif %} + +--- + +## Exploit Chains + +{% if not chains %} +_No exploit chains were identified by the Chainer agent._ +{% else %} +{% for c in chains %} +### {{ c.id }} — {{ c.severity | upper }} + +**Findings combined:** {{ c.findings | join(', ') }} +**Files involved:** {{ c.files_involved | map('string') | join(', ') }} + +{{ c.narrative }} + +{% endfor %} +{% endif %} + +--- + +## Atomic Findings + +{% for f in findings %} +### {{ f.id }} — {{ f.title }} + +- **Severity:** {{ f.severity }} +{%- if f.cwe %} +- **CWE:** {{ f.cwe }} +{%- endif %} +- **Location:** `{{ f.evidence.file }}:{{ f.evidence.start_line }}-{{ f.evidence.end_line }}` + +{{ f.description }} + +``` +{{ f.evidence.snippet }} +``` + +{% endfor %} +```` + +Then the renderer at `src/venice_security_reviewer/report.py`: + +```python +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path + +from jinja2 import Environment, PackageLoader, select_autoescape + +from .models import Chain, Finding + + +def _build_env() -> Environment: + return Environment( + loader=PackageLoader("venice_security_reviewer", "templates"), + autoescape=select_autoescape(enabled_extensions=("html",)), + keep_trailing_newline=True, + ) + + +def render_report( + *, + target: Path, + model: str, + findings: list[Finding], + chains: list[Chain], + dropped_chains: list[Chain] | None = None, +) -> str: + env = _build_env() + template = env.get_template("report.md.j2") + return template.render( + target=str(target), + scanned_at=datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"), + model=model, + findings=findings, + chains=chains, + dropped_chains=dropped_chains or [], + ) +``` + +Autoescape stays off for the Markdown template (Markdown isn't HTML), but we leave it enabled for any future `.html` templates by extension. + +## Wiring the CLI + +The CLI is the orchestrator: build the repo map, scan, chain, render. We'll use Typer to handle argument parsing and Rich to print a nice summary table. + +Create `src/venice_security_reviewer/cli.py`: + +```python +from __future__ import annotations + +import logging +import sys +from pathlib import Path +from typing import Annotated + +import typer +from rich.console import Console +from rich.table import Table + +from .chainer import find_chains +from .client import VeniceConfigError, build_client +from .models import Chain, Finding +from .repo_map import build_repo_map +from .report import render_report +from .scanner import scan_path + +app = typer.Typer( + add_completion=False, + help="Two-agent security code reviewer powered by Venice AI.", + no_args_is_help=True, +) +console = Console() + + +@app.callback() +def _root() -> None: + """Force Typer to keep `scan` as a named subcommand.""" + + +def _configure_logging(verbose: bool) -> None: + logging.basicConfig( + level=logging.DEBUG if verbose else logging.INFO, + format="%(levelname)s %(name)s: %(message)s", + stream=sys.stderr, + ) + + +def _print_summary( + findings: list[Finding], chains: list[Chain], dropped: list[Chain] +) -> None: + table = Table(title="Scan summary", show_header=True, header_style="bold") + table.add_column("Metric") + table.add_column("Count", justify="right") + table.add_row("Atomic findings", str(len(findings))) + table.add_row("Exploit chains", str(len(chains))) + if dropped: + table.add_row("Chains dropped (bad refs)", str(len(dropped))) + console.print(table) + + +@app.command() +def scan( + path: Annotated[ + Path, + typer.Argument( + exists=True, file_okay=False, dir_okay=True, readable=True, resolve_path=True, + help="Path to the codebase to scan.", + ), + ], + out: Annotated[ + Path, typer.Option("--out", "-o", help="Where to write the Markdown report.") + ] = Path("report.md"), + verbose: Annotated[ + bool, typer.Option("--verbose", "-v", help="Enable debug logging.") + ] = False, +) -> None: + """Scan a codebase for vulnerabilities and exploit chains.""" + _configure_logging(verbose) + + try: + client, model = build_client() + except VeniceConfigError as exc: + console.print(f"[red]error:[/red] {exc}") + raise typer.Exit(code=2) from exc + + console.print(f"[bold]Indexing[/bold] {path} (AST repo map)...") + repo_map = build_repo_map(path) + edge_count = sum(len(m.imports) for m in repo_map.modules) + console.print( + f"Repo map: [bold]{len(repo_map.modules)}[/bold] module(s), " + f"[bold]{edge_count}[/bold] import edge(s)." + ) + + console.print(f"[bold]Scanning[/bold] {path} with model [cyan]{model}[/cyan]...") + findings = scan_path(client, model, path, repo_map) + console.print(f"Scanner produced [bold]{len(findings)}[/bold] finding(s).") + + console.print("[bold]Chaining[/bold] findings...") + chains, dropped = find_chains(client, model, findings, repo_map) + console.print(f"Chainer produced [bold]{len(chains)}[/bold] chain(s).") + + report = render_report( + target=path, model=model, + findings=findings, chains=chains, dropped_chains=dropped, + ) + out.write_text(report, encoding="utf-8") + console.print(f"Report written to [green]{out}[/green]") + _print_summary(findings, chains, dropped) + + +def main() -> None: + app() + + +if __name__ == "__main__": + main() +``` + +Add the script entry point to `pyproject.toml`: + +```toml +[project.scripts] +venice-security-reviewer = "venice_security_reviewer.cli:main" +``` + +That's the whole pipeline wired up. + +## Running the Project + +To try it on a real codebase, point the CLI at a directory of Python source: + +```bash +uv run venice-security-reviewer scan path/to/your/code +``` + +Or install it into your virtualenv with `pip install -e .` and run `venice-security-reviewer scan path/to/your/code`. + +The output looks roughly like this: + +```text +Indexing /path/to/code (AST repo map)... +Repo map: 6 module(s), 14 import edge(s). +Scanning /path/to/code with model zai-org-glm-5... +Scanner produced 4 finding(s). +Chaining findings... +Chainer produced 1 chain(s). +Report written to report.md + Scan summary +┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┓ +┃ Metric ┃ Count ┃ +┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━┩ +│ Atomic findings │ 4 │ +│ Exploit chains │ 1 │ +└───────────────────────────┴───────┘ +``` + +The Markdown report shows each chain at the top with its narrative, then every individual finding underneath with severity, CWE, file location, description, and the verbatim snippet the model claims to have read. + +The reference repo also ships with four bundled demo targets that each exercise a different shape of reasoning the Chainer has to do: + +- `examples/vulnerable_app` — a multi-file Flask app with three "low" findings, two of which combine into a critical privilege-escalation chain across files. Tests whether the Chainer is selective about what it combines. +- `examples/url_preview` — a multi-file URL-fetcher with a defensive allowlist that doesn't apply per-iteration. Tests cross-file data-flow reasoning combined with deployment topology (link-local IPs are cloud-credential gateways). +- `examples/csv_query` — a single-file CSV filter with an `eval` sandbox escape via `__class__.__base__.__subclasses__()`. Tests language-level reasoning rather than HTTP flow. +- `examples/webhook_handler` — a single-file HMAC verifier with a JSON parser-differential vulnerability. Tests reasoning across multiple specifications. + +Try them with: + +```bash +uv run venice-security-reviewer scan examples/vulnerable_app +uv run venice-security-reviewer scan examples/csv_query +``` + +If you ever see the CLI log `chainer referenced N unknown finding id(s) or file(s); chains dropped`, that's the cross-reference validator catching the model in the act of inventing a chain. The dropped chains never make it into the report; you just get a warning that you can use to adjust the prompt or sample additional Chainer runs. + +## Extending This Example + +The two-agent shape generalises well. A few directions worth exploring: + +- **More languages.** The Scanner is language-agnostic at the prompt level; the AST builder is what's Python-specific. Swap in `tree-sitter` and you can build the same neighbourhood/condensed-map shapes for TypeScript, Go, or Rust. +- **A third agent for fixes.** Once you have a chain, asking a Patcher agent to draft a unified diff that defangs *one* of the constituent findings is a small step. Pydantic-validate the diff against the same evidence-file set and you get the same hallucination guard for free. +- **Output formats.** `render_report` is the only place that knows about Markdown. Add a SARIF renderer and the same findings can drop into GitHub code scanning. Add a JSON renderer and you can pipe results into a downstream system. +- **Caching by file hash.** The Scanner's per-file calls are independent and idempotent. Caching by `(file_hash, prompt_hash, model)` means re-scanning a repo where one file changed only re-runs the Scanner on that one file. +- **Sampling for the Chainer.** For high-stakes runs, call the Chainer N times at slightly higher temperature and intersect the results. Chains the model finds consistently are more likely to be real; chains it finds once and never again are likely noise. +- **Stronger models.** `zai-org-glm-5` is the default because it strikes a good balance between cost and quality for combinatorial reasoning, but for harder codebases swapping in a stronger Venice model (set via `VENICE_MODEL`) can make the Chainer's narratives noticeably tighter. + +## Finishing Up + +Thanks for reading! Hopefully this helped you understand how to structure an AI security tool that's actually trustworthy. The pattern we used here generalises beyond security too: any time you want an LLM to reason across files in a way that has to ground out in real evidence, the recipe is the same. Build a deterministic structural map, hand the model a slice of it that fits in context, validate the model's references back against the structure, and drop anything it can't ground. + +By using Python with the Venice AI API, we can build agents that combine LLM reasoning with hard validation boundaries, and ship something that gives a useful answer instead of a confident-sounding one.