diff --git a/README.md b/README.md index cbb72f4..b1e134d 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,7 @@ print(f"Replay with: agent-strace replay {meta.session_id}") | `curve` | Personal cost-efficiency curve | | `a2a-tree` | Cross-agent trace correlation (A2A protocol) | | `mcp` | MCP server: expose traces as queryable tools for a debugging agent | +| `timeline` | Structured phase-by-phase view of a session with costs, retries, and wasted spend | | `config-watch` | Snapshot and diff AGENTS.md and other config files; find affected sessions | ``` @@ -238,6 +239,8 @@ agent-strace oncall --rotation-start DATE On-call readiness for agent-modi agent-strace curve [--export csv] Personal agent cost-efficiency curve agent-strace inflation [--compare m1,m2] Token inflation calculator across model versions agent-strace a2a-tree [session-id] Visualise A2A agent call graph +agent-strace timeline [session-id] [--format text|json] [--model MODEL] + Structured phase-by-phase session view with costs and retries agent-strace config-watch snapshot [--label TEXT] [--watch PATH] Snapshot current config file state agent-strace config-watch check [--format text|json] [--watch PATH] @@ -446,6 +449,55 @@ Rules are configurable via `.agent-strace-lint.json`: | `error-retry-loop` | WARN | Same tool errored and was retried 3+ times | | `no-output` | WARN | Session completed with no write or file-modifying tool calls | +### Session timeline + +A structured, phase-by-phase view of what happened in a session — tool calls, file operations, LLM requests, errors, retries, and a wasted-spend callout for failed phases. + +```bash +# Timeline for the latest session +agent-strace timeline + +# Timeline for a specific session +agent-strace timeline + +# Machine-readable output +agent-strace timeline --format json + +# Use a different model for cost estimates +agent-strace timeline --model opus +``` + +Example output: + +``` +Session: abc123def456 | 2026-05-19 14:32 | 4m 12s | $0.0043 | 3 errors + +Phase 1: Setup (0:00–0:45) $0.0008 + ✓ Read src/auth/handler.go + ✓ Read src/auth/middleware.go + +Phase 2: Implementation (0:45–2:10) $0.0031 + · Run Bash (1.2s) + pytest tests/test_auth.py + ✗ Error: Bash + FAILED tests/test_auth.py::TestAuthHandler + · Run Bash (attempt 2) (1.1s) + pytest tests/test_auth.py + ✗ Error: Bash + FAILED tests/test_auth.py::TestAuthHandler + ✓ Write src/auth/handler.go +3 lines + ✓ Run Bash (0.9s) + + ⚠ 2 retries in this phase + +⚠ Wasted spend: 2 retries on failed phases = ~$0.0008 (19% of session cost) +``` + +| Flag | Default | Description | +|---|---|---| +| `--format` | `text` | `text` or `json` | +| `--model` | `sonnet` | Pricing model for cost estimates: `sonnet`, `opus`, `haiku`, `gpt4`, `gpt4o` | + ### Config change detector Track changes to AGENTS.md and other agent configuration files. Snapshot the current state before a change, then check what drifted and which sessions ran after it. diff --git a/src/agent_trace/__init__.py b/src/agent_trace/__init__.py index 101e0ea..071f1f3 100644 --- a/src/agent_trace/__init__.py +++ b/src/agent_trace/__init__.py @@ -1,3 +1,3 @@ """agent-trace: strace for AI agents.""" -__version__ = "0.50.0" +__version__ = "0.51.0" diff --git a/src/agent_trace/cli.py b/src/agent_trace/cli.py index 9371fac..4fb6d62 100644 --- a/src/agent_trace/cli.py +++ b/src/agent_trace/cli.py @@ -49,6 +49,7 @@ from .integrations import detect_and_instrument, _INTEGRATIONS from .budget_report import cmd_budget_report from .compare import cmd_compare +from .timeline import cmd_timeline from .config_watch import cmd_config_watch from .lint import cmd_lint from .retention import cmd_retention @@ -572,6 +573,17 @@ def build_parser() -> argparse.ArgumentParser: p_explain = sub.add_parser("explain", help="explain a session in plain English") p_explain.add_argument("session_id", nargs="?", help="session ID or prefix (default: latest)") + # timeline + p_timeline = sub.add_parser("timeline", + help="structured chronological view of a session by phase") + p_timeline.add_argument("session_id", nargs="?", + help="session ID or prefix (default: latest)") + p_timeline.add_argument("--model", default="sonnet", + choices=["sonnet", "opus", "haiku", "gpt4", "gpt4o"], + help="model pricing for cost estimates (default: sonnet)") + p_timeline.add_argument("--format", choices=["text", "json"], default="text", + help="output format (default: text)") + # diff p_diff = sub.add_parser("diff", help="compare two sessions structurally") p_diff.add_argument("session_a", help="first session ID or prefix") @@ -1055,6 +1067,7 @@ def main() -> None: "stats": cmd_stats, "import": cmd_import, "explain": cmd_explain, + "timeline": cmd_timeline, "cost": cmd_cost, "diff": cmd_diff, "why": cmd_why, diff --git a/src/agent_trace/timeline.py b/src/agent_trace/timeline.py new file mode 100644 index 0000000..b774408 --- /dev/null +++ b/src/agent_trace/timeline.py @@ -0,0 +1,449 @@ +"""Session timeline view. + +Renders a structured, chronological view of a session grouped into phases. +Shows tool calls, file operations, LLM requests, errors, retries, and a +wasted-spend callout for failed phases. + +Builds on explain.py (phase detection) and cost.py (token/cost estimation). +""" + +from __future__ import annotations + +import argparse +import json +import sys +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, TextIO + +from .cost import DEFAULT_MODEL, PRICING, _dollars, _estimate_tokens, _event_tokens +from .explain import ExplainResult, Phase, explain_session +from .models import EventType, TraceEvent +from .store import TraceStore + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + +@dataclass +class TimelineEntry: + """One rendered line in the timeline.""" + offset: float # seconds from session start + status: str # "ok", "fail", "info" + label: str # short description + detail: str = "" # optional second line + duration_ms: float | None = None + tokens: int = 0 + cost: float = 0.0 + + +@dataclass +class TimelinePhase: + index: int + name: str + start_offset: float + end_offset: float + entries: list[TimelineEntry] = field(default_factory=list) + failed: bool = False + retry_count: int = 0 + wasted_cost: float = 0.0 + total_cost: float = 0.0 + + +@dataclass +class TimelineResult: + session_id: str + started_at: float + total_duration: float # seconds + total_events: int + phases: list[TimelinePhase] + total_cost: float + wasted_cost: float + error_count: int + retry_count: int + + +# --------------------------------------------------------------------------- +# Builder +# --------------------------------------------------------------------------- + +def _fmt_offset(seconds: float) -> str: + m = int(seconds) // 60 + s = int(seconds) % 60 + return f"{m}:{s:02d}" + + +def _tool_label(event: TraceEvent) -> tuple[str, str]: + """Return (label, detail) for a tool_call event.""" + name = event.data.get("tool_name", "?") + args = event.data.get("arguments", {}) + n = name.lower() + + if n == "bash": + cmd = str(args.get("command", "")).strip() + detail = cmd[:120] + ("..." if len(cmd) > 120 else "") + return f"Run {name}", detail + + if n == "read": + path = str(args.get("file_path", "")).strip() + return f"Read {path}", "" + + if n in ("write", "edit"): + path = str(args.get("file_path", "")).strip() + lines = args.get("new_string", args.get("content", "")) + line_count = str(lines).count("\n") + 1 if lines else 0 + detail = f"+{line_count} lines" if line_count > 1 else "" + return f"Write {path}", detail + + if n == "glob": + return f"Glob {args.get('pattern', '')}", "" + + if n == "grep": + return f"Grep /{args.get('pattern', '')}/", str(args.get("path", "")) + + if n == "webfetch": + return f"Fetch {args.get('url', '')[:80]}", "" + + if n == "websearch": + return f"Search \"{args.get('query', '')[:80]}\"", "" + + # Generic: show first string arg + for k, v in args.items(): + if isinstance(v, str) and v: + return f"{name} {k}={v[:60]}", "" + + return name, "" + + +def _phase_cost(phase: Phase, model: str) -> float: + inp = out = 0 + for e in phase.events: + i, o = _event_tokens(e) + inp += i + out += o + return _dollars(inp, out, model) + + +def _build_entries(phase: Phase, base_ts: float, model: str) -> list[TimelineEntry]: + entries: list[TimelineEntry] = [] + + # Track tool_call events so we can pair them with tool_results for retry detection + pending_calls: dict[str, TraceEvent] = {} # event_id -> tool_call event + call_counts: dict[str, int] = {} # normalised key -> count + + for event in phase.events: + offset = event.timestamp - base_ts + inp, out = _event_tokens(event) + tokens = inp + out + cost = _dollars(inp, out, model) + + if event.event_type == EventType.TOOL_CALL: + label, detail = _tool_label(event) + pending_calls[event.event_id] = event + + # Retry detection: same tool+args key seen before + key = f"{event.data.get('tool_name','')}:{json.dumps(event.data.get('arguments',{}), sort_keys=True)}" + call_counts[key] = call_counts.get(key, 0) + 1 + retry_tag = f" (attempt {call_counts[key]})" if call_counts[key] > 1 else "" + + entries.append(TimelineEntry( + offset=offset, + status="info", + label=label + retry_tag, + detail=detail, + duration_ms=event.duration_ms, + tokens=tokens, + cost=cost, + )) + + elif event.event_type == EventType.TOOL_RESULT: + # Mark the matching call entry as ok/fail based on result + result_text = str(event.data.get("result", "")) + is_error = event.data.get("is_error", False) or "error" in result_text.lower()[:50] + # Update the last entry for this tool if it's still "info" + for entry in reversed(entries): + if entry.status == "info" and entry.label.split(" (attempt")[0]: + entry.status = "fail" if is_error else "ok" + if event.duration_ms: + entry.duration_ms = event.duration_ms + break + + elif event.event_type == EventType.LLM_REQUEST: + model_name = event.data.get("model", "") + msg_count = event.data.get("message_count", 0) + detail = f"{model_name}, {msg_count} messages" if model_name else f"{msg_count} messages" + entries.append(TimelineEntry( + offset=offset, + status="info", + label="LLM request", + detail=detail, + tokens=tokens, + cost=cost, + )) + + elif event.event_type == EventType.LLM_RESPONSE: + tok = event.data.get("total_tokens", tokens) + entries.append(TimelineEntry( + offset=offset, + status="ok", + label="LLM response", + detail=f"{tok} tokens", + duration_ms=event.duration_ms, + tokens=tok, + cost=cost, + )) + + elif event.event_type == EventType.FILE_READ: + uri = event.data.get("uri", "") + entries.append(TimelineEntry( + offset=offset, + status="ok", + label=f"Read {uri}", + )) + + elif event.event_type == EventType.FILE_WRITE: + uri = event.data.get("uri", "") + entries.append(TimelineEntry( + offset=offset, + status="ok", + label=f"Write {uri}", + )) + + elif event.event_type == EventType.ERROR: + msg = event.data.get("message", "") or event.data.get("error", "") + tool = event.data.get("tool_name", "") + label = f"Error: {tool}" if tool else "Error" + entries.append(TimelineEntry( + offset=offset, + status="fail", + label=label, + detail=str(msg)[:120], + )) + + elif event.event_type == EventType.DECISION: + choice = event.data.get("choice", "") + reason = event.data.get("reason", "") + entries.append(TimelineEntry( + offset=offset, + status="info", + label=f"Decision: {choice}", + detail=reason[:80] if reason else "", + )) + + elif event.event_type == EventType.USER_PROMPT: + text = event.data.get("prompt", "") + preview = text[:100].replace("\n", " ") + if len(text) > 100: + preview += "..." + entries.append(TimelineEntry( + offset=offset, + status="info", + label="User prompt", + detail=f'"{preview}"', + )) + + elif event.event_type == EventType.ASSISTANT_RESPONSE: + text = event.data.get("text", "") + preview = text[:100].replace("\n", " ") + if len(text) > 100: + preview += "..." + entries.append(TimelineEntry( + offset=offset, + status="ok", + label="Response", + detail=f'"{preview}"' if preview else "", + tokens=tokens, + cost=cost, + )) + + return entries + + +def build_timeline( + store: TraceStore, + session_id: str, + model: str = DEFAULT_MODEL, +) -> TimelineResult: + """Build a structured timeline for *session_id*.""" + explain = explain_session(store, session_id) + meta = store.load_meta(session_id) + base_ts = meta.started_at if meta else (explain.phases[0].events[0].timestamp if explain.phases else 0.0) + + total_cost = 0.0 + wasted_cost = 0.0 + error_count = 0 + retry_count = 0 + phases: list[TimelinePhase] = [] + + for phase in explain.phases: + phase_cost = _phase_cost(phase, model) + entries = _build_entries(phase, base_ts, model) + + # Count errors and retries from entries + phase_errors = sum(1 for e in entries if e.status == "fail") + # Retry count: entries with "(attempt N)" where N > 1 + phase_retries = sum( + 1 for e in entries + if "(attempt " in e.label and not e.label.endswith("(attempt 1)") + ) + + wasted = phase_cost if phase.failed else 0.0 + + phases.append(TimelinePhase( + index=phase.index, + name=phase.name, + start_offset=phase.start_offset, + end_offset=phase.end_offset, + entries=entries, + failed=phase.failed, + retry_count=phase_retries, + wasted_cost=wasted, + total_cost=phase_cost, + )) + + total_cost += phase_cost + wasted_cost += wasted + error_count += phase_errors + retry_count += phase_retries + + return TimelineResult( + session_id=session_id, + started_at=base_ts, + total_duration=explain.total_duration, + total_events=explain.total_events, + phases=phases, + total_cost=total_cost, + wasted_cost=wasted_cost, + error_count=error_count, + retry_count=retry_count, + ) + + +# --------------------------------------------------------------------------- +# Text formatter +# --------------------------------------------------------------------------- + +def _fmt_duration(ms: float | None) -> str: + if ms is None: + return "" + if ms < 1000: + return f" ({ms:.0f}ms)" + return f" ({ms / 1000:.2f}s)" + + +def _status_icon(status: str) -> str: + return {"ok": "✓", "fail": "✗", "info": "·"}.get(status, "·") + + +def format_timeline(result: TimelineResult, out: TextIO = sys.stdout) -> None: + """Write a human-readable timeline to *out*.""" + w = out.write + + started = datetime.fromtimestamp(result.started_at, tz=timezone.utc) + dur_m = int(result.total_duration) // 60 + dur_s = int(result.total_duration) % 60 + dur_str = f"{dur_m}m {dur_s:02d}s" if dur_m else f"{dur_s}s" + + cost_str = f"${result.total_cost:.4f}" if result.total_cost else "" + err_str = f"{result.error_count} error{'s' if result.error_count != 1 else ''}" if result.error_count else "" + meta_parts = [p for p in [dur_str, cost_str, err_str] if p] + + w(f"\nSession: {result.session_id} | " + f"{started.strftime('%Y-%m-%d %H:%M')} | " + f"{' | '.join(meta_parts)}\n\n") + + for phase in result.phases: + time_range = f"{_fmt_offset(phase.start_offset)}–{_fmt_offset(phase.end_offset)}" + failed_tag = " — FAILED" if phase.failed else "" + cost_tag = f" ${phase.total_cost:.4f}" if phase.total_cost else "" + w(f"Phase {phase.index}: {phase.name}{failed_tag} ({time_range}){cost_tag}\n") + + for entry in phase.entries: + icon = _status_icon(entry.status) + dur = _fmt_duration(entry.duration_ms) + cost = f" ${entry.cost:.4f}" if entry.cost >= 0.0001 else "" + w(f" {icon} {entry.label}{dur}{cost}\n") + if entry.detail: + w(f" {entry.detail}\n") + + if phase.retry_count: + w(f"\n ⚠ {phase.retry_count} retr{'ies' if phase.retry_count != 1 else 'y'} in this phase\n") + + w("\n") + + if result.wasted_cost > 0 and result.total_cost > 0: + wasted_pct = result.wasted_cost / result.total_cost * 100 + w(f"⚠ Wasted spend: {result.retry_count} retr{'ies' if result.retry_count != 1 else 'y'} " + f"on failed phases = ~${result.wasted_cost:.4f} ({wasted_pct:.0f}% of session cost)\n\n") + + +def format_timeline_json(result: TimelineResult, out: TextIO = sys.stdout) -> None: + """Write the timeline as JSON to *out*.""" + def _phase_dict(p: TimelinePhase) -> dict: + return { + "index": p.index, + "name": p.name, + "start_offset": round(p.start_offset, 3), + "end_offset": round(p.end_offset, 3), + "failed": p.failed, + "retry_count": p.retry_count, + "total_cost": round(p.total_cost, 6), + "wasted_cost": round(p.wasted_cost, 6), + "entries": [ + { + "offset": round(e.offset, 3), + "status": e.status, + "label": e.label, + **({"detail": e.detail} if e.detail else {}), + **({"duration_ms": e.duration_ms} if e.duration_ms is not None else {}), + **({"tokens": e.tokens} if e.tokens else {}), + **({"cost": round(e.cost, 6)} if e.cost else {}), + } + for e in p.entries + ], + } + + out.write(json.dumps({ + "session_id": result.session_id, + "started_at": result.started_at, + "total_duration": round(result.total_duration, 3), + "total_events": result.total_events, + "total_cost": round(result.total_cost, 6), + "wasted_cost": round(result.wasted_cost, 6), + "error_count": result.error_count, + "retry_count": result.retry_count, + "phases": [_phase_dict(p) for p in result.phases], + }, indent=2) + "\n") + + +# --------------------------------------------------------------------------- +# CLI handler +# --------------------------------------------------------------------------- + +def cmd_timeline(args: argparse.Namespace) -> int: + store = TraceStore(args.trace_dir) + + session_id = getattr(args, "session_id", None) + if not session_id: + session_id = store.get_latest_session_id() + if not session_id: + sys.stderr.write("No sessions found.\n") + return 1 + + full_id = store.find_session(session_id) + if not full_id: + sys.stderr.write(f"Session not found: {session_id}\n") + return 1 + + model = getattr(args, "model", DEFAULT_MODEL) or DEFAULT_MODEL + fmt = getattr(args, "format", "text") or "text" + + result = build_timeline(store, full_id, model=model) + + if fmt == "json": + format_timeline_json(result) + else: + format_timeline(result) + + return 0 diff --git a/tests/test_timeline.py b/tests/test_timeline.py new file mode 100644 index 0000000..8e2ba9e --- /dev/null +++ b/tests/test_timeline.py @@ -0,0 +1,575 @@ +"""Tests for agent-strace timeline (Issue #81).""" + +from __future__ import annotations + +import argparse +import io +import json +import tempfile +import time +import unittest +from pathlib import Path + +from agent_trace.models import EventType, SessionMeta, TraceEvent +from agent_trace.store import TraceStore +from agent_trace.timeline import ( + TimelineEntry, + TimelinePhase, + TimelineResult, + build_timeline, + cmd_timeline, + format_timeline, + format_timeline_json, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_store() -> TraceStore: + return TraceStore(Path(tempfile.mkdtemp())) + + +def _add_session( + store: TraceStore, + *, + tool_calls: list[dict] | None = None, + tool_results: list[dict] | None = None, + llm_requests: int = 0, + llm_responses: int = 0, + errors: list[str] | None = None, + file_reads: list[str] | None = None, + file_writes: list[str] | None = None, + decisions: list[str] | None = None, + user_prompts: list[str] | None = None, + duration_ms: float = 5000.0, +) -> str: + meta = SessionMeta(agent_name="test", command="test", total_duration_ms=duration_ms) + sp = store.create_session(meta) + sid = sp.name + base = 1_000_000.0 + t = base + + def ev(etype, data, dur=None): + nonlocal t + t += 0.5 + return TraceEvent( + event_type=etype, + timestamp=t, + session_id=sid, + data=data, + duration_ms=dur, + ) + + events = [ev(EventType.SESSION_START, {})] + + for prompt in (user_prompts or []): + events.append(ev(EventType.USER_PROMPT, {"prompt": prompt})) + + for tc in (tool_calls or []): + events.append(ev(EventType.TOOL_CALL, tc, dur=100.0)) + + for tr in (tool_results or []): + events.append(ev(EventType.TOOL_RESULT, tr, dur=50.0)) + + for _ in range(llm_requests): + events.append(ev(EventType.LLM_REQUEST, {"model": "claude-3-5-sonnet", "message_count": 3})) + + for _ in range(llm_responses): + events.append(ev(EventType.LLM_RESPONSE, {"total_tokens": 500}, dur=800.0)) + + for path in (file_reads or []): + events.append(ev(EventType.FILE_READ, {"uri": path})) + + for path in (file_writes or []): + events.append(ev(EventType.FILE_WRITE, {"uri": path})) + + for msg in (errors or []): + events.append(ev(EventType.ERROR, {"message": msg})) + + for text in (decisions or []): + events.append(ev(EventType.DECISION, {"choice": text, "reason": "because"})) + + events.append(ev(EventType.SESSION_END, {"exit_code": 0})) + + for e in events: + store.append_event(sid, e) + + return sid + + +# --------------------------------------------------------------------------- +# build_timeline +# --------------------------------------------------------------------------- + +class TestBuildTimeline(unittest.TestCase): + + def test_returns_timeline_result(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + self.assertIsInstance(result, TimelineResult) + + def test_session_id_preserved(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + self.assertEqual(result.session_id, sid) + + def test_has_at_least_one_phase(self): + store = _make_store() + sid = _add_session(store, tool_calls=[{"tool_name": "Bash", "arguments": {"command": "ls"}}]) + result = build_timeline(store, sid) + self.assertGreater(len(result.phases), 0) + + def test_phases_are_timeline_phase(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + for phase in result.phases: + self.assertIsInstance(phase, TimelinePhase) + + def test_total_duration_positive(self): + store = _make_store() + sid = _add_session(store, duration_ms=3000.0) + result = build_timeline(store, sid) + self.assertGreaterEqual(result.total_duration, 0) + + def test_total_events_positive(self): + store = _make_store() + sid = _add_session(store, tool_calls=[{"tool_name": "Read", "arguments": {"file_path": "x.py"}}]) + result = build_timeline(store, sid) + self.assertGreater(result.total_events, 0) + + def test_error_count_increments(self): + store = _make_store() + sid = _add_session(store, errors=["something broke", "also this"]) + result = build_timeline(store, sid) + self.assertGreaterEqual(result.error_count, 2) + + def test_no_errors_gives_zero_error_count(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + self.assertEqual(result.error_count, 0) + + def test_total_cost_non_negative(self): + store = _make_store() + sid = _add_session(store, llm_requests=2, llm_responses=2) + result = build_timeline(store, sid) + self.assertGreaterEqual(result.total_cost, 0.0) + + def test_wasted_cost_non_negative(self): + store = _make_store() + sid = _add_session(store, errors=["fail"]) + result = build_timeline(store, sid) + self.assertGreaterEqual(result.wasted_cost, 0.0) + + def test_wasted_cost_lte_total_cost(self): + store = _make_store() + sid = _add_session(store, llm_requests=1, errors=["fail"]) + result = build_timeline(store, sid) + self.assertLessEqual(result.wasted_cost, result.total_cost + 1e-9) + + def test_multiple_phases_from_user_prompts(self): + store = _make_store() + sid = _add_session(store, user_prompts=["first task", "second task"]) + result = build_timeline(store, sid) + self.assertGreaterEqual(len(result.phases), 2) + + def test_phase_offsets_non_negative(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + for phase in result.phases: + self.assertGreaterEqual(phase.start_offset, 0.0) + self.assertGreaterEqual(phase.end_offset, 0.0) + + def test_phase_end_gte_start(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + for phase in result.phases: + self.assertGreaterEqual(phase.end_offset, phase.start_offset) + + def test_entries_are_timeline_entry(self): + store = _make_store() + sid = _add_session(store, tool_calls=[{"tool_name": "Bash", "arguments": {"command": "echo hi"}}]) + result = build_timeline(store, sid) + for phase in result.phases: + for entry in phase.entries: + self.assertIsInstance(entry, TimelineEntry) + + def test_entry_status_values(self): + store = _make_store() + sid = _add_session(store, tool_calls=[{"tool_name": "Read", "arguments": {"file_path": "a.py"}}]) + result = build_timeline(store, sid) + valid = {"ok", "fail", "info"} + for phase in result.phases: + for entry in phase.entries: + self.assertIn(entry.status, valid) + + def test_bash_tool_label(self): + store = _make_store() + sid = _add_session(store, tool_calls=[{"tool_name": "Bash", "arguments": {"command": "pytest"}}]) + result = build_timeline(store, sid) + labels = [e.label for p in result.phases for e in p.entries] + self.assertTrue(any("Bash" in l or "Run" in l for l in labels)) + + def test_read_tool_label(self): + store = _make_store() + sid = _add_session(store, tool_calls=[{"tool_name": "Read", "arguments": {"file_path": "src/main.py"}}]) + result = build_timeline(store, sid) + labels = [e.label for p in result.phases for e in p.entries] + self.assertTrue(any("src/main.py" in l for l in labels)) + + def test_write_tool_label(self): + store = _make_store() + sid = _add_session(store, tool_calls=[{"tool_name": "Write", "arguments": {"file_path": "out.py", "new_string": "x\ny\nz"}}]) + result = build_timeline(store, sid) + labels = [e.label for p in result.phases for e in p.entries] + self.assertTrue(any("out.py" in l for l in labels)) + + def test_error_entry_status_fail(self): + store = _make_store() + sid = _add_session(store, errors=["test failed"]) + result = build_timeline(store, sid) + fail_entries = [e for p in result.phases for e in p.entries if e.status == "fail"] + self.assertGreater(len(fail_entries), 0) + + def test_error_entry_label_contains_error(self): + store = _make_store() + sid = _add_session(store, errors=["something broke"]) + result = build_timeline(store, sid) + labels = [e.label for p in result.phases for e in p.entries if e.status == "fail"] + self.assertTrue(any("Error" in l or "error" in l for l in labels)) + + def test_llm_request_entry_present(self): + store = _make_store() + sid = _add_session(store, llm_requests=1) + result = build_timeline(store, sid) + labels = [e.label for p in result.phases for e in p.entries] + self.assertTrue(any("LLM" in l or "llm" in l for l in labels)) + + def test_llm_response_entry_present(self): + store = _make_store() + sid = _add_session(store, llm_responses=1) + result = build_timeline(store, sid) + labels = [e.label for p in result.phases for e in p.entries] + self.assertTrue(any("response" in l.lower() for l in labels)) + + def test_decision_entry_present(self): + store = _make_store() + sid = _add_session(store, decisions=["use approach A"]) + result = build_timeline(store, sid) + labels = [e.label for p in result.phases for e in p.entries] + self.assertTrue(any("Decision" in l or "decision" in l for l in labels)) + + def test_file_read_entry_present(self): + store = _make_store() + sid = _add_session(store, file_reads=["README.md"]) + result = build_timeline(store, sid) + labels = [e.label for p in result.phases for e in p.entries] + self.assertTrue(any("README.md" in l for l in labels)) + + def test_file_write_entry_present(self): + store = _make_store() + sid = _add_session(store, file_writes=["output.txt"]) + result = build_timeline(store, sid) + labels = [e.label for p in result.phases for e in p.entries] + self.assertTrue(any("output.txt" in l for l in labels)) + + def test_custom_model_affects_cost(self): + store = _make_store() + sid = _add_session(store, llm_requests=2, llm_responses=2) + result_sonnet = build_timeline(store, sid, model="sonnet") + result_opus = build_timeline(store, sid, model="opus") + # opus is more expensive than sonnet + self.assertGreaterEqual(result_opus.total_cost, result_sonnet.total_cost) + + def test_empty_session_no_crash(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + self.assertIsInstance(result, TimelineResult) + + def test_retry_count_non_negative(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + self.assertGreaterEqual(result.retry_count, 0) + + +# --------------------------------------------------------------------------- +# format_timeline (text) +# --------------------------------------------------------------------------- + +class TestFormatTimeline(unittest.TestCase): + + def _result(self, **kwargs) -> TimelineResult: + store = _make_store() + sid = _add_session(store, **kwargs) + return build_timeline(store, sid) + + def test_output_contains_session_id(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + out = io.StringIO() + format_timeline(result, out) + self.assertIn(sid, out.getvalue()) + + def test_output_contains_phase_header(self): + result = self._result() + out = io.StringIO() + format_timeline(result, out) + self.assertIn("Phase", out.getvalue()) + + def test_output_contains_ok_icon(self): + result = self._result(llm_responses=1) + out = io.StringIO() + format_timeline(result, out) + self.assertIn("✓", out.getvalue()) + + def test_output_contains_fail_icon_on_error(self): + result = self._result(errors=["boom"]) + out = io.StringIO() + format_timeline(result, out) + self.assertIn("✗", out.getvalue()) + + def test_output_contains_wasted_spend_on_error(self): + store = _make_store() + sid = _add_session(store, llm_requests=2, errors=["fail"]) + result = build_timeline(store, sid) + # Manually mark a phase as failed to trigger wasted cost output + if result.phases: + result.phases[0].failed = True + result.wasted_cost = result.phases[0].total_cost + result.total_cost = max(result.total_cost, result.wasted_cost + 0.001) + out = io.StringIO() + format_timeline(result, out) + text = out.getvalue() + # Either wasted spend callout or FAILED tag should appear + self.assertTrue("Wasted" in text or "FAILED" in text or "✗" in text) + + def test_output_contains_tool_label(self): + result = self._result(tool_calls=[{"tool_name": "Bash", "arguments": {"command": "ls -la"}}]) + out = io.StringIO() + format_timeline(result, out) + self.assertIn("ls -la", out.getvalue()) + + def test_output_contains_error_message(self): + result = self._result(errors=["test suite failed"]) + out = io.StringIO() + format_timeline(result, out) + self.assertIn("test suite failed", out.getvalue()) + + def test_output_non_empty(self): + result = self._result() + out = io.StringIO() + format_timeline(result, out) + self.assertGreater(len(out.getvalue()), 0) + + def test_multiple_phases_all_shown(self): + result = self._result(user_prompts=["task one", "task two"]) + out = io.StringIO() + format_timeline(result, out) + text = out.getvalue() + self.assertIn("Phase 1", text) + self.assertIn("Phase 2", text) + + def test_cost_shown_when_nonzero(self): + store = _make_store() + sid = _add_session(store, llm_requests=3, llm_responses=3) + result = build_timeline(store, sid) + # Force a non-zero cost for display + result.total_cost = 0.0042 + out = io.StringIO() + format_timeline(result, out) + self.assertIn("$", out.getvalue()) + + +# --------------------------------------------------------------------------- +# format_timeline_json +# --------------------------------------------------------------------------- + +class TestFormatTimelineJson(unittest.TestCase): + + def _json_result(self, **kwargs) -> dict: + store = _make_store() + sid = _add_session(store, **kwargs) + result = build_timeline(store, sid) + out = io.StringIO() + format_timeline_json(result, out) + return json.loads(out.getvalue()) + + def test_valid_json(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + out = io.StringIO() + format_timeline_json(result, out) + data = json.loads(out.getvalue()) + self.assertIsInstance(data, dict) + + def test_json_has_session_id(self): + data = self._json_result() + self.assertIn("session_id", data) + + def test_json_has_phases(self): + data = self._json_result() + self.assertIn("phases", data) + self.assertIsInstance(data["phases"], list) + + def test_json_has_total_duration(self): + data = self._json_result() + self.assertIn("total_duration", data) + + def test_json_has_total_cost(self): + data = self._json_result() + self.assertIn("total_cost", data) + + def test_json_has_error_count(self): + data = self._json_result() + self.assertIn("error_count", data) + + def test_json_has_retry_count(self): + data = self._json_result() + self.assertIn("retry_count", data) + + def test_json_has_wasted_cost(self): + data = self._json_result() + self.assertIn("wasted_cost", data) + + def test_json_phase_has_entries(self): + data = self._json_result(tool_calls=[{"tool_name": "Read", "arguments": {"file_path": "x.py"}}]) + for phase in data["phases"]: + self.assertIn("entries", phase) + self.assertIsInstance(phase["entries"], list) + + def test_json_phase_has_name(self): + data = self._json_result() + for phase in data["phases"]: + self.assertIn("name", phase) + + def test_json_phase_has_index(self): + data = self._json_result() + for phase in data["phases"]: + self.assertIn("index", phase) + + def test_json_phase_has_failed_flag(self): + data = self._json_result() + for phase in data["phases"]: + self.assertIn("failed", phase) + + def test_json_entry_has_status(self): + data = self._json_result(tool_calls=[{"tool_name": "Bash", "arguments": {"command": "echo"}}]) + for phase in data["phases"]: + for entry in phase["entries"]: + self.assertIn("status", entry) + + def test_json_entry_has_label(self): + data = self._json_result(tool_calls=[{"tool_name": "Bash", "arguments": {"command": "echo"}}]) + for phase in data["phases"]: + for entry in phase["entries"]: + self.assertIn("label", entry) + + def test_json_entry_status_valid(self): + data = self._json_result(tool_calls=[{"tool_name": "Read", "arguments": {"file_path": "a.py"}}]) + valid = {"ok", "fail", "info"} + for phase in data["phases"]: + for entry in phase["entries"]: + self.assertIn(entry["status"], valid) + + def test_json_error_count_matches(self): + data = self._json_result(errors=["e1", "e2"]) + self.assertGreaterEqual(data["error_count"], 2) + + def test_json_total_events_positive(self): + data = self._json_result(tool_calls=[{"tool_name": "Read", "arguments": {}}]) + self.assertGreater(data["total_events"], 0) + + +# --------------------------------------------------------------------------- +# cmd_timeline +# --------------------------------------------------------------------------- + +class TestCmdTimeline(unittest.TestCase): + + def _args(self, store: TraceStore, session_id: str | None = None, + fmt: str = "text", model: str = "sonnet") -> argparse.Namespace: + return argparse.Namespace( + trace_dir=store.base_dir, + session_id=session_id, + format=fmt, + model=model, + ) + + def test_returns_0_on_success(self): + store = _make_store() + sid = _add_session(store) + args = self._args(store, sid) + self.assertEqual(cmd_timeline(args), 0) + + def test_returns_0_with_latest_session(self): + store = _make_store() + _add_session(store) + args = self._args(store, session_id=None) + self.assertEqual(cmd_timeline(args), 0) + + def test_returns_1_when_no_sessions(self): + store = _make_store() + args = self._args(store, session_id=None) + self.assertEqual(cmd_timeline(args), 1) + + def test_returns_1_for_unknown_session(self): + store = _make_store() + _add_session(store) + args = self._args(store, session_id="nonexistent000") + self.assertEqual(cmd_timeline(args), 1) + + def test_json_format_returns_0(self): + store = _make_store() + sid = _add_session(store) + args = self._args(store, sid, fmt="json") + self.assertEqual(cmd_timeline(args), 0) + + def test_prefix_lookup_works(self): + store = _make_store() + sid = _add_session(store) + args = self._args(store, session_id=sid[:6]) + self.assertEqual(cmd_timeline(args), 0) + + def test_text_output_written_to_stdout(self, capsys=None): + store = _make_store() + sid = _add_session(store, tool_calls=[{"tool_name": "Bash", "arguments": {"command": "ls"}}]) + result = build_timeline(store, sid) + buf = io.StringIO() + format_timeline(result, buf) + self.assertIn(sid, buf.getvalue()) + + def test_json_output_is_valid_json(self): + store = _make_store() + sid = _add_session(store) + result = build_timeline(store, sid) + buf = io.StringIO() + format_timeline_json(result, buf) + data = json.loads(buf.getvalue()) + self.assertIn("session_id", data) + + def test_opus_model_accepted(self): + store = _make_store() + sid = _add_session(store) + args = self._args(store, sid, model="opus") + self.assertEqual(cmd_timeline(args), 0) + + def test_haiku_model_accepted(self): + store = _make_store() + sid = _add_session(store) + args = self._args(store, sid, model="haiku") + self.assertEqual(cmd_timeline(args), 0) + + +if __name__ == "__main__": + unittest.main()