diff --git a/CHANGELOG.md b/CHANGELOG.md index 76b1050..f187efe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ here explicitly. ### Changed -- **Project data moved out of repo** — all per-workspace artifacts (SQLite DB, config, search index, reports, telemetry NDJSON, backups, sampler stop files) now live under `~/.kaizen/projects//` instead of `/.kaizen/`. Slug = canonical path with `/` → `-`. `KAIZEN_HOME` overrides `~/.kaizen`. Existing in-repo `.kaizen/` directories auto-migrate on first use; a `MIGRATED.txt` marker is left behind and the old directory is safe to delete. Identity (workspace key) is unchanged. See [ADR 007](docs/adr/007-project-data-in-home.md). +- **Target repositories are read-only** — all per-workspace artifacts live under `~/.kaizen/projects//`; `KAIZEN_HOME` overrides `~/.kaizen`. Legacy in-repo `.kaizen/` data is copied without moving, deleting, or marking the source. Host hooks live in user-level agent configuration. See [ADR 011](docs/adr/011-target-repositories-read-only.md). ### Added @@ -32,9 +32,9 @@ here explicitly. ### Changed -- **Machine-local project registry** lives in `~/.kaizen/machine.db` (SQLite) instead of `workspaces.json`; `kaizen init` upserts the current repo. Legacy `workspaces.json` is imported once and renamed to `workspaces.json.migrated`. `kaizen doctor` reports registry status. `--all-workspaces` still merges per-repo stores and now includes inited projects that do not yet have `.kaizen/kaizen.db`. +- **Machine-local project registry** lives in `~/.kaizen/machine.db` (SQLite) instead of `workspaces.json`; `kaizen init` upserts the current repo. Legacy `workspaces.json` is imported once and renamed to `workspaces.json.migrated`. `kaizen doctor` reports registry status. `--all-workspaces` merges available project databases and ignores missing or unsafe roots. - **Telemetry wire format (third-party sinks):** PostHog capture is one event per canonical row (`kaizen.event`, `kaizen.tool_span`, `kaizen.repo_snapshot_chunk`, `kaizen.workspace_fact_snapshot`); Datadog uses the [Logs API v2](https://docs.datadoghq.com/api/latest/logs/) (`POST /api/v2/logs`) with one JSON log per canonical item instead of the Events API. OTLP remains a placeholder with `tracing::debug` of expanded item counts. Primary Kaizen `POST` ingest and outbox JSON shapes for `events` / `tool_spans` / `repo_snapshots` are unchanged; when sync is enabled, workspace skill/rule discovery can enqueue **`workspace_facts`** for the new `/v1/workspace-facts` path. -- **CLI — read paths and telemetry:** `summary`, `insights`, `metrics`, `guidance`, and `retro` accept `--source local|provider|mixed` (default `local`). With `provider` or `mixed`, a background provider pull runs when `[telemetry.query].cache_ttl_seconds` has expired, or when you pass `--refresh` (in addition to transcript rescan where applicable). New `kaizen telemetry` subcommands: `init` (alias of `configure`), `doctor`, `pull --days`, and `print-schema`. MCP tools keep the previous local-only behavior. +- **CLI — read paths and telemetry:** `summary`, `insights`, `metrics`, `guidance`, and `retro` accept `--source local|provider|mixed` (default `local`). With `provider` or `mixed`, a background provider pull runs when `[telemetry.query].cache_ttl_seconds` has expired, or when you pass `--refresh` (in addition to bounded local tail ingest where applicable). New `kaizen telemetry` subcommands: `init` (alias of `configure`), `doctor`, `pull --days`, and `print-schema`. MCP tools keep the previous local-only behavior. - `Cargo.toml` no longer excludes `assets/` so `cargo publish` / docs.rs builds resolve `include_str!` for embedded defaults and the retro skill template. - Release workflow **`update-homebrew-tap`**: `scripts/render-homebrew-tap-formula.sh` + push to @@ -49,6 +49,10 @@ here explicitly. ### Fixed +- The Web dashboard now selects the most recently active valid project, refreshes from SQLite/WAL changes within one second, preserves the selected session, and exposes bounded session details plus tool, attention, and telemetry-coverage insights. +- Hook-backed tool activity appears as named `ToolCall` and `ToolResult` events in Web and TUI views; legacy stored hook rows are normalized at read time. +- Hook ingestion no longer commits and merges the Tantivy index after every event. Search writes use one indexing worker and one merge worker, commit in bounded batches, and flush when a session stops. +- Transcript refreshes cap recent files and growing Claude/Codex JSONL tails, append only unseen events in one transaction, and avoid repeated Git enrichment and full derived-row rebuilds. - `kaizen daemon status` now exits successfully with a stable `status: stopped` line after the daemon has been stopped, instead of surfacing a raw Unix socket connection error. - `kaizen sessions tree ` now prints a non-empty placeholder for existing sessions with no @@ -72,9 +76,9 @@ here explicitly. ### Added -- `kaizen init` now creates `.cursor/hooks.json` and `.claude/settings.json` from scratch when absent (in addition to patching them when present), so a single command is enough to instrument a fresh workspace for both Cursor and Claude Code. +- `kaizen init` now creates or patches `~/.cursor/hooks.json` and `~/.claude/settings.json`, so one user-level setup covers every workspace without writing to the target repository. - Local retention: `[retention].hot_days` (default 30) prunes old sessions from SQLite after rescans (throttled to once per 24h); `hot_days = 0` disables auto-prune. `kaizen gc` with optional `--days` and `--vacuum`. -- `[scan].min_rescan_seconds` (default 300) skips full transcript rescans; `--refresh` / `-r` on `sessions list`, `summary`, `insights`, `metrics`, and `retro` forces a rescan. MCP tools accept `refresh=true` for the same behavior. +- `[scan].min_rescan_seconds` (default 300) throttles bounded transcript-tail ingestion; `--refresh` / `-r` on `sessions list`, `summary`, `insights`, `metrics`, and `retro` ingests recently changed tails. MCP tools accept `refresh=true` for the same behavior. - Composite index `sessions(workspace, started_at_ms)` for faster session listing. - `docs/telemetry-journey.md` — end-to-end “session → data” learning path; README and `docs/` index point to it. Root `README` clarifies that long-form docs live in the GitHub `docs/` diff --git a/Cargo.lock b/Cargo.lock index 72cefe9..b0bf8f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1869,6 +1869,7 @@ dependencies = [ "indicatif", "itf", "libc", + "memchr", "notify", "proptest", "quint-connect", diff --git a/Cargo.toml b/Cargo.toml index 2516c39..f0750e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -354,6 +354,7 @@ clap_complete = "4" indicatif = "0.18" arboard = "3" anyhow = "1" +memchr = "2" tempfile = "3" tar = "0.4" rusqlite = { version = "0.40", features = ["bundled"] } diff --git a/docs/config.md b/docs/config.md index ea7ba6f..b92f7ca 100644 --- a/docs/config.md +++ b/docs/config.md @@ -50,7 +50,7 @@ When you pass **`--all-workspaces`** (or MCP `all_workspaces: true`), Kaizen loa | Key | Default | Purpose | |-----|---------|--------| | `roots` | `["~/.cursor/projects"]` | Transcript index roots (Cursor projects layout) | -| `min_rescan_seconds` | `300` | Minimum seconds between full transcript rescans when a command is already in refresh mode (`--refresh` on the CLI or `refresh=true` over MCP). The daemon uses the same value for its workspace scanner loop after `kaizen init`. | +| `min_rescan_seconds` | `300` | Minimum seconds between bounded incremental transcript scans in refresh mode (`--refresh` on the CLI or `refresh=true` over MCP). The daemon uses the same value after `kaizen init`; Claude, Codex, and Cursor discovery caps each source at 32 recent transcripts, while Claude and Codex growing JSONL reads are capped at 256 KiB. | ## `[retention]` diff --git a/docs/experiments.md b/docs/experiments.md index 197c332..8b7bf8a 100644 --- a/docs/experiments.md +++ b/docs/experiments.md @@ -125,7 +125,7 @@ kaizen exp list kaizen exp status kaizen exp tag --variant treatment # manual override kaizen exp report # markdown + bootstrap CI + sequential decision -kaizen exp report --refresh # optional: full transcript rescan first; can take a while +kaizen exp report --refresh # optional: ingest changed transcript tails first kaizen exp conclude # Running → Concluded kaizen exp archive # Concluded → Archived ``` diff --git a/docs/mcp.md b/docs/mcp.md index 79cd7e4..08563d1 100644 --- a/docs/mcp.md +++ b/docs/mcp.md @@ -120,7 +120,7 @@ Set any value to `false` to skip that agent’s local scan (useful if a VS Code |------|----------------|--------| | `kaizen_capabilities` | (no CLI; static text) | Read first: which tool to use for cost rollups vs repo metrics, sessions, retro, etc. | | `kaizen_ingest_hook` | `kaizen ingest hook` | Pass hook JSON in `payload` (not stdin). `source`: `cursor` or `claude`. | -| `kaizen_sessions_list` | `kaizen sessions list` | Optional `json: true`, `refresh: true` (full transcript rescan; matches `--refresh`), `all_workspaces: true`, `limit` (cap rows, newest first). | +| `kaizen_sessions_list` | `kaizen sessions list` | Optional `json: true`, `refresh: true` (bounded changed-tail ingest; matches `--refresh`), `all_workspaces: true`, `limit` (cap rows, newest first). | | `kaizen_session_show` | `kaizen sessions show` | `id` + optional `workspace`. | | `kaizen_search_sessions` | `kaizen search` | Structured BM25 event search. Args: `query`, optional `since`, `agent`, `kind`, `limit`, `workspace`. `kaizen sessions search` remains a compatible CLI alias. Returns `hits[]` with session id, seq, ts, score, snippet, paths, skills, and `tokens_total`. | | `kaizen_query` | `kaizen query` | Structured trace query. | @@ -143,7 +143,7 @@ Set any value to `false` to skip that agent’s local scan (useful if a VS Code | `kaizen_exp_list` | `kaizen exp list` | | | `kaizen_exp_status` | `kaizen exp status` | | | `kaizen_exp_tag` | `kaizen exp tag` | | -| `kaizen_exp_report` | `kaizen exp report` | `json` and optional `refresh: true` (full rescan before report; matches CLI `--refresh`). Includes `sequential_decision` and `srm_warning`. | +| `kaizen_exp_report` | `kaizen exp report` | `json` and optional `refresh: true` (bounded changed-tail ingest before report; matches CLI `--refresh`). Includes `sequential_decision` and `srm_warning`. | | `kaizen_exp_conclude` | `kaizen exp conclude` | Running → Concluded. | | `kaizen_exp_archive` | `kaizen exp archive` | Concluded → Archived. | | `kaizen_retro` | `kaizen retro` | `json`, `refresh`, etc. Set `json: true` for the same `Report` JSON as `kaizen retro --json`. | @@ -152,7 +152,7 @@ Set any value to `false` to skip that agent’s local scan (useful if a VS Code - **Workspace**: most tools accept optional `workspace` (string path) or `project` (short project name — resolved from existing rows shown by `kaizen projects`; mutually exclusive with `workspace`). If neither is given, the server uses the process current directory, matching CLI defaults. Use `kaizen projects --include-missing` to inspect stale registry rows; MCP workspace reads ignore them. - **Data source**: `kaizen_summary`, `kaizen_insights`, `kaizen_metrics`, and `kaizen_retro` use the local DB only (`DataSource::Local`), matching CLI default `--source local`. The MCP server does not expose CLI `--source` switches; use the CLI if you need another source. -- **Rescan**: list/summary/insights/metrics/retro stay on the cached local DB unless you pass `refresh: true` (same as CLI `--refresh`). `kaizen_exp_report` defaults to cache-first as well; set `refresh: true` to force a full transcript rescan before computing the report. +- **Refresh**: list/summary/insights/metrics/retro stay on the cached local DB unless you pass `refresh: true` (same as CLI `--refresh`). `kaizen_exp_report` defaults to cache-first as well; refresh ingests only bounded, recently changed transcript tails. - **Aggregation**: `kaizen_sessions_list`, `kaizen_summary`, `kaizen_insights`, and `kaizen_metrics` accept `all_workspaces: true`. Kaizen opens each existing registered workspace DB separately and merges the results in memory. - **Blocking work** is run on a blocking thread pool so the async MCP runtime is not starved; long `retro` or metrics runs may take time. - **Version** in the MCP `initialize` response is the built-in string configured for the server (keep in sync with releases when using strict client checks). diff --git a/docs/retro.md b/docs/retro.md index 2c4f02a..71fae6e 100644 --- a/docs/retro.md +++ b/docs/retro.md @@ -3,7 +3,7 @@ `kaizen retro` reads recent sessions and cached local repo facts, then produces a ranked Markdown report of changes that may make agents cheaper, faster, or more accurate in this codebase. Default runs are cache-first. Use `--refresh` -when the local store may be stale; it rescans agent transcripts first and can +when the local store may be stale; it ingests bounded changed tails first and can take a while on large workspaces. The engine is deterministic and pure: diff --git a/docs/tui.md b/docs/tui.md index 8d848e7..a61c206 100644 --- a/docs/tui.md +++ b/docs/tui.md @@ -20,6 +20,8 @@ kaizen tui --workspace /path/to/project The TUI requires an interactive terminal. It watches the SQLite WAL and coalesces refreshes, so active sessions update without a busy polling loop. +Hook-backed `PreToolUse` and `PostToolUse` rows appear as named tool calls and +results; lifecycle hooks remain lifecycle events. ## Keys @@ -54,4 +56,3 @@ help view instead of terminating the interface. | Wrong repository appears | Start with `--workspace /path/to/project`. | | Navigation seems stuck | Press `Esc` to close overlays, then `Tab` to select the intended pane. | | Terminal display is damaged after a crash | Run `reset`, then restart `kaizen tui`. | - diff --git a/docs/tutorial/02-observe.md b/docs/tutorial/02-observe.md index 1cc1b83..c312bff 100644 --- a/docs/tutorial/02-observe.md +++ b/docs/tutorial/02-observe.md @@ -22,7 +22,7 @@ kaizen sessions list --json kaizen sessions list --refresh ``` -**`--json`** is for scripts and MCP-shaped tooling. **`--refresh`** forces a full transcript rescan (subject to min interval). +**`--json`** is for scripts and MCP-shaped tooling. **`--refresh`** ingests bounded, recently changed transcript tails (subject to the minimum interval). ## Show one session @@ -43,7 +43,7 @@ The JSON shape includes rollups by agent and model, total cost, and when availab ## Data source: local, provider, or mixed -Most of this tutorial assumes the default **`--source local`**: numbers come from the workspace’s local SQLite store (and the usual transcript rescan rules with `--refresh`). +Most of this tutorial assumes the default **`--source local`**: numbers come from the workspace’s local SQLite store (and the bounded changed-tail rules used by `--refresh`). When you have **[sync]** identity in config (`team_id`, `team_salt_hex`, …) *and* a **[telemetry.query](https://github.com/marquesds/kaizen/blob/main/docs/config.md#telemetryquery) provider** (PostHog or Datadog) configured, you can ask read commands to fold in **provider-pulled events** cached under `remote_events` in the same DB: diff --git a/docs/tutorial/06-experiments.md b/docs/tutorial/06-experiments.md index 312e10b..6e19de1 100644 --- a/docs/tutorial/06-experiments.md +++ b/docs/tutorial/06-experiments.md @@ -23,7 +23,7 @@ kaizen exp status kaizen exp tag --session --variant treatment kaizen exp report kaizen exp report --json -kaizen exp report --refresh # full transcript rescan before report if the store may be stale +kaizen exp report --refresh # ingest changed transcript tails if the store may be stale kaizen exp conclude ``` diff --git a/docs/usage-observe.md b/docs/usage-observe.md index dd3c967..1acd515 100644 --- a/docs/usage-observe.md +++ b/docs/usage-observe.md @@ -2,9 +2,9 @@ [Back to CLI index](usage.md). -These commands are cache-first. Pass `--refresh` when they should rescan local -transcripts before rendering. With `--source provider|mixed`, refresh can also -refresh a configured remote provider cache. +These commands are cache-first. Pass `--refresh` to ingest recently changed, +bounded local transcript tails before rendering. With +`--source provider|mixed`, refresh can also refresh a configured remote cache. ## `kaizen sessions` @@ -33,7 +33,10 @@ Text output shows a placeholder when no spans exist; JSON returns `[]`. `search` uses the rebuildable Tantivy index at `~/.kaizen/projects//search/`. It indexes redacted event text. Payload -bodies remain in SQLite and are not copied into the index. Rebuild with: +bodies remain in SQLite and are not copied into the index. Daemon-backed hook +sessions commit search batches at 256 documents, on the first event after a +60-second batch window, or immediately when the session stops. SQLite session +and event views remain live before that secondary-index commit. Rebuild with: ```bash kaizen search reindex diff --git a/docs/usage-setup.md b/docs/usage-setup.md index fc20168..129de6c 100644 --- a/docs/usage-setup.md +++ b/docs/usage-setup.md @@ -19,7 +19,7 @@ Every command resolves a workspace through one of three mechanisms: opens each registered project database separately and merges results. See [machine-local registry](config.md#machine-local-registry). -After a full transcript rescan, Kaizen may delete sessions older than +After a transcript refresh, Kaizen may delete sessions older than `[retention].hot_days`, at most once per 24 hours. Set `hot_days = 0` to disable automatic pruning; use `kaizen gc` for explicit pruning. @@ -93,8 +93,11 @@ kaizen sessions load --json ``` Use `load` after installing or upgrading when existing sessions should appear -in reports. Use `sessions list --refresh` when one read command should rescan -before rendering. +in reports. Claude, Codex, and Cursor imports consider at most 32 recent +transcripts per source. Claude and Codex decode at most the latest 256 KiB of a +growing JSONL file. `load` also enriches imported sessions with Git binding +metadata. Use `sessions list --refresh` to ingest recently changed tails before +one read. ## `kaizen outcomes` diff --git a/docs/usage.md b/docs/usage.md index 22d229d..74efd66 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -12,8 +12,8 @@ Use `--no-daemon` or `KAIZEN_DAEMON=0` for direct SQLite mode. Both modes use the same project database. See [daemon.md](daemon.md). Cache-first reads use the local SQLite database without rescanning agent -transcripts. Pass `--refresh` when a command should ingest new transcript data -before rendering. +transcripts. Pass `--refresh` to ingest recently changed transcript tails before +rendering. Refresh work is bounded; it does not replay all historical files. ## Reference diff --git a/docs/web.md b/docs/web.md index aef7f7f..c74cb3a 100644 --- a/docs/web.md +++ b/docs/web.md @@ -23,16 +23,19 @@ kaizen open --no-browser The dashboard provides: -- project selection, including a manual local path; +- automatic selection of the most recently active valid project, plus a manual + local-path fallback; - session, active-session, error, and cost totals; +- project-level tool, attention, and telemetry-coverage insights; - the latest 30 sessions for the selected project; - selected-session facts, recent events, nested tool spans, touched files, and top tools; - the exact bounded report under **Developer details**. Selected-session detail is capped at 40 events, 40 spans, and 40 files. Those -limits keep refresh latency and memory use predictable. The page refreshes every -20 seconds while connected; **Refresh now** requests an immediate snapshot. +limits keep refresh latency and memory use predictable. The server watches the +selected project's SQLite database and WAL; a committed change requests a new +snapshot within one second. **Refresh now** remains available for manual checks. Web is an Observe-only surface. It cannot mutate experiments, guidance, sync, configuration, or local data. Use the CLI or MCP for those workflows. @@ -55,6 +58,5 @@ for protocol and runtime-file details. | Browser did not open | Run `kaizen open --no-browser` and open the printed URL. | | Page says connection failed | Run `kaizen daemon status`; restart with `kaizen daemon stop` followed by `kaizen open`. | | URL has no valid token | Run `kaizen open --no-browser` again instead of editing the URL. | -| Expected project is missing | Open its path manually or run `kaizen sessions list --refresh` from that repository. | +| Expected project is missing | Run one Kaizen command from that repository to register it, then reload. Unsafe roots containing `KAIZEN_HOME` and missing paths are ignored. | | Default port is busy | Use the URL Kaizen prints; the daemon automatically chooses another loopback port. | - diff --git a/src/collect/hooks/normalize.rs b/src/collect/hooks/normalize.rs index 562c567..878dfa3 100644 --- a/src/collect/hooks/normalize.rs +++ b/src/collect/hooks/normalize.rs @@ -17,9 +17,9 @@ pub fn hook_to_event(h: &HookEvent, seq: u64) -> Event { seq, ts_ms: h.ts_ms, ts_exact: true, - kind: lifecycle.map_or(EventKind::Hook, |_| EventKind::Lifecycle), + kind: core_event_kind(&h.kind), source: EventSource::Hook, - tool: None, + tool: hook_tool(&h.payload), tool_call_id: hook_tool_id(&h.payload), tokens_in: u32_field(&h.payload, "input_tokens"), tokens_out: u32_field(&h.payload, "output_tokens"), @@ -40,6 +40,22 @@ pub fn hook_to_event(h: &HookEvent, seq: u64) -> Event { } } +fn core_event_kind(kind: &HookKind) -> EventKind { + match kind { + HookKind::PreToolUse => EventKind::ToolCall, + HookKind::PostToolUse => EventKind::ToolResult, + HookKind::Unknown(_) => EventKind::Hook, + _ => EventKind::Lifecycle, + } +} + +fn hook_tool(payload: &serde_json::Value) -> Option { + ["tool_name", "tool"] + .iter() + .find_map(|key| payload.get(key).and_then(|value| value.as_str())) + .map(ToOwned::to_owned) +} + fn hook_tool_id(payload: &serde_json::Value) -> Option { ["tool_call_id", "tool_use_id", "call_id", "id"] .iter() @@ -49,6 +65,8 @@ fn hook_tool_id(payload: &serde_json::Value) -> Option { fn lifecycle_type(kind: &HookKind) -> Option<&'static str> { match kind { + HookKind::SessionStart => Some("session_start"), + HookKind::Stop => Some("session_stop"), HookKind::PermissionRequest => Some("permission_request"), HookKind::UserPromptSubmit => Some("user_prompt_submit"), HookKind::Notification => Some("notification"), @@ -103,90 +121,5 @@ pub fn hook_to_status(kind: &HookKind) -> Option { } #[cfg(test)] -mod tests { - use super::*; - use crate::collect::hooks::HookEvent; - use serde_json::json; - - fn make_event(kind: HookKind) -> HookEvent { - HookEvent { - kind, - session_id: "s1".to_string(), - ts_ms: 1000, - payload: json!({}), - } - } - - #[test] - fn session_start_maps_running() { - assert_eq!( - hook_to_status(&HookKind::SessionStart), - Some(SessionStatus::Running) - ); - } - - #[test] - fn pre_tool_use_maps_waiting() { - assert_eq!( - hook_to_status(&HookKind::PreToolUse), - Some(SessionStatus::Waiting) - ); - } - - #[test] - fn post_tool_use_maps_running() { - assert_eq!( - hook_to_status(&HookKind::PostToolUse), - Some(SessionStatus::Running) - ); - } - - #[test] - fn stop_maps_done() { - assert_eq!(hook_to_status(&HookKind::Stop), Some(SessionStatus::Done)); - } - - #[test] - fn unknown_maps_none() { - assert_eq!(hook_to_status(&HookKind::Unknown("x".to_string())), None); - } - - #[test] - fn hook_to_event_kind_is_hook() { - let h = make_event(HookKind::Stop); - let ev = hook_to_event(&h, 5); - assert_eq!(ev.kind, EventKind::Hook); - assert_eq!(ev.seq, 5); - assert_eq!(ev.session_id, "s1"); - } - - #[test] - fn hook_to_event_maps_total_cost_usd_to_microdollars() { - let h = HookEvent { - kind: HookKind::Stop, - session_id: "s1".to_string(), - ts_ms: 1000, - payload: json!({ "total_cost_usd": 0.042 }), - }; - let ev = hook_to_event(&h, 0); - assert_eq!(ev.cost_usd_e6, Some(42_000)); - } - - #[test] - fn permission_request_maps_lifecycle_wait() { - let h = HookEvent { - kind: HookKind::PermissionRequest, - session_id: "s1".to_string(), - ts_ms: 1000, - payload: json!({"permission_wait_ms": 250}), - }; - let ev = hook_to_event(&h, 0); - assert_eq!(ev.kind, EventKind::Lifecycle); - assert_eq!(ev.latency_ms, Some(250)); - assert_eq!(ev.payload["type"], "permission_request"); - assert_eq!( - hook_to_status(&HookKind::PermissionRequest), - Some(SessionStatus::Waiting) - ); - } -} +#[path = "normalize_tests.rs"] +mod tests; diff --git a/src/collect/hooks/normalize_tests.rs b/src/collect/hooks/normalize_tests.rs new file mode 100644 index 0000000..99605cc --- /dev/null +++ b/src/collect/hooks/normalize_tests.rs @@ -0,0 +1,93 @@ +use super::*; +use crate::collect::hooks::HookEvent; +use serde_json::json; + +fn make_event(kind: HookKind) -> HookEvent { + HookEvent { + kind, + session_id: "s1".to_string(), + ts_ms: 1000, + payload: json!({}), + } +} + +#[test] +fn session_start_maps_running() { + assert_eq!( + hook_to_status(&HookKind::SessionStart), + Some(SessionStatus::Running) + ); +} + +#[test] +fn pre_tool_use_maps_waiting() { + assert_eq!( + hook_to_status(&HookKind::PreToolUse), + Some(SessionStatus::Waiting) + ); +} + +#[test] +fn post_tool_use_maps_running() { + assert_eq!( + hook_to_status(&HookKind::PostToolUse), + Some(SessionStatus::Running) + ); +} + +#[test] +fn stop_maps_done() { + assert_eq!(hook_to_status(&HookKind::Stop), Some(SessionStatus::Done)); +} + +#[test] +fn unknown_maps_none() { + assert_eq!(hook_to_status(&HookKind::Unknown("x".to_string())), None); +} + +#[test] +fn stop_maps_to_lifecycle_event() { + let ev = hook_to_event(&make_event(HookKind::Stop), 5); + assert_eq!((ev.kind, ev.seq), (EventKind::Lifecycle, 5)); + assert_eq!(ev.payload["type"], "session_stop"); +} + +#[test] +fn pre_tool_hook_maps_to_named_tool_call() { + let ev = named_tool_event(HookKind::PreToolUse, 0); + assert_eq!(ev.kind, EventKind::ToolCall); + assert_eq!(ev.tool.as_deref(), Some("Read")); +} + +#[test] +fn post_tool_hook_maps_to_named_tool_result() { + let ev = named_tool_event(HookKind::PostToolUse, 1); + assert_eq!(ev.kind, EventKind::ToolResult); + assert_eq!(ev.tool.as_deref(), Some("Read")); +} + +fn named_tool_event(kind: HookKind, seq: u64) -> Event { + let mut hook = make_event(kind); + hook.payload = json!({"tool_name":"Read","tool_use_id":"call-1"}); + hook_to_event(&hook, seq) +} + +#[test] +fn hook_to_event_maps_total_cost_usd_to_microdollars() { + let mut hook = make_event(HookKind::Stop); + hook.payload = json!({"total_cost_usd":0.042}); + assert_eq!(hook_to_event(&hook, 0).cost_usd_e6, Some(42_000)); +} + +#[test] +fn permission_request_maps_lifecycle_wait() { + let mut hook = make_event(HookKind::PermissionRequest); + hook.payload = json!({"permission_wait_ms":250}); + let event = hook_to_event(&hook, 0); + assert_eq!( + (event.kind, event.latency_ms), + (EventKind::Lifecycle, Some(250)) + ); + assert_eq!(event.payload["type"], "permission_request"); + assert_eq!(hook_to_status(&hook.kind), Some(SessionStatus::Waiting)); +} diff --git a/src/collect/tail/budget_tests.rs b/src/collect/tail/budget_tests.rs new file mode 100644 index 0000000..bf50f47 --- /dev/null +++ b/src/collect/tail/budget_tests.rs @@ -0,0 +1,86 @@ +use super::MAX_RECENT_TRANSCRIPTS; +use serde_json::json; +use std::fs::{FileTimes, OpenOptions}; +use std::path::Path; +use std::time::{Duration, UNIX_EPOCH}; + +#[test] +fn claude_scan_caps_recent_transcripts() { + let temp = tempfile::tempdir().unwrap(); + let workspace = temp.path().join("repo"); + let project = temp.path().join("claude"); + std::fs::create_dir_all(&workspace).unwrap(); + std::fs::create_dir_all(&project).unwrap(); + write_claude_rows(&project, &workspace, 40); + + let rows = super::claude_code::scan_claude_project_dir(&project, &workspace).unwrap(); + + assert_eq!(rows.len(), MAX_RECENT_TRANSCRIPTS); +} + +#[test] +fn codex_scan_caps_recent_transcripts() { + let temp = tempfile::tempdir().unwrap(); + let workspace = temp.path().join("repo"); + let root = temp.path().join("codex/2026/06/18"); + std::fs::create_dir_all(&workspace).unwrap(); + std::fs::create_dir_all(&root).unwrap(); + write_codex_rows(&root, &workspace, 40); + + let rows = super::codex_desktop::scan_codex_sessions_root(&root, &workspace).unwrap(); + + assert_eq!(rows.len(), MAX_RECENT_TRANSCRIPTS); +} + +#[test] +fn recent_path_budget_excludes_unchanged_files() { + let temp = tempfile::tempdir().unwrap(); + let stale = temp.path().join("stale.jsonl"); + let recent = temp.path().join("recent.jsonl"); + std::fs::write(&stale, "{}").unwrap(); + std::fs::write(&recent, "{}").unwrap(); + set_modified(&stale, 1); + set_modified(&recent, 3); + + let rows = super::newest_paths_since(vec![stale, recent.clone()], 2_000); + + assert_eq!(rows, vec![recent]); +} + +#[test] +fn transcript_reader_excludes_large_old_prefix() { + let temp = tempfile::tempdir().unwrap(); + let transcript = temp.path().join("large.jsonl"); + std::fs::write(&transcript, "{}\n".repeat(800_000)).unwrap(); + + let (first_seq, content) = super::read_recent_jsonl(&transcript).unwrap(); + + assert!(first_seq > 0); + assert!(content.len() <= super::MAX_TRANSCRIPT_READ_BYTES as usize); +} + +fn write_claude_rows(dir: &Path, workspace: &Path, count: usize) { + (0..count).for_each(|index| { + let row = json!({ + "type":"user", "timestamp":"2026-06-18T00:00:00Z", + "cwd":workspace, "sessionId":format!("claude-{index:02}") + }); + std::fs::write(dir.join(format!("{index:02}.jsonl")), row.to_string()).unwrap(); + }); +} + +fn write_codex_rows(dir: &Path, workspace: &Path, count: usize) { + (0..count).for_each(|index| { + let row = json!({ + "type":"session_meta", "timestamp":"2026-06-18T00:00:00Z", + "payload":{"id":format!("codex-{index:02}"), "cwd":workspace} + }); + std::fs::write(dir.join(format!("{index:02}.jsonl")), row.to_string()).unwrap(); + }); +} + +fn set_modified(path: &Path, seconds: u64) { + let file = OpenOptions::new().write(true).open(path).unwrap(); + let times = FileTimes::new().set_modified(UNIX_EPOCH + Duration::from_secs(seconds)); + file.set_times(times).unwrap(); +} diff --git a/src/collect/tail/claude_code.rs b/src/collect/tail/claude_code.rs index 0cb3083..b228674 100644 --- a/src/collect/tail/claude_code.rs +++ b/src/collect/tail/claude_code.rs @@ -20,20 +20,28 @@ struct Meta { pub fn scan_claude_project_dir( project_dir: &Path, workspace: &Path, +) -> Result)>> { + scan_claude_project_dir_since(project_dir, workspace, 0) +} + +pub(crate) fn scan_claude_project_dir_since( + project_dir: &Path, + workspace: &Path, + since_ms: u64, ) -> Result)>> { if !project_dir.exists() { return Ok(Vec::new()); } let target = crate::core::paths::canonical(workspace); let mut out = Vec::new(); - for file in top_level_jsonl(project_dir)? { + for file in top_level_jsonl(project_dir, since_ms)? { push_if_target( &mut out, scan_claude_session_file(&file, None, Some(workspace))?, &target, ); } - for (file, parent) in subagent_jsonl(project_dir)? { + for (file, parent) in subagent_jsonl(project_dir, since_ms)? { push_if_target( &mut out, scan_claude_session_file(&file, Some(parent), Some(workspace))?, @@ -48,19 +56,25 @@ pub fn scan_claude_session_file( parent_session_id: Option, workspace_fallback: Option<&Path>, ) -> Result<(SessionRecord, Vec)> { - let content = std::fs::read_to_string(path) - .with_context(|| format!("read claude file: {}", path.display()))?; - let mut meta = content.lines().fold(Meta::default(), read_meta); + let first = super::read_first_jsonl_line(path) + .with_context(|| format!("read claude metadata: {}", path.display()))?; + let (first_seq, content) = super::read_recent_jsonl(path) + .with_context(|| format!("read claude tail: {}", path.display()))?; + let mut meta = std::iter::once(first.as_str()) + .chain(content.lines()) + .fold(Meta::default(), read_meta); if meta.workspace.is_none() { meta.workspace = workspace_fallback.map(|p| p.to_string_lossy().to_string()); } - let id = meta.id.clone().unwrap_or_else(|| file_stem(path)); - let base = meta.started_ms.unwrap_or_else(|| file_mtime_ms(path)); + let id = meta.id.clone().unwrap_or_else(|| super::file_stem(path)); + let base = meta + .started_ms + .unwrap_or_else(|| super::file_mtime_ms(path)); let events = content .lines() .enumerate() .filter_map(|(i, line)| { - crate::collect::tail::claude::parse_claude_line(&id, i as u64, base, line) + crate::collect::tail::claude::parse_claude_line(&id, first_seq + i as u64, base, line) .ok() .flatten() }) @@ -68,16 +82,15 @@ pub fn scan_claude_session_file( Ok((record(path, id, meta, base, parent_session_id), events)) } -fn top_level_jsonl(project_dir: &Path) -> Result> { - let mut out = std::fs::read_dir(project_dir)? +fn top_level_jsonl(project_dir: &Path, since_ms: u64) -> Result> { + let out = std::fs::read_dir(project_dir)? .filter_map(|e| e.ok().map(|e| e.path())) .filter(|p| p.is_file() && p.extension().and_then(|x| x.to_str()) == Some("jsonl")) .collect::>(); - out.sort(); - Ok(out) + Ok(super::newest_paths_since(out, since_ms)) } -fn subagent_jsonl(project_dir: &Path) -> Result> { +fn subagent_jsonl(project_dir: &Path, since_ms: u64) -> Result> { let mut out = Vec::new(); for entry in std::fs::read_dir(project_dir)? { let path = entry?.path(); @@ -86,8 +99,7 @@ fn subagent_jsonl(project_dir: &Path) -> Result> { }; collect_subagents(&path.join("subagents"), &parent, &mut out)?; } - out.sort(); - Ok(out) + Ok(super::newest_by_path_since(out, |(path, _)| path, since_ms)) } fn collect_subagents(dir: &Path, parent: &str, out: &mut Vec<(PathBuf, String)>) -> Result<()> { @@ -176,22 +188,3 @@ fn line_ts(obj: &serde_json::Map) -> Option { fn text(v: &Value, key: &str) -> Option { v.get(key).and_then(Value::as_str).map(ToOwned::to_owned) } - -fn file_stem(path: &Path) -> String { - path.file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("") - .to_string() -} - -fn file_mtime_ms(path: &Path) -> u64 { - path.metadata() - .ok() - .and_then(|m| m.modified().ok()) - .map(|t| { - t.duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64 - }) - .unwrap_or(0) -} diff --git a/src/collect/tail/codex_desktop.rs b/src/collect/tail/codex_desktop.rs index f930028..784d886 100644 --- a/src/collect/tail/codex_desktop.rs +++ b/src/collect/tail/codex_desktop.rs @@ -21,6 +21,14 @@ struct Meta { pub fn scan_codex_sessions_root( root: &Path, workspace: &Path, +) -> Result)>> { + scan_codex_sessions_root_since(root, workspace, 0) +} + +pub(crate) fn scan_codex_sessions_root_since( + root: &Path, + workspace: &Path, + since_ms: u64, ) -> Result)>> { if !root.exists() { return Ok(Vec::new()); @@ -28,8 +36,7 @@ pub fn scan_codex_sessions_root( let target = crate::core::paths::canonical(workspace); let mut paths = Vec::new(); collect_jsonl(root, &mut paths)?; - paths.sort(); - Ok(paths + Ok(super::newest_paths_since(paths, since_ms) .into_iter() .filter_map(|p| scan_codex_session_file(&p).ok()) .filter(|(r, _)| workspace_matches(&r.workspace, &target)) @@ -37,15 +44,23 @@ pub fn scan_codex_sessions_root( } pub fn scan_codex_session_file(path: &Path) -> Result<(SessionRecord, Vec)> { - let content = std::fs::read_to_string(path) - .with_context(|| format!("read codex file: {}", path.display()))?; - let meta = content.lines().fold(Meta::default(), read_meta); - let id = meta.id.clone().unwrap_or_else(|| file_stem(path)); - let base = meta.started_ms.unwrap_or_else(|| file_mtime_ms(path)); + let first = super::read_first_jsonl_line(path) + .with_context(|| format!("read codex metadata: {}", path.display()))?; + let (first_seq, content) = super::read_recent_jsonl(path) + .with_context(|| format!("read codex tail: {}", path.display()))?; + let meta = std::iter::once(first.as_str()) + .chain(content.lines()) + .fold(Meta::default(), read_meta); + let id = meta.id.clone().unwrap_or_else(|| super::file_stem(path)); + let base = meta + .started_ms + .unwrap_or_else(|| super::file_mtime_ms(path)); let events = content .lines() .enumerate() - .filter_map(|(i, line)| parse_modern_line(&id, i as u64, base, meta.model.as_deref(), line)) + .filter_map(|(i, line)| { + parse_modern_line(&id, first_seq + i as u64, base, meta.model.as_deref(), line) + }) .collect(); Ok((record(path, id, meta, base), events)) } @@ -137,22 +152,3 @@ fn workspace_matches(found: &str, target: &Path) -> bool { fn text(v: &Value, key: &str) -> Option { v.get(key).and_then(Value::as_str).map(ToOwned::to_owned) } - -fn file_stem(path: &Path) -> String { - path.file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("") - .to_string() -} - -fn file_mtime_ms(path: &Path) -> u64 { - path.metadata() - .ok() - .and_then(|m| m.modified().ok()) - .map(|t| { - t.duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64 - }) - .unwrap_or(0) -} diff --git a/src/collect/tail/mod.rs b/src/collect/tail/mod.rs index 7c552fb..b59b13f 100644 --- a/src/collect/tail/mod.rs +++ b/src/collect/tail/mod.rs @@ -22,7 +22,108 @@ pub mod opencode; pub mod pi; pub mod vibe; -use std::path::Path; +pub(crate) const MAX_RECENT_TRANSCRIPTS: usize = 32; + +#[cfg(test)] +mod budget_tests; + +use std::cmp::Reverse; +use std::fs::File; +use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; +use std::path::{Path, PathBuf}; + +pub(crate) const MAX_TRANSCRIPT_READ_BYTES: u64 = 256 * 1024; + +pub(crate) fn newest_paths(paths: Vec) -> Vec { + newest_by_path(paths, PathBuf::as_path) +} + +pub(crate) fn newest_paths_since(paths: Vec, since_ms: u64) -> Vec { + newest_by_path_since(paths, PathBuf::as_path, since_ms) +} + +pub(crate) fn newest_by_path(mut values: Vec, path: impl Fn(&T) -> &Path) -> Vec { + values.sort_by_key(|value| Reverse(modified_ms(path(value)))); + values.truncate(MAX_RECENT_TRANSCRIPTS); + values +} + +pub(crate) fn newest_by_path_since(values: Vec, path: F, since_ms: u64) -> Vec +where + F: Fn(&T) -> &Path + Copy, +{ + let recent = values + .into_iter() + .filter(|value| modified_ms(path(value)) >= u128::from(since_ms)) + .collect(); + newest_by_path(recent, path) +} + +fn modified_ms(path: &Path) -> u128 { + path.metadata() + .and_then(|metadata| metadata.modified()) + .ok() + .and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok()) + .map_or(0, |duration| duration.as_millis()) +} + +pub(crate) fn file_mtime_ms(path: &Path) -> u64 { + u64::try_from(modified_ms(path)).unwrap_or(u64::MAX) +} + +pub(crate) fn file_stem(path: &Path) -> String { + path.file_stem() + .and_then(|stem| stem.to_str()) + .unwrap_or_default() + .to_string() +} + +pub(crate) fn read_first_jsonl_line(path: &Path) -> std::io::Result { + let mut line = String::new(); + BufReader::new(File::open(path)?).read_line(&mut line)?; + Ok(line) +} + +pub(crate) fn read_recent_jsonl(path: &Path) -> std::io::Result<(u64, String)> { + let file = File::open(path)?; + let start = file + .metadata()? + .len() + .saturating_sub(MAX_TRANSCRIPT_READ_BYTES); + let mut reader = BufReader::new(file); + let mut first_seq = count_newlines(&mut reader, start)?; + reader.seek(SeekFrom::Start(start))?; + first_seq += discard_partial_line(&mut reader, start)?; + let mut content = String::new(); + reader.read_to_string(&mut content)?; + Ok((first_seq, content)) +} + +fn count_newlines(reader: &mut BufReader, end: u64) -> std::io::Result { + reader.seek(SeekFrom::Start(0))?; + let mut remaining = end; + let mut count = 0; + let mut buffer = [0_u8; 64 * 1024]; + while remaining > 0 { + let limit = usize::try_from(remaining.min(buffer.len() as u64)).unwrap_or(buffer.len()); + let read = reader.read(&mut buffer[..limit])?; + if read == 0 { + break; + } + count += memchr::memchr_iter(b'\n', &buffer[..read]).count() as u64; + remaining -= read as u64; + } + Ok(count) +} + +fn discard_partial_line(reader: &mut BufReader, start: u64) -> std::io::Result { + if start == 0 { + return Ok(0); + } + let mut ignored = Vec::new(); + reader.read_until(b'\n', &mut ignored)?; + Ok(1) +} /// Earliest mtime (ms) of `.jsonl` files in `dir`. Returns 0 on failure. pub fn dir_mtime_ms(dir: &Path) -> u64 { diff --git a/src/core/event.rs b/src/core/event.rs index 0f683cd..74d679e 100644 --- a/src/core/event.rs +++ b/src/core/event.rs @@ -48,6 +48,44 @@ pub struct Event { pub payload: serde_json::Value, } +impl Event { + pub fn normalize_legacy_hook(mut self) -> Self { + if !self.is_legacy_hook() { + return self; + } + self.kind = legacy_hook_kind(&self.payload).unwrap_or(EventKind::Hook); + self.tool = self.tool.or_else(|| legacy_hook_tool(&self.payload)); + self + } + + fn is_legacy_hook(&self) -> bool { + self.source == EventSource::Hook && self.kind == EventKind::Hook + } +} + +fn legacy_hook_kind(payload: &serde_json::Value) -> Option { + match hook_name(payload)? { + "PreToolUse" | "pre_tool_use" => Some(EventKind::ToolCall), + "PostToolUse" | "post_tool_use" => Some(EventKind::ToolResult), + "SessionStart" | "session_start" | "Stop" | "stop" => Some(EventKind::Lifecycle), + _ => None, + } +} + +fn legacy_hook_tool(payload: &serde_json::Value) -> Option { + ["tool_name", "tool"] + .iter() + .find_map(|key| payload.get(key).and_then(|value| value.as_str())) + .map(ToOwned::to_owned) +} + +fn hook_name(payload: &serde_json::Value) -> Option<&str> { + payload + .get("hook_event_name") + .or_else(|| payload.get("event")) + .and_then(|value| value.as_str()) +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum SessionStatus { Running, diff --git a/src/core/workspace.rs b/src/core/workspace.rs index 486aad1..7d5ba00 100644 --- a/src/core/workspace.rs +++ b/src/core/workspace.rs @@ -54,14 +54,7 @@ pub fn machine_workspaces(seed: Option<&Path>) -> Result> { if let Some(path) = seed.as_ref() { push_unique(&mut roots, path.clone()); } - roots.retain(|p| { - if seed.as_ref() == Some(p) { - return true; - } - p.exists() - && (db_path(p).ok().is_some_and(|d| d.exists()) - || crate::core::machine_registry::is_registered(p)) - }); + roots.retain(|path| seed.as_ref() == Some(path) || usable_registered_workspace(path)); if roots.is_empty() && let Some(path) = seed { @@ -70,6 +63,12 @@ pub fn machine_workspaces(seed: Option<&Path>) -> Result> { Ok(roots) } +fn usable_registered_workspace(path: &Path) -> bool { + path.exists() + && db_path(path) + .is_ok_and(|db| db.exists() || crate::core::machine_registry::is_registered(path)) +} + pub fn db_path(workspace: &Path) -> Result { let path = crate::core::paths::project_data_child(workspace, Path::new("kaizen.db"))?; ["kaizen.db-journal", "kaizen.db-wal", "kaizen.db-shm"] diff --git a/src/daemon/scanner_task.rs b/src/daemon/scanner_task.rs index 82e3e43..59a323c 100644 --- a/src/daemon/scanner_task.rs +++ b/src/daemon/scanner_task.rs @@ -4,11 +4,12 @@ use crate::store::Store; use anyhow::Result; use std::path::{Path, PathBuf}; +use std::sync::{Mutex, OnceLock}; pub(super) async fn scanner_loop(ws: PathBuf) { loop { - scan_once(ws.clone()).await; tokio::time::sleep(scan_interval(&ws)).await; + scan_once(ws.clone()).await; } } @@ -19,6 +20,9 @@ async fn scan_once(ws: PathBuf) { } fn scan_workspace(ws: &Path) -> Result<()> { + let _guard = scan_lock() + .lock() + .unwrap_or_else(|error| error.into_inner()); let cfg = crate::core::config::load(ws)?; let store = Store::open(&crate::core::workspace::db_path(ws)?)?; let ws_str = ws.to_string_lossy().to_string(); @@ -26,6 +30,11 @@ fn scan_workspace(ws: &Path) -> Result<()> { crate::shell::cli::maybe_auto_prune_after_scan(&store, &cfg) } +fn scan_lock() -> &'static Mutex<()> { + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| Mutex::new(())) +} + fn scan_interval(ws: &Path) -> std::time::Duration { let secs = crate::core::config::load(ws) .map(|cfg| cfg.scan.min_rescan_seconds.max(5)) diff --git a/src/daemon/worker.rs b/src/daemon/worker.rs index 7cb9f3d..6388d79 100644 --- a/src/daemon/worker.rs +++ b/src/daemon/worker.rs @@ -80,7 +80,6 @@ fn handle_request(request: DaemonRequest, stores: &mut StoreCache) -> Result Result<()> { + if get(store, &event.session_id)?.is_none() { + upsert_session(store, &event.session_id)?; + return Ok(()); + } + store.conn().execute( + APPLY_EVENT_SQL, + params![ + event.session_id, + i64::from(event.kind == EventKind::ToolCall), + i64::from(event.kind == EventKind::Error), + i64::from(event.tokens_in.unwrap_or(0)), + i64::from(event.tokens_out.unwrap_or(0)), + i64::from(event.reasoning_tokens.unwrap_or(0)), + i64::from(event.cache_read_tokens.unwrap_or(0)), + i64::from(event.cache_creation_tokens.unwrap_or(0)), + event.cost_usd_e6.unwrap_or(0), + event.ts_ms as i64, + now_ms() as i64, + ], + )?; + Ok(()) +} + pub fn rebuild_workspace(store: &Store, workspace: &str) -> Result { store .list_sessions(workspace)? diff --git a/src/extensions/diffs.rs b/src/extensions/diffs.rs index 420aba4..04b71bc 100644 --- a/src/extensions/diffs.rs +++ b/src/extensions/diffs.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use crate::store::Store; +use crate::store::tool_span_index::ToolSpanRecord; use anyhow::Result; use rusqlite::params; use serde::{Deserialize, Serialize}; @@ -15,6 +16,23 @@ pub struct StepDiff { pub raw_patch_stored: bool, } +pub fn upsert_tool_span(store: &Store, span: &ToolSpanRecord) -> Result<()> { + if span.paths.is_empty() { + return Ok(()); + } + insert( + store, + &StepDiff { + session_id: span.session_id.clone(), + span_id: span.span_id.clone(), + files: span.paths.clone(), + added_lines: 0, + removed_lines: 0, + raw_patch_stored: false, + }, + ) +} + pub fn refresh_session( store: &Store, session_id: &str, diff --git a/src/mcp/handler.rs b/src/mcp/handler.rs index 86b63c1..5a49fee 100644 --- a/src/mcp/handler.rs +++ b/src/mcp/handler.rs @@ -19,11 +19,11 @@ const MCP_CAPABILITIES: &str = r#"Kaizen MCP exposes most `kaizen` CLI workflows - kaizen_summary — Session counts, USD cost, by-agent/model, top tools. Use for spend and volume. Optional json=true. - kaizen_metrics — Code hotspots, slow tools (p95), token-heavy tools, churn. Use for **repository** and tool latency. Optional json. -- kaizen_sessions_list / kaizen_session_show — Session list and one session metadata. Optional json on list; optional `limit` caps rows (newest first). `kaizen_exp_report` supports `refresh: true` for a full transcript rescan before computing the report (matches CLI `kaizen exp report --refresh`). +- kaizen_sessions_list / kaizen_session_show — Session list and one session metadata. Optional json on list; optional `limit` caps rows (newest first). `kaizen_exp_report` supports `refresh: true` for bounded changed-tail ingest before computing the report (matches CLI `kaizen exp report --refresh`). - kaizen_search_sessions — BM25 event search over current workspace. Supports since, agent, kind, limit. - kaizen_insights — Activity dashboard (7d). kaizen_retro — weekly bets. kaizen_exp_* — experiments. - kaizen_query / kaizen_cases_* / kaizen_rules_* / kaizen_alerts_check / kaizen_review_* — local trace-to-case automation loop. -- List/summary/insights/metrics/retro are cache-first; set refresh=true to force a full transcript rescan (matches CLI --refresh). +- List/summary/insights/metrics/retro are cache-first; set refresh=true for bounded changed-tail ingest (matches CLI --refresh). - sessions_list/summary/insights/metrics also accept all_workspaces=true to aggregate across registered project DBs. - kaizen_ingest_hook — same as `kaizen ingest hook` (rare; hooks call this). - kaizen_init — idempotent user-level hooks and home project data; target repos stay read-only. kaizen_sync_* — outbox. kaizen_tui — not available (returns JSON stub). @@ -83,7 +83,7 @@ struct WorkspaceJsonArg { /// When true, return the same pretty JSON as `kaizen sessions list --json` or `kaizen summary --json`. #[serde(default)] json: bool, - /// When true, run a full agent transcript rescan (matches `kaizen ... --refresh`). + /// When true, ingest bounded changed transcript tails (matches `kaizen ... --refresh`). #[serde(default)] refresh: bool, /// Cap sessions returned (newest first); only `kaizen_sessions_list` uses this. @@ -148,7 +148,7 @@ struct MetricsArg { json: bool, #[serde(default)] force: bool, - /// When true, run a full agent transcript rescan (matches `kaizen metrics --refresh`). + /// When true, ingest bounded changed transcript tails (matches `kaizen metrics --refresh`). #[serde(default)] refresh: bool, } @@ -190,7 +190,7 @@ struct RetroArg { json: bool, #[serde(default)] force: bool, - /// When true, run a full agent transcript rescan (matches `kaizen retro --refresh`). + /// When true, ingest bounded changed transcript tails (matches `kaizen retro --refresh`). #[serde(default)] refresh: bool, } @@ -201,7 +201,7 @@ struct InsightsArg { ws: WorkspaceArg, #[serde(default)] all_workspaces: bool, - /// When true, run a full agent transcript rescan (matches `kaizen insights --refresh`). + /// When true, ingest bounded changed transcript tails (matches `kaizen insights --refresh`). #[serde(default)] refresh: bool, } @@ -262,7 +262,7 @@ struct ExpReportArg { id: String, #[serde(default)] json: bool, - /// Full transcript rescan before computing the report. + /// Ingest bounded changed transcript tails before computing the report. #[serde(default)] refresh: bool, } @@ -397,7 +397,7 @@ impl KaizenMcp { #[tool( name = "kaizen_sessions_list", - description = "List agent sessions in the workspace. Set json=true for structured output. Optional limit caps rows after sort (newest first). Use refresh=true for a full transcript rescan." + description = "List agent sessions in the workspace. Set json=true for structured output. Optional limit caps rows after sort (newest first). Use refresh=true to ingest bounded, recently changed transcript tails." )] async fn kaizen_sessions_list( &self, @@ -917,7 +917,7 @@ impl KaizenMcp { #[tool( name = "kaizen_exp_report", - description = "Experiment report (kaizen exp report). Optional refresh: true forces a full transcript rescan before computing the report." + description = "Experiment report (kaizen exp report). Optional refresh: true ingests bounded, recently changed transcript tails before computing the report." )] async fn kaizen_exp_report( &self, diff --git a/src/search/writer.rs b/src/search/writer.rs index 37621c0..7d69795 100644 --- a/src/search/writer.rs +++ b/src/search/writer.rs @@ -7,12 +7,13 @@ use anyhow::{Context, Result}; use std::fs::File; use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; +use tantivy::indexer::IndexWriterOptions; use tantivy::schema::Term; use tantivy::{Index, IndexWriter, TantivyDocument, doc}; const HEAP_BYTES: usize = 50_000_000; -const BATCH_DOCS: usize = 1000; -const BATCH_SECS: u64 = 1; +const BATCH_DOCS: usize = 256; +const BATCH_SECS: u64 = 60; pub struct PendingWriter { writer: IndexWriter, @@ -29,7 +30,7 @@ impl PendingWriter { let lock = lock_file(&dir)?; let (schema, fields) = build_schema(); let index = open_or_create(&dir, schema)?; - let writer = index.writer(HEAP_BYTES)?; + let writer = index.writer_with_options(writer_options())?; Ok(Self { writer, fields, @@ -40,6 +41,9 @@ impl PendingWriter { } pub fn add(&mut self, doc: &SearchDoc) -> Result<()> { + if self.pending == 0 { + self.last_commit = Instant::now(); + } self.writer.delete_term(Term::from_field_text( self.fields.event_key, &event_key(&doc.session_id, doc.seq), @@ -87,6 +91,14 @@ impl PendingWriter { } } +fn writer_options() -> IndexWriterOptions { + IndexWriterOptions::builder() + .memory_budget_per_thread(HEAP_BYTES) + .num_worker_threads(1) + .num_merge_threads(1) + .build() +} + pub fn delete_sessions(root: &Path, ids: &[String]) -> Result<()> { if ids.is_empty() { return Ok(()); @@ -129,3 +141,7 @@ fn reject_tree_symlinks(root: &Path) -> Result<()> { fn open_or_create(dir: &Path, schema: tantivy::schema::Schema) -> Result { Ok(Index::open_in_dir(dir).or_else(|_| Index::create_in_dir(dir, schema))?) } + +#[cfg(test)] +#[path = "writer_tests.rs"] +mod tests; diff --git a/src/search/writer_tests.rs b/src/search/writer_tests.rs new file mode 100644 index 0000000..ca96008 --- /dev/null +++ b/src/search/writer_tests.rs @@ -0,0 +1,27 @@ +use super::*; + +#[test] +fn sparse_hooks_do_not_commit_every_ten_seconds() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = PendingWriter::open(dir.path()).unwrap(); + writer.pending = 1; + writer.last_commit = Instant::now() - Duration::from_secs(10); + assert!(!writer.should_commit()); +} + +#[test] +fn active_session_commits_after_one_minute() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = PendingWriter::open(dir.path()).unwrap(); + writer.pending = 1; + writer.last_commit = Instant::now() - Duration::from_secs(BATCH_SECS); + assert!(writer.should_commit()); +} + +#[test] +fn full_batch_commits_without_waiting() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = PendingWriter::open(dir.path()).unwrap(); + writer.pending = BATCH_DOCS; + assert!(writer.should_commit()); +} diff --git a/src/shell/cli.rs b/src/shell/cli.rs index 32544af..1a58b2d 100644 --- a/src/shell/cli.rs +++ b/src/shell/cli.rs @@ -3,9 +3,9 @@ use crate::collect::tail::antigravity::scan_antigravity_workspace; use crate::collect::tail::claude::scan_claude_session_dir; -use crate::collect::tail::claude_code::scan_claude_project_dir; +use crate::collect::tail::claude_code::scan_claude_project_dir_since; use crate::collect::tail::codex::scan_codex_session_dir; -use crate::collect::tail::codex_desktop::scan_codex_sessions_root; +use crate::collect::tail::codex_desktop::scan_codex_sessions_root_since; use crate::collect::tail::copilot_cli::scan_copilot_cli_workspace; use crate::collect::tail::copilot_vscode::scan_copilot_vscode_workspace; use crate::collect::tail::cursor::scan_session_dir_all; @@ -70,7 +70,6 @@ impl AgentScanStats { self.sessions_found += 1; self.sessions_upserted += 1; self.events_found += event_count as u64; - self.events_upserted += event_count as u64; self.agents.insert(record.agent.clone()); } @@ -129,6 +128,8 @@ fn now_ms_u64() -> u64 { /// Minimum interval between automatic local DB prunes after a successful rescan (24h). const AUTO_PRUNE_INTERVAL_MS: u64 = 86_400_000; +const INITIAL_SCAN_LOOKBACK_MS: u64 = 30 * 86_400_000; +const SCAN_OVERLAP_MS: u64 = 60_000; pub(crate) fn maybe_auto_prune_after_scan(store: &Store, cfg: &config::Config) -> Result<()> { if cfg.retention.hot_days == 0 { @@ -146,7 +147,7 @@ pub(crate) fn maybe_auto_prune_after_scan(store: &Store, cfg: &config::Config) - Ok(()) } -/// Full transcript rescan unless throttled by `[scan].min_rescan_seconds` or `refresh` is true. +/// Import recently changed transcripts unless throttled by the scan interval. pub(crate) fn maybe_scan_all_agents( ws: &Path, cfg: &config::Config, @@ -163,11 +164,20 @@ pub(crate) fn maybe_scan_all_agents( { return Ok(()); } - scan_all_agents(ws, cfg, ws_str, store)?; - store.sync_state_set_u64(SYNC_STATE_LAST_AGENT_SCAN_MS, now_ms_u64())?; + let since_ms = scan_since_ms(store, now)?; + scan_all_agents_since(ws, cfg, ws_str, store, since_ms)?; + store.sync_state_set_u64(SYNC_STATE_LAST_AGENT_SCAN_MS, now)?; Ok(()) } +fn scan_since_ms(store: &Store, now_ms: u64) -> Result { + Ok(store + .sync_state_get_u64(SYNC_STATE_LAST_AGENT_SCAN_MS)? + .map_or(now_ms.saturating_sub(INITIAL_SCAN_LOOKBACK_MS), |last| { + last.saturating_sub(SCAN_OVERLAP_MS) + })) +} + pub(crate) fn maybe_refresh_store(workspace: &Path, store: &Store, refresh: bool) -> Result<()> { if !refresh { return Ok(()); @@ -738,13 +748,14 @@ pub fn cmd_summary( Ok(()) } -pub(crate) fn scan_all_agents( +fn scan_all_agents_since( ws: &Path, cfg: &config::Config, ws_str: &str, store: &Store, + since_ms: u64, ) -> Result<()> { - scan_all_agents_with_stats(ws, cfg, ws_str, store).map(|_| ()) + scan_all_agents_with_stats_since(ws, cfg, ws_str, store, since_ms, false).map(|_| ()) } pub(crate) fn scan_all_agents_with_stats( @@ -752,19 +763,31 @@ pub(crate) fn scan_all_agents_with_stats( cfg: &config::Config, ws_str: &str, store: &Store, +) -> Result { + scan_all_agents_with_stats_since(ws, cfg, ws_str, store, 0, true) +} + +fn scan_all_agents_with_stats_since( + ws: &Path, + cfg: &config::Config, + ws_str: &str, + store: &Store, + since_ms: u64, + enrich_repo: bool, ) -> Result { let _spin = ScanSpinner::start("Scanning agent sessions…"); let sync_ctx = crate::sync::ingest_ctx(cfg, ws.to_path_buf()); - let sessions = collect_all_agent_sessions(ws, cfg, ws_str)?; - let stats = persist_session_batch(store, sessions, sync_ctx.as_ref())?; + let sessions = collect_all_agent_sessions_since(ws, cfg, ws_str, since_ms)?; + let stats = persist_session_batch_mode(store, sessions, sync_ctx.as_ref(), enrich_repo)?; maybe_auto_prune_after_scan(store, cfg)?; Ok(stats) } -pub(crate) fn collect_all_agent_sessions( +fn collect_all_agent_sessions_since( ws: &Path, cfg: &config::Config, ws_str: &str, + since_ms: u64, ) -> Result)>> { let mut out = Vec::new(); let slug = workspace_slug(ws_str); @@ -794,7 +817,11 @@ pub(crate) fn collect_all_agent_sessions( let claude_project = PathBuf::from(&home) .join(".claude/projects") .join(&claude_slug); - out.extend(scan_claude_project_dir(&claude_project, ws)?); + out.extend(scan_claude_project_dir_since( + &claude_project, + ws, + since_ms, + )?); let claude_dir = claude_project.join("sessions"); out.extend(collect_agent_dirs(&claude_dir, |p| { scan_claude_session_dir(p).map(|(mut r, evs)| { @@ -810,9 +837,10 @@ pub(crate) fn collect_all_agent_sessions( vec![(r, evs)] }) })?); - out.extend(scan_codex_sessions_root( + out.extend(scan_codex_sessions_root_since( &PathBuf::from(&home).join(".codex/sessions"), ws, + since_ms, )?); let tail = &cfg.sources.tail; @@ -862,39 +890,76 @@ fn bind_workspace( .collect() } +#[cfg(test)] pub(crate) fn persist_session_batch( store: &Store, sessions: Vec<(SessionRecord, Vec)>, sync_ctx: Option<&crate::sync::SyncIngestContext>, +) -> Result { + persist_session_batch_mode(store, sessions, sync_ctx, true) +} + +fn persist_session_batch_mode( + store: &Store, + sessions: Vec<(SessionRecord, Vec)>, + sync_ctx: Option<&crate::sync::SyncIngestContext>, + enrich_repo: bool, ) -> Result { let mut stats = AgentScanStats::default(); for (mut record, events) in sessions { stats.record(&record, events.len()); - if record.start_commit.is_none() && !record.workspace.is_empty() { - let binding = crate::core::repo::binding_for_session( - Path::new(&record.workspace), - record.started_at_ms, - record.ended_at_ms, - ); - record.start_commit = binding.start_commit; - record.end_commit = binding.end_commit; - record.branch = binding.branch; - record.dirty_start = binding.dirty_start; - record.dirty_end = binding.dirty_end; - record.repo_binding_source = binding.source; - } + let existing = store.get_session(&record.id)?; + inherit_repo_binding(&mut record, existing.as_ref()); + enrich_repo_binding(&mut record, enrich_repo); store.upsert_session(&record)?; + let last_seq = store.last_event_seq_for_session(&record.id)?; let flush_ms = record.ended_at_ms.unwrap_or(record.started_at_ms); - for ev in events { - store.append_event_with_sync(&ev, sync_ctx)?; - } - if record.status == crate::core::event::SessionStatus::Done { - store.flush_projector_session(&record.id, flush_ms)?; - } + let unseen = events + .into_iter() + .filter(|event| last_seq.is_none_or(|seq| event.seq > seq)) + .collect::>(); + let flush = (record.status == crate::core::event::SessionStatus::Done).then_some(flush_ms); + stats.events_upserted += store.append_scanned_event_batch(&unseen, sync_ctx, flush)? as u64; } Ok(stats) } +fn inherit_repo_binding(record: &mut SessionRecord, existing: Option<&SessionRecord>) { + let Some(existing) = existing else { return }; + record.start_commit = record + .start_commit + .take() + .or_else(|| existing.start_commit.clone()); + record.end_commit = record + .end_commit + .take() + .or_else(|| existing.end_commit.clone()); + record.branch = record.branch.take().or_else(|| existing.branch.clone()); + record.dirty_start = record.dirty_start.or(existing.dirty_start); + record.dirty_end = record.dirty_end.or(existing.dirty_end); + record.repo_binding_source = record + .repo_binding_source + .take() + .or_else(|| existing.repo_binding_source.clone()); +} + +fn enrich_repo_binding(record: &mut SessionRecord, enabled: bool) { + if !enabled || record.start_commit.is_some() || record.workspace.is_empty() { + return; + } + let binding = crate::core::repo::binding_for_session( + Path::new(&record.workspace), + record.started_at_ms, + record.ended_at_ms, + ); + record.start_commit = binding.start_commit; + record.end_commit = binding.end_commit; + record.branch = binding.branch; + record.dirty_start = binding.dirty_start; + record.dirty_end = binding.dirty_end; + record.repo_binding_source = binding.source; +} + pub(crate) fn collect_agent_dirs( dir: &Path, scanner: F, @@ -905,14 +970,16 @@ where if !dir.exists() { return Ok(Vec::new()); } + let paths = std::fs::read_dir(dir)? + .filter_map(|entry| entry.ok()) + .filter(|entry| entry.file_type().is_ok_and(|kind| kind.is_dir())) + .map(|entry| entry.path()) + .collect(); let mut out = Vec::new(); - for entry in std::fs::read_dir(dir)?.filter_map(|e| e.ok()) { - if !entry.file_type().map(|t| t.is_dir()).unwrap_or(false) { - continue; - } - match scanner(&entry.path()) { + for path in crate::collect::tail::newest_paths(paths) { + match scanner(&path) { Ok(sessions) => out.extend(sessions), - Err(e) => tracing::warn!("scan {:?}: {e}", entry.path()), + Err(e) => tracing::warn!("scan {path:?}: {e}"), } } Ok(out) diff --git a/src/shell/cli_tests.rs b/src/shell/cli_tests.rs new file mode 100644 index 0000000..940c8bd --- /dev/null +++ b/src/shell/cli_tests.rs @@ -0,0 +1,83 @@ +use super::cli::persist_session_batch; +use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus}; +use crate::store::Store; +use serde_json::json; + +#[test] +fn scanned_batch_appends_only_unseen_suffix() { + let temp = tempfile::tempdir().unwrap(); + let store = Store::open(&temp.path().join("kaizen.db")).unwrap(); + persist_session_batch(&store, vec![(record(), vec![event(0), event(1)])], None).unwrap(); + let mut stale = event(0); + stale.payload = json!({"changed": true}); + let stats = + persist_session_batch(&store, vec![(record(), vec![stale, event(2)])], None).unwrap(); + let rows = store.list_events_for_session("scan").unwrap(); + assert_eq!((stats.events_found, stats.events_upserted), (2, 1)); + assert_eq!((rows.len(), &rows[0].payload), (3, &json!({}))); +} + +#[test] +fn repeated_scan_preserves_repo_binding() { + let temp = tempfile::tempdir().unwrap(); + let store = Store::open(&temp.path().join("kaizen.db")).unwrap(); + persist_session_batch(&store, vec![(record(), Vec::new())], None).unwrap(); + let mut rescanned = record(); + rescanned.start_commit = None; + persist_session_batch(&store, vec![(rescanned, Vec::new())], None).unwrap(); + let stored = store.get_session("scan").unwrap().unwrap(); + assert_eq!(stored.start_commit.as_deref(), Some("abc")); +} + +fn record() -> SessionRecord { + SessionRecord { + id: "scan".into(), + agent: "codex".into(), + model: None, + workspace: String::new(), + started_at_ms: 1, + ended_at_ms: None, + status: SessionStatus::Running, + trace_path: "/trace".into(), + start_commit: Some("abc".into()), + end_commit: None, + branch: None, + dirty_start: None, + dirty_end: None, + repo_binding_source: None, + prompt_fingerprint: None, + parent_session_id: None, + agent_version: None, + os: None, + arch: None, + repo_file_count: None, + repo_total_loc: None, + } +} + +fn event(seq: u64) -> Event { + Event { + session_id: "scan".into(), + seq, + ts_ms: seq + 1, + ts_exact: true, + kind: EventKind::Message, + source: EventSource::Tail, + tool: None, + tool_call_id: None, + tokens_in: None, + tokens_out: None, + reasoning_tokens: None, + cost_usd_e6: None, + stop_reason: None, + latency_ms: None, + ttft_ms: None, + retry_count: None, + context_used_tokens: None, + context_max_tokens: None, + cache_creation_tokens: None, + cache_read_tokens: None, + system_prompt_tokens: None, + payload: json!({}), + } +} diff --git a/src/shell/ingest.rs b/src/shell/ingest.rs index 0c6c558..ec6f77a 100644 --- a/src/shell/ingest.rs +++ b/src/shell/ingest.rs @@ -157,6 +157,9 @@ pub(crate) fn ingest_hook_with_store( )?; } store.append_event_with_sync(&ev, sync_ctx.as_ref())?; + if matches!(event.kind, collect::hooks::EventKind::Stop) { + store.flush_search()?; + } post_ingest_detached(&event, &cfg, ws)?; Ok(()) } diff --git a/src/shell/mod.rs b/src/shell/mod.rs index e033ecd..3461b23 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -36,3 +36,6 @@ pub mod sync; pub mod telemetry; pub mod telemetry_tail; pub mod upgrade; + +#[cfg(test)] +mod cli_tests; diff --git a/src/store/sqlite/event_batch.rs b/src/store/sqlite/event_batch.rs new file mode 100644 index 0000000..395df23 --- /dev/null +++ b/src/store/sqlite/event_batch.rs @@ -0,0 +1,41 @@ +use super::*; + +impl Store { + pub(crate) fn append_scanned_event_batch( + &self, + events: &[Event], + ctx: Option<&SyncIngestContext>, + flush_ms: Option, + ) -> Result { + let Some(session_id) = batch_session(events)? else { + return Ok(0); + }; + let transaction = self.conn.unchecked_transaction()?; + events + .iter() + .try_for_each(|event| self.append_event_deferred(event, ctx))?; + self.finish_scanned_batch(session_id, flush_ms)?; + transaction.commit()?; + Ok(events.len()) + } + + fn finish_scanned_batch(&self, session_id: &str, flush_ms: Option) -> Result<()> { + if let Some(timestamp) = flush_ms { + self.flush_projector_session(session_id, timestamp)?; + } + self.refresh_extension_session(session_id) + } +} + +fn batch_session(events: &[Event]) -> Result> { + let Some(first) = events.first() else { + return Ok(None); + }; + anyhow::ensure!( + events + .iter() + .all(|event| event.session_id == first.session_id), + "scanned event batch mixes sessions" + ); + Ok(Some(&first.session_id)) +} diff --git a/src/store/sqlite/event_extensions.rs b/src/store/sqlite/event_extensions.rs new file mode 100644 index 0000000..35f9e77 --- /dev/null +++ b/src/store/sqlite/event_extensions.rs @@ -0,0 +1,19 @@ +use super::*; + +impl Store { + pub(super) fn index_extension_event(&self, event: &Event) -> Result<()> { + crate::extensions::hash_chain::store_event_hash(self, event) + } + + pub(super) fn apply_live_extension_event(&self, event: &Event) -> Result<()> { + crate::extensions::aggregates::apply_event(self, event) + } + + pub(super) fn refresh_extension_session(&self, session_id: &str) -> Result<()> { + crate::extensions::aggregates::upsert_session(self, session_id)?; + if let Err(error) = crate::extensions::diffs::refresh_session(self, session_id, false) { + tracing::warn!(%session_id, "step diff attribution skipped: {error:#}"); + } + Ok(()) + } +} diff --git a/src/store/sqlite/event_projector.rs b/src/store/sqlite/event_projector.rs index 6bd547a..da4ba59 100644 --- a/src/store/sqlite/event_projector.rs +++ b/src/store/sqlite/event_projector.rs @@ -1,6 +1,8 @@ use super::events::*; use super::*; +const PROJECTOR_HYDRATE_EVENTS: usize = 512; + impl Store { pub(super) fn sync_projector_session( &self, @@ -10,7 +12,15 @@ impl Store { if self.projector.borrow().last_seq(session_id) == last_seq { return Ok(()); } - self.replay_projector_session(session_id) + self.hydrate_projector_session(session_id) + } + + fn hydrate_projector_session(&self, session_id: &str) -> Result<()> { + self.projector.borrow_mut().reset_session(session_id); + for event in self.list_latest_events_for_session(session_id, PROJECTOR_HYDRATE_EVENTS)? { + self.projector.borrow_mut().apply(&event); + } + Ok(()) } pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> { @@ -60,7 +70,8 @@ impl Store { for delta in deltas { match delta { ProjectorEvent::SpanClosed(span, sample) => { - upsert_tool_span_record(&self.conn, span)?; + let persisted = upsert_tool_span_record(&self.conn, span)?; + crate::extensions::diffs::upsert_tool_span(self, &persisted)?; tracing::debug!( session_id = %sample.session_id, span_id = %sample.span_id, diff --git a/src/store/sqlite/event_write.rs b/src/store/sqlite/event_write.rs index 001b299..a61b16f 100644 --- a/src/store/sqlite/event_write.rs +++ b/src/store/sqlite/event_write.rs @@ -19,6 +19,23 @@ impl Store { /// Append event; when `ctx` is set and sync is configured, enqueue one redacted outbox row. pub fn append_event_with_sync(&self, e: &Event, ctx: Option<&SyncIngestContext>) -> Result<()> { + self.append_event_inner(e, ctx, true) + } + + pub(super) fn append_event_deferred( + &self, + e: &Event, + ctx: Option<&SyncIngestContext>, + ) -> Result<()> { + self.append_event_inner(e, ctx, false) + } + + fn append_event_inner( + &self, + e: &Event, + ctx: Option<&SyncIngestContext>, + refresh_session: bool, + ) -> Result<()> { let last_before = if projector_legacy_mode() { None } else { @@ -39,27 +56,7 @@ impl Store { ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22 ) - ON CONFLICT(session_id, seq) DO UPDATE SET - ts_ms = excluded.ts_ms, - ts_exact = excluded.ts_exact, - kind = excluded.kind, - source = excluded.source, - tool = excluded.tool, - tool_call_id = excluded.tool_call_id, - tokens_in = excluded.tokens_in, - tokens_out = excluded.tokens_out, - reasoning_tokens = excluded.reasoning_tokens, - cost_usd_e6 = excluded.cost_usd_e6, - payload = excluded.payload, - stop_reason = excluded.stop_reason, - latency_ms = excluded.latency_ms, - ttft_ms = excluded.ttft_ms, - retry_count = excluded.retry_count, - context_used_tokens = excluded.context_used_tokens, - context_max_tokens = excluded.context_max_tokens, - cache_creation_tokens = excluded.cache_creation_tokens, - cache_read_tokens = excluded.cache_read_tokens, - system_prompt_tokens = excluded.system_prompt_tokens", + ON CONFLICT(session_id, seq) DO NOTHING", params![ e.session_id, e.seq as i64, @@ -112,7 +109,10 @@ impl Store { self.invalidate_span_tree_cache(); } self.append_search_event(e); - self.refresh_extension_rows(e)?; + self.index_extension_event(e)?; + if refresh_session { + self.apply_live_extension_event(e)?; + } let Some(ctx) = ctx else { return Ok(()); }; @@ -144,15 +144,6 @@ impl Store { Ok(()) } - fn refresh_extension_rows(&self, e: &Event) -> Result<()> { - crate::extensions::hash_chain::store_event_hash(self, e)?; - crate::extensions::aggregates::upsert_session(self, &e.session_id)?; - if let Err(err) = crate::extensions::diffs::refresh_session(self, &e.session_id, false) { - tracing::warn!(session_id = %e.session_id, "step diff attribution skipped: {err:#}"); - } - Ok(()) - } - pub(super) fn append_search_event(&self, e: &Event) { if let Err(err) = self.try_append_search_event(e) { tracing::warn!(session_id = %e.session_id, seq = e.seq, "search index skipped: {err:#}"); diff --git a/src/store/sqlite/events.rs b/src/store/sqlite/events.rs index 3ea3783..1fca7e2 100644 --- a/src/store/sqlite/events.rs +++ b/src/store/sqlite/events.rs @@ -5,7 +5,7 @@ pub(super) fn projector_legacy_mode() -> bool { } pub(super) fn is_stop_event(e: &Event) -> bool { - if !matches!(e.kind, EventKind::Hook) { + if !matches!(e.kind, EventKind::Hook | EventKind::Lifecycle) { return false; } e.payload diff --git a/src/store/sqlite/mod.rs b/src/store/sqlite/mod.rs index 8c4d698..de43e3a 100644 --- a/src/store/sqlite/mod.rs +++ b/src/store/sqlite/mod.rs @@ -56,6 +56,8 @@ mod artifact_windows; mod constants; mod contracts; mod evals; +mod event_batch; +mod event_extensions; mod event_projector; mod event_read; mod event_write; diff --git a/src/store/sqlite/rows.rs b/src/store/sqlite/rows.rs index e892354..82019ea 100644 --- a/src/store/sqlite/rows.rs +++ b/src/store/sqlite/rows.rs @@ -76,7 +76,8 @@ pub(super) fn event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result { cache_creation_tokens: row.get::<_, Option>(19)?.map(|v| v as u32), cache_read_tokens: row.get::<_, Option>(20)?.map(|v| v as u32), system_prompt_tokens: row.get::<_, Option>(21)?.map(|v| v as u32), - }) + } + .normalize_legacy_hook()) } pub(super) fn search_tool_event_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, Event)> { diff --git a/src/store/sqlite/tests/events.rs b/src/store/sqlite/tests/events.rs index 1248c15..26717d7 100644 --- a/src/store/sqlite/tests/events.rs +++ b/src/store/sqlite/tests/events.rs @@ -1,5 +1,5 @@ use super::super::Store; -use super::{EventKind, SessionStatus, make_event, make_session}; +use super::{EventKind, EventSource, SessionStatus, make_event, make_session}; use serde_json::json; use tempfile::TempDir; @@ -30,6 +30,26 @@ fn list_events_for_session_round_trip() { assert_eq!(events[1].seq, 1); } +#[test] +fn legacy_hook_rows_read_as_semantic_tool_events() { + let dir = TempDir::new().unwrap(); + let store = Store::open(&dir.path().join("kaizen.db")).unwrap(); + store.upsert_session(&make_session("legacy-hook")).unwrap(); + let mut event = make_event("legacy-hook", 0); + event.kind = EventKind::Hook; + event.source = EventSource::Hook; + event.tool = None; + event.payload = json!({"hook_event_name":"PreToolUse","tool_name":"Read"}); + store.append_event(&event).unwrap(); + + let event = store + .list_events_for_session("legacy-hook") + .unwrap() + .remove(0); + assert_eq!(event.kind, EventKind::ToolCall); + assert_eq!(event.tool.as_deref(), Some("Read")); +} + #[test] fn list_events_page_uses_inclusive_seq_cursor() { let dir = TempDir::new().unwrap(); @@ -52,9 +72,44 @@ fn append_event_dedup() { let store = Store::open(&dir.path().join("kaizen.db")).unwrap(); store.upsert_session(&make_session("s5")).unwrap(); store.append_event(&make_event("s5", 0)).unwrap(); - store.append_event(&make_event("s5", 0)).unwrap(); + let mut duplicate = make_event("s5", 0); + duplicate.tokens_in = Some(42); + store.append_event(&duplicate).unwrap(); let events = store.list_events_for_session("s5").unwrap(); assert_eq!(events.len(), 1); + assert_eq!(events[0].tokens_in, None); + let aggregate = crate::extensions::aggregates::get(&store, "s5") + .unwrap() + .unwrap(); + assert_eq!(aggregate.event_count, 1); +} + +#[test] +fn append_event_backfills_missing_aggregate_before_incrementing() { + let dir = TempDir::new().unwrap(); + let store = Store::open(&dir.path().join("kaizen.db")).unwrap(); + store + .upsert_session(&make_session("legacy-aggregate")) + .unwrap(); + store + .append_event(&make_event("legacy-aggregate", 0)) + .unwrap(); + store + .conn() + .execute( + "DELETE FROM session_aggregates WHERE session_id = ?1", + ["legacy-aggregate"], + ) + .unwrap(); + + store + .append_event(&make_event("legacy-aggregate", 1)) + .unwrap(); + + let aggregate = crate::extensions::aggregates::get(&store, "legacy-aggregate") + .unwrap() + .unwrap(); + assert_eq!(aggregate.event_count, 2); } #[test] diff --git a/src/store/sqlite/visualization/aggregates.rs b/src/store/sqlite/visualization/aggregates.rs index f2db792..00fef88 100644 --- a/src/store/sqlite/visualization/aggregates.rs +++ b/src/store/sqlite/visualization/aggregates.rs @@ -4,13 +4,18 @@ use crate::visualization::{DataQuality, TokenTotals, VisualizationTotals}; use anyhow::Result; const TOTALS_SQL: &str = " -WITH ws AS (SELECT id, status FROM sessions WHERE workspace = ?1) +WITH ws AS (SELECT id, status FROM sessions WHERE workspace = ?1), +last_events AS ( + SELECT e.session_id, MAX(e.ts_ms) last_event_ms FROM events e + JOIN ws ON ws.id = e.session_id GROUP BY e.session_id +) SELECT (SELECT COUNT(*) FROM ws), - (SELECT COUNT(*) FROM ws WHERE status IN ('Running', 'Waiting', 'Idle')), + (SELECT COUNT(*) FROM ws JOIN last_events l ON l.session_id = ws.id + WHERE ws.status IN ('Running', 'Waiting', 'Idle') AND l.last_event_ms >= ?2), COUNT(e.id), COALESCE(SUM(e.kind = 'Error'), 0), - COALESCE(SUM(e.kind = 'ToolCall'), 0), + (SELECT COUNT(*) FROM tool_spans t JOIN ws ON ws.id = t.session_id), COALESCE(SUM(e.cost_usd_e6), 0), COALESCE(SUM(e.tokens_in), 0), COALESCE(SUM(e.tokens_out), 0), @@ -28,9 +33,13 @@ impl Store { pub(crate) fn visualization_totals( &self, workspace: &str, + active_since_ms: u64, ) -> Result<(VisualizationTotals, DataQuality)> { let mut statement = self.conn().prepare(TOTALS_SQL)?; - let row = statement.query_row([workspace], totals_row)?; + let row = statement.query_row( + rusqlite::params![workspace, active_since_ms as i64], + totals_row, + )?; Ok((totals(&row), quality(&row))) } } diff --git a/src/store/sqlite/visualization/sessions.rs b/src/store/sqlite/visualization/sessions.rs index 3ad9434..2cf22d7 100644 --- a/src/store/sqlite/visualization/sessions.rs +++ b/src/store/sqlite/visualization/sessions.rs @@ -32,9 +32,9 @@ ORDER BY r.started_at_ms DESC, r.id ASC"; const TOP_TOOLS_SQL: &str = " WITH recent AS MATERIALIZED (SELECT id FROM sessions WHERE workspace = ?1 ORDER BY started_at_ms DESC, id ASC LIMIT ?2), counts AS ( - SELECT e.session_id, e.tool, COUNT(*) count FROM events e - JOIN recent r ON r.id = e.session_id WHERE e.tool IS NOT NULL - GROUP BY e.session_id, e.tool + SELECT t.session_id, t.tool, COUNT(*) count FROM tool_spans t + JOIN recent r ON r.id = t.session_id WHERE t.tool <> '' + GROUP BY t.session_id, t.tool ), ranked AS ( SELECT session_id, tool, count, ROW_NUMBER() OVER ( PARTITION BY session_id ORDER BY count DESC, tool ASC) rank FROM counts diff --git a/src/store/tool_span_index/persistence/write.rs b/src/store/tool_span_index/persistence/write.rs index 694bdc1..f57d670 100644 --- a/src/store/tool_span_index/persistence/write.rs +++ b/src/store/tool_span_index/persistence/write.rs @@ -41,10 +41,14 @@ pub(crate) fn clear_session_spans(conn: &Connection, session_id: &str) -> Result Ok(()) } -pub(crate) fn upsert_tool_span_record(conn: &Connection, span: &ToolSpanRecord) -> Result<()> { +pub(crate) fn upsert_tool_span_record( + conn: &Connection, + span: &ToolSpanRecord, +) -> Result { let persisted = namespaced_record(span); upsert_span(conn, &persisted)?; - replace_paths(conn, &persisted) + replace_paths(conn, &persisted)?; + Ok(persisted) } fn namespaced_record(span: &ToolSpanRecord) -> ToolSpanRecord { diff --git a/src/visualization/build.rs b/src/visualization/build.rs index fcdb6a5..ff7bf9e 100644 --- a/src/visualization/build.rs +++ b/src/visualization/build.rs @@ -56,7 +56,8 @@ pub(crate) fn build_report_observed( query: VisualizationQuery, ) -> Result { validate(&query.limits)?; - let (totals, quality) = store.visualization_totals(&query.workspace)?; + let active_since_ms = query.now_ms.saturating_sub(ACTIVE_TTL_MS); + let (totals, quality) = store.visualization_totals(&query.workspace, active_since_ms)?; let sessions = store.visualization_sessions(&query.workspace, query.limits.sessions, query.now_ms)?; let selected = selected_detail(store, &query, sessions.first())?; diff --git a/src/web/assets.rs b/src/web/assets.rs index 00c2396..b4c9543 100644 --- a/src/web/assets.rs +++ b/src/web/assets.rs @@ -118,6 +118,10 @@ mod tests { "id=\"detail-spans\"", "id=\"detail-files\"", "id=\"detail-tools\"", + "id=\"project-insights\"", + "id=\"insight-tools\"", + "id=\"insight-attention\"", + "id=\"insight-coverage\"", "id=\"developer-raw\"", "/assets/kaizen-tokens.css", ] { @@ -127,12 +131,15 @@ mod tests { "kaizen_sessions_list", "all_workspaces", "visibilitychange", - "AUTO_REFRESH_MS", + "type: \"subscribe\"", + "message.type === \"changed\"", + "message.id !== state.snapshotPending", "Authorization required", ] { assert!(JS.contains(needle), "js missing {needle}"); } assert!(!JS.contains("setInterval(")); + assert!(RENDER_JS.contains("renderInsights")); } #[test] diff --git a/src/web/assets/index.html b/src/web/assets/index.html index 115bffa..b5cf579 100644 --- a/src/web/assets/index.html +++ b/src/web/assets/index.html @@ -77,6 +77,30 @@

