From bcc1c8235150abe481363cfb3aceba71a3f55b67 Mon Sep 17 00:00:00 2001 From: marquesds Date: Thu, 18 Jun 2026 20:00:55 -0300 Subject: [PATCH] feat(observe): make telemetry live Web and TUI could lag, hide tool semantics, require manual project paths, and spike CPU on every hook.\n\nStream SQLite changes, bound transcript ingestion and derived updates, batch search work, and expose session insights without writing target repositories. --- CHANGELOG.md | 14 +- Cargo.lock | 1 + Cargo.toml | 1 + docs/config.md | 2 +- docs/experiments.md | 2 +- docs/mcp.md | 6 +- docs/retro.md | 2 +- docs/tui.md | 3 +- docs/tutorial/02-observe.md | 4 +- docs/tutorial/06-experiments.md | 2 +- docs/usage-observe.md | 11 +- docs/usage-setup.md | 9 +- docs/usage.md | 4 +- docs/web.md | 12 +- src/collect/hooks/normalize.rs | 111 +++----------- src/collect/hooks/normalize_tests.rs | 93 ++++++++++++ src/collect/tail/budget_tests.rs | 86 +++++++++++ src/collect/tail/claude_code.rs | 61 ++++---- src/collect/tail/codex_desktop.rs | 50 +++--- src/collect/tail/mod.rs | 103 ++++++++++++- src/core/event.rs | 38 +++++ src/core/workspace.rs | 15 +- src/daemon/scanner_task.rs | 11 +- src/daemon/worker.rs | 1 - src/extensions/aggregates.rs | 45 ++++++ src/extensions/diffs.rs | 18 +++ src/mcp/handler.rs | 18 +-- src/search/writer.rs | 22 ++- src/search/writer_tests.rs | 27 ++++ src/shell/cli.rs | 143 +++++++++++++----- src/shell/cli_tests.rs | 83 ++++++++++ src/shell/ingest.rs | 3 + src/shell/mod.rs | 3 + src/store/sqlite/event_batch.rs | 41 +++++ src/store/sqlite/event_extensions.rs | 19 +++ src/store/sqlite/event_projector.rs | 15 +- src/store/sqlite/event_write.rs | 53 +++---- src/store/sqlite/events.rs | 2 +- src/store/sqlite/mod.rs | 2 + src/store/sqlite/rows.rs | 3 +- src/store/sqlite/tests/events.rs | 59 +++++++- src/store/sqlite/visualization/aggregates.rs | 17 ++- src/store/sqlite/visualization/sessions.rs | 6 +- .../tool_span_index/persistence/write.rs | 8 +- src/visualization/build.rs | 3 +- src/web/assets.rs | 9 +- src/web/assets/index.html | 24 +++ src/web/assets/kaizen-render.js | 30 ++++ src/web/assets/kaizen-state.js | 2 - src/web/assets/kaizen.css | 16 +- src/web/assets/kaizen.js | 66 ++++---- src/web/live.rs | 81 ++++++++++ src/web/mod.rs | 1 + src/web/server.rs | 30 ++-- tests/machine_registry.rs | 26 ++++ tests/spec/visualization_report.rs | 42 +++++ tests/web_live.rs | 136 +++++++++++++++++ 57 files changed, 1363 insertions(+), 332 deletions(-) create mode 100644 src/collect/hooks/normalize_tests.rs create mode 100644 src/collect/tail/budget_tests.rs create mode 100644 src/search/writer_tests.rs create mode 100644 src/shell/cli_tests.rs create mode 100644 src/store/sqlite/event_batch.rs create mode 100644 src/store/sqlite/event_extensions.rs create mode 100644 src/web/live.rs create mode 100644 tests/web_live.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 76b1050..f187efe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ here explicitly. ### Changed -- **Project data moved out of repo** — all per-workspace artifacts (SQLite DB, config, search index, reports, telemetry NDJSON, backups, sampler stop files) now live under `~/.kaizen/projects//` instead of `/.kaizen/`. Slug = canonical path with `/` → `-`. `KAIZEN_HOME` overrides `~/.kaizen`. Existing in-repo `.kaizen/` directories auto-migrate on first use; a `MIGRATED.txt` marker is left behind and the old directory is safe to delete. Identity (workspace key) is unchanged. See [ADR 007](docs/adr/007-project-data-in-home.md). +- **Target repositories are read-only** — all per-workspace artifacts live under `~/.kaizen/projects//`; `KAIZEN_HOME` overrides `~/.kaizen`. Legacy in-repo `.kaizen/` data is copied without moving, deleting, or marking the source. Host hooks live in user-level agent configuration. See [ADR 011](docs/adr/011-target-repositories-read-only.md). ### Added @@ -32,9 +32,9 @@ here explicitly. ### Changed -- **Machine-local project registry** lives in `~/.kaizen/machine.db` (SQLite) instead of `workspaces.json`; `kaizen init` upserts the current repo. Legacy `workspaces.json` is imported once and renamed to `workspaces.json.migrated`. `kaizen doctor` reports registry status. `--all-workspaces` still merges per-repo stores and now includes inited projects that do not yet have `.kaizen/kaizen.db`. +- **Machine-local project registry** lives in `~/.kaizen/machine.db` (SQLite) instead of `workspaces.json`; `kaizen init` upserts the current repo. Legacy `workspaces.json` is imported once and renamed to `workspaces.json.migrated`. `kaizen doctor` reports registry status. `--all-workspaces` merges available project databases and ignores missing or unsafe roots. - **Telemetry wire format (third-party sinks):** PostHog capture is one event per canonical row (`kaizen.event`, `kaizen.tool_span`, `kaizen.repo_snapshot_chunk`, `kaizen.workspace_fact_snapshot`); Datadog uses the [Logs API v2](https://docs.datadoghq.com/api/latest/logs/) (`POST /api/v2/logs`) with one JSON log per canonical item instead of the Events API. OTLP remains a placeholder with `tracing::debug` of expanded item counts. Primary Kaizen `POST` ingest and outbox JSON shapes for `events` / `tool_spans` / `repo_snapshots` are unchanged; when sync is enabled, workspace skill/rule discovery can enqueue **`workspace_facts`** for the new `/v1/workspace-facts` path. -- **CLI — read paths and telemetry:** `summary`, `insights`, `metrics`, `guidance`, and `retro` accept `--source local|provider|mixed` (default `local`). With `provider` or `mixed`, a background provider pull runs when `[telemetry.query].cache_ttl_seconds` has expired, or when you pass `--refresh` (in addition to transcript rescan where applicable). New `kaizen telemetry` subcommands: `init` (alias of `configure`), `doctor`, `pull --days`, and `print-schema`. MCP tools keep the previous local-only behavior. +- **CLI — read paths and telemetry:** `summary`, `insights`, `metrics`, `guidance`, and `retro` accept `--source local|provider|mixed` (default `local`). With `provider` or `mixed`, a background provider pull runs when `[telemetry.query].cache_ttl_seconds` has expired, or when you pass `--refresh` (in addition to bounded local tail ingest where applicable). New `kaizen telemetry` subcommands: `init` (alias of `configure`), `doctor`, `pull --days`, and `print-schema`. MCP tools keep the previous local-only behavior. - `Cargo.toml` no longer excludes `assets/` so `cargo publish` / docs.rs builds resolve `include_str!` for embedded defaults and the retro skill template. - Release workflow **`update-homebrew-tap`**: `scripts/render-homebrew-tap-formula.sh` + push to @@ -49,6 +49,10 @@ here explicitly. ### Fixed +- The Web dashboard now selects the most recently active valid project, refreshes from SQLite/WAL changes within one second, preserves the selected session, and exposes bounded session details plus tool, attention, and telemetry-coverage insights. +- Hook-backed tool activity appears as named `ToolCall` and `ToolResult` events in Web and TUI views; legacy stored hook rows are normalized at read time. +- Hook ingestion no longer commits and merges the Tantivy index after every event. Search writes use one indexing worker and one merge worker, commit in bounded batches, and flush when a session stops. +- Transcript refreshes cap recent files and growing Claude/Codex JSONL tails, append only unseen events in one transaction, and avoid repeated Git enrichment and full derived-row rebuilds. - `kaizen daemon status` now exits successfully with a stable `status: stopped` line after the daemon has been stopped, instead of surfacing a raw Unix socket connection error. - `kaizen sessions tree ` now prints a non-empty placeholder for existing sessions with no @@ -72,9 +76,9 @@ here explicitly. ### Added -- `kaizen init` now creates `.cursor/hooks.json` and `.claude/settings.json` from scratch when absent (in addition to patching them when present), so a single command is enough to instrument a fresh workspace for both Cursor and Claude Code. +- `kaizen init` now creates or patches `~/.cursor/hooks.json` and `~/.claude/settings.json`, so one user-level setup covers every workspace without writing to the target repository. - Local retention: `[retention].hot_days` (default 30) prunes old sessions from SQLite after rescans (throttled to once per 24h); `hot_days = 0` disables auto-prune. `kaizen gc` with optional `--days` and `--vacuum`. -- `[scan].min_rescan_seconds` (default 300) skips full transcript rescans; `--refresh` / `-r` on `sessions list`, `summary`, `insights`, `metrics`, and `retro` forces a rescan. MCP tools accept `refresh=true` for the same behavior. +- `[scan].min_rescan_seconds` (default 300) throttles bounded transcript-tail ingestion; `--refresh` / `-r` on `sessions list`, `summary`, `insights`, `metrics`, and `retro` ingests recently changed tails. MCP tools accept `refresh=true` for the same behavior. - Composite index `sessions(workspace, started_at_ms)` for faster session listing. - `docs/telemetry-journey.md` — end-to-end “session → data” learning path; README and `docs/` index point to it. Root `README` clarifies that long-form docs live in the GitHub `docs/` diff --git a/Cargo.lock b/Cargo.lock index 72cefe9..b0bf8f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1869,6 +1869,7 @@ dependencies = [ "indicatif", "itf", "libc", + "memchr", "notify", "proptest", "quint-connect", diff --git a/Cargo.toml b/Cargo.toml index 2516c39..f0750e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -354,6 +354,7 @@ clap_complete = "4" indicatif = "0.18" arboard = "3" anyhow = "1" +memchr = "2" tempfile = "3" tar = "0.4" rusqlite = { version = "0.40", features = ["bundled"] } diff --git a/docs/config.md b/docs/config.md index ea7ba6f..b92f7ca 100644 --- a/docs/config.md +++ b/docs/config.md @@ -50,7 +50,7 @@ When you pass **`--all-workspaces`** (or MCP `all_workspaces: true`), Kaizen loa | Key | Default | Purpose | |-----|---------|--------| | `roots` | `["~/.cursor/projects"]` | Transcript index roots (Cursor projects layout) | -| `min_rescan_seconds` | `300` | Minimum seconds between full transcript rescans when a command is already in refresh mode (`--refresh` on the CLI or `refresh=true` over MCP). The daemon uses the same value for its workspace scanner loop after `kaizen init`. | +| `min_rescan_seconds` | `300` | Minimum seconds between bounded incremental transcript scans in refresh mode (`--refresh` on the CLI or `refresh=true` over MCP). The daemon uses the same value after `kaizen init`; Claude, Codex, and Cursor discovery caps each source at 32 recent transcripts, while Claude and Codex growing JSONL reads are capped at 256 KiB. | ## `[retention]` diff --git a/docs/experiments.md b/docs/experiments.md index 197c332..8b7bf8a 100644 --- a/docs/experiments.md +++ b/docs/experiments.md @@ -125,7 +125,7 @@ kaizen exp list kaizen exp status kaizen exp tag --variant treatment # manual override kaizen exp report # markdown + bootstrap CI + sequential decision -kaizen exp report --refresh # optional: full transcript rescan first; can take a while +kaizen exp report --refresh # optional: ingest changed transcript tails first kaizen exp conclude # Running → Concluded kaizen exp archive # Concluded → Archived ``` diff --git a/docs/mcp.md b/docs/mcp.md index 79cd7e4..08563d1 100644 --- a/docs/mcp.md +++ b/docs/mcp.md @@ -120,7 +120,7 @@ Set any value to `false` to skip that agent’s local scan (useful if a VS Code |------|----------------|--------| | `kaizen_capabilities` | (no CLI; static text) | Read first: which tool to use for cost rollups vs repo metrics, sessions, retro, etc. | | `kaizen_ingest_hook` | `kaizen ingest hook` | Pass hook JSON in `payload` (not stdin). `source`: `cursor` or `claude`. | -| `kaizen_sessions_list` | `kaizen sessions list` | Optional `json: true`, `refresh: true` (full transcript rescan; matches `--refresh`), `all_workspaces: true`, `limit` (cap rows, newest first). | +| `kaizen_sessions_list` | `kaizen sessions list` | Optional `json: true`, `refresh: true` (bounded changed-tail ingest; matches `--refresh`), `all_workspaces: true`, `limit` (cap rows, newest first). | | `kaizen_session_show` | `kaizen sessions show` | `id` + optional `workspace`. | | `kaizen_search_sessions` | `kaizen search` | Structured BM25 event search. Args: `query`, optional `since`, `agent`, `kind`, `limit`, `workspace`. `kaizen sessions search` remains a compatible CLI alias. Returns `hits[]` with session id, seq, ts, score, snippet, paths, skills, and `tokens_total`. | | `kaizen_query` | `kaizen query` | Structured trace query. | @@ -143,7 +143,7 @@ Set any value to `false` to skip that agent’s local scan (useful if a VS Code | `kaizen_exp_list` | `kaizen exp list` | | | `kaizen_exp_status` | `kaizen exp status` | | | `kaizen_exp_tag` | `kaizen exp tag` | | -| `kaizen_exp_report` | `kaizen exp report` | `json` and optional `refresh: true` (full rescan before report; matches CLI `--refresh`). Includes `sequential_decision` and `srm_warning`. | +| `kaizen_exp_report` | `kaizen exp report` | `json` and optional `refresh: true` (bounded changed-tail ingest before report; matches CLI `--refresh`). Includes `sequential_decision` and `srm_warning`. | | `kaizen_exp_conclude` | `kaizen exp conclude` | Running → Concluded. | | `kaizen_exp_archive` | `kaizen exp archive` | Concluded → Archived. | | `kaizen_retro` | `kaizen retro` | `json`, `refresh`, etc. Set `json: true` for the same `Report` JSON as `kaizen retro --json`. | @@ -152,7 +152,7 @@ Set any value to `false` to skip that agent’s local scan (useful if a VS Code - **Workspace**: most tools accept optional `workspace` (string path) or `project` (short project name — resolved from existing rows shown by `kaizen projects`; mutually exclusive with `workspace`). If neither is given, the server uses the process current directory, matching CLI defaults. Use `kaizen projects --include-missing` to inspect stale registry rows; MCP workspace reads ignore them. - **Data source**: `kaizen_summary`, `kaizen_insights`, `kaizen_metrics`, and `kaizen_retro` use the local DB only (`DataSource::Local`), matching CLI default `--source local`. The MCP server does not expose CLI `--source` switches; use the CLI if you need another source. -- **Rescan**: list/summary/insights/metrics/retro stay on the cached local DB unless you pass `refresh: true` (same as CLI `--refresh`). `kaizen_exp_report` defaults to cache-first as well; set `refresh: true` to force a full transcript rescan before computing the report. +- **Refresh**: list/summary/insights/metrics/retro stay on the cached local DB unless you pass `refresh: true` (same as CLI `--refresh`). `kaizen_exp_report` defaults to cache-first as well; refresh ingests only bounded, recently changed transcript tails. - **Aggregation**: `kaizen_sessions_list`, `kaizen_summary`, `kaizen_insights`, and `kaizen_metrics` accept `all_workspaces: true`. Kaizen opens each existing registered workspace DB separately and merges the results in memory. - **Blocking work** is run on a blocking thread pool so the async MCP runtime is not starved; long `retro` or metrics runs may take time. - **Version** in the MCP `initialize` response is the built-in string configured for the server (keep in sync with releases when using strict client checks). diff --git a/docs/retro.md b/docs/retro.md index 2c4f02a..71fae6e 100644 --- a/docs/retro.md +++ b/docs/retro.md @@ -3,7 +3,7 @@ `kaizen retro` reads recent sessions and cached local repo facts, then produces a ranked Markdown report of changes that may make agents cheaper, faster, or more accurate in this codebase. Default runs are cache-first. Use `--refresh` -when the local store may be stale; it rescans agent transcripts first and can +when the local store may be stale; it ingests bounded changed tails first and can take a while on large workspaces. The engine is deterministic and pure: diff --git a/docs/tui.md b/docs/tui.md index 8d848e7..a61c206 100644 --- a/docs/tui.md +++ b/docs/tui.md @@ -20,6 +20,8 @@ kaizen tui --workspace /path/to/project The TUI requires an interactive terminal. It watches the SQLite WAL and coalesces refreshes, so active sessions update without a busy polling loop. +Hook-backed `PreToolUse` and `PostToolUse` rows appear as named tool calls and +results; lifecycle hooks remain lifecycle events. ## Keys @@ -54,4 +56,3 @@ help view instead of terminating the interface. | Wrong repository appears | Start with `--workspace /path/to/project`. | | Navigation seems stuck | Press `Esc` to close overlays, then `Tab` to select the intended pane. | | Terminal display is damaged after a crash | Run `reset`, then restart `kaizen tui`. | - diff --git a/docs/tutorial/02-observe.md b/docs/tutorial/02-observe.md index 1cc1b83..c312bff 100644 --- a/docs/tutorial/02-observe.md +++ b/docs/tutorial/02-observe.md @@ -22,7 +22,7 @@ kaizen sessions list --json kaizen sessions list --refresh ``` -**`--json`** is for scripts and MCP-shaped tooling. **`--refresh`** forces a full transcript rescan (subject to min interval). +**`--json`** is for scripts and MCP-shaped tooling. **`--refresh`** ingests bounded, recently changed transcript tails (subject to the minimum interval). ## Show one session @@ -43,7 +43,7 @@ The JSON shape includes rollups by agent and model, total cost, and when availab ## Data source: local, provider, or mixed -Most of this tutorial assumes the default **`--source local`**: numbers come from the workspace’s local SQLite store (and the usual transcript rescan rules with `--refresh`). +Most of this tutorial assumes the default **`--source local`**: numbers come from the workspace’s local SQLite store (and the bounded changed-tail rules used by `--refresh`). When you have **[sync]** identity in config (`team_id`, `team_salt_hex`, …) *and* a **[telemetry.query](https://github.com/marquesds/kaizen/blob/main/docs/config.md#telemetryquery) provider** (PostHog or Datadog) configured, you can ask read commands to fold in **provider-pulled events** cached under `remote_events` in the same DB: diff --git a/docs/tutorial/06-experiments.md b/docs/tutorial/06-experiments.md index 312e10b..6e19de1 100644 --- a/docs/tutorial/06-experiments.md +++ b/docs/tutorial/06-experiments.md @@ -23,7 +23,7 @@ kaizen exp status kaizen exp tag --session --variant treatment kaizen exp report kaizen exp report --json -kaizen exp report --refresh # full transcript rescan before report if the store may be stale +kaizen exp report --refresh # ingest changed transcript tails if the store may be stale kaizen exp conclude ``` diff --git a/docs/usage-observe.md b/docs/usage-observe.md index dd3c967..1acd515 100644 --- a/docs/usage-observe.md +++ b/docs/usage-observe.md @@ -2,9 +2,9 @@ [Back to CLI index](usage.md). -These commands are cache-first. Pass `--refresh` when they should rescan local -transcripts before rendering. With `--source provider|mixed`, refresh can also -refresh a configured remote provider cache. +These commands are cache-first. Pass `--refresh` to ingest recently changed, +bounded local transcript tails before rendering. With +`--source provider|mixed`, refresh can also refresh a configured remote cache. ## `kaizen sessions` @@ -33,7 +33,10 @@ Text output shows a placeholder when no spans exist; JSON returns `[]`. `search` uses the rebuildable Tantivy index at `~/.kaizen/projects//search/`. It indexes redacted event text. Payload -bodies remain in SQLite and are not copied into the index. Rebuild with: +bodies remain in SQLite and are not copied into the index. Daemon-backed hook +sessions commit search batches at 256 documents, on the first event after a +60-second batch window, or immediately when the session stops. SQLite session +and event views remain live before that secondary-index commit. Rebuild with: ```bash kaizen search reindex diff --git a/docs/usage-setup.md b/docs/usage-setup.md index fc20168..129de6c 100644 --- a/docs/usage-setup.md +++ b/docs/usage-setup.md @@ -19,7 +19,7 @@ Every command resolves a workspace through one of three mechanisms: opens each registered project database separately and merges results. See [machine-local registry](config.md#machine-local-registry). -After a full transcript rescan, Kaizen may delete sessions older than +After a transcript refresh, Kaizen may delete sessions older than `[retention].hot_days`, at most once per 24 hours. Set `hot_days = 0` to disable automatic pruning; use `kaizen gc` for explicit pruning. @@ -93,8 +93,11 @@ kaizen sessions load --json ``` Use `load` after installing or upgrading when existing sessions should appear -in reports. Use `sessions list --refresh` when one read command should rescan -before rendering. +in reports. Claude, Codex, and Cursor imports consider at most 32 recent +transcripts per source. Claude and Codex decode at most the latest 256 KiB of a +growing JSONL file. `load` also enriches imported sessions with Git binding +metadata. Use `sessions list --refresh` to ingest recently changed tails before +one read. ## `kaizen outcomes` diff --git a/docs/usage.md b/docs/usage.md index 22d229d..74efd66 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -12,8 +12,8 @@ Use `--no-daemon` or `KAIZEN_DAEMON=0` for direct SQLite mode. Both modes use the same project database. See [daemon.md](daemon.md). Cache-first reads use the local SQLite database without rescanning agent -transcripts. Pass `--refresh` when a command should ingest new transcript data -before rendering. +transcripts. Pass `--refresh` to ingest recently changed transcript tails before +rendering. Refresh work is bounded; it does not replay all historical files. ## Reference diff --git a/docs/web.md b/docs/web.md index aef7f7f..c74cb3a 100644 --- a/docs/web.md +++ b/docs/web.md @@ -23,16 +23,19 @@ kaizen open --no-browser The dashboard provides: -- project selection, including a manual local path; +- automatic selection of the most recently active valid project, plus a manual + local-path fallback; - session, active-session, error, and cost totals; +- project-level tool, attention, and telemetry-coverage insights; - the latest 30 sessions for the selected project; - selected-session facts, recent events, nested tool spans, touched files, and top tools; - the exact bounded report under **Developer details**. Selected-session detail is capped at 40 events, 40 spans, and 40 files. Those -limits keep refresh latency and memory use predictable. The page refreshes every -20 seconds while connected; **Refresh now** requests an immediate snapshot. +limits keep refresh latency and memory use predictable. The server watches the +selected project's SQLite database and WAL; a committed change requests a new +snapshot within one second. **Refresh now** remains available for manual checks. Web is an Observe-only surface. It cannot mutate experiments, guidance, sync, configuration, or local data. Use the CLI or MCP for those workflows. @@ -55,6 +58,5 @@ for protocol and runtime-file details. | Browser did not open | Run `kaizen open --no-browser` and open the printed URL. | | Page says connection failed | Run `kaizen daemon status`; restart with `kaizen daemon stop` followed by `kaizen open`. | | URL has no valid token | Run `kaizen open --no-browser` again instead of editing the URL. | -| Expected project is missing | Open its path manually or run `kaizen sessions list --refresh` from that repository. | +| Expected project is missing | Run one Kaizen command from that repository to register it, then reload. Unsafe roots containing `KAIZEN_HOME` and missing paths are ignored. | | Default port is busy | Use the URL Kaizen prints; the daemon automatically chooses another loopback port. | - diff --git a/src/collect/hooks/normalize.rs b/src/collect/hooks/normalize.rs index 562c567..878dfa3 100644 --- a/src/collect/hooks/normalize.rs +++ b/src/collect/hooks/normalize.rs @@ -17,9 +17,9 @@ pub fn hook_to_event(h: &HookEvent, seq: u64) -> Event { seq, ts_ms: h.ts_ms, ts_exact: true, - kind: lifecycle.map_or(EventKind::Hook, |_| EventKind::Lifecycle), + kind: core_event_kind(&h.kind), source: EventSource::Hook, - tool: None, + tool: hook_tool(&h.payload), tool_call_id: hook_tool_id(&h.payload), tokens_in: u32_field(&h.payload, "input_tokens"), tokens_out: u32_field(&h.payload, "output_tokens"), @@ -40,6 +40,22 @@ pub fn hook_to_event(h: &HookEvent, seq: u64) -> Event { } } +fn core_event_kind(kind: &HookKind) -> EventKind { + match kind { + HookKind::PreToolUse => EventKind::ToolCall, + HookKind::PostToolUse => EventKind::ToolResult, + HookKind::Unknown(_) => EventKind::Hook, + _ => EventKind::Lifecycle, + } +} + +fn hook_tool(payload: &serde_json::Value) -> Option { + ["tool_name", "tool"] + .iter() + .find_map(|key| payload.get(key).and_then(|value| value.as_str())) + .map(ToOwned::to_owned) +} + fn hook_tool_id(payload: &serde_json::Value) -> Option { ["tool_call_id", "tool_use_id", "call_id", "id"] .iter() @@ -49,6 +65,8 @@ fn hook_tool_id(payload: &serde_json::Value) -> Option { fn lifecycle_type(kind: &HookKind) -> Option<&'static str> { match kind { + HookKind::SessionStart => Some("session_start"), + HookKind::Stop => Some("session_stop"), HookKind::PermissionRequest => Some("permission_request"), HookKind::UserPromptSubmit => Some("user_prompt_submit"), HookKind::Notification => Some("notification"), @@ -103,90 +121,5 @@ pub fn hook_to_status(kind: &HookKind) -> Option { } #[cfg(test)] -mod tests { - use super::*; - use crate::collect::hooks::HookEvent; - use serde_json::json; - - fn make_event(kind: HookKind) -> HookEvent { - HookEvent { - kind, - session_id: "s1".to_string(), - ts_ms: 1000, - payload: json!({}), - } - } - - #[test] - fn session_start_maps_running() { - assert_eq!( - hook_to_status(&HookKind::SessionStart), - Some(SessionStatus::Running) - ); - } - - #[test] - fn pre_tool_use_maps_waiting() { - assert_eq!( - hook_to_status(&HookKind::PreToolUse), - Some(SessionStatus::Waiting) - ); - } - - #[test] - fn post_tool_use_maps_running() { - assert_eq!( - hook_to_status(&HookKind::PostToolUse), - Some(SessionStatus::Running) - ); - } - - #[test] - fn stop_maps_done() { - assert_eq!(hook_to_status(&HookKind::Stop), Some(SessionStatus::Done)); - } - - #[test] - fn unknown_maps_none() { - assert_eq!(hook_to_status(&HookKind::Unknown("x".to_string())), None); - } - - #[test] - fn hook_to_event_kind_is_hook() { - let h = make_event(HookKind::Stop); - let ev = hook_to_event(&h, 5); - assert_eq!(ev.kind, EventKind::Hook); - assert_eq!(ev.seq, 5); - assert_eq!(ev.session_id, "s1"); - } - - #[test] - fn hook_to_event_maps_total_cost_usd_to_microdollars() { - let h = HookEvent { - kind: HookKind::Stop, - session_id: "s1".to_string(), - ts_ms: 1000, - payload: json!({ "total_cost_usd": 0.042 }), - }; - let ev = hook_to_event(&h, 0); - assert_eq!(ev.cost_usd_e6, Some(42_000)); - } - - #[test] - fn permission_request_maps_lifecycle_wait() { - let h = HookEvent { - kind: HookKind::PermissionRequest, - session_id: "s1".to_string(), - ts_ms: 1000, - payload: json!({"permission_wait_ms": 250}), - }; - let ev = hook_to_event(&h, 0); - assert_eq!(ev.kind, EventKind::Lifecycle); - assert_eq!(ev.latency_ms, Some(250)); - assert_eq!(ev.payload["type"], "permission_request"); - assert_eq!( - hook_to_status(&HookKind::PermissionRequest), - Some(SessionStatus::Waiting) - ); - } -} +#[path = "normalize_tests.rs"] +mod tests; diff --git a/src/collect/hooks/normalize_tests.rs b/src/collect/hooks/normalize_tests.rs new file mode 100644 index 0000000..99605cc --- /dev/null +++ b/src/collect/hooks/normalize_tests.rs @@ -0,0 +1,93 @@ +use super::*; +use crate::collect::hooks::HookEvent; +use serde_json::json; + +fn make_event(kind: HookKind) -> HookEvent { + HookEvent { + kind, + session_id: "s1".to_string(), + ts_ms: 1000, + payload: json!({}), + } +} + +#[test] +fn session_start_maps_running() { + assert_eq!( + hook_to_status(&HookKind::SessionStart), + Some(SessionStatus::Running) + ); +} + +#[test] +fn pre_tool_use_maps_waiting() { + assert_eq!( + hook_to_status(&HookKind::PreToolUse), + Some(SessionStatus::Waiting) + ); +} + +#[test] +fn post_tool_use_maps_running() { + assert_eq!( + hook_to_status(&HookKind::PostToolUse), + Some(SessionStatus::Running) + ); +} + +#[test] +fn stop_maps_done() { + assert_eq!(hook_to_status(&HookKind::Stop), Some(SessionStatus::Done)); +} + +#[test] +fn unknown_maps_none() { + assert_eq!(hook_to_status(&HookKind::Unknown("x".to_string())), None); +} + +#[test] +fn stop_maps_to_lifecycle_event() { + let ev = hook_to_event(&make_event(HookKind::Stop), 5); + assert_eq!((ev.kind, ev.seq), (EventKind::Lifecycle, 5)); + assert_eq!(ev.payload["type"], "session_stop"); +} + +#[test] +fn pre_tool_hook_maps_to_named_tool_call() { + let ev = named_tool_event(HookKind::PreToolUse, 0); + assert_eq!(ev.kind, EventKind::ToolCall); + assert_eq!(ev.tool.as_deref(), Some("Read")); +} + +#[test] +fn post_tool_hook_maps_to_named_tool_result() { + let ev = named_tool_event(HookKind::PostToolUse, 1); + assert_eq!(ev.kind, EventKind::ToolResult); + assert_eq!(ev.tool.as_deref(), Some("Read")); +} + +fn named_tool_event(kind: HookKind, seq: u64) -> Event { + let mut hook = make_event(kind); + hook.payload = json!({"tool_name":"Read","tool_use_id":"call-1"}); + hook_to_event(&hook, seq) +} + +#[test] +fn hook_to_event_maps_total_cost_usd_to_microdollars() { + let mut hook = make_event(HookKind::Stop); + hook.payload = json!({"total_cost_usd":0.042}); + assert_eq!(hook_to_event(&hook, 0).cost_usd_e6, Some(42_000)); +} + +#[test] +fn permission_request_maps_lifecycle_wait() { + let mut hook = make_event(HookKind::PermissionRequest); + hook.payload = json!({"permission_wait_ms":250}); + let event = hook_to_event(&hook, 0); + assert_eq!( + (event.kind, event.latency_ms), + (EventKind::Lifecycle, Some(250)) + ); + assert_eq!(event.payload["type"], "permission_request"); + assert_eq!(hook_to_status(&hook.kind), Some(SessionStatus::Waiting)); +} diff --git a/src/collect/tail/budget_tests.rs b/src/collect/tail/budget_tests.rs new file mode 100644 index 0000000..bf50f47 --- /dev/null +++ b/src/collect/tail/budget_tests.rs @@ -0,0 +1,86 @@ +use super::MAX_RECENT_TRANSCRIPTS; +use serde_json::json; +use std::fs::{FileTimes, OpenOptions}; +use std::path::Path; +use std::time::{Duration, UNIX_EPOCH}; + +#[test] +fn claude_scan_caps_recent_transcripts() { + let temp = tempfile::tempdir().unwrap(); + let workspace = temp.path().join("repo"); + let project = temp.path().join("claude"); + std::fs::create_dir_all(&workspace).unwrap(); + std::fs::create_dir_all(&project).unwrap(); + write_claude_rows(&project, &workspace, 40); + + let rows = super::claude_code::scan_claude_project_dir(&project, &workspace).unwrap(); + + assert_eq!(rows.len(), MAX_RECENT_TRANSCRIPTS); +} + +#[test] +fn codex_scan_caps_recent_transcripts() { + let temp = tempfile::tempdir().unwrap(); + let workspace = temp.path().join("repo"); + let root = temp.path().join("codex/2026/06/18"); + std::fs::create_dir_all(&workspace).unwrap(); + std::fs::create_dir_all(&root).unwrap(); + write_codex_rows(&root, &workspace, 40); + + let rows = super::codex_desktop::scan_codex_sessions_root(&root, &workspace).unwrap(); + + assert_eq!(rows.len(), MAX_RECENT_TRANSCRIPTS); +} + +#[test] +fn recent_path_budget_excludes_unchanged_files() { + let temp = tempfile::tempdir().unwrap(); + let stale = temp.path().join("stale.jsonl"); + let recent = temp.path().join("recent.jsonl"); + std::fs::write(&stale, "{}").unwrap(); + std::fs::write(&recent, "{}").unwrap(); + set_modified(&stale, 1); + set_modified(&recent, 3); + + let rows = super::newest_paths_since(vec![stale, recent.clone()], 2_000); + + assert_eq!(rows, vec![recent]); +} + +#[test] +fn transcript_reader_excludes_large_old_prefix() { + let temp = tempfile::tempdir().unwrap(); + let transcript = temp.path().join("large.jsonl"); + std::fs::write(&transcript, "{}\n".repeat(800_000)).unwrap(); + + let (first_seq, content) = super::read_recent_jsonl(&transcript).unwrap(); + + assert!(first_seq > 0); + assert!(content.len() <= super::MAX_TRANSCRIPT_READ_BYTES as usize); +} + +fn write_claude_rows(dir: &Path, workspace: &Path, count: usize) { + (0..count).for_each(|index| { + let row = json!({ + "type":"user", "timestamp":"2026-06-18T00:00:00Z", + "cwd":workspace, "sessionId":format!("claude-{index:02}") + }); + std::fs::write(dir.join(format!("{index:02}.jsonl")), row.to_string()).unwrap(); + }); +} + +fn write_codex_rows(dir: &Path, workspace: &Path, count: usize) { + (0..count).for_each(|index| { + let row = json!({ + "type":"session_meta", "timestamp":"2026-06-18T00:00:00Z", + "payload":{"id":format!("codex-{index:02}"), "cwd":workspace} + }); + std::fs::write(dir.join(format!("{index:02}.jsonl")), row.to_string()).unwrap(); + }); +} + +fn set_modified(path: &Path, seconds: u64) { + let file = OpenOptions::new().write(true).open(path).unwrap(); + let times = FileTimes::new().set_modified(UNIX_EPOCH + Duration::from_secs(seconds)); + file.set_times(times).unwrap(); +} diff --git a/src/collect/tail/claude_code.rs b/src/collect/tail/claude_code.rs index 0cb3083..b228674 100644 --- a/src/collect/tail/claude_code.rs +++ b/src/collect/tail/claude_code.rs @@ -20,20 +20,28 @@ struct Meta { pub fn scan_claude_project_dir( project_dir: &Path, workspace: &Path, +) -> Result)>> { + scan_claude_project_dir_since(project_dir, workspace, 0) +} + +pub(crate) fn scan_claude_project_dir_since( + project_dir: &Path, + workspace: &Path, + since_ms: u64, ) -> Result)>> { if !project_dir.exists() { return Ok(Vec::new()); } let target = crate::core::paths::canonical(workspace); let mut out = Vec::new(); - for file in top_level_jsonl(project_dir)? { + for file in top_level_jsonl(project_dir, since_ms)? { push_if_target( &mut out, scan_claude_session_file(&file, None, Some(workspace))?, &target, ); } - for (file, parent) in subagent_jsonl(project_dir)? { + for (file, parent) in subagent_jsonl(project_dir, since_ms)? { push_if_target( &mut out, scan_claude_session_file(&file, Some(parent), Some(workspace))?, @@ -48,19 +56,25 @@ pub fn scan_claude_session_file( parent_session_id: Option, workspace_fallback: Option<&Path>, ) -> Result<(SessionRecord, Vec)> { - let content = std::fs::read_to_string(path) - .with_context(|| format!("read claude file: {}", path.display()))?; - let mut meta = content.lines().fold(Meta::default(), read_meta); + let first = super::read_first_jsonl_line(path) + .with_context(|| format!("read claude metadata: {}", path.display()))?; + let (first_seq, content) = super::read_recent_jsonl(path) + .with_context(|| format!("read claude tail: {}", path.display()))?; + let mut meta = std::iter::once(first.as_str()) + .chain(content.lines()) + .fold(Meta::default(), read_meta); if meta.workspace.is_none() { meta.workspace = workspace_fallback.map(|p| p.to_string_lossy().to_string()); } - let id = meta.id.clone().unwrap_or_else(|| file_stem(path)); - let base = meta.started_ms.unwrap_or_else(|| file_mtime_ms(path)); + let id = meta.id.clone().unwrap_or_else(|| super::file_stem(path)); + let base = meta + .started_ms + .unwrap_or_else(|| super::file_mtime_ms(path)); let events = content .lines() .enumerate() .filter_map(|(i, line)| { - crate::collect::tail::claude::parse_claude_line(&id, i as u64, base, line) + crate::collect::tail::claude::parse_claude_line(&id, first_seq + i as u64, base, line) .ok() .flatten() }) @@ -68,16 +82,15 @@ pub fn scan_claude_session_file( Ok((record(path, id, meta, base, parent_session_id), events)) } -fn top_level_jsonl(project_dir: &Path) -> Result> { - let mut out = std::fs::read_dir(project_dir)? +fn top_level_jsonl(project_dir: &Path, since_ms: u64) -> Result> { + let out = std::fs::read_dir(project_dir)? .filter_map(|e| e.ok().map(|e| e.path())) .filter(|p| p.is_file() && p.extension().and_then(|x| x.to_str()) == Some("jsonl")) .collect::>(); - out.sort(); - Ok(out) + Ok(super::newest_paths_since(out, since_ms)) } -fn subagent_jsonl(project_dir: &Path) -> Result> { +fn subagent_jsonl(project_dir: &Path, since_ms: u64) -> Result> { let mut out = Vec::new(); for entry in std::fs::read_dir(project_dir)? { let path = entry?.path(); @@ -86,8 +99,7 @@ fn subagent_jsonl(project_dir: &Path) -> Result> { }; collect_subagents(&path.join("subagents"), &parent, &mut out)?; } - out.sort(); - Ok(out) + Ok(super::newest_by_path_since(out, |(path, _)| path, since_ms)) } fn collect_subagents(dir: &Path, parent: &str, out: &mut Vec<(PathBuf, String)>) -> Result<()> { @@ -176,22 +188,3 @@ fn line_ts(obj: &serde_json::Map) -> Option { fn text(v: &Value, key: &str) -> Option { v.get(key).and_then(Value::as_str).map(ToOwned::to_owned) } - -fn file_stem(path: &Path) -> String { - path.file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("") - .to_string() -} - -fn file_mtime_ms(path: &Path) -> u64 { - path.metadata() - .ok() - .and_then(|m| m.modified().ok()) - .map(|t| { - t.duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64 - }) - .unwrap_or(0) -} diff --git a/src/collect/tail/codex_desktop.rs b/src/collect/tail/codex_desktop.rs index f930028..784d886 100644 --- a/src/collect/tail/codex_desktop.rs +++ b/src/collect/tail/codex_desktop.rs @@ -21,6 +21,14 @@ struct Meta { pub fn scan_codex_sessions_root( root: &Path, workspace: &Path, +) -> Result)>> { + scan_codex_sessions_root_since(root, workspace, 0) +} + +pub(crate) fn scan_codex_sessions_root_since( + root: &Path, + workspace: &Path, + since_ms: u64, ) -> Result)>> { if !root.exists() { return Ok(Vec::new()); @@ -28,8 +36,7 @@ pub fn scan_codex_sessions_root( let target = crate::core::paths::canonical(workspace); let mut paths = Vec::new(); collect_jsonl(root, &mut paths)?; - paths.sort(); - Ok(paths + Ok(super::newest_paths_since(paths, since_ms) .into_iter() .filter_map(|p| scan_codex_session_file(&p).ok()) .filter(|(r, _)| workspace_matches(&r.workspace, &target)) @@ -37,15 +44,23 @@ pub fn scan_codex_sessions_root( } pub fn scan_codex_session_file(path: &Path) -> Result<(SessionRecord, Vec)> { - let content = std::fs::read_to_string(path) - .with_context(|| format!("read codex file: {}", path.display()))?; - let meta = content.lines().fold(Meta::default(), read_meta); - let id = meta.id.clone().unwrap_or_else(|| file_stem(path)); - let base = meta.started_ms.unwrap_or_else(|| file_mtime_ms(path)); + let first = super::read_first_jsonl_line(path) + .with_context(|| format!("read codex metadata: {}", path.display()))?; + let (first_seq, content) = super::read_recent_jsonl(path) + .with_context(|| format!("read codex tail: {}", path.display()))?; + let meta = std::iter::once(first.as_str()) + .chain(content.lines()) + .fold(Meta::default(), read_meta); + let id = meta.id.clone().unwrap_or_else(|| super::file_stem(path)); + let base = meta + .started_ms + .unwrap_or_else(|| super::file_mtime_ms(path)); let events = content .lines() .enumerate() - .filter_map(|(i, line)| parse_modern_line(&id, i as u64, base, meta.model.as_deref(), line)) + .filter_map(|(i, line)| { + parse_modern_line(&id, first_seq + i as u64, base, meta.model.as_deref(), line) + }) .collect(); Ok((record(path, id, meta, base), events)) } @@ -137,22 +152,3 @@ fn workspace_matches(found: &str, target: &Path) -> bool { fn text(v: &Value, key: &str) -> Option { v.get(key).and_then(Value::as_str).map(ToOwned::to_owned) } - -fn file_stem(path: &Path) -> String { - path.file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("") - .to_string() -} - -fn file_mtime_ms(path: &Path) -> u64 { - path.metadata() - .ok() - .and_then(|m| m.modified().ok()) - .map(|t| { - t.duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64 - }) - .unwrap_or(0) -} diff --git a/src/collect/tail/mod.rs b/src/collect/tail/mod.rs index 7c552fb..b59b13f 100644 --- a/src/collect/tail/mod.rs +++ b/src/collect/tail/mod.rs @@ -22,7 +22,108 @@ pub mod opencode; pub mod pi; pub mod vibe; -use std::path::Path; +pub(crate) const MAX_RECENT_TRANSCRIPTS: usize = 32; + +#[cfg(test)] +mod budget_tests; + +use std::cmp::Reverse; +use std::fs::File; +use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; +use std::path::{Path, PathBuf}; + +pub(crate) const MAX_TRANSCRIPT_READ_BYTES: u64 = 256 * 1024; + +pub(crate) fn newest_paths(paths: Vec) -> Vec { + newest_by_path(paths, PathBuf::as_path) +} + +pub(crate) fn newest_paths_since(paths: Vec, since_ms: u64) -> Vec { + newest_by_path_since(paths, PathBuf::as_path, since_ms) +} + +pub(crate) fn newest_by_path(mut values: Vec, path: impl Fn(&T) -> &Path) -> Vec { + values.sort_by_key(|value| Reverse(modified_ms(path(value)))); + values.truncate(MAX_RECENT_TRANSCRIPTS); + values +} + +pub(crate) fn newest_by_path_since(values: Vec, path: F, since_ms: u64) -> Vec +where + F: Fn(&T) -> &Path + Copy, +{ + let recent = values + .into_iter() + .filter(|value| modified_ms(path(value)) >= u128::from(since_ms)) + .collect(); + newest_by_path(recent, path) +} + +fn modified_ms(path: &Path) -> u128 { + path.metadata() + .and_then(|metadata| metadata.modified()) + .ok() + .and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok()) + .map_or(0, |duration| duration.as_millis()) +} + +pub(crate) fn file_mtime_ms(path: &Path) -> u64 { + u64::try_from(modified_ms(path)).unwrap_or(u64::MAX) +} + +pub(crate) fn file_stem(path: &Path) -> String { + path.file_stem() + .and_then(|stem| stem.to_str()) + .unwrap_or_default() + .to_string() +} + +pub(crate) fn read_first_jsonl_line(path: &Path) -> std::io::Result { + let mut line = String::new(); + BufReader::new(File::open(path)?).read_line(&mut line)?; + Ok(line) +} + +pub(crate) fn read_recent_jsonl(path: &Path) -> std::io::Result<(u64, String)> { + let file = File::open(path)?; + let start = file + .metadata()? + .len() + .saturating_sub(MAX_TRANSCRIPT_READ_BYTES); + let mut reader = BufReader::new(file); + let mut first_seq = count_newlines(&mut reader, start)?; + reader.seek(SeekFrom::Start(start))?; + first_seq += discard_partial_line(&mut reader, start)?; + let mut content = String::new(); + reader.read_to_string(&mut content)?; + Ok((first_seq, content)) +} + +fn count_newlines(reader: &mut BufReader, end: u64) -> std::io::Result { + reader.seek(SeekFrom::Start(0))?; + let mut remaining = end; + let mut count = 0; + let mut buffer = [0_u8; 64 * 1024]; + while remaining > 0 { + let limit = usize::try_from(remaining.min(buffer.len() as u64)).unwrap_or(buffer.len()); + let read = reader.read(&mut buffer[..limit])?; + if read == 0 { + break; + } + count += memchr::memchr_iter(b'\n', &buffer[..read]).count() as u64; + remaining -= read as u64; + } + Ok(count) +} + +fn discard_partial_line(reader: &mut BufReader, start: u64) -> std::io::Result { + if start == 0 { + return Ok(0); + } + let mut ignored = Vec::new(); + reader.read_until(b'\n', &mut ignored)?; + Ok(1) +} /// Earliest mtime (ms) of `.jsonl` files in `dir`. Returns 0 on failure. pub fn dir_mtime_ms(dir: &Path) -> u64 { diff --git a/src/core/event.rs b/src/core/event.rs index 0f683cd..74d679e 100644 --- a/src/core/event.rs +++ b/src/core/event.rs @@ -48,6 +48,44 @@ pub struct Event { pub payload: serde_json::Value, } +impl Event { + pub fn normalize_legacy_hook(mut self) -> Self { + if !self.is_legacy_hook() { + return self; + } + self.kind = legacy_hook_kind(&self.payload).unwrap_or(EventKind::Hook); + self.tool = self.tool.or_else(|| legacy_hook_tool(&self.payload)); + self + } + + fn is_legacy_hook(&self) -> bool { + self.source == EventSource::Hook && self.kind == EventKind::Hook + } +} + +fn legacy_hook_kind(payload: &serde_json::Value) -> Option { + match hook_name(payload)? { + "PreToolUse" | "pre_tool_use" => Some(EventKind::ToolCall), + "PostToolUse" | "post_tool_use" => Some(EventKind::ToolResult), + "SessionStart" | "session_start" | "Stop" | "stop" => Some(EventKind::Lifecycle), + _ => None, + } +} + +fn legacy_hook_tool(payload: &serde_json::Value) -> Option { + ["tool_name", "tool"] + .iter() + .find_map(|key| payload.get(key).and_then(|value| value.as_str())) + .map(ToOwned::to_owned) +} + +fn hook_name(payload: &serde_json::Value) -> Option<&str> { + payload + .get("hook_event_name") + .or_else(|| payload.get("event")) + .and_then(|value| value.as_str()) +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum SessionStatus { Running, diff --git a/src/core/workspace.rs b/src/core/workspace.rs index 486aad1..7d5ba00 100644 --- a/src/core/workspace.rs +++ b/src/core/workspace.rs @@ -54,14 +54,7 @@ pub fn machine_workspaces(seed: Option<&Path>) -> Result> { if let Some(path) = seed.as_ref() { push_unique(&mut roots, path.clone()); } - roots.retain(|p| { - if seed.as_ref() == Some(p) { - return true; - } - p.exists() - && (db_path(p).ok().is_some_and(|d| d.exists()) - || crate::core::machine_registry::is_registered(p)) - }); + roots.retain(|path| seed.as_ref() == Some(path) || usable_registered_workspace(path)); if roots.is_empty() && let Some(path) = seed { @@ -70,6 +63,12 @@ pub fn machine_workspaces(seed: Option<&Path>) -> Result> { Ok(roots) } +fn usable_registered_workspace(path: &Path) -> bool { + path.exists() + && db_path(path) + .is_ok_and(|db| db.exists() || crate::core::machine_registry::is_registered(path)) +} + pub fn db_path(workspace: &Path) -> Result { let path = crate::core::paths::project_data_child(workspace, Path::new("kaizen.db"))?; ["kaizen.db-journal", "kaizen.db-wal", "kaizen.db-shm"] diff --git a/src/daemon/scanner_task.rs b/src/daemon/scanner_task.rs index 82e3e43..59a323c 100644 --- a/src/daemon/scanner_task.rs +++ b/src/daemon/scanner_task.rs @@ -4,11 +4,12 @@ use crate::store::Store; use anyhow::Result; use std::path::{Path, PathBuf}; +use std::sync::{Mutex, OnceLock}; pub(super) async fn scanner_loop(ws: PathBuf) { loop { - scan_once(ws.clone()).await; tokio::time::sleep(scan_interval(&ws)).await; + scan_once(ws.clone()).await; } } @@ -19,6 +20,9 @@ async fn scan_once(ws: PathBuf) { } fn scan_workspace(ws: &Path) -> Result<()> { + let _guard = scan_lock() + .lock() + .unwrap_or_else(|error| error.into_inner()); let cfg = crate::core::config::load(ws)?; let store = Store::open(&crate::core::workspace::db_path(ws)?)?; let ws_str = ws.to_string_lossy().to_string(); @@ -26,6 +30,11 @@ fn scan_workspace(ws: &Path) -> Result<()> { crate::shell::cli::maybe_auto_prune_after_scan(&store, &cfg) } +fn scan_lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::new(())) +} + fn scan_interval(ws: &Path) -> std::time::Duration { let secs = crate::core::config::load(ws) .map(|cfg| cfg.scan.min_rescan_seconds.max(5)) diff --git a/src/daemon/worker.rs b/src/daemon/worker.rs index 7cb9f3d..6388d79 100644 --- a/src/daemon/worker.rs +++ b/src/daemon/worker.rs @@ -80,7 +80,6 @@ fn handle_request(request: DaemonRequest, stores: &mut StoreCache) -> Result Result<()> { + if get(store, &event.session_id)?.is_none() { + upsert_session(store, &event.session_id)?; + return Ok(()); + } + store.conn().execute( + APPLY_EVENT_SQL, + params![ + event.session_id, + i64::from(event.kind == EventKind::ToolCall), + i64::from(event.kind == EventKind::Error), + i64::from(event.tokens_in.unwrap_or(0)), + i64::from(event.tokens_out.unwrap_or(0)), + i64::from(event.reasoning_tokens.unwrap_or(0)), + i64::from(event.cache_read_tokens.unwrap_or(0)), + i64::from(event.cache_creation_tokens.unwrap_or(0)), + event.cost_usd_e6.unwrap_or(0), + event.ts_ms as i64, + now_ms() as i64, + ], + )?; + Ok(()) +} + pub fn rebuild_workspace(store: &Store, workspace: &str) -> Result { store .list_sessions(workspace)? diff --git a/src/extensions/diffs.rs b/src/extensions/diffs.rs index 420aba4..04b71bc 100644 --- a/src/extensions/diffs.rs +++ b/src/extensions/diffs.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use crate::store::Store; +use crate::store::tool_span_index::ToolSpanRecord; use anyhow::Result; use rusqlite::params; use serde::{Deserialize, Serialize}; @@ -15,6 +16,23 @@ pub struct StepDiff { pub raw_patch_stored: bool, } +pub fn upsert_tool_span(store: &Store, span: &ToolSpanRecord) -> Result<()> { + if span.paths.is_empty() { + return Ok(()); + } + insert( + store, + &StepDiff { + session_id: span.session_id.clone(), + span_id: span.span_id.clone(), + files: span.paths.clone(), + added_lines: 0, + removed_lines: 0, + raw_patch_stored: false, + }, + ) +} + pub fn refresh_session( store: &Store, session_id: &str, diff --git a/src/mcp/handler.rs b/src/mcp/handler.rs index 86b63c1..5a49fee 100644 --- a/src/mcp/handler.rs +++ b/src/mcp/handler.rs @@ -19,11 +19,11 @@ const MCP_CAPABILITIES: &str = r#"Kaizen MCP exposes most `kaizen` CLI workflows - kaizen_summary — Session counts, USD cost, by-agent/model, top tools. Use for spend and volume. Optional json=true. - kaizen_metrics — Code hotspots, slow tools (p95), token-heavy tools, churn. Use for **repository** and tool latency. Optional json. -- kaizen_sessions_list / kaizen_session_show — Session list and one session metadata. Optional json on list; optional `limit` caps rows (newest first). `kaizen_exp_report` supports `refresh: true` for a full transcript rescan before computing the report (matches CLI `kaizen exp report --refresh`). +- kaizen_sessions_list / kaizen_session_show — Session list and one session metadata. Optional json on list; optional `limit` caps rows (newest first). `kaizen_exp_report` supports `refresh: true` for bounded changed-tail ingest before computing the report (matches CLI `kaizen exp report --refresh`). - kaizen_search_sessions — BM25 event search over current workspace. Supports since, agent, kind, limit. - kaizen_insights — Activity dashboard (7d). kaizen_retro — weekly bets. kaizen_exp_* — experiments. - kaizen_query / kaizen_cases_* / kaizen_rules_* / kaizen_alerts_check / kaizen_review_* — local trace-to-case automation loop. -- List/summary/insights/metrics/retro are cache-first; set refresh=true to force a full transcript rescan (matches CLI --refresh). +- List/summary/insights/metrics/retro are cache-first; set refresh=true for bounded changed-tail ingest (matches CLI --refresh). - sessions_list/summary/insights/metrics also accept all_workspaces=true to aggregate across registered project DBs. - kaizen_ingest_hook — same as `kaizen ingest hook` (rare; hooks call this). - kaizen_init — idempotent user-level hooks and home project data; target repos stay read-only. kaizen_sync_* — outbox. kaizen_tui — not available (returns JSON stub). @@ -83,7 +83,7 @@ struct WorkspaceJsonArg { /// When true, return the same pretty JSON as `kaizen sessions list --json` or `kaizen summary --json`. #[serde(default)] json: bool, - /// When true, run a full agent transcript rescan (matches `kaizen ... --refresh`). + /// When true, ingest bounded changed transcript tails (matches `kaizen ... --refresh`). #[serde(default)] refresh: bool, /// Cap sessions returned (newest first); only `kaizen_sessions_list` uses this. @@ -148,7 +148,7 @@ struct MetricsArg { json: bool, #[serde(default)] force: bool, - /// When true, run a full agent transcript rescan (matches `kaizen metrics --refresh`). + /// When true, ingest bounded changed transcript tails (matches `kaizen metrics --refresh`). #[serde(default)] refresh: bool, } @@ -190,7 +190,7 @@ struct RetroArg { json: bool, #[serde(default)] force: bool, - /// When true, run a full agent transcript rescan (matches `kaizen retro --refresh`). + /// When true, ingest bounded changed transcript tails (matches `kaizen retro --refresh`). #[serde(default)] refresh: bool, } @@ -201,7 +201,7 @@ struct InsightsArg { ws: WorkspaceArg, #[serde(default)] all_workspaces: bool, - /// When true, run a full agent transcript rescan (matches `kaizen insights --refresh`). + /// When true, ingest bounded changed transcript tails (matches `kaizen insights --refresh`). #[serde(default)] refresh: bool, } @@ -262,7 +262,7 @@ struct ExpReportArg { id: String, #[serde(default)] json: bool, - /// Full transcript rescan before computing the report. + /// Ingest bounded changed transcript tails before computing the report. #[serde(default)] refresh: bool, } @@ -397,7 +397,7 @@ impl KaizenMcp { #[tool( name = "kaizen_sessions_list", - description = "List agent sessions in the workspace. Set json=true for structured output. Optional limit caps rows after sort (newest first). Use refresh=true for a full transcript rescan." + description = "List agent sessions in the workspace. Set json=true for structured output. Optional limit caps rows after sort (newest first). Use refresh=true to ingest bounded, recently changed transcript tails." )] async fn kaizen_sessions_list( &self, @@ -917,7 +917,7 @@ impl KaizenMcp { #[tool( name = "kaizen_exp_report", - description = "Experiment report (kaizen exp report). Optional refresh: true forces a full transcript rescan before computing the report." + description = "Experiment report (kaizen exp report). Optional refresh: true ingests bounded, recently changed transcript tails before computing the report." )] async fn kaizen_exp_report( &self, diff --git a/src/search/writer.rs b/src/search/writer.rs index 37621c0..7d69795 100644 --- a/src/search/writer.rs +++ b/src/search/writer.rs @@ -7,12 +7,13 @@ use anyhow::{Context, Result}; use std::fs::File; use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; +use tantivy::indexer::IndexWriterOptions; use tantivy::schema::Term; use tantivy::{Index, IndexWriter, TantivyDocument, doc}; const HEAP_BYTES: usize = 50_000_000; -const BATCH_DOCS: usize = 1000; -const BATCH_SECS: u64 = 1; +const BATCH_DOCS: usize = 256; +const BATCH_SECS: u64 = 60; pub struct PendingWriter { writer: IndexWriter, @@ -29,7 +30,7 @@ impl PendingWriter { let lock = lock_file(&dir)?; let (schema, fields) = build_schema(); let index = open_or_create(&dir, schema)?; - let writer = index.writer(HEAP_BYTES)?; + let writer = index.writer_with_options(writer_options())?; Ok(Self { writer, fields, @@ -40,6 +41,9 @@ impl PendingWriter { } pub fn add(&mut self, doc: &SearchDoc) -> Result<()> { + if self.pending == 0 { + self.last_commit = Instant::now(); + } self.writer.delete_term(Term::from_field_text( self.fields.event_key, &event_key(&doc.session_id, doc.seq), @@ -87,6 +91,14 @@ impl PendingWriter { } } +fn writer_options() -> IndexWriterOptions { + IndexWriterOptions::builder() + .memory_budget_per_thread(HEAP_BYTES) + .num_worker_threads(1) + .num_merge_threads(1) + .build() +} + pub fn delete_sessions(root: &Path, ids: &[String]) -> Result<()> { if ids.is_empty() { return Ok(()); @@ -129,3 +141,7 @@ fn reject_tree_symlinks(root: &Path) -> Result<()> { fn open_or_create(dir: &Path, schema: tantivy::schema::Schema) -> Result { Ok(Index::open_in_dir(dir).or_else(|_| Index::create_in_dir(dir, schema))?) } + +#[cfg(test)] +#[path = "writer_tests.rs"] +mod tests; diff --git a/src/search/writer_tests.rs b/src/search/writer_tests.rs new file mode 100644 index 0000000..ca96008 --- /dev/null +++ b/src/search/writer_tests.rs @@ -0,0 +1,27 @@ +use super::*; + +#[test] +fn sparse_hooks_do_not_commit_every_ten_seconds() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = PendingWriter::open(dir.path()).unwrap(); + writer.pending = 1; + writer.last_commit = Instant::now() - Duration::from_secs(10); + assert!(!writer.should_commit()); +} + +#[test] +fn active_session_commits_after_one_minute() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = PendingWriter::open(dir.path()).unwrap(); + writer.pending = 1; + writer.last_commit = Instant::now() - Duration::from_secs(BATCH_SECS); + assert!(writer.should_commit()); +} + +#[test] +fn full_batch_commits_without_waiting() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = PendingWriter::open(dir.path()).unwrap(); + writer.pending = BATCH_DOCS; + assert!(writer.should_commit()); +} diff --git a/src/shell/cli.rs b/src/shell/cli.rs index 32544af..1a58b2d 100644 --- a/src/shell/cli.rs +++ b/src/shell/cli.rs @@ -3,9 +3,9 @@ use crate::collect::tail::antigravity::scan_antigravity_workspace; use crate::collect::tail::claude::scan_claude_session_dir; -use crate::collect::tail::claude_code::scan_claude_project_dir; +use crate::collect::tail::claude_code::scan_claude_project_dir_since; use crate::collect::tail::codex::scan_codex_session_dir; -use crate::collect::tail::codex_desktop::scan_codex_sessions_root; +use crate::collect::tail::codex_desktop::scan_codex_sessions_root_since; use crate::collect::tail::copilot_cli::scan_copilot_cli_workspace; use crate::collect::tail::copilot_vscode::scan_copilot_vscode_workspace; use crate::collect::tail::cursor::scan_session_dir_all; @@ -70,7 +70,6 @@ impl AgentScanStats { self.sessions_found += 1; self.sessions_upserted += 1; self.events_found += event_count as u64; - self.events_upserted += event_count as u64; self.agents.insert(record.agent.clone()); } @@ -129,6 +128,8 @@ fn now_ms_u64() -> u64 { /// Minimum interval between automatic local DB prunes after a successful rescan (24h). const AUTO_PRUNE_INTERVAL_MS: u64 = 86_400_000; +const INITIAL_SCAN_LOOKBACK_MS: u64 = 30 * 86_400_000; +const SCAN_OVERLAP_MS: u64 = 60_000; pub(crate) fn maybe_auto_prune_after_scan(store: &Store, cfg: &config::Config) -> Result<()> { if cfg.retention.hot_days == 0 { @@ -146,7 +147,7 @@ pub(crate) fn maybe_auto_prune_after_scan(store: &Store, cfg: &config::Config) - Ok(()) } -/// Full transcript rescan unless throttled by `[scan].min_rescan_seconds` or `refresh` is true. +/// Import recently changed transcripts unless throttled by the scan interval. pub(crate) fn maybe_scan_all_agents( ws: &Path, cfg: &config::Config, @@ -163,11 +164,20 @@ pub(crate) fn maybe_scan_all_agents( { return Ok(()); } - scan_all_agents(ws, cfg, ws_str, store)?; - store.sync_state_set_u64(SYNC_STATE_LAST_AGENT_SCAN_MS, now_ms_u64())?; + let since_ms = scan_since_ms(store, now)?; + scan_all_agents_since(ws, cfg, ws_str, store, since_ms)?; + store.sync_state_set_u64(SYNC_STATE_LAST_AGENT_SCAN_MS, now)?; Ok(()) } +fn scan_since_ms(store: &Store, now_ms: u64) -> Result { + Ok(store + .sync_state_get_u64(SYNC_STATE_LAST_AGENT_SCAN_MS)? + .map_or(now_ms.saturating_sub(INITIAL_SCAN_LOOKBACK_MS), |last| { + last.saturating_sub(SCAN_OVERLAP_MS) + })) +} + pub(crate) fn maybe_refresh_store(workspace: &Path, store: &Store, refresh: bool) -> Result<()> { if !refresh { return Ok(()); @@ -738,13 +748,14 @@ pub fn cmd_summary( Ok(()) } -pub(crate) fn scan_all_agents( +fn scan_all_agents_since( ws: &Path, cfg: &config::Config, ws_str: &str, store: &Store, + since_ms: u64, ) -> Result<()> { - scan_all_agents_with_stats(ws, cfg, ws_str, store).map(|_| ()) + scan_all_agents_with_stats_since(ws, cfg, ws_str, store, since_ms, false).map(|_| ()) } pub(crate) fn scan_all_agents_with_stats( @@ -752,19 +763,31 @@ pub(crate) fn scan_all_agents_with_stats( cfg: &config::Config, ws_str: &str, store: &Store, +) -> Result { + scan_all_agents_with_stats_since(ws, cfg, ws_str, store, 0, true) +} + +fn scan_all_agents_with_stats_since( + ws: &Path, + cfg: &config::Config, + ws_str: &str, + store: &Store, + since_ms: u64, + enrich_repo: bool, ) -> Result { let _spin = ScanSpinner::start("Scanning agent sessions…"); let sync_ctx = crate::sync::ingest_ctx(cfg, ws.to_path_buf()); - let sessions = collect_all_agent_sessions(ws, cfg, ws_str)?; - let stats = persist_session_batch(store, sessions, sync_ctx.as_ref())?; + let sessions = collect_all_agent_sessions_since(ws, cfg, ws_str, since_ms)?; + let stats = persist_session_batch_mode(store, sessions, sync_ctx.as_ref(), enrich_repo)?; maybe_auto_prune_after_scan(store, cfg)?; Ok(stats) } -pub(crate) fn collect_all_agent_sessions( +fn collect_all_agent_sessions_since( ws: &Path, cfg: &config::Config, ws_str: &str, + since_ms: u64, ) -> Result)>> { let mut out = Vec::new(); let slug = workspace_slug(ws_str); @@ -794,7 +817,11 @@ pub(crate) fn collect_all_agent_sessions( let claude_project = PathBuf::from(&home) .join(".claude/projects") .join(&claude_slug); - out.extend(scan_claude_project_dir(&claude_project, ws)?); + out.extend(scan_claude_project_dir_since( + &claude_project, + ws, + since_ms, + )?); let claude_dir = claude_project.join("sessions"); out.extend(collect_agent_dirs(&claude_dir, |p| { scan_claude_session_dir(p).map(|(mut r, evs)| { @@ -810,9 +837,10 @@ pub(crate) fn collect_all_agent_sessions( vec![(r, evs)] }) })?); - out.extend(scan_codex_sessions_root( + out.extend(scan_codex_sessions_root_since( &PathBuf::from(&home).join(".codex/sessions"), ws, + since_ms, )?); let tail = &cfg.sources.tail; @@ -862,39 +890,76 @@ fn bind_workspace( .collect() } +#[cfg(test)] pub(crate) fn persist_session_batch( store: &Store, sessions: Vec<(SessionRecord, Vec)>, sync_ctx: Option<&crate::sync::SyncIngestContext>, +) -> Result { + persist_session_batch_mode(store, sessions, sync_ctx, true) +} + +fn persist_session_batch_mode( + store: &Store, + sessions: Vec<(SessionRecord, Vec)>, + sync_ctx: Option<&crate::sync::SyncIngestContext>, + enrich_repo: bool, ) -> Result { let mut stats = AgentScanStats::default(); for (mut record, events) in sessions { stats.record(&record, events.len()); - if record.start_commit.is_none() && !record.workspace.is_empty() { - let binding = crate::core::repo::binding_for_session( - Path::new(&record.workspace), - record.started_at_ms, - record.ended_at_ms, - ); - record.start_commit = binding.start_commit; - record.end_commit = binding.end_commit; - record.branch = binding.branch; - record.dirty_start = binding.dirty_start; - record.dirty_end = binding.dirty_end; - record.repo_binding_source = binding.source; - } + let existing = store.get_session(&record.id)?; + inherit_repo_binding(&mut record, existing.as_ref()); + enrich_repo_binding(&mut record, enrich_repo); store.upsert_session(&record)?; + let last_seq = store.last_event_seq_for_session(&record.id)?; let flush_ms = record.ended_at_ms.unwrap_or(record.started_at_ms); - for ev in events { - store.append_event_with_sync(&ev, sync_ctx)?; - } - if record.status == crate::core::event::SessionStatus::Done { - store.flush_projector_session(&record.id, flush_ms)?; - } + let unseen = events + .into_iter() + .filter(|event| last_seq.is_none_or(|seq| event.seq > seq)) + .collect::>(); + let flush = (record.status == crate::core::event::SessionStatus::Done).then_some(flush_ms); + stats.events_upserted += store.append_scanned_event_batch(&unseen, sync_ctx, flush)? as u64; } Ok(stats) } +fn inherit_repo_binding(record: &mut SessionRecord, existing: Option<&SessionRecord>) { + let Some(existing) = existing else { return }; + record.start_commit = record + .start_commit + .take() + .or_else(|| existing.start_commit.clone()); + record.end_commit = record + .end_commit + .take() + .or_else(|| existing.end_commit.clone()); + record.branch = record.branch.take().or_else(|| existing.branch.clone()); + record.dirty_start = record.dirty_start.or(existing.dirty_start); + record.dirty_end = record.dirty_end.or(existing.dirty_end); + record.repo_binding_source = record + .repo_binding_source + .take() + .or_else(|| existing.repo_binding_source.clone()); +} + +fn enrich_repo_binding(record: &mut SessionRecord, enabled: bool) { + if !enabled || record.start_commit.is_some() || record.workspace.is_empty() { + return; + } + let binding = crate::core::repo::binding_for_session( + Path::new(&record.workspace), + record.started_at_ms, + record.ended_at_ms, + ); + record.start_commit = binding.start_commit; + record.end_commit = binding.end_commit; + record.branch = binding.branch; + record.dirty_start = binding.dirty_start; + record.dirty_end = binding.dirty_end; + record.repo_binding_source = binding.source; +} + pub(crate) fn collect_agent_dirs( dir: &Path, scanner: F, @@ -905,14 +970,16 @@ where if !dir.exists() { return Ok(Vec::new()); } + let paths = std::fs::read_dir(dir)? + .filter_map(|entry| entry.ok()) + .filter(|entry| entry.file_type().is_ok_and(|kind| kind.is_dir())) + .map(|entry| entry.path()) + .collect(); let mut out = Vec::new(); - for entry in std::fs::read_dir(dir)?.filter_map(|e| e.ok()) { - if !entry.file_type().map(|t| t.is_dir()).unwrap_or(false) { - continue; - } - match scanner(&entry.path()) { + for path in crate::collect::tail::newest_paths(paths) { + match scanner(&path) { Ok(sessions) => out.extend(sessions), - Err(e) => tracing::warn!("scan {:?}: {e}", entry.path()), + Err(e) => tracing::warn!("scan {path:?}: {e}"), } } Ok(out) diff --git a/src/shell/cli_tests.rs b/src/shell/cli_tests.rs new file mode 100644 index 0000000..940c8bd --- /dev/null +++ b/src/shell/cli_tests.rs @@ -0,0 +1,83 @@ +use super::cli::persist_session_batch; +use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus}; +use crate::store::Store; +use serde_json::json; + +#[test] +fn scanned_batch_appends_only_unseen_suffix() { + let temp = tempfile::tempdir().unwrap(); + let store = Store::open(&temp.path().join("kaizen.db")).unwrap(); + persist_session_batch(&store, vec![(record(), vec![event(0), event(1)])], None).unwrap(); + let mut stale = event(0); + stale.payload = json!({"changed": true}); + let stats = + persist_session_batch(&store, vec![(record(), vec![stale, event(2)])], None).unwrap(); + let rows = store.list_events_for_session("scan").unwrap(); + assert_eq!((stats.events_found, stats.events_upserted), (2, 1)); + assert_eq!((rows.len(), &rows[0].payload), (3, &json!({}))); +} + +#[test] +fn repeated_scan_preserves_repo_binding() { + let temp = tempfile::tempdir().unwrap(); + let store = Store::open(&temp.path().join("kaizen.db")).unwrap(); + persist_session_batch(&store, vec![(record(), Vec::new())], None).unwrap(); + let mut rescanned = record(); + rescanned.start_commit = None; + persist_session_batch(&store, vec![(rescanned, Vec::new())], None).unwrap(); + let stored = store.get_session("scan").unwrap().unwrap(); + assert_eq!(stored.start_commit.as_deref(), Some("abc")); +} + +fn record() -> SessionRecord { + SessionRecord { + id: "scan".into(), + agent: "codex".into(), + model: None, + workspace: String::new(), + started_at_ms: 1, + ended_at_ms: None, + status: SessionStatus::Running, + trace_path: "/trace".into(), + start_commit: Some("abc".into()), + end_commit: None, + branch: None, + dirty_start: None, + dirty_end: None, + repo_binding_source: None, + prompt_fingerprint: None, + parent_session_id: None, + agent_version: None, + os: None, + arch: None, + repo_file_count: None, + repo_total_loc: None, + } +} + +fn event(seq: u64) -> Event { + Event { + session_id: "scan".into(), + seq, + ts_ms: seq + 1, + ts_exact: true, + kind: EventKind::Message, + source: EventSource::Tail, + tool: None, + tool_call_id: None, + tokens_in: None, + tokens_out: None, + reasoning_tokens: None, + cost_usd_e6: None, + stop_reason: None, + latency_ms: None, + ttft_ms: None, + retry_count: None, + context_used_tokens: None, + context_max_tokens: None, + cache_creation_tokens: None, + cache_read_tokens: None, + system_prompt_tokens: None, + payload: json!({}), + } +} diff --git a/src/shell/ingest.rs b/src/shell/ingest.rs index 0c6c558..ec6f77a 100644 --- a/src/shell/ingest.rs +++ b/src/shell/ingest.rs @@ -157,6 +157,9 @@ pub(crate) fn ingest_hook_with_store( )?; } store.append_event_with_sync(&ev, sync_ctx.as_ref())?; + if matches!(event.kind, collect::hooks::EventKind::Stop) { + store.flush_search()?; + } post_ingest_detached(&event, &cfg, ws)?; Ok(()) } diff --git a/src/shell/mod.rs b/src/shell/mod.rs index e033ecd..3461b23 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -36,3 +36,6 @@ pub mod sync; pub mod telemetry; pub mod telemetry_tail; pub mod upgrade; + +#[cfg(test)] +mod cli_tests; diff --git a/src/store/sqlite/event_batch.rs b/src/store/sqlite/event_batch.rs new file mode 100644 index 0000000..395df23 --- /dev/null +++ b/src/store/sqlite/event_batch.rs @@ -0,0 +1,41 @@ +use super::*; + +impl Store { + pub(crate) fn append_scanned_event_batch( + &self, + events: &[Event], + ctx: Option<&SyncIngestContext>, + flush_ms: Option, + ) -> Result { + let Some(session_id) = batch_session(events)? else { + return Ok(0); + }; + let transaction = self.conn.unchecked_transaction()?; + events + .iter() + .try_for_each(|event| self.append_event_deferred(event, ctx))?; + self.finish_scanned_batch(session_id, flush_ms)?; + transaction.commit()?; + Ok(events.len()) + } + + fn finish_scanned_batch(&self, session_id: &str, flush_ms: Option) -> Result<()> { + if let Some(timestamp) = flush_ms { + self.flush_projector_session(session_id, timestamp)?; + } + self.refresh_extension_session(session_id) + } +} + +fn batch_session(events: &[Event]) -> Result> { + let Some(first) = events.first() else { + return Ok(None); + }; + anyhow::ensure!( + events + .iter() + .all(|event| event.session_id == first.session_id), + "scanned event batch mixes sessions" + ); + Ok(Some(&first.session_id)) +} diff --git a/src/store/sqlite/event_extensions.rs b/src/store/sqlite/event_extensions.rs new file mode 100644 index 0000000..35f9e77 --- /dev/null +++ b/src/store/sqlite/event_extensions.rs @@ -0,0 +1,19 @@ +use super::*; + +impl Store { + pub(super) fn index_extension_event(&self, event: &Event) -> Result<()> { + crate::extensions::hash_chain::store_event_hash(self, event) + } + + pub(super) fn apply_live_extension_event(&self, event: &Event) -> Result<()> { + crate::extensions::aggregates::apply_event(self, event) + } + + pub(super) fn refresh_extension_session(&self, session_id: &str) -> Result<()> { + crate::extensions::aggregates::upsert_session(self, session_id)?; + if let Err(error) = crate::extensions::diffs::refresh_session(self, session_id, false) { + tracing::warn!(%session_id, "step diff attribution skipped: {error:#}"); + } + Ok(()) + } +} diff --git a/src/store/sqlite/event_projector.rs b/src/store/sqlite/event_projector.rs index 6bd547a..da4ba59 100644 --- a/src/store/sqlite/event_projector.rs +++ b/src/store/sqlite/event_projector.rs @@ -1,6 +1,8 @@ use super::events::*; use super::*; +const PROJECTOR_HYDRATE_EVENTS: usize = 512; + impl Store { pub(super) fn sync_projector_session( &self, @@ -10,7 +12,15 @@ impl Store { if self.projector.borrow().last_seq(session_id) == last_seq { return Ok(()); } - self.replay_projector_session(session_id) + self.hydrate_projector_session(session_id) + } + + fn hydrate_projector_session(&self, session_id: &str) -> Result<()> { + self.projector.borrow_mut().reset_session(session_id); + for event in self.list_latest_events_for_session(session_id, PROJECTOR_HYDRATE_EVENTS)? { + self.projector.borrow_mut().apply(&event); + } + Ok(()) } pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> { @@ -60,7 +70,8 @@ impl Store { for delta in deltas { match delta { ProjectorEvent::SpanClosed(span, sample) => { - upsert_tool_span_record(&self.conn, span)?; + let persisted = upsert_tool_span_record(&self.conn, span)?; + crate::extensions::diffs::upsert_tool_span(self, &persisted)?; tracing::debug!( session_id = %sample.session_id, span_id = %sample.span_id, diff --git a/src/store/sqlite/event_write.rs b/src/store/sqlite/event_write.rs index 001b299..a61b16f 100644 --- a/src/store/sqlite/event_write.rs +++ b/src/store/sqlite/event_write.rs @@ -19,6 +19,23 @@ impl Store { /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row. pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> { + self.append_event_inner(e, ctx, true) + } + + pub(super) fn append_event_deferred( + &self, + e: &Event, + ctx: Option<&SyncIngestContext>, + ) -> Result<()> { + self.append_event_inner(e, ctx, false) + } + + fn append_event_inner( + &self, + e: &Event, + ctx: Option<&SyncIngestContext>, + refresh_session: bool, + ) -> Result<()> { let last_before = if projector_legacy_mode() { None } else { @@ -39,27 +56,7 @@ impl Store { ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22 ) - ON CONFLICT(session_id, seq) DO UPDATE SET - ts_ms = excluded.ts_ms, - ts_exact = excluded.ts_exact, - kind = excluded.kind, - source = excluded.source, - tool = excluded.tool, - tool_call_id = excluded.tool_call_id, - tokens_in = excluded.tokens_in, - tokens_out = excluded.tokens_out, - reasoning_tokens = excluded.reasoning_tokens, - cost_usd_e6 = excluded.cost_usd_e6, - payload = excluded.payload, - stop_reason = excluded.stop_reason, - latency_ms = excluded.latency_ms, - ttft_ms = excluded.ttft_ms, - retry_count = excluded.retry_count, - context_used_tokens = excluded.context_used_tokens, - context_max_tokens = excluded.context_max_tokens, - cache_creation_tokens = excluded.cache_creation_tokens, - cache_read_tokens = excluded.cache_read_tokens, - system_prompt_tokens = excluded.system_prompt_tokens", + ON CONFLICT(session_id, seq) DO NOTHING", params![ e.session_id, e.seq as i64, @@ -112,7 +109,10 @@ impl Store { self.invalidate_span_tree_cache(); } self.append_search_event(e); - self.refresh_extension_rows(e)?; + self.index_extension_event(e)?; + if refresh_session { + self.apply_live_extension_event(e)?; + } let Some(ctx) = ctx else { return Ok(()); }; @@ -144,15 +144,6 @@ impl Store { Ok(()) } - fn refresh_extension_rows(&self, e: &Event) -> Result<()> { - crate::extensions::hash_chain::store_event_hash(self, e)?; - crate::extensions::aggregates::upsert_session(self, &e.session_id)?; - if let Err(err) = crate::extensions::diffs::refresh_session(self, &e.session_id, false) { - tracing::warn!(session_id = %e.session_id, "step diff attribution skipped: {err:#}"); - } - Ok(()) - } - pub(super) fn append_search_event(&self, e: &Event) { if let Err(err) = self.try_append_search_event(e) { tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}"); diff --git a/src/store/sqlite/events.rs b/src/store/sqlite/events.rs index 3ea3783..1fca7e2 100644 --- a/src/store/sqlite/events.rs +++ b/src/store/sqlite/events.rs @@ -5,7 +5,7 @@ pub(super) fn projector_legacy_mode() -> bool { } pub(super) fn is_stop_event(e: &Event) -> bool { - if !matches!(e.kind, EventKind::Hook) { + if !matches!(e.kind, EventKind::Hook | EventKind::Lifecycle) { return false; } e.payload diff --git a/src/store/sqlite/mod.rs b/src/store/sqlite/mod.rs index 8c4d698..de43e3a 100644 --- a/src/store/sqlite/mod.rs +++ b/src/store/sqlite/mod.rs @@ -56,6 +56,8 @@ mod artifact_windows; mod constants; mod contracts; mod evals; +mod event_batch; +mod event_extensions; mod event_projector; mod event_read; mod event_write; diff --git a/src/store/sqlite/rows.rs b/src/store/sqlite/rows.rs index e892354..82019ea 100644 --- a/src/store/sqlite/rows.rs +++ b/src/store/sqlite/rows.rs @@ -76,7 +76,8 @@ pub(super) fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result { cache_creation_tokens: row.get::<_, Option>(19)?.map(|v| v as u32), cache_read_tokens: row.get::<_, Option>(20)?.map(|v| v as u32), system_prompt_tokens: row.get::<_, Option>(21)?.map(|v| v as u32), - }) + } + .normalize_legacy_hook()) } pub(super) fn search_tool_event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, Event)> { diff --git a/src/store/sqlite/tests/events.rs b/src/store/sqlite/tests/events.rs index 1248c15..26717d7 100644 --- a/src/store/sqlite/tests/events.rs +++ b/src/store/sqlite/tests/events.rs @@ -1,5 +1,5 @@ use super::super::Store; -use super::{EventKind, SessionStatus, make_event, make_session}; +use super::{EventKind, EventSource, SessionStatus, make_event, make_session}; use serde_json::json; use tempfile::TempDir; @@ -30,6 +30,26 @@ fn list_events_for_session_round_trip() { assert_eq!(events[1].seq, 1); } +#[test] +fn legacy_hook_rows_read_as_semantic_tool_events() { + let dir = TempDir::new().unwrap(); + let store = Store::open(&dir.path().join("kaizen.db")).unwrap(); + store.upsert_session(&make_session("legacy-hook")).unwrap(); + let mut event = make_event("legacy-hook", 0); + event.kind = EventKind::Hook; + event.source = EventSource::Hook; + event.tool = None; + event.payload = json!({"hook_event_name":"PreToolUse","tool_name":"Read"}); + store.append_event(&event).unwrap(); + + let event = store + .list_events_for_session("legacy-hook") + .unwrap() + .remove(0); + assert_eq!(event.kind, EventKind::ToolCall); + assert_eq!(event.tool.as_deref(), Some("Read")); +} + #[test] fn list_events_page_uses_inclusive_seq_cursor() { let dir = TempDir::new().unwrap(); @@ -52,9 +72,44 @@ fn append_event_dedup() { let store = Store::open(&dir.path().join("kaizen.db")).unwrap(); store.upsert_session(&make_session("s5")).unwrap(); store.append_event(&make_event("s5", 0)).unwrap(); - store.append_event(&make_event("s5", 0)).unwrap(); + let mut duplicate = make_event("s5", 0); + duplicate.tokens_in = Some(42); + store.append_event(&duplicate).unwrap(); let events = store.list_events_for_session("s5").unwrap(); assert_eq!(events.len(), 1); + assert_eq!(events[0].tokens_in, None); + let aggregate = crate::extensions::aggregates::get(&store, "s5") + .unwrap() + .unwrap(); + assert_eq!(aggregate.event_count, 1); +} + +#[test] +fn append_event_backfills_missing_aggregate_before_incrementing() { + let dir = TempDir::new().unwrap(); + let store = Store::open(&dir.path().join("kaizen.db")).unwrap(); + store + .upsert_session(&make_session("legacy-aggregate")) + .unwrap(); + store + .append_event(&make_event("legacy-aggregate", 0)) + .unwrap(); + store + .conn() + .execute( + "DELETE FROM session_aggregates WHERE session_id = ?1", + ["legacy-aggregate"], + ) + .unwrap(); + + store + .append_event(&make_event("legacy-aggregate", 1)) + .unwrap(); + + let aggregate = crate::extensions::aggregates::get(&store, "legacy-aggregate") + .unwrap() + .unwrap(); + assert_eq!(aggregate.event_count, 2); } #[test] diff --git a/src/store/sqlite/visualization/aggregates.rs b/src/store/sqlite/visualization/aggregates.rs index f2db792..00fef88 100644 --- a/src/store/sqlite/visualization/aggregates.rs +++ b/src/store/sqlite/visualization/aggregates.rs @@ -4,13 +4,18 @@ use crate::visualization::{DataQuality, TokenTotals, VisualizationTotals}; use anyhow::Result; const TOTALS_SQL: &str = " -WITH ws AS (SELECT id, status FROM sessions WHERE workspace = ?1) +WITH ws AS (SELECT id, status FROM sessions WHERE workspace = ?1), +last_events AS ( + SELECT e.session_id, MAX(e.ts_ms) last_event_ms FROM events e + JOIN ws ON ws.id = e.session_id GROUP BY e.session_id +) SELECT (SELECT COUNT(*) FROM ws), - (SELECT COUNT(*) FROM ws WHERE status IN ('Running', 'Waiting', 'Idle')), + (SELECT COUNT(*) FROM ws JOIN last_events l ON l.session_id = ws.id + WHERE ws.status IN ('Running', 'Waiting', 'Idle') AND l.last_event_ms >= ?2), COUNT(e.id), COALESCE(SUM(e.kind = 'Error'), 0), - COALESCE(SUM(e.kind = 'ToolCall'), 0), + (SELECT COUNT(*) FROM tool_spans t JOIN ws ON ws.id = t.session_id), COALESCE(SUM(e.cost_usd_e6), 0), COALESCE(SUM(e.tokens_in), 0), COALESCE(SUM(e.tokens_out), 0), @@ -28,9 +33,13 @@ impl Store { pub(crate) fn visualization_totals( &self, workspace: &str, + active_since_ms: u64, ) -> Result<(VisualizationTotals, DataQuality)> { let mut statement = self.conn().prepare(TOTALS_SQL)?; - let row = statement.query_row([workspace], totals_row)?; + let row = statement.query_row( + rusqlite::params![workspace, active_since_ms as i64], + totals_row, + )?; Ok((totals(&row), quality(&row))) } } diff --git a/src/store/sqlite/visualization/sessions.rs b/src/store/sqlite/visualization/sessions.rs index 3ad9434..2cf22d7 100644 --- a/src/store/sqlite/visualization/sessions.rs +++ b/src/store/sqlite/visualization/sessions.rs @@ -32,9 +32,9 @@ ORDER BY r.started_at_ms DESC, r.id ASC"; const TOP_TOOLS_SQL: &str = " WITH recent AS MATERIALIZED (SELECT id FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC, id ASC LIMIT ?2), counts AS ( - SELECT e.session_id, e.tool, COUNT(*) count FROM events e - JOIN recent r ON r.id = e.session_id WHERE e.tool IS NOT NULL - GROUP BY e.session_id, e.tool + SELECT t.session_id, t.tool, COUNT(*) count FROM tool_spans t + JOIN recent r ON r.id = t.session_id WHERE t.tool <> '' + GROUP BY t.session_id, t.tool ), ranked AS ( SELECT session_id, tool, count, ROW_NUMBER() OVER ( PARTITION BY session_id ORDER BY count DESC, tool ASC) rank FROM counts diff --git a/src/store/tool_span_index/persistence/write.rs b/src/store/tool_span_index/persistence/write.rs index 694bdc1..f57d670 100644 --- a/src/store/tool_span_index/persistence/write.rs +++ b/src/store/tool_span_index/persistence/write.rs @@ -41,10 +41,14 @@ pub(crate) fn clear_session_spans(conn: &Connection, session_id: &str) -> Result Ok(()) } -pub(crate) fn upsert_tool_span_record(conn: &Connection, span: &ToolSpanRecord) -> Result<()> { +pub(crate) fn upsert_tool_span_record( + conn: &Connection, + span: &ToolSpanRecord, +) -> Result { let persisted = namespaced_record(span); upsert_span(conn, &persisted)?; - replace_paths(conn, &persisted) + replace_paths(conn, &persisted)?; + Ok(persisted) } fn namespaced_record(span: &ToolSpanRecord) -> ToolSpanRecord { diff --git a/src/visualization/build.rs b/src/visualization/build.rs index fcdb6a5..ff7bf9e 100644 --- a/src/visualization/build.rs +++ b/src/visualization/build.rs @@ -56,7 +56,8 @@ pub(crate) fn build_report_observed( query: VisualizationQuery, ) -> Result { validate(&query.limits)?; - let (totals, quality) = store.visualization_totals(&query.workspace)?; + let active_since_ms = query.now_ms.saturating_sub(ACTIVE_TTL_MS); + let (totals, quality) = store.visualization_totals(&query.workspace, active_since_ms)?; let sessions = store.visualization_sessions(&query.workspace, query.limits.sessions, query.now_ms)?; let selected = selected_detail(store, &query, sessions.first())?; diff --git a/src/web/assets.rs b/src/web/assets.rs index 00c2396..b4c9543 100644 --- a/src/web/assets.rs +++ b/src/web/assets.rs @@ -118,6 +118,10 @@ mod tests { "id=\"detail-spans\"", "id=\"detail-files\"", "id=\"detail-tools\"", + "id=\"project-insights\"", + "id=\"insight-tools\"", + "id=\"insight-attention\"", + "id=\"insight-coverage\"", "id=\"developer-raw\"", "/assets/kaizen-tokens.css", ] { @@ -127,12 +131,15 @@ mod tests { "kaizen_sessions_list", "all_workspaces", "visibilitychange", - "AUTO_REFRESH_MS", + "type: \"subscribe\"", + "message.type === \"changed\"", + "message.id !== state.snapshotPending", "Authorization required", ] { assert!(JS.contains(needle), "js missing {needle}"); } assert!(!JS.contains("setInterval(")); + assert!(RENDER_JS.contains("renderInsights")); } #[test] diff --git a/src/web/assets/index.html b/src/web/assets/index.html index 115bffa..b5cf579 100644 --- a/src/web/assets/index.html +++ b/src/web/assets/index.html @@ -77,6 +77,30 @@

