Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion anton/chat.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio
import json as _json
import os
import urllib.error
import re as _re
Expand Down
38 changes: 38 additions & 0 deletions anton/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,15 @@ def main(
resume: bool = typer.Option(
False, "--resume", "-r", help="Resume a previous chat session"
),
prompt: str | None = typer.Option(
Comment thread
velocitybolt marked this conversation as resolved.
None, "--prompt", "-p", help="Run a single prompt in headless mode and exit"
),
output_format: str = typer.Option(
Comment thread
velocitybolt marked this conversation as resolved.
"text", "--output-format", help="Output format: text or json (headless mode only)"
),
stdin: bool = typer.Option(
False, "--stdin", help="Read prompt from stdin (headless mode)"
),
) -> None:
"""Anton — a self-evolving autonomous system."""
_ensure_dependencies(console)
Expand All @@ -287,6 +296,35 @@ def main(
settings = AntonSettings()
settings.resolve_workspace(folder)

# Headless mode: --prompt or --stdin (mutually exclusive)
if prompt and stdin:
import sys as _sys
print("Error: --prompt and --stdin are mutually exclusive", file=_sys.stderr)
raise typer.Exit(1)

headless_prompt = prompt
if stdin:
import sys as _sys
headless_prompt = _sys.stdin.read().strip()
if not headless_prompt:
print("Error: no input received from stdin", file=_sys.stderr)
raise typer.Exit(1)

if headless_prompt:
# Headless mode — skip terms consent, banner, update check
if not _has_api_key(settings):
print("Error: no API key configured. Run `anton setup` first.", file=sys.stderr)
raise typer.Exit(1)

ctx.ensure_object(dict)
ctx.obj["settings"] = settings

from anton.commands.headless import run_headless

_ensure_workspace(settings)
run_headless(console, settings, prompt=headless_prompt, output_format=output_format)
raise typer.Exit(0)

if not settings.terms_consent:
_ensure_terms_consent(console, settings)

Expand Down
154 changes: 154 additions & 0 deletions anton/commands/headless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Headless single-shot prompt execution."""
from __future__ import annotations

import asyncio
import json as _json
import sys
from pathlib import Path
from typing import TYPE_CHECKING

from anton.chat_session import build_runtime_context
from anton.data_vault import DataVault
from anton.datasource_registry import DatasourceRegistry
from anton.llm.provider import (
StreamComplete,
StreamTextDelta,
StreamToolUseDelta,
StreamToolUseEnd,
StreamToolUseStart,
)
from anton.utils.datasources import register_secret_vars

if TYPE_CHECKING:
from rich.console import Console

from anton.config.settings import AntonSettings


def run_headless(
console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text"
) -> None:
"""Run a single prompt in headless mode and exit."""
if not prompt:
print("Error: headless mode requires a prompt via --prompt or --stdin", file=sys.stderr)
raise SystemExit(1)

asyncio.run(_headless(console, settings, prompt=prompt, output_format=output_format))


async def _headless(
console: Console, settings: AntonSettings, *, prompt: str, output_format: str = "text"
) -> None:
"""Execute a single prompt without interactive elements."""
try:
from anton.context.self_awareness import SelfAwarenessContext
from anton.llm.client import LLMClient
from anton.memory.cortex import Cortex
from anton.workspace import Workspace

llm_client = LLMClient.from_settings(settings)

self_awareness = SelfAwarenessContext(Path(settings.context_dir))
workspace = Workspace(settings.workspace_path)
workspace.apply_env_to_process()

# Inject datasource env vars
dv = DataVault()
dreg = DatasourceRegistry()
for conn in dv.list_connections():
dv.inject_env(conn["engine"], conn["name"])
edef = dreg.get(conn["engine"])
if edef is not None:
register_secret_vars(edef, engine=conn["engine"], name=conn["name"])
del dv, dreg

global_memory_dir = Path.home() / ".anton" / "memory"
project_memory_dir = settings.workspace_path / ".anton" / "memory"

cortex = Cortex(
global_dir=global_memory_dir,
project_dir=project_memory_dir,
mode=settings.memory_mode,
llm_client=llm_client,
)

from anton.memory.episodes import EpisodicMemory

episodes_dir = settings.workspace_path / ".anton" / "episodes"
episodic = EpisodicMemory(episodes_dir, enabled=settings.episodic_memory)
if episodic.enabled:
episodic.start_session()

from anton.memory.history_store import HistoryStore

history_store = HistoryStore(episodes_dir)
current_session_id = episodic._session_id if episodic.enabled else None

from anton.chat import ChatSession

runtime_context = build_runtime_context(settings)
coding_api_key = (
settings.anthropic_api_key
if settings.coding_provider == "anthropic"
else settings.openai_api_key
) or ""

session = ChatSession(
llm_client,
self_awareness=self_awareness,
cortex=cortex,
episodic=episodic,
runtime_context=runtime_context,
workspace=workspace,
console=None,
coding_provider=settings.coding_provider,
coding_api_key=coding_api_key,
coding_base_url=settings.openai_base_url or "",
history_store=history_store,
session_id=current_session_id,
proactive_dashboards=False,
)

# Execute single turn
response_text = ""
tool_calls: list[dict] = []
usage_data: dict = {}

async for event in session.turn_stream(prompt):
if isinstance(event, StreamTextDelta):
response_text += event.text
elif isinstance(event, StreamToolUseStart):
tool_calls.append({"name": event.name, "id": event.id, "input": {}})
elif isinstance(event, StreamToolUseDelta):
if tool_calls:
last = tool_calls[-1]
last.setdefault("_raw_input", "")
last["_raw_input"] += event.json_delta
elif isinstance(event, StreamToolUseEnd):
if tool_calls:
last = tool_calls[-1]
raw = last.pop("_raw_input", "{}")
try:
last["input"] = _json.loads(raw)
except _json.JSONDecodeError:
last["input"] = raw
elif isinstance(event, StreamComplete):
usage_data = {
"input_tokens": event.response.usage.input_tokens,
"output_tokens": event.response.usage.output_tokens,
}

# Output
if output_format == "json":
result = {
"response": response_text,
"tool_calls": [{"name": tc["name"], "input": tc.get("input", {})} for tc in tool_calls],
"usage": usage_data,
}
print(_json.dumps(result))
else:
print(response_text)

except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
raise SystemExit(1)
166 changes: 166 additions & 0 deletions tests/test_headless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Tests for headless mode (--prompt flag)."""
from __future__ import annotations

import json
from io import StringIO
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from anton.commands.headless import _headless
from anton.llm.provider import (
LLMResponse,
StreamComplete,
StreamTextDelta,
StreamToolUseEnd,
StreamToolUseStart,
ToolCall,
Usage,
)


def _text_response(text: str) -> LLMResponse:
return LLMResponse(
content=text,
tool_calls=[],
usage=Usage(input_tokens=10, output_tokens=20),
stop_reason="end_turn",
)


def _mock_settings():
settings = MagicMock()
settings.workspace_path = MagicMock()
settings.workspace_path.__truediv__ = lambda self, other: MagicMock()
settings.context_dir = "/tmp/test-anton-context"
settings.memory_mode = "off"
settings.episodic_memory = False
settings.coding_provider = "anthropic"
settings.anthropic_api_key = "test-key"
settings.openai_api_key = None
settings.openai_base_url = None
settings.proactive_dashboards = False
return settings


def _patches(mock_session):
"""Common patches for headless tests. Returns context manager stack."""
mock_dv = MagicMock()
mock_dv.return_value.list_connections.return_value = []
mock_ep = MagicMock()
mock_ep.return_value.enabled = False
mock_ep.return_value._session_id = None

return (
patch("anton.llm.client.LLMClient.from_settings", return_value=MagicMock()),
patch("anton.context.self_awareness.SelfAwarenessContext"),
patch("anton.workspace.Workspace"),
patch("anton.data_vault.DataVault", mock_dv),
patch("anton.datasource_registry.DatasourceRegistry"),
patch("anton.memory.cortex.Cortex"),
patch("anton.memory.episodes.EpisodicMemory", mock_ep),
patch("anton.memory.history_store.HistoryStore"),
patch("anton.chat_session.build_runtime_context", return_value=""),
patch("anton.chat.ChatSession", return_value=mock_session),
)


class TestHeadlessTextOutput:
@pytest.mark.asyncio
async def test_basic_text_response(self, capsys):
mock_session = AsyncMock()

async def fake_stream(prompt):
yield StreamTextDelta("The answer is 42.")
yield StreamComplete(_text_response("The answer is 42."))

mock_session.turn_stream = fake_stream

patches = _patches(mock_session)
with patches[0], patches[1], patches[2], patches[3], patches[4], \
patches[5], patches[6], patches[7], patches[8], patches[9]:
from rich.console import Console
console = Console(file=StringIO())
await _headless(console, _mock_settings(), prompt="question", output_format="text")

captured = capsys.readouterr()
assert "The answer is 42." in captured.out


class TestHeadlessJsonOutput:
@pytest.mark.asyncio
async def test_json_response(self, capsys):
mock_session = AsyncMock()

async def fake_stream(prompt):
yield StreamTextDelta("Hello world")
yield StreamComplete(_text_response("Hello world"))

mock_session.turn_stream = fake_stream

patches = _patches(mock_session)
with patches[0], patches[1], patches[2], patches[3], patches[4], \
patches[5], patches[6], patches[7], patches[8], patches[9]:
from rich.console import Console
console = Console(file=StringIO())
await _headless(console, _mock_settings(), prompt="say hello", output_format="json")

captured = capsys.readouterr()
result = json.loads(captured.out)
assert result["response"] == "Hello world"
assert isinstance(result["tool_calls"], list)
assert result["usage"]["input_tokens"] == 10
assert result["usage"]["output_tokens"] == 20


class TestHeadlessToolCalls:
@pytest.mark.asyncio
async def test_tool_calls_in_json_output(self, capsys):
mock_session = AsyncMock()

async def fake_stream(prompt):
yield StreamToolUseStart(id="tc_1", name="scratchpad")
yield StreamToolUseEnd(id="tc_1")
yield StreamTextDelta("Result: 55")
yield StreamComplete(LLMResponse(
content="Result: 55",
tool_calls=[ToolCall(id="tc_1", name="scratchpad", input={"action": "exec"})],
usage=Usage(input_tokens=50, output_tokens=100),
stop_reason="end_turn",
))

mock_session.turn_stream = fake_stream

patches = _patches(mock_session)
with patches[0], patches[1], patches[2], patches[3], patches[4], \
patches[5], patches[6], patches[7], patches[8], patches[9]:
from rich.console import Console
console = Console(file=StringIO())
await _headless(console, _mock_settings(), prompt="fibonacci", output_format="json")

captured = capsys.readouterr()
result = json.loads(captured.out)
assert len(result["tool_calls"]) == 1
assert result["tool_calls"][0]["name"] == "scratchpad"
assert "55" in result["response"]


class TestHeadlessNoInteractive:
@pytest.mark.asyncio
async def test_completes_without_interactive_input(self):
"""Headless mode completes without prompt_toolkit or interactive console."""
mock_session = AsyncMock()

async def fake_stream(prompt):
yield StreamTextDelta("ok")
yield StreamComplete(_text_response("ok"))

mock_session.turn_stream = fake_stream

patches = _patches(mock_session)
with patches[0], patches[1], patches[2], patches[3], patches[4], \
patches[5], patches[6], patches[7], patches[8], patches[9]:
from rich.console import Console
console = Console(file=StringIO())
# Should complete without hanging on interactive input
await _headless(console, _mock_settings(), prompt="test", output_format="text")
Loading