See what your coding agents are doing.

+
+
+

Fast read

+

What stands out

+
+
+
+ Tool pattern + Waiting for activity +

Recent tool use appears here.

+
+
+ Needs attention + Waiting for sessions +

Errors and stale work appear here.

+
+
+ Telemetry coverage + Waiting for events +

Token and cost coverage appear here.

+
+
+
+
diff --git a/src/web/assets/kaizen-render.js b/src/web/assets/kaizen-render.js index f810fe7..19814cc 100644 --- a/src/web/assets/kaizen-render.js +++ b/src/web/assets/kaizen-render.js @@ -40,6 +40,7 @@ export function showManual(path = "") { export function renderReport(report) { renderTotals(report?.totals || {}); + renderInsights(report); renderSessions( report?.sessions || [], report?.selected?.session?.id, @@ -48,6 +49,35 @@ export function renderReport(report) { renderDetail(report); } +function renderInsights(report) { + const sessions = report?.sessions || []; + const [tool, calls] = topTool(sessions); + const attention = sessions.filter(row => ["errored", "orphaned"].includes(row.status)).length; + const quality = report?.quality || {}; + setInsight("tools", tool ? `${label(tool)} leads` : "No tool calls yet", tool ? `${count(calls)} calls in visible sessions` : "Live tool use appears here."); + setInsight("attention", attention ? `${count(attention)} need attention` : "No recent warnings", `${count(sessions.length)} visible sessions checked`); + setInsight("coverage", `${percent(quality.token_coverage_pct)} token coverage`, `${percent(quality.cost_coverage_pct)} cost coverage`); +} + +function topTool(sessions) { + const counts = sessions.flatMap(row => row.top_tools || []).reduce(addTool, new Map()); + return [...counts].sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0]))[0] || ["", 0]; +} + +function addTool(counts, [tool, calls]) { + counts.set(tool, (counts.get(tool) || 0) + calls); + return counts; +} + +function setInsight(id, title, note) { + $(`#insight-${id}`).textContent = title; + $(`#insight-${id}-note`).textContent = note; +} + +function percent(value) { + return `${Math.round(Number(value) || 0)}%`; +} + function renderTotals(totals) { $("#total-sessions").textContent = count(totals.session_count); $("#active-sessions").textContent = count(totals.running_count); diff --git a/src/web/assets/kaizen-state.js b/src/web/assets/kaizen-state.js index b8d6bc8..8f076fb 100644 --- a/src/web/assets/kaizen-state.js +++ b/src/web/assets/kaizen-state.js @@ -1,5 +1,3 @@ -export const AUTO_REFRESH_MS = 20_000; - export function decodeOutput(output) { const value = output?.value; if (typeof value !== "string") return value || {}; diff --git a/src/web/assets/kaizen.css b/src/web/assets/kaizen.css index dee11f5..de7028b 100644 --- a/src/web/assets/kaizen.css +++ b/src/web/assets/kaizen.css @@ -82,6 +82,17 @@ main { width: min(var(--content-width), 100%); margin: auto; padding: var(--spac .summary-grid article { padding: var(--space-4); border: 1px solid var(--rule-strong); background: var(--paper-raised); } .summary-grid span { display: block; color: var(--ink-faint); font-size: 0.75rem; font-weight: 800; text-transform: uppercase; } .summary-grid strong { display: block; margin-top: var(--space-2); font-family: var(--font-display); font-size: 2rem; font-weight: 500; } +.insight-panel { margin-bottom: var(--space-4); border: 1px solid var(--rule-strong); background: var(--ink); color: var(--paper-raised); box-shadow: var(--shadow); } +.insight-heading { padding: var(--space-3) var(--space-4); border-bottom: 1px solid color-mix(in srgb, var(--paper) 25%, transparent); } +.insight-heading p, .insight-heading h2 { margin: 0; } +.insight-heading .kicker { color: color-mix(in srgb, var(--paper) 72%, transparent); } +.insight-heading h2 { margin-top: var(--space-1); font-family: var(--font-display); font-size: 1.45rem; font-weight: 500; } +.insight-grid { display: grid; grid-template-columns: repeat(3, 1fr); } +.insight-grid article { padding: var(--space-4); border-right: 1px solid color-mix(in srgb, var(--paper) 25%, transparent); } +.insight-grid article:last-child { border-right: 0; } +.insight-grid span { color: color-mix(in srgb, var(--paper) 68%, transparent); font-size: 0.72rem; font-weight: 800; letter-spacing: 0.07em; text-transform: uppercase; } +.insight-grid strong { display: block; margin-top: var(--space-2); font-family: var(--font-display); font-size: 1.35rem; font-weight: 500; } +.insight-grid p { margin: var(--space-1) 0 0; color: color-mix(in srgb, var(--paper) 72%, transparent); font-size: 0.78rem; } .observe-grid { display: grid; grid-template-columns: minmax(0, 1.35fr) minmax(320px, 0.65fr); gap: var(--space-4); align-items: start; } .notebook-panel { min-width: 0; border: 1px solid var(--rule-strong); background: var(--paper-raised); box-shadow: var(--shadow); } .panel-heading { display: flex; align-items: start; justify-content: space-between; gap: var(--space-3); padding: var(--space-4); border-bottom: 1px solid var(--rule); } @@ -102,7 +113,7 @@ td strong { color: var(--ink); } .status-label[data-tone="danger"] { color: var(--rust); border-color: var(--rust); } .status-label[data-tone="ready"] { color: var(--field-green); border-color: var(--field-green); } .empty-note { margin: 0; padding: var(--space-5); color: var(--ink-soft); } -.detail-panel { display: grid; } +.detail-panel { position: sticky; top: var(--space-4); display: grid; max-height: calc(100vh - (2 * var(--space-4))); overflow: auto; } .detail-facts { display: grid; grid-template-columns: auto 1fr; gap: var(--space-2) var(--space-3); margin: 0; padding: var(--space-4); border-bottom: 1px solid var(--rule); } .detail-facts dt { color: var(--ink-faint); font-size: 0.75rem; text-transform: uppercase; } .detail-facts dd { margin: 0; color: var(--ink); overflow-wrap: anywhere; } @@ -128,6 +139,7 @@ td strong { color: var(--ink); } } @media (max-width: 980px) { .observe-grid { grid-template-columns: 1fr; } + .detail-panel { position: static; max-height: none; } .project-controls { grid-template-columns: 1fr auto; } .project-controls details { grid-column: 1 / -1; } } @@ -136,6 +148,8 @@ td strong { color: var(--ink); } main { padding: var(--space-5) var(--space-3) var(--space-6); } .site-header, .panel-heading { align-items: start; } .summary-grid, .project-controls { grid-template-columns: 1fr 1fr; } + .insight-grid { grid-template-columns: 1fr; } + .insight-grid article { border-right: 0; border-bottom: 1px solid color-mix(in srgb, var(--paper) 25%, transparent); } .control-field, .project-controls details { grid-column: 1 / -1; } .manual-row { grid-template-columns: 1fr; } } diff --git a/src/web/assets/kaizen.js b/src/web/assets/kaizen.js index 43382d2..4dddeae 100644 --- a/src/web/assets/kaizen.js +++ b/src/web/assets/kaizen.js @@ -1,4 +1,4 @@ -import { AUTO_REFRESH_MS, chooseProject, decodeOutput, projectPaths } from "./kaizen-state.js"; +import { chooseProject, decodeOutput, projectPaths } from "./kaizen-state.js"; import { bindRawReport, setRawReport } from "./kaizen-raw.js"; import { createTransport } from "./kaizen-transport.js"; import { @@ -19,9 +19,8 @@ const state = { projects: [], workspace: "", selected: "", - snapshotPending: false, - refreshTimer: 0, - lastRefresh: 0, + snapshotPending: "", + refreshQueued: false, }; const transport = createTransport({ url: socketUrl, @@ -46,7 +45,7 @@ function bindControls() { $("#project-select").addEventListener("change", event => activateProject(event.target.value)); $("#session-rows").addEventListener("click", selectSession); $("#manual-form").addEventListener("submit", openManualPath); - document.addEventListener("visibilitychange", visibilityChanged); + document.addEventListener("visibilitychange", flushQueued); } function socketUrl() { const scheme = location.protocol === "https:" ? "wss" : "ws"; @@ -57,9 +56,8 @@ function connected() { discoverProjects(); } function disconnected() { - state.snapshotPending = false; + state.snapshotPending = ""; setBusy(false); - clearRefresh(); setConnection("Reconnecting", "danger"); setJourney("error", "Connection lost", "Trying the secure local connection again."); } @@ -86,7 +84,8 @@ function receive(raw) { } if (message.type === "result") return receiveResult(message); if (message.type === "visualization_snapshot") return receiveSnapshot(message); - if (message.type === "error") return fail(message.error || "Request failed."); + if (message.type === "changed") return receiveChanged(message); + if (message.type === "error") return receiveError(message); } function receiveResult(message) { const purpose = state.pending.get(message.id); @@ -111,19 +110,22 @@ function activateProject(workspace) { if (!workspace) return; state.workspace = workspace; state.selected = ""; + state.snapshotPending = ""; + state.refreshQueued = false; localStorage.kaizenWorkspace = workspace; $("#manual-path").value = workspace; renderProjects(state.projects.includes(workspace) ? state.projects : [workspace, ...state.projects], workspace); + transport.send({ type: "subscribe", workspace }); requestSnapshot(true); } function requestSnapshot(announce) { if (!state.workspace || state.snapshotPending) return; if (!transport.isOpen()) return fail("Local connection is not ready."); - clearRefresh(); - state.snapshotPending = true; + const request = snapshotRequest(); + state.snapshotPending = request.id; setBusy(true); if (announce) setJourney("neutral", "Loading observations", "Reading recent local telemetry."); - transport.send(snapshotRequest()); + if (!transport.send(request)) fail("Local connection is not ready."); } function snapshotRequest() { return { @@ -134,15 +136,33 @@ function snapshotRequest() { }; } function receiveSnapshot(message) { + if (message.id !== state.snapshotPending) return; const report = message.report || {}; - state.snapshotPending = false; + state.snapshotPending = ""; state.selected = report.selected?.session?.id || report.sessions?.[0]?.id || ""; - state.lastRefresh = Date.now(); setBusy(false); setRawReport(report); renderReport(report); report.sessions?.length ? ready(report) : empty(report); - scheduleRefresh(); + if (state.refreshQueued) return refreshQueued(); +} +function receiveError(message) { + if (String(message.id || "").startsWith("snapshot-") && message.id !== state.snapshotPending) return; + fail(message.error || "Request failed."); +} +function receiveChanged(message) { + if (message.workspace !== state.workspace) return; + if (document.hidden || state.snapshotPending) state.refreshQueued = true; + else requestSnapshot(false); +} + +function refreshQueued() { + state.refreshQueued = false; + requestSnapshot(false); +} + +function flushQueued() { + if (!document.hidden && state.refreshQueued && !state.snapshotPending) refreshQueued(); } function ready(report) { const at = new Date(report.generated_at_ms || Date.now()).toLocaleTimeString(); @@ -167,27 +187,11 @@ function openManualPath(event) { if (!path) return fail("Project path is required."); activateProject(path); } -function visibilityChanged() { - clearRefresh(); - if (document.hidden || !state.workspace) return; - const remaining = AUTO_REFRESH_MS - (Date.now() - state.lastRefresh); - remaining <= 0 ? requestSnapshot(false) : scheduleRefresh(remaining); -} -function scheduleRefresh(delay = AUTO_REFRESH_MS) { - clearRefresh(); - if (document.hidden || !state.workspace || !transport.isOpen()) return; - state.refreshTimer = setTimeout(() => requestSnapshot(false), Math.max(1_000, delay)); -} -function clearRefresh() { - clearTimeout(state.refreshTimer); - state.refreshTimer = 0; -} function fail(message) { - state.snapshotPending = false; + state.snapshotPending = ""; setBusy(false); setJourney("error", "Could not load observations", message); showManual(state.workspace); - scheduleRefresh(); } function showAuth(message) { setBusy(false); diff --git a/src/web/live.rs b/src/web/live.rs new file mode 100644 index 0000000..78a9140 --- /dev/null +++ b/src/web/live.rs @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +//! Cheap per-connection SQLite revision tracking for live Web updates. + +use anyhow::Result; +use serde_json::{Value, json}; +use std::path::{Path, PathBuf}; +use std::time::UNIX_EPOCH; + +#[derive(Default)] +pub(super) struct Subscription { + workspace: Option, + revision: Option, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +struct Revision { + database: FileStamp, + wal: FileStamp, +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +struct FileStamp { + len: u64, + modified_ns: u128, +} + +impl Subscription { + pub(super) fn set(&mut self, workspace: Option) -> Result<()> { + self.revision = workspace.as_deref().map(revision).transpose()?; + self.workspace = workspace; + Ok(()) + } + + pub(super) fn clear(&mut self) { + self.workspace = None; + self.revision = None; + } + + pub(super) fn is_active(&self) -> bool { + self.workspace.is_some() + } + + pub(super) fn changed(&mut self) -> Option { + let workspace = self.workspace.as_deref()?; + let next = revision(workspace).ok()?; + if self.revision == Some(next) { + return None; + } + self.revision = Some(next); + Some(json!({"type":"changed", "workspace":workspace})) + } +} + +fn revision(workspace: &str) -> Result { + let database = crate::core::workspace::db_path(Path::new(workspace))?; + let wal = sidecar(&database, "-wal"); + Ok(Revision { + database: stamp(&database), + wal: stamp(&wal), + }) +} + +fn sidecar(database: &Path, suffix: &str) -> PathBuf { + PathBuf::from(format!("{}{suffix}", database.to_string_lossy())) +} + +fn stamp(path: &Path) -> FileStamp { + path.metadata().map_or_else( + |_| FileStamp::default(), + |meta| FileStamp { + len: meta.len(), + modified_ns: meta.modified().ok().and_then(since_epoch).unwrap_or(0), + }, + ) +} + +fn since_epoch(time: std::time::SystemTime) -> Option { + time.duration_since(UNIX_EPOCH) + .ok() + .map(|value| value.as_nanos()) +} diff --git a/src/web/mod.rs b/src/web/mod.rs index c174f90..00f52e3 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -3,6 +3,7 @@ mod assets; pub mod features; +mod live; mod server; mod snapshot; mod token; diff --git a/src/web/server.rs b/src/web/server.rs index 1587a60..732df1d 100644 --- a/src/web/server.rs +++ b/src/web/server.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later //! Axum router for the local daemon web app. -use super::{assets, features, snapshot, tools}; +use super::{assets, features, live::Subscription, snapshot, tools}; use axum::Router; use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; use axum::extract::{Query, State}; @@ -35,6 +35,8 @@ enum ClientMessage { Subscribe { #[serde(default)] id: Option, + #[serde(default)] + workspace: Option, }, Unsubscribe, Ping { @@ -80,18 +82,19 @@ async fn ws( } async fn socket_loop(mut socket: WebSocket) { - let mut subscribed = false; - let mut tick = tokio::time::interval(std::time::Duration::from_secs(3)); + let mut subscription = Subscription::default(); + let mut tick = tokio::time::interval(std::time::Duration::from_millis(250)); loop { tokio::select! { msg = socket.recv() => { let Some(Ok(Message::Text(text))) = msg else { break; }; - if !handle_text(&mut socket, &text, &mut subscribed).await { + if !handle_text(&mut socket, &text, &mut subscription).await { break; } } - _ = tick.tick(), if subscribed => { - if send(&mut socket, status_msg(None)).await.is_err() { + _ = tick.tick(), if subscription.is_active() => { + if let Some(value) = subscription.changed() + && send(&mut socket, value).await.is_err() { break; } } @@ -99,18 +102,25 @@ async fn socket_loop(mut socket: WebSocket) { } } -async fn handle_text(socket: &mut WebSocket, text: &str, subscribed: &mut bool) -> bool { +async fn handle_text(socket: &mut WebSocket, text: &str, subscription: &mut Subscription) -> bool { match serde_json::from_str::(text) { Ok(ClientMessage::Call { id, tool, args }) => { let value = call_msg(&id, &tool, args.unwrap_or_else(|| json!({}))).await; send(socket, value).await.is_ok() } - Ok(ClientMessage::Subscribe { id }) => { - *subscribed = true; + Ok(ClientMessage::Subscribe { id, workspace }) => { + if let Err(err) = subscription.set(workspace) { + return send( + socket, + json!({"type":"error","id":id,"error":err.to_string()}), + ) + .await + .is_ok(); + } send(socket, status_msg(id.as_deref())).await.is_ok() } Ok(ClientMessage::Unsubscribe) => { - *subscribed = false; + subscription.clear(); send( socket, json!({"type":"result","output":{"kind":"json","value":{"subscribed":false}}}), diff --git a/tests/machine_registry.rs b/tests/machine_registry.rs index 67dbebd..c670f1b 100644 --- a/tests/machine_registry.rs +++ b/tests/machine_registry.rs @@ -30,6 +30,32 @@ fn default_list_omits_missing_workspace_without_deleting_row() -> anyhow::Result Ok(()) } +#[test] +fn machine_scope_ignores_workspace_containing_kaizen_home() -> anyhow::Result<()> { + let _home = test_home::TestHome::new()?; + let tmp = tempfile::tempdir()?; + let workspace = tmp.path().join("repo"); + std::fs::create_dir(&workspace)?; + kaizen::core::machine_registry::upsert_from_resolve(&workspace)?; + insert_legacy_home_workspace()?; + + let roots = kaizen::core::workspace::machine_workspaces(Some(&workspace))?; + + assert_eq!(roots, vec![std::fs::canonicalize(workspace)?]); + Ok(()) +} + +fn insert_legacy_home_workspace() -> anyhow::Result<()> { + let home = std::fs::canonicalize(std::env::var("HOME")?)?; + let db = kaizen::core::machine_registry::db_path().expect("test home"); + let connection = rusqlite::Connection::open(db)?; + connection.execute( + "INSERT INTO projects(path, name, first_seen_ms, last_seen_ms) VALUES (?1, 'home', 0, 1)", + [home.to_string_lossy().as_ref()], + )?; + Ok(()) +} + fn assert_raw_row(workspace: std::path::PathBuf) -> anyhow::Result<()> { let rows = kaizen::core::machine_registry::list_paths_including_missing()?; assert_eq!(rows, vec![workspace]); diff --git a/tests/spec/visualization_report.rs b/tests/spec/visualization_report.rs index 99a73c6..319e018 100644 --- a/tests/spec/visualization_report.rs +++ b/tests/spec/visualization_report.rs @@ -47,6 +47,38 @@ fn report_skips_activity_when_disabled() -> anyhow::Result<()> { Ok(()) } +#[test] +fn active_now_excludes_stale_open_sessions() -> anyhow::Result<()> { + let tmp = tempfile::tempdir()?; + let store = Store::open(&tmp.path().join("k.db"))?; + store.upsert_session(&session("active", SessionStatus::Running))?; + store.upsert_session(&session("stale", SessionStatus::Running))?; + store.append_event(&event_at("active", 0, 999_500))?; + store.append_event(&event_at("stale", 0, 1))?; + let mut query = query(None); + query.now_ms = 1_000_000; + + let report = build_report(&store, query)?; + + assert_eq!(report.totals.running_count, 1); + Ok(()) +} + +#[test] +fn legacy_hook_spans_feed_tool_insights() -> anyhow::Result<()> { + let tmp = tempfile::tempdir()?; + let store = Store::open(&tmp.path().join("k.db"))?; + store.upsert_session(&session("legacy", SessionStatus::Done))?; + store.append_event(&legacy_hook("legacy", 0, "PreToolUse"))?; + store.append_event(&legacy_hook("legacy", 1, "PostToolUse"))?; + + let report = build_report(&store, query(None))?; + + assert_eq!(report.totals.tool_call_count, 1); + assert_eq!(report.sessions[0].top_tools, vec![("Read".into(), 1)]); + Ok(()) +} + #[test] fn activity_bins_preserve_time_boundaries() -> anyhow::Result<()> { let tmp = tempfile::tempdir()?; @@ -151,3 +183,13 @@ fn event_at(session_id: &str, seq: u64, ts_ms: u64) -> Event { payload: json!({"input": {"path": "src/lib.rs"}}), } } + +fn legacy_hook(session_id: &str, seq: u64, name: &str) -> Event { + let mut event = event_at(session_id, seq, 2_000 + seq); + event.kind = EventKind::Hook; + event.source = EventSource::Hook; + event.tool = None; + event.tool_call_id = Some("legacy-call".into()); + event.payload = json!({"hook_event_name":name,"tool_name":"Read"}); + event +} diff --git a/tests/web_live.rs b/tests/web_live.rs new file mode 100644 index 0000000..af3a331 --- /dev/null +++ b/tests/web_live.rs @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use futures_util::{SinkExt, StreamExt}; +use kaizen::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus}; +use kaizen::store::Store; +use serde_json::{Value, json}; +use tokio::net::TcpListener; +use tokio::time::{Duration, timeout}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +#[tokio::test] +async fn subscribed_workspace_reports_database_changes_within_one_second() -> anyhow::Result<()> { + let fixture = Fixture::new()?; + let listener = TcpListener::bind("127.0.0.1:0").await?; + let (endpoint, _task) = kaizen::web::start_with_listener(listener).await?; + let (mut socket, _) = connect_async(fixture.ws_url(&endpoint)).await?; + socket + .send(Message::Text(fixture.subscribe().into())) + .await?; + assert_eq!(recv(&mut socket).await?["type"], "status"); + + fixture.append_event()?; + let changed = timeout(Duration::from_secs(1), recv(&mut socket)).await??; + + assert_eq!(changed["type"], "changed"); + assert_eq!(changed["workspace"], fixture.workspace_string()); + Ok(()) +} + +struct Fixture { + _temp: tempfile::TempDir, + workspace: std::path::PathBuf, + store: Store, +} + +impl Fixture { + fn new() -> anyhow::Result { + let temp = tempfile::tempdir()?; + let workspace = temp.path().join("repo"); + std::fs::create_dir_all(&workspace)?; + unsafe { + std::env::set_var("HOME", temp.path()); + std::env::set_var("KAIZEN_HOME", temp.path().join(".kaizen")); + std::env::set_var("KAIZEN_DAEMON", "0"); + } + let workspace = std::fs::canonicalize(workspace)?; + let store = Store::open(&kaizen::core::workspace::db_path(&workspace)?)?; + store.upsert_session(&session(&workspace))?; + Ok(Self { + _temp: temp, + workspace, + store, + }) + } + + fn ws_url(&self, endpoint: &kaizen::ipc::WebEndpoint) -> String { + format!("ws://{}/ws?token={}", endpoint.listen, endpoint.token) + } + + fn subscribe(&self) -> String { + json!({"type":"subscribe", "id":"live", "workspace":self.workspace}).to_string() + } + + fn append_event(&self) -> anyhow::Result<()> { + self.store.append_event(&event()) + } + + fn workspace_string(&self) -> String { + self.workspace.to_string_lossy().into_owned() + } +} + +fn session(workspace: &std::path::Path) -> SessionRecord { + SessionRecord { + id: "live-session".into(), + agent: "claude".into(), + model: None, + workspace: workspace.to_string_lossy().into_owned(), + started_at_ms: 1, + ended_at_ms: None, + status: SessionStatus::Running, + trace_path: String::new(), + start_commit: None, + end_commit: None, + branch: None, + dirty_start: None, + dirty_end: None, + repo_binding_source: None, + prompt_fingerprint: None, + parent_session_id: None, + agent_version: None, + os: None, + arch: None, + repo_file_count: None, + repo_total_loc: None, + } +} + +fn event() -> Event { + Event { + session_id: "live-session".into(), + seq: 0, + ts_ms: 2, + ts_exact: true, + kind: EventKind::ToolCall, + source: EventSource::Hook, + tool: Some("Read".into()), + tool_call_id: Some("call-1".into()), + tokens_in: None, + tokens_out: None, + reasoning_tokens: None, + cost_usd_e6: None, + stop_reason: None, + latency_ms: None, + ttft_ms: None, + retry_count: None, + context_used_tokens: None, + context_max_tokens: None, + cache_creation_tokens: None, + cache_read_tokens: None, + system_prompt_tokens: None, + payload: json!({}), + } +} + +async fn recv( + socket: &mut tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, +) -> anyhow::Result { + let Some(Ok(Message::Text(text))) = socket.next().await else { + anyhow::bail!("missing WebSocket message"); + }; + Ok(serde_json::from_str(&text)?) +}