See what your coding agents are doing.

+
+
+

Fast read

+

What stands out

+
+
+
+ Tool pattern + Waiting for activity +

Recent tool use appears here.

+
+
+ Needs attention + Waiting for sessions +

Errors and stale work appear here.

+
+
+ Telemetry coverage + Waiting for events +

Token and cost coverage appear here.

+
+
+
+
diff --git a/src/web/assets/kaizen-render.js b/src/web/assets/kaizen-render.js index f810fe7..19814cc 100644 --- a/src/web/assets/kaizen-render.js +++ b/src/web/assets/kaizen-render.js @@ -40,6 +40,7 @@ export function showManual(path = "") { export function renderReport(report) { renderTotals(report?.totals || {}); + renderInsights(report); renderSessions( report?.sessions || [], report?.selected?.session?.id, @@ -48,6 +49,35 @@ export function renderReport(report) { renderDetail(report); } +function renderInsights(report) { + const sessions = report?.sessions || []; + const [tool, calls] = topTool(sessions); + const attention = sessions.filter(row => ["errored", "orphaned"].includes(row.status)).length; + const quality = report?.quality || {}; + setInsight("tools", tool ? `${label(tool)} leads` : "No tool calls yet", tool ? `${count(calls)} calls in visible sessions` : "Live tool use appears here."); + setInsight("attention", attention ? `${count(attention)} need attention` : "No recent warnings", `${count(sessions.length)} visible sessions checked`); + setInsight("coverage", `${percent(quality.token_coverage_pct)} token coverage`, `${percent(quality.cost_coverage_pct)} cost coverage`); +} + +function topTool(sessions) { + const counts = sessions.flatMap(row => row.top_tools || []).reduce(addTool, new Map()); + return [...counts].sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0]))[0] || ["", 0]; +} + +function addTool(counts, [tool, calls]) { + counts.set(tool, (counts.get(tool) || 0) + calls); + return counts; +} + +function setInsight(id, title, note) { + $(`#insight-${id}`).textContent = title; + $(`#insight-${id}-note`).textContent = note; +} + +function percent(value) { + return `${Math.round(Number(value) || 0)}%`; +} + function renderTotals(totals) { $("#total-sessions").textContent = count(totals.session_count); $("#active-sessions").textContent = count(totals.running_count); diff --git a/src/web/assets/kaizen-state.js b/src/web/assets/kaizen-state.js index b8d6bc8..8f076fb 100644 --- a/src/web/assets/kaizen-state.js +++ b/src/web/assets/kaizen-state.js @@ -1,5 +1,3 @@ -export const AUTO_REFRESH_MS = 20_000; - export function decodeOutput(output) { const value = output?.value; if (typeof value !== "string") return value || {}; diff --git a/src/web/assets/kaizen.css b/src/web/assets/kaizen.css index dee11f5..de7028b 100644 --- a/src/web/assets/kaizen.css +++ b/src/web/assets/kaizen.css @@ -82,6 +82,17 @@ main { width: min(var(--content-width), 100%); margin: auto; padding: var(--spac .summary-grid article { padding: var(--space-4); border: 1px solid var(--rule-strong); background: var(--paper-raised); } .summary-grid span { display: block; color: var(--ink-faint); font-size: 0.75rem; font-weight: 800; text-transform: uppercase; } .summary-grid strong { display: block; margin-top: var(--space-2); font-family: var(--font-display); font-size: 2rem; font-weight: 500; } +.insight-panel { margin-bottom: var(--space-4); border: 1px solid var(--rule-strong); background: var(--ink); color: var(--paper-raised); box-shadow: var(--shadow); } +.insight-heading { padding: var(--space-3) var(--space-4); border-bottom: 1px solid color-mix(in srgb, var(--paper) 25%, transparent); } +.insight-heading p, .insight-heading h2 { margin: 0; } +.insight-heading .kicker { color: color-mix(in srgb, var(--paper) 72%, transparent); } +.insight-heading h2 { margin-top: var(--space-1); font-family: var(--font-display); font-size: 1.45rem; font-weight: 500; } +.insight-grid { display: grid; grid-template-columns: repeat(3, 1fr); } +.insight-grid article { padding: var(--space-4); border-right: 1px solid color-mix(in srgb, var(--paper) 25%, transparent); } +.insight-grid article:last-child { border-right: 0; } +.insight-grid span { color: color-mix(in srgb, var(--paper) 68%, transparent); font-size: 0.72rem; font-weight: 800; letter-spacing: 0.07em; text-transform: uppercase; } +.insight-grid strong { display: block; margin-top: var(--space-2); font-family: var(--font-display); font-size: 1.35rem; font-weight: 500; } +.insight-grid p { margin: var(--space-1) 0 0; color: color-mix(in srgb, var(--paper) 72%, transparent); font-size: 0.78rem; } .observe-grid { display: grid; grid-template-columns: minmax(0, 1.35fr) minmax(320px, 0.65fr); gap: var(--space-4); align-items: start; } .notebook-panel { min-width: 0; border: 1px solid var(--rule-strong); background: var(--paper-raised); box-shadow: var(--shadow); } .panel-heading { display: flex; align-items: start; justify-content: space-between; gap: var(--space-3); padding: var(--space-4); border-bottom: 1px solid var(--rule); } @@ -102,7 +113,7 @@ td strong { color: var(--ink); } .status-label[data-tone="danger"] { color: var(--rust); border-color: var(--rust); } .status-label[data-tone="ready"] { color: var(--field-green); border-color: var(--field-green); } .empty-note { margin: 0; padding: var(--space-5); color: var(--ink-soft); } -.detail-panel { display: grid; } +.detail-panel { position: sticky; top: var(--space-4); display: grid; max-height: calc(100vh - (2 * var(--space-4))); overflow: auto; } .detail-facts { display: grid; grid-template-columns: auto 1fr; gap: var(--space-2) var(--space-3); margin: 0; padding: var(--space-4); border-bottom: 1px solid var(--rule); } .detail-facts dt { color: var(--ink-faint); font-size: 0.75rem; text-transform: uppercase; } .detail-facts dd { margin: 0; color: var(--ink); overflow-wrap: anywhere; } @@ -128,6 +139,7 @@ td strong { color: var(--ink); } } @media (max-width: 980px) { .observe-grid { grid-template-columns: 1fr; } + .detail-panel { position: static; max-height: none; } .project-controls { grid-template-columns: 1fr auto; } .project-controls details { grid-column: 1 / -1; } } @@ -136,6 +148,8 @@ td strong { color: var(--ink); } main { padding: var(--space-5) var(--space-3) var(--space-6); } .site-header, .panel-heading { align-items: start; } .summary-grid, .project-controls { grid-template-columns: 1fr 1fr; } + .insight-grid { grid-template-columns: 1fr; } + .insight-grid article { border-right: 0; border-bottom: 1px solid color-mix(in srgb, var(--paper) 25%, transparent); } .control-field, .project-controls details { grid-column: 1 / -1; } .manual-row { grid-template-columns: 1fr; } } diff --git a/src/web/assets/kaizen.js b/src/web/assets/kaizen.js index 43382d2..4dddeae 100644 --- a/src/web/assets/kaizen.js +++ b/src/web/assets/kaizen.js @@ -1,4 +1,4 @@ -import { AUTO_REFRESH_MS, chooseProject, decodeOutput, projectPaths } from "./kaizen-state.js"; +import { chooseProject, decodeOutput, projectPaths } from "./kaizen-state.js"; import { bindRawReport, setRawReport } from "./kaizen-raw.js"; import { createTransport } from "./kaizen-transport.js"; import { @@ -19,9 +19,8 @@ const state = { projects: [], workspace: "", selected: "", - snapshotPending: false, - refreshTimer: 0, - lastRefresh: 0, + snapshotPending: "", + refreshQueued: false, }; const transport = createTransport({ url: socketUrl, @@ -46,7 +45,7 @@ function bindControls() { $("#project-select").addEventListener("change", event => activateProject(event.target.value)); $("#session-rows").addEventListener("click", selectSession); $("#manual-form").addEventListener("submit", openManualPath); - document.addEventListener("visibilitychange", visibilityChanged); + document.addEventListener("visibilitychange", flushQueued); } function socketUrl() { const scheme = location.protocol === "https:" ? "wss" : "ws"; @@ -57,9 +56,8 @@ function connected() { discoverProjects(); } function disconnected() { - state.snapshotPending = false; + state.snapshotPending = ""; setBusy(false); - clearRefresh(); setConnection("Reconnecting", "danger"); setJourney("error", "Connection lost", "Trying the secure local connection again."); } @@ -86,7 +84,8 @@ function receive(raw) { } if (message.type === "result") return receiveResult(message); if (message.type === "visualization_snapshot") return receiveSnapshot(message); - if (message.type === "error") return fail(message.error || "Request failed."); + if (message.type === "changed") return receiveChanged(message); + if (message.type === "error") return receiveError(message); } function receiveResult(message) { const purpose = state.pending.get(message.id); @@ -111,19 +110,22 @@ function activateProject(workspace) { if (!workspace) return; state.workspace = workspace; state.selected = ""; + state.snapshotPending = ""; + state.refreshQueued = false; localStorage.kaizenWorkspace = workspace; $("#manual-path").value = workspace; renderProjects(state.projects.includes(workspace) ? state.projects : [workspace, ...state.projects], workspace); + transport.send({ type: "subscribe", workspace }); requestSnapshot(true); } function requestSnapshot(announce) { if (!state.workspace || state.snapshotPending) return; if (!transport.isOpen()) return fail("Local connection is not ready."); - clearRefresh(); - state.snapshotPending = true; + const request = snapshotRequest(); + state.snapshotPending = request.id; setBusy(true); if (announce) setJourney("neutral", "Loading observations", "Reading recent local telemetry."); - transport.send(snapshotRequest()); + if (!transport.send(request)) fail("Local connection is not ready."); } function snapshotRequest() { return { @@ -134,15 +136,33 @@ function snapshotRequest() { }; } function receiveSnapshot(message) { + if (message.id !== state.snapshotPending) return; const report = message.report || {}; - state.snapshotPending = false; + state.snapshotPending = ""; state.selected = report.selected?.session?.id || report.sessions?.[0]?.id || ""; - state.lastRefresh = Date.now(); setBusy(false); setRawReport(report); renderReport(report); report.sessions?.length ? ready(report) : empty(report); - scheduleRefresh(); + if (state.refreshQueued) return refreshQueued(); +} +function receiveError(message) { + if (String(message.id || "").startsWith("snapshot-") && message.id !== state.snapshotPending) return; + fail(message.error || "Request failed."); +} +function receiveChanged(message) { + if (message.workspace !== state.workspace) return; + if (document.hidden || state.snapshotPending) state.refreshQueued = true; + else requestSnapshot(false); +} + +function refreshQueued() { + state.refreshQueued = false; + requestSnapshot(false); +} + +function flushQueued() { + if (!document.hidden && state.refreshQueued && !state.snapshotPending) refreshQueued(); } function ready(report) { const at = new Date(report.generated_at_ms || Date.now()).toLocaleTimeString(); @@ -167,27 +187,11 @@ function openManualPath(event) { if (!path) return fail("Project path is required."); activateProject(path); } -function visibilityChanged() { - clearRefresh(); - if (document.hidden || !state.workspace) return; - const remaining = AUTO_REFRESH_MS - (Date.now() - state.lastRefresh); - remaining <= 0 ? requestSnapshot(false) : scheduleRefresh(remaining); -} -function scheduleRefresh(delay = AUTO_REFRESH_MS) { - clearRefresh(); - if (document.hidden || !state.workspace || !transport.isOpen()) return; - state.refreshTimer = setTimeout(() => requestSnapshot(false), Math.max(1_000, delay)); -} -function clearRefresh() { - clearTimeout(state.refreshTimer); - state.refreshTimer = 0; -} function fail(message) { - state.snapshotPending = false; + state.snapshotPending = ""; setBusy(false); setJourney("error", "Could not load observations", message); showManual(state.workspace); - scheduleRefresh(); } function showAuth(message) { setBusy(false); diff --git a/src/web/live.rs b/src/web/live.rs new file mode 100644 index 0000000..78a9140 --- /dev/null +++ b/src/web/live.rs @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +//! Cheap per-connection SQLite revision tracking for live Web updates. + +use anyhow::Result; +use serde_json::{Value, json}; +use std::path::{Path, PathBuf}; +use std::time::UNIX_EPOCH; + +#[derive(Default)] +pub(super) struct Subscription { + workspace: Option, + revision: Option, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +struct Revision { + database: FileStamp, + wal: FileStamp, +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +struct FileStamp { + len: u64, + modified_ns: u128, +} + +impl Subscription { + pub(super) fn set(&mut self, workspace: Option) -> Result<()> { + self.revision = workspace.as_deref().map(revision).transpose()?; + self.workspace = workspace; + Ok(()) + } + + pub(super) fn clear(&mut self) { + self.workspace = None; + self.revision = None; + } + + pub(super) fn is_active(&self) -> bool { + self.workspace.is_some() + } + + pub(super) fn changed(&mut self) -> Option { + let workspace = self.workspace.as_deref()?; + let next = revision(workspace).ok()?; + if self.revision == Some(next) { + return None; + } + self.revision = Some(next); + Some(json!({"type":"changed", "workspace":workspace})) + } +} + +fn revision(workspace: &str) -> Result { + let database = crate::core::workspace::db_path(Path::new(workspace))?; + let wal = sidecar(&database, "-wal"); + Ok(Revision { + database: stamp(&database), + wal: stamp(&wal), + }) +} + +fn sidecar(database: &Path, suffix: &str) -> PathBuf { + PathBuf::from(format!("{}{suffix}", database.to_string_lossy())) +} + +fn stamp(path: &Path) -> FileStamp { + path.metadata().map_or_else( + |_| FileStamp::default(), + |meta| FileStamp { + len: meta.len(), + modified_ns: meta.modified().ok().and_then(since_epoch).unwrap_or(0), + }, + ) +} + +fn since_epoch(time: std::time::SystemTime) -> Option { + time.duration_since(UNIX_EPOCH) + .ok() + .map(|value| value.as_nanos()) +} diff --git a/src/web/mod.rs b/src/web/mod.rs index c174f90..00f52e3 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -3,6 +3,7 @@ mod assets; pub mod features; +mod live; mod server; mod snapshot; mod token; diff --git a/src/web/server.rs b/src/web/server.rs index 1587a60..732df1d 100644 --- a/src/web/server.rs +++ b/src/web/server.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later //! Axum router for the local daemon web app. -use super::{assets, features, snapshot, tools}; +use super::{assets, features, live::Subscription, snapshot, tools}; use axum::Router; use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; use axum::extract::{Query, State}; @@ -35,6 +35,8 @@ enum ClientMessage { Subscribe { #[serde(default)] id: Option, + #[serde(default)] + workspace: Option, }, Unsubscribe, Ping { @@ -80,18 +82,19 @@ async fn ws( } async fn socket_loop(mut socket: WebSocket) { - let mut subscribed = false; - let mut tick = tokio::time::interval(std::time::Duration::from_secs(3)); + let mut subscription = Subscription::default(); + let mut tick = tokio::time::interval(std::time::Duration::from_millis(250)); loop { tokio::select! { msg = socket.recv() => { let Some(Ok(Message::Text(text))) = msg else { break; }; - if !handle_text(&mut socket, &text, &mut subscribed).await { + if !handle_text(&mut socket, &text, &mut subscription).await { break; } } - _ = tick.tick(), if subscribed => { - if send(&mut socket, status_msg(None)).await.is_err() { + _ = tick.tick(), if subscription.is_active() => { + if let Some(value) = subscription.changed() + && send(&mut socket, value).await.is_err() { break; } } @@ -99,18 +102,25 @@ async fn socket_loop(mut socket: WebSocket) { } } -async fn handle_text(socket: &mut WebSocket, text: &str, subscribed: &mut bool) -> bool { +async fn handle_text(socket: &mut WebSocket, text: &str, subscription: &mut Subscription) -> bool { match serde_json::from_str::(text) { Ok(ClientMessage::Call { id, tool, args }) => { let value = call_msg(&id, &tool, args.unwrap_or_else(|| json!({}))).await; send(socket, value).await.is_ok() } - Ok(ClientMessage::Subscribe { id }) => { - *subscribed = true; + Ok(ClientMessage::Subscribe { id, workspace }) => { + if let Err(err) = subscription.set(workspace) { + return send( + socket, + json!({"type":"error","id":id,"error":err.to_string()}), + ) + .await + .is_ok(); + } send(socket, status_msg(id.as_deref())).await.is_ok() } Ok(ClientMessage::Unsubscribe) => { - *subscribed = false; + subscription.clear(); send( socket, json!({"type":"result","output":{"kind":"json","value":{"subscribed":false}}}), diff --git a/tests/machine_registry.rs b/tests/machine_registry.rs index 67dbebd..c670f1b 100644 --- a/tests/machine_registry.rs +++ b/tests/machine_registry.rs @@ -30,6 +30,32 @@ fn default_list_omits_missing_workspace_without_deleting_row() -> anyhow::Result Ok(()) } +#[test] +fn machine_scope_ignores_workspace_containing_kaizen_home() -> anyhow::Result<()> { + let _home = test_home::TestHome::new()?; + let tmp = tempfile::tempdir()?; + let workspace = tmp.path().join("repo"); + std::fs::create_dir(&workspace)?; + kaizen::core::machine_registry::upsert_from_resolve(&workspace)?; + insert_legacy_home_workspace()?; + + let roots = kaizen::core::workspace::machine_workspaces(Some(&workspace))?; + + assert_eq!(roots, vec![std::fs::canonicalize(workspace)?]); + Ok(()) +} + +fn insert_legacy_home_workspace() -> anyhow::Result<()> { + let home = std::fs::canonicalize(std::env::var("HOME")?)?; + let db = kaizen::core::machine_registry::db_path().expect("test home"); + let connection = rusqlite::Connection::open(db)?; + connection.execute( + "INSERT INTO projects(path, name, first_seen_ms, last_seen_ms) VALUES (?1, 'home', 0, 1)", + [home.to_string_lossy().as_ref()], + )?; + Ok(()) +} + fn assert_raw_row(workspace: std::path::PathBuf) -> anyhow::Result<()> { let rows = kaizen::core::machine_registry::list_paths_including_missing()?; assert_eq!(rows, vec![workspace]); diff --git a/tests/spec/visualization_report.rs b/tests/spec/visualization_report.rs index 99a73c6..319e018 100644 --- a/tests/spec/visualization_report.rs +++ b/tests/spec/visualization_report.rs @@ -47,6 +47,38 @@ fn report_skips_activity_when_disabled() -> anyhow::Result<()> { Ok(()) } +#[test] +fn active_now_excludes_stale_open_sessions() -> anyhow::Result<()> { + let tmp = tempfile::tempdir()?; + let store = Store::open(&tmp.path().join("k.db"))?; + store.upsert_session(&session("active", SessionStatus::Running))?; + store.upsert_session(&session("stale", SessionStatus::Running))?; + store.append_event(&event_at("active", 0, 999_500))?; + store.append_event(&event_at("stale", 0, 1))?; + let mut query = query(None); + query.now_ms = 1_000_000; + + let report = build_report(&store, query)?; + + assert_eq!(report.totals.running_count, 1); + Ok(()) +} + +#[test] +fn legacy_hook_spans_feed_tool_insights() -> anyhow::Result<()> { + let tmp = tempfile::tempdir()?; + let store = Store::open(&tmp.path().join("k.db"))?; + store.upsert_session(&session("legacy", SessionStatus::Done))?; + store.append_event(&legacy_hook("legacy", 0, "PreToolUse"))?; + store.append_event(&legacy_hook("legacy", 1, "PostToolUse"))?; + + let report = build_report(&store, query(None))?; + + assert_eq!(report.totals.tool_call_count, 1); + assert_eq!(report.sessions[0].top_tools, vec![("Read".into(), 1)]); + Ok(()) +} + #[test] fn activity_bins_preserve_time_boundaries() -> anyhow::Result<()> { let tmp = tempfile::tempdir()?; @@ -151,3 +183,13 @@ fn event_at(session_id: &str, seq: u64, ts_ms: u64) -> Event { payload: json!({"input": {"path": "src/lib.rs"}}), } } + +fn legacy_hook(session_id: &str, seq: u64, name: &str) -> Event { + let mut event = event_at(session_id, seq, 2_000 + seq); + event.kind = EventKind::Hook; + event.source = EventSource::Hook; + event.tool = None; + event.tool_call_id = Some("legacy-call".into()); + event.payload = json!({"hook_event_name":name,"tool_name":"Read"}); + event +} diff --git a/tests/web_live.rs b/tests/web_live.rs new file mode 100644 index 0000000..af3a331 --- /dev/null +++ b/tests/web_live.rs @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use futures_util::{SinkExt, StreamExt}; +use kaizen::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus}; +use kaizen::store::Store; +use serde_json::{Value, json}; +use tokio::net::TcpListener; +use tokio::time::{Duration, timeout}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +#[tokio::test] +async fn subscribed_workspace_reports_database_changes_within_one_second() -> anyhow::Result<()> { + let fixture = Fixture::new()?; + let listener = TcpListener::bind("127.0.0.1:0").await?; + let (endpoint, _task) = kaizen::web::start_with_listener(listener).await?; + let (mut socket, _) = connect_async(fixture.ws_url(&endpoint)).await?; + socket + .send(Message::Text(fixture.subscribe().into())) + .await?; + assert_eq!(recv(&mut socket).await?["type"], "status"); + + fixture.append_event()?; + let changed = timeout(Duration::from_secs(1), recv(&mut socket)).await??; + + assert_eq!(changed["type"], "changed"); + assert_eq!(changed["workspace"], fixture.workspace_string()); + Ok(()) +} + +struct Fixture { + _temp: tempfile::TempDir, + workspace: std::path::PathBuf, + store: Store, +} + +impl Fixture { + fn new() -> anyhow::Result { + let temp = tempfile::tempdir()?; + let workspace = temp.path().join("repo"); + std::fs::create_dir_all(&workspace)?; + unsafe { + std::env::set_var("HOME", temp.path()); + std::env::set_var("KAIZEN_HOME", temp.path().join(".kaizen")); + std::env::set_var("KAIZEN_DAEMON", "0"); + } + let workspace = std::fs::canonicalize(workspace)?; + let store = Store::open(&kaizen::core::workspace::db_path(&workspace)?)?; + store.upsert_session(&session(&workspace))?; + Ok(Self { + _temp: temp, + workspace, + store, + }) + } + + fn ws_url(&self, endpoint: &kaizen::ipc::WebEndpoint) -> String { + format!("ws://{}/ws?token={}", endpoint.listen, endpoint.token) + } + + fn subscribe(&self) -> String { + json!({"type":"subscribe", "id":"live", "workspace":self.workspace}).to_string() + } + + fn append_event(&self) -> anyhow::Result<()> { + self.store.append_event(&event()) + } + + fn workspace_string(&self) -> String { + self.workspace.to_string_lossy().into_owned() + } +} + +fn session(workspace: &std::path::Path) -> SessionRecord { + SessionRecord { + id: "live-session".into(), + agent: "claude".into(), + model: None, + workspace: workspace.to_string_lossy().into_owned(), + started_at_ms: 1, + ended_at_ms: None, + status: SessionStatus::Running, + trace_path: String::new(), + start_commit: None, + end_commit: None, + branch: None, + dirty_start: None, + dirty_end: None, + repo_binding_source: None, + prompt_fingerprint: None, + parent_session_id: None, + agent_version: None, + os: None, + arch: None, + repo_file_count: None, + repo_total_loc: None, + } +} + +fn event() -> Event { + Event { + session_id: "live-session".into(), + seq: 0, + ts_ms: 2, + ts_exact: true, + kind: EventKind::ToolCall, + source: EventSource::Hook, + tool: Some("Read".into()), + tool_call_id: Some("call-1".into()), + tokens_in: None, + tokens_out: None, + reasoning_tokens: None, + cost_usd_e6: None, + stop_reason: None, + latency_ms: None, + ttft_ms: None, + retry_count: None, + context_used_tokens: None, + context_max_tokens: None, + cache_creation_tokens: None, + cache_read_tokens: None, + system_prompt_tokens: None, + payload: json!({}), + } +} + +async fn recv( + socket: &mut tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, +) -> anyhow::Result { + let Some(Ok(Message::Text(text))) = socket.next().await else { + anyhow::bail!("missing WebSocket message"); + }; + Ok(serde_json::from_str(&text)?) +}