diff --git a/Cargo.lock b/Cargo.lock index d963369..b03201f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,6 +416,15 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ae3f5d315924270530207e2a68396c3cc547f6dca3fbdca317cfb1a51edb593" +[[package]] +name = "caseless" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6fd507454086c8edfd769ca6ada439193cdb209c7681712ef6275cccbfe5d8" +dependencies = [ + "unicode-normalization", +] + [[package]] name = "castaway" version = "0.2.4" @@ -1857,6 +1866,7 @@ dependencies = [ "arc-swap", "axum", "blake3", + "caseless", "clap", "clap_complete", "crossterm", @@ -2702,9 +2712,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.14" +version = "0.11.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +checksum = "4fcb935c5bec503c2f0e306bdd3e58bb9029dcb14fa8d9ac76e3a5256ac0763e" dependencies = [ "aws-lc-rs", "bytes", @@ -4520,6 +4530,15 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-normalization" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd4f6878c9cb28d874b009da9e8d183b5abc80117c40bbd187a1fde336be6e8" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-segmentation" version = "1.13.3" diff --git a/Cargo.toml b/Cargo.toml index a05e6ea..ef48e08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -357,7 +357,8 @@ anyhow = "1" memchr = "2" tempfile = "3" tar = "0.4" -rusqlite = { version = "0.40", features = ["bundled"] } +rusqlite = { version = "0.40", features = ["bundled", "functions"] } +caseless = "0.2.2" rand = { version = "0.10", features = ["std_rng"] } tokio = { version = "1", features = ["full"] } notify = "8" diff --git a/docs/daemon.md b/docs/daemon.md index 66adc13..784e190 100644 --- a/docs/daemon.md +++ b/docs/daemon.md @@ -12,6 +12,7 @@ Kaizen can run a local daemon so one process owns store writes. | `kaizen daemon start --background` | Spawn daemon, wait until ready, print pid/socket/log/web, exit | | `kaizen daemon status` | Print `status: running` plus pid/uptime/queue/error/capture/web, or `status: stopped` plus socket path | | `kaizen daemon stop` | Request graceful daemon shutdown | +| `kaizen daemon restart` | Gracefully stop when running, then start in background and restore registered workspace capture | | `--no-daemon` or `KAIZEN_DAEMON=0` | Use direct SQLite mode | Runtime files live under `$KAIZEN_HOME` or `~/.kaizen`: @@ -58,3 +59,7 @@ remains compiled and supported for CI, smoke tests, and debugging. workspace scanner loop and records capture health for `daemon status`. `kaizen init --deep` also asks for provider proxy endpoints; unsupported agent config rewrites stay fail-open and are reported as partial deep capture. + +Daemon startup restores basic capture for every existing registered workspace. +Use `kaizen daemon restart` after daemon upgrades; existing SQLite sessions stay +available while transcript scanning resumes immediately. diff --git a/docs/web.md b/docs/web.md index cedc487..0b2eeac 100644 --- a/docs/web.md +++ b/docs/web.md @@ -23,20 +23,29 @@ kaizen open --no-browser The dashboard provides: -- automatic selection of the most recently active valid project, plus a manual - local-path fallback; +- project selection, manual local-path fallback, and refresh controls in the + responsive top navigation; - session, active-session, error, and cost totals; - project-level tool, attention, and telemetry-coverage insights; Tool Pattern lists the selected session's three most frequent recent shell commands; -- the latest 30 sessions for the selected project; +- 30 sessions per page, with Previous and Next controls; +- one session search field that ranks prompt matches first, then matches session + ID, agent, model, status, branch, and tool; - selected-session prompt, facts, recent events with bounded command details, nested tool spans, touched files, and top tools; - the exact bounded report under **Developer details**. -Selected-session detail is capped at 40 events, 40 spans, and 40 files. Those +Search accepts up to 256 characters. Search results and page controls use the +filtered count, while summary cards continue to show project-wide totals. +Changing the search starts at the first page. Automatic and manual refreshes +preserve the active search and page; if data changes and removes that page, +Kaizen returns to the last available page. + +Selected-session detail is capped at 40 events, 40 spans, and 40 files. These limits keep refresh latency and memory use predictable. The server watches the selected project's SQLite database and WAL; a committed change requests a new -snapshot within one second. **Refresh now** remains available for manual checks. +snapshot within one second. **Refresh now** remains available in the top +navigation for manual checks. `No completion` means Kaizen received activity but no final session event for at least 30 minutes. It does not mean the work failed. Models and prompts remain diff --git a/src/bin_kaizen/args/operate.rs b/src/bin_kaizen/args/operate.rs index ff53271..b2e8b83 100644 --- a/src/bin_kaizen/args/operate.rs +++ b/src/bin_kaizen/args/operate.rs @@ -35,6 +35,8 @@ pub(crate) enum DaemonCommand { }, /// Gracefully stop daemon. Stop, + /// Gracefully restart daemon in background. + Restart, /// Show daemon pid, uptime, queue depth, and last error. Status, } diff --git a/src/bin_kaizen/dispatch/operate_daemon.rs b/src/bin_kaizen/dispatch/operate_daemon.rs index 3ac761f..5f39c8a 100644 --- a/src/bin_kaizen/dispatch/operate_daemon.rs +++ b/src/bin_kaizen/dispatch/operate_daemon.rs @@ -9,6 +9,7 @@ pub(super) fn daemon(cmd: DaemonCommand) -> anyhow::Result<()> { println!("{}", kaizen::daemon::stop()?); Ok(()) } + DaemonCommand::Restart => daemon_restart(), DaemonCommand::Status => daemon_status(), } } @@ -72,6 +73,14 @@ fn daemon_start(background: bool) -> anyhow::Result<()> { Ok(()) } +fn daemon_restart() -> anyhow::Result<()> { + let started = kaizen::daemon::restart_background()?; + for line in background_start_lines(&started) { + println!("{line}"); + } + Ok(()) +} + fn background_start_lines(started: &kaizen::daemon::BackgroundStart) -> Vec { let lines = [ format!("daemon {}", background_state(started)), diff --git a/src/core/mod.rs b/src/core/mod.rs index 2f52b3b..82df464 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -13,6 +13,7 @@ pub mod legacy_import; pub mod machine_registry; pub mod paths; pub mod project_identity; +pub(crate) mod prompt_text; pub mod repo; pub mod safe_fs; pub mod session; diff --git a/src/core/prompt_text.rs b/src/core/prompt_text.rs new file mode 100644 index 0000000..d74fd6d --- /dev/null +++ b/src/core/prompt_text.rs @@ -0,0 +1,63 @@ +use serde_json::Value; + +const MAX_PROMPT_CHARS: usize = 8_000; + +pub(crate) fn from_value(value: &Value) -> Option { + direct(value) + .or_else(|| user_message(value)) + .map(|text| compact(&text)) +} + +fn direct(value: &Value) -> Option { + ["/prompt", "/user_prompt"] + .iter() + .find_map(|path| value.pointer(path)?.as_str().and_then(clean)) +} + +fn user_message(value: &Value) -> Option { + let message = value.get("message").unwrap_or(value); + (message.get("role")?.as_str()? == "user") + .then(|| content(message.get("content")?)) + .flatten() +} + +fn content(value: &Value) -> Option { + value.as_str().and_then(clean).or_else(|| { + value + .as_array()? + .iter() + .rev() + .find_map(|part| part.get("text")?.as_str().and_then(clean)) + }) +} + +fn clean(raw: &str) -> Option { + let value = objective(raw).unwrap_or(raw).trim(); + (!value.is_empty() && !ignored(value)).then(|| value.to_string()) +} + +fn objective(raw: &str) -> Option<&str> { + tagged(raw, "", "") + .or_else(|| tagged(raw, "", "")) +} + +fn tagged<'a>(raw: &'a str, open: &str, close: &str) -> Option<&'a str> { + raw.split_once(open)?.1.split_once(close).map(|pair| pair.0) +} + +fn ignored(value: &str) -> bool { + value.starts_with("") || value.starts_with("# AGENTS.md instructions") +} + +fn compact(value: &str) -> String { + let normalized = value.split_whitespace().collect::>().join(" "); + let truncated = normalized + .chars() + .take(MAX_PROMPT_CHARS) + .collect::(); + if normalized.chars().count() > MAX_PROMPT_CHARS { + format!("{truncated}...") + } else { + truncated + } +} diff --git a/src/daemon/background.rs b/src/daemon/background.rs index 89f6074..3be50fc 100644 --- a/src/daemon/background.rs +++ b/src/daemon/background.rs @@ -1,14 +1,18 @@ // SPDX-License-Identifier: AGPL-3.0-or-later //! Background daemon process startup. -use super::lifecycle::{RuntimePaths, runtime_paths, runtime_paths_for, try_status}; +use super::lifecycle::{ + DaemonStatusOutcome, RuntimePaths, runtime_paths, runtime_paths_for, status_outcome, +}; use crate::ipc::{DaemonStatus, WebEndpoint}; use anyhow::{Context, Result, anyhow}; use std::path::Path; use std::process::{Child, Command, Stdio}; use std::time::{Duration, Instant}; -const START_WAIT_MS: u64 = 2_000; +// Restore runs migrations, backfills, first scans before readiness. Keep slow startup bounded. +const START_WAIT_MS: u64 = 30_000; +const STOP_WAIT_MS: u64 = 2_000; #[derive(Debug, Clone)] pub struct BackgroundStart { @@ -26,8 +30,34 @@ pub fn start_background_for(workspace: &Path) -> Result { start_background_at(runtime_paths_for(workspace)?) } +pub fn restart_background() -> Result { + if !daemon_stopped()? { + super::stop()?; + wait_until_stopped()?; + } + start_background() +} + +fn wait_until_stopped() -> Result<()> { + let deadline = Instant::now() + Duration::from_millis(STOP_WAIT_MS); + while Instant::now() < deadline { + if daemon_stopped()? { + return Ok(()); + } + std::thread::sleep(Duration::from_millis(25)); + } + Err(anyhow!("daemon did not stop within {STOP_WAIT_MS}ms")) +} + +fn daemon_stopped() -> Result { + Ok(matches!( + super::status_outcome()?, + super::DaemonStatusOutcome::Stopped { .. } + )) +} + fn start_background_at(paths: RuntimePaths) -> Result { - if let Some(start) = running_start(&paths) { + if let Some(start) = status_start(&paths, true)? { return Ok(start); } std::fs::create_dir_all(&paths.dir)?; @@ -35,10 +65,15 @@ fn start_background_at(paths: RuntimePaths) -> Result { wait_until_ready(paths, &mut child) } -fn running_start(paths: &RuntimePaths) -> Option { - try_status() - .ok() - .map(|status| background_start(status, paths.clone(), true)) +fn status_start(paths: &RuntimePaths, already_running: bool) -> Result> { + match status_outcome()? { + DaemonStatusOutcome::Running(status) => Ok(Some(background_start( + status, + paths.clone(), + already_running, + ))), + DaemonStatusOutcome::Stopped { .. } => Ok(None), + } } fn spawn_background(paths: &RuntimePaths) -> Result { @@ -69,9 +104,7 @@ fn poll_start(child: &mut Child, paths: &RuntimePaths) -> Result Result { super::ensure_running()?; tokio::runtime::Runtime::new()?.block_on(request_async(request)) @@ -83,3 +86,10 @@ pub(super) async fn request_async(request: DaemonRequest) -> Result Result { + let timeout = Duration::from_millis(LIFECYCLE_TIMEOUT_MS); + tokio::time::timeout(timeout, request_async(request)) + .await + .map_err(|_| anyhow!("daemon IPC timed out after {LIFECYCLE_TIMEOUT_MS}ms"))? +} diff --git a/src/daemon/lifecycle.rs b/src/daemon/lifecycle.rs index d29cb46..75b6766 100644 --- a/src/daemon/lifecycle.rs +++ b/src/daemon/lifecycle.rs @@ -68,8 +68,9 @@ pub fn ensure_running_for(workspace: &Path) -> Result<()> { } pub fn try_status() -> Result { - let response = tokio::runtime::Runtime::new()? - .block_on(super::client::request_async(DaemonRequest::Status))?; + let response = tokio::runtime::Runtime::new()?.block_on( + super::client::request_lifecycle_async(DaemonRequest::Status), + )?; match response { DaemonResponse::Status(status) => Ok(status), DaemonResponse::Error { message, .. } => Err(anyhow!(message)), @@ -114,7 +115,7 @@ pub fn start_foreground() -> Result<()> { pub fn stop() -> Result { let response = tokio::runtime::Runtime::new()? - .block_on(super::client::request_async(DaemonRequest::Stop))?; + .block_on(super::client::request_lifecycle_async(DaemonRequest::Stop))?; match response { DaemonResponse::Ack { message } => Ok(message), DaemonResponse::Error { message, .. } => Err(anyhow!(message)), diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index e0fd162..fb01ae8 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -6,12 +6,13 @@ mod capture_status; mod client; mod lifecycle; mod proxy_task; +mod scanner_health; mod scanner_task; mod server; mod supervisor; mod worker; -pub use background::{BackgroundStart, start_background, start_background_for}; +pub use background::{BackgroundStart, restart_background, start_background, start_background_for}; pub use client::{ begin_observed_session_blocking, ensure_capture_blocking, ensure_proxy_blocking, hello_blocking, request_blocking, diff --git a/src/daemon/scanner_health.rs b/src/daemon/scanner_health.rs new file mode 100644 index 0000000..0101d9a --- /dev/null +++ b/src/daemon/scanner_health.rs @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +//! Transcript scanner health projection. + +use super::capture_status::component; +use crate::ipc::{CaptureComponent, CaptureComponentStatus, CaptureStatus}; + +const SCANNER: &str = "transcript-scanner"; +const ERROR_PREFIX: &str = "transcript-scanner:"; + +pub(super) fn pending() -> CaptureComponent { + component( + SCANNER, + CaptureComponentStatus::Partial, + Some("initial scan pending".into()), + ) +} + +pub(super) fn update(capture: &mut CaptureStatus, error: Option) { + capture.watchers.retain(|watcher| watcher.name != SCANNER); + capture.watchers.push(health_component(error.as_deref())); + capture + .errors + .retain(|message| !message.starts_with(ERROR_PREFIX)); + if let Some(message) = error { + capture.errors.push(format!("{ERROR_PREFIX} {message}")); + } +} + +fn health_component(error: Option<&str>) -> CaptureComponent { + match error { + Some(message) => component(SCANNER, CaptureComponentStatus::Error, Some(message.into())), + None => component(SCANNER, CaptureComponentStatus::Ready, None), + } +} diff --git a/src/daemon/scanner_task.rs b/src/daemon/scanner_task.rs index 59a323c..6605796 100644 --- a/src/daemon/scanner_task.rs +++ b/src/daemon/scanner_task.rs @@ -1,21 +1,27 @@ // SPDX-License-Identifier: AGPL-3.0-or-later //! Periodic transcript scanner owned by the daemon runtime. +use super::supervisor::Supervisor; use crate::store::Store; -use anyhow::Result; +use anyhow::{Result, anyhow}; use std::path::{Path, PathBuf}; use std::sync::{Mutex, OnceLock}; -pub(super) async fn scanner_loop(ws: PathBuf) { +pub(super) async fn scanner_loop(ws: PathBuf, supervisor: Supervisor) { loop { tokio::time::sleep(scan_interval(&ws)).await; - scan_once(ws.clone()).await; + let result = scan_once(ws.clone()).await; + supervisor.record_scan_result(&ws, result); } } -async fn scan_once(ws: PathBuf) { - if let Err(err) = tokio::task::spawn_blocking(move || scan_workspace(&ws)).await { - tracing::warn!(%err, "daemon scanner task join failed"); +pub(super) async fn scan_once(ws: PathBuf) -> Result<()> { + match tokio::task::spawn_blocking(move || scan_workspace(&ws)).await { + Ok(result) => result, + Err(err) => { + tracing::warn!(%err, "daemon scanner task join failed"); + Err(anyhow!("scanner task join failed: {err}")) + } } } diff --git a/src/daemon/server.rs b/src/daemon/server.rs index cf4a708..7ee5aa4 100644 --- a/src/daemon/server.rs +++ b/src/daemon/server.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: AGPL-3.0-or-later //! Daemon server loop and single request worker. +mod startup; + use super::supervisor::Supervisor; use super::worker::{Job, spawn_worker}; use super::{RuntimePaths, runtime_paths}; @@ -30,28 +32,16 @@ struct ServerState { pub async fn run_server() -> Result<()> { let paths = runtime_paths()?; - std::fs::create_dir_all(&paths.dir)?; - let _pid_lock = lock_pid(&paths)?; - remove_stale_socket(&paths.sock)?; - let listener = UnixListener::bind(&paths.sock) - .with_context(|| format!("bind daemon socket: {}", paths.sock.display()))?; - set_socket_private(&paths.sock)?; - let (web, _web_task) = crate::web::start(&paths.token).await?; - let (tx, rx) = mpsc::channel(128); - let state = ServerState { - started: Instant::now(), - queue_depth: Arc::new(AtomicUsize::new(0)), - last_error: Arc::new(Mutex::new(None)), - supervisor: Supervisor::default(), - tx, - web, - }; - spawn_worker(rx, state.queue_depth.clone(), state.last_error.clone()); + let (_pid_lock, listener, state) = startup::start(&paths).await?; + serve(listener, state).await +} + +async fn serve(listener: UnixListener, state: ServerState) -> Result<()> { loop { let (stream, _) = listener.accept().await?; - let state = state.clone(); + let client_state = state.clone(); tokio::spawn(async move { - if let Err(err) = handle_client(stream, state).await { + if let Err(err) = handle_client(stream, client_state).await { tracing::warn!(%err, "daemon client failed"); } }); diff --git a/src/daemon/server/startup.rs b/src/daemon/server/startup.rs new file mode 100644 index 0000000..d0b2ec6 --- /dev/null +++ b/src/daemon/server/startup.rs @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +//! Daemon startup ordering: restore capture before socket and Web readiness. + +use super::*; + +pub(super) async fn start(paths: &RuntimePaths) -> Result<(File, UnixListener, ServerState)> { + let (pid_lock, supervisor) = prepare_runtime(paths).await?; + let (listener, state) = bind_services(paths, supervisor).await?; + Ok((pid_lock, listener, state)) +} + +async fn prepare_runtime(paths: &RuntimePaths) -> Result<(File, Supervisor)> { + std::fs::create_dir_all(&paths.dir)?; + let pid_lock = lock_pid(paths)?; + remove_stale_socket(&paths.sock)?; + let supervisor = Supervisor::default(); + supervisor.restore_registered().await?; + Ok((pid_lock, supervisor)) +} + +async fn bind_services( + paths: &RuntimePaths, + supervisor: Supervisor, +) -> Result<(UnixListener, ServerState)> { + let listener = bind_listener(paths)?; + let (web, _web_task) = crate::web::start(&paths.token).await?; + let (state, rx) = new_state(supervisor, web); + spawn_worker(rx, state.queue_depth.clone(), state.last_error.clone()); + Ok((listener, state)) +} + +fn bind_listener(paths: &RuntimePaths) -> Result { + let listener = UnixListener::bind(&paths.sock) + .with_context(|| format!("bind daemon socket: {}", paths.sock.display()))?; + set_socket_private(&paths.sock)?; + Ok(listener) +} + +fn new_state(supervisor: Supervisor, web: WebEndpoint) -> (ServerState, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(128); + let state = ServerState { + started: Instant::now(), + queue_depth: Arc::new(AtomicUsize::new(0)), + last_error: Arc::new(Mutex::new(None)), + supervisor, + tx, + web, + }; + (state, rx) +} diff --git a/src/daemon/supervisor.rs b/src/daemon/supervisor.rs index a6297b5..0976734 100644 --- a/src/daemon/supervisor.rs +++ b/src/daemon/supervisor.rs @@ -1,12 +1,11 @@ // SPDX-License-Identifier: AGPL-3.0-or-later //! Long-lived daemon capture tasks: transcript scanner loops and LLM proxy tasks. -use super::capture_status::{base_status, component}; +mod scanning; + +use super::capture_status::component; use super::proxy_task::{normalize_provider, providers_for_agent, start_proxy}; -use super::scanner_task::scanner_loop; -use crate::ipc::{ - CaptureComponent, CaptureComponentStatus, CaptureStatus, ObservedSession, ProxyEndpoint, -}; +use crate::ipc::{CaptureComponentStatus, CaptureStatus, ObservedSession, ProxyEndpoint}; use anyhow::Result; use std::collections::{BTreeMap, HashMap, HashSet}; use std::path::{Path, PathBuf}; @@ -45,17 +44,6 @@ impl Supervisor { .unwrap_or_default() } - pub(super) async fn ensure_capture(&self, workspace: String, deep: bool) -> CaptureStatus { - let ws = workspace_path(&workspace); - let mut status = base_status(&ws, deep); - status.watchers.push(self.ensure_scanner(ws.clone())); - if deep { - self.add_deep_capture(&ws, &mut status).await; - } - self.remember_capture(status.clone()); - status - } - pub(super) async fn ensure_proxy( &self, workspace: String, @@ -110,19 +98,6 @@ impl Supervisor { )); } - fn ensure_scanner(&self, ws: PathBuf) -> CaptureComponent { - let key = ws.to_string_lossy().to_string(); - if self - .inner - .lock() - .map(|mut st| st.scanners.insert(key)) - .unwrap_or(false) - { - tokio::spawn(scanner_loop(ws)); - } - component("transcript-scanner", CaptureComponentStatus::Ready, None) - } - fn existing_proxy(&self, key: &ProxyKey) -> Option { let mut st = self.inner.lock().ok()?; if st.proxies.get(key).is_some_and(|h| h.task.is_finished()) { diff --git a/src/daemon/supervisor/scanning.rs b/src/daemon/supervisor/scanning.rs new file mode 100644 index 0000000..df6c2b6 --- /dev/null +++ b/src/daemon/supervisor/scanning.rs @@ -0,0 +1,83 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +//! Daemon scanner lifecycle and health projection. + +use super::{Supervisor, workspace_path}; +use crate::daemon::capture_status::base_status; +use crate::daemon::scanner_health; +use crate::daemon::scanner_task::{scan_once, scanner_loop}; +use crate::ipc::CaptureStatus; +use anyhow::Result; +use std::path::{Path, PathBuf}; + +impl Supervisor { + pub(in crate::daemon) async fn restore_registered(&self) -> Result<()> { + for workspace in crate::core::machine_registry::list_paths()? { + self.ensure_capture(workspace.to_string_lossy().to_string(), false) + .await; + } + Ok(()) + } + + pub(in crate::daemon) async fn ensure_capture( + &self, + workspace: String, + deep: bool, + ) -> CaptureStatus { + let ws = workspace_path(&workspace); + let status = self.pending_capture(&ws, deep).await; + let key = status.workspace.clone(); + self.remember_capture(status.clone()); + self.ensure_scanner(ws).await; + self.capture(&key).unwrap_or(status) + } + + async fn pending_capture(&self, ws: &Path, deep: bool) -> CaptureStatus { + let mut status = base_status(ws, deep); + status.watchers.push(scanner_health::pending()); + if deep { + self.add_deep_capture(ws, &mut status).await; + } + status + } + + async fn ensure_scanner(&self, ws: PathBuf) { + let spawn = self.reserve_scanner(&ws); + self.record_scan_result(&ws, scan_once(ws.clone()).await); + if spawn { + tokio::spawn(scanner_loop(ws, self.clone())); + } + } + + pub(in crate::daemon) fn record_scan_result(&self, ws: &Path, result: Result<()>) { + let error = result.err().map(|err| format!("{err:#}")); + log_error(ws, error.as_deref()); + self.update_scan_health(ws, error); + } + + fn update_scan_health(&self, ws: &Path, error: Option) { + let key = ws.to_string_lossy(); + if let Ok(mut state) = self.inner.lock() + && let Some(capture) = state.captures.get_mut(key.as_ref()) + { + scanner_health::update(capture, error); + } + } + + fn reserve_scanner(&self, ws: &Path) -> bool { + let key = ws.to_string_lossy().to_string(); + self.inner + .lock() + .map(|mut state| state.scanners.insert(key)) + .unwrap_or(false) + } + + fn capture(&self, workspace: &str) -> Option { + self.inner.lock().ok()?.captures.get(workspace).cloned() + } +} + +fn log_error(ws: &Path, error: Option<&str>) { + if let Some(error) = error { + tracing::warn!(workspace = %ws.display(), %error, "daemon scanner failed"); + } +} diff --git a/src/store/mod.rs b/src/store/mod.rs index c80d8dd..ed42215 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -23,5 +23,6 @@ pub use sqlite::SessionFilter; pub use sqlite::SessionOutcomeRow; pub use sqlite::SessionPage; pub use sqlite::SessionSampleAgg; +pub(crate) use sqlite::SessionSearchQuery; pub use sqlite::Store; pub use sqlite::SummaryStats; diff --git a/src/store/sqlite/connection_functions.rs b/src/store/sqlite/connection_functions.rs new file mode 100644 index 0000000..c68e9da --- /dev/null +++ b/src/store/sqlite/connection_functions.rs @@ -0,0 +1,12 @@ +use anyhow::Result; +use rusqlite::Connection; +use rusqlite::functions::FunctionFlags; + +pub(super) fn register(conn: &Connection) -> Result<()> { + let flags = FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC; + conn.create_scalar_function("kaizen_casefold", 1, flags, |context| { + let value = context.get::(0)?; + Ok(caseless::default_case_fold_str(&value)) + })?; + Ok(()) +} diff --git a/src/store/sqlite/event_write.rs b/src/store/sqlite/event_write.rs index a61b16f..bff7833 100644 --- a/src/store/sqlite/event_write.rs +++ b/src/store/sqlite/event_write.rs @@ -85,6 +85,7 @@ impl Store { if self.conn.changes() == 0 { return Ok(()); } + super::session_search_projection::project(&self.conn, e)?; if projector_legacy_mode() { index_event_derived(&self.conn, e)?; rebuild_tool_spans_for_session(&self.conn, &e.session_id)?; diff --git a/src/store/sqlite/maintenance.rs b/src/store/sqlite/maintenance.rs index 6f816f5..3bed523 100644 --- a/src/store/sqlite/maintenance.rs +++ b/src/store/sqlite/maintenance.rs @@ -42,6 +42,10 @@ impl Store { &format!("DELETE FROM events WHERE session_id IN ({sub_old_sessions})"), params![cutoff_ms], )?; + tx.execute( + &format!("DELETE FROM session_search_prompts WHERE session_id IN ({sub_old_sessions})"), + params![cutoff_ms], + )?; tx.execute( &format!("DELETE FROM files_touched WHERE session_id IN ({sub_old_sessions})"), params![cutoff_ms], diff --git a/src/store/sqlite/mod.rs b/src/store/sqlite/mod.rs index b72c267..a29bc3a 100644 --- a/src/store/sqlite/mod.rs +++ b/src/store/sqlite/mod.rs @@ -36,6 +36,7 @@ pub use contracts::{ SummaryStats, SyncStatusSnapshot, ToolSpanSyncRow, }; pub(super) use sql::{PAIN_HOTSPOTS_SQL, SESSION_SELECT, TOOL_RANK_ROWS_SQL}; +pub(crate) use visualization::SessionSearchQuery; #[derive(Clone)] struct SpanTreeCacheEntry { @@ -53,6 +54,7 @@ pub struct Store { } mod artifact_windows; +mod connection_functions; mod constants; mod contracts; mod evals; @@ -78,6 +80,7 @@ mod samples; mod schema; mod session_identity; mod session_read; +mod session_search_projection; mod session_window; mod sessions; mod sql; @@ -130,12 +133,14 @@ impl Store { } .with_context(|| format!("open db: {}", path.display()))?; schema::apply_pragmas(&conn, mode)?; + connection_functions::register(&conn)?; if mode == StoreOpenMode::ReadWrite { for sql in schema::MIGRATIONS { conn.execute_batch(sql)?; } schema::ensure_schema_columns(&conn)?; session_identity::backfill(&conn)?; + session_search_projection::backfill(&conn)?; outbox_migration::migrate(&conn, path.parent().unwrap_or_else(|| Path::new(".")))?; } let root = path @@ -152,6 +157,7 @@ impl Store { fn initialize_empty(conn: &Connection) -> Result<()> { schema::apply_pragmas(conn, StoreOpenMode::ReadWrite)?; + connection_functions::register(conn)?; schema::MIGRATIONS .iter() .try_for_each(|statement| conn.execute_batch(statement))?; diff --git a/src/store/sqlite/schema.rs b/src/store/sqlite/schema.rs index dbded21..d53c8c0 100644 --- a/src/store/sqlite/schema.rs +++ b/src/store/sqlite/schema.rs @@ -24,6 +24,11 @@ pub(super) const MIGRATIONS: &[&str] = &[ cost_usd_e6 INTEGER, payload TEXT NOT NULL )", + "CREATE TABLE IF NOT EXISTS session_search_prompts ( + session_id TEXT PRIMARY KEY, + event_seq INTEGER NOT NULL, + prompt TEXT NOT NULL + )", "CREATE INDEX IF NOT EXISTS events_session_idx ON events(session_id)", "CREATE TABLE IF NOT EXISTS files_touched ( id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/src/store/sqlite/session_search_projection.rs b/src/store/sqlite/session_search_projection.rs new file mode 100644 index 0000000..d06657c --- /dev/null +++ b/src/store/sqlite/session_search_projection.rs @@ -0,0 +1,62 @@ +use super::*; + +const UPSERT: &str = "INSERT INTO session_search_prompts (session_id, event_seq, prompt) + VALUES (?1, ?2, ?3) ON CONFLICT(session_id) DO UPDATE SET + event_seq = excluded.event_seq, prompt = excluded.prompt + WHERE excluded.event_seq < session_search_prompts.event_seq"; + +const BACKFILL: &str = "SELECT e.session_id, e.seq, e.payload FROM events e + LEFT JOIN session_search_prompts p ON p.session_id = e.session_id + WHERE p.session_id IS NULL ORDER BY e.session_id, e.seq"; + +pub(super) fn backfill(conn: &Connection) -> Result<()> { + backfill_prompts(conn)?; + fill_empty(conn) +} + +fn backfill_prompts(conn: &Connection) -> Result<()> { + let mut statement = conn.prepare(BACKFILL)?; + let mut rows = statement.query([])?; + project_rows(conn, &mut rows) +} + +fn project_rows(conn: &Connection, rows: &mut rusqlite::Rows<'_>) -> Result<()> { + while let Some(row) = rows.next()? { + let payload: String = row.get(2)?; + project_raw(conn, row.get(0)?, row.get(1)?, &payload)?; + } + Ok(()) +} + +pub(super) fn project(conn: &Connection, event: &Event) -> Result<()> { + let Some(prompt) = crate::core::prompt_text::from_value(&event.payload) else { + return Ok(()); + }; + upsert(conn, &event.session_id, event.seq as i64, &prompt) +} + +fn project_raw(conn: &Connection, id: String, seq: i64, raw: &str) -> Result<()> { + let Some(prompt) = serde_json::from_str(raw) + .ok() + .as_ref() + .and_then(crate::core::prompt_text::from_value) + else { + return Ok(()); + }; + upsert(conn, &id, seq, &prompt) +} + +fn upsert(conn: &Connection, id: &str, seq: i64, prompt: &str) -> Result<()> { + conn.execute(UPSERT, params![id, seq, prompt])?; + Ok(()) +} + +fn fill_empty(conn: &Connection) -> Result<()> { + conn.execute( + "INSERT INTO session_search_prompts (session_id, event_seq, prompt) + SELECT s.id, ?1, '' FROM sessions s LEFT JOIN session_search_prompts p + ON p.session_id = s.id WHERE p.session_id IS NULL", + [i64::MAX], + )?; + Ok(()) +} diff --git a/src/store/sqlite/visualization.rs b/src/store/sqlite/visualization.rs index a99a87d..2d80be1 100644 --- a/src/store/sqlite/visualization.rs +++ b/src/store/sqlite/visualization.rs @@ -4,8 +4,12 @@ use super::*; mod activity; mod aggregates; +mod session_search; +mod session_search_sql; mod sessions; +pub(crate) use session_search::SessionSearchQuery; + #[derive(Clone, Copy, Debug, Default)] pub(crate) struct TokenRead { pub(crate) input: u64, diff --git a/src/store/sqlite/visualization/session_search.rs b/src/store/sqlite/visualization/session_search.rs new file mode 100644 index 0000000..a034ea2 --- /dev/null +++ b/src/store/sqlite/visualization/session_search.rs @@ -0,0 +1,140 @@ +use super::SessionSummaryRead; +use super::session_search_sql; +use crate::store::Store; +use anyhow::{Result, ensure}; +use rusqlite::named_params; + +pub(crate) struct SessionSearchQuery { + workspace: String, + text: String, + offset: i64, + limit: i64, + now_ms: i64, + with_status: bool, +} + +pub(super) struct SessionRowsPage { + pub(super) rows: Vec, + pub(super) filtered_total: usize, + pub(super) offset: usize, + pub(super) limit: usize, +} + +impl SessionSearchQuery { + pub(crate) fn new( + workspace: &str, + text: &str, + offset: usize, + limit: usize, + now_ms: u64, + ) -> Result { + validate(offset, limit, now_ms)?; + Ok(valid_query(workspace, text, offset, limit, now_ms)) + } +} + +pub(super) fn read(store: &Store, query: &SessionSearchQuery) -> Result { + let filtered_total = count(store, query)?; + Ok(SessionRowsPage { + rows: rows(store, query)?, + filtered_total, + offset: query.offset as usize, + limit: query.limit as usize, + }) +} + +fn count(store: &Store, query: &SessionSearchQuery) -> Result { + let count = if query.with_status { + count_status(store, query)? + } else { + count_ordinary(store, query)? + }; + Ok(count as usize) +} + +fn rows(store: &Store, query: &SessionSearchQuery) -> Result> { + if query.with_status { + rows_status(store, query) + } else { + rows_ordinary(store, query) + } +} + +fn count_status(store: &Store, query: &SessionSearchQuery) -> Result { + let sql = session_search_sql::count(true); + let values = named_params! {":workspace": query.workspace, ":text": query.text, + ":now_ms": query.now_ms}; + Ok(store.conn().query_row(&sql, values, |row| row.get(0))?) +} + +fn count_ordinary(store: &Store, query: &SessionSearchQuery) -> Result { + let sql = session_search_sql::count(false); + let values = named_params! {":workspace": query.workspace, ":text": query.text}; + Ok(store.conn().query_row(&sql, values, |row| row.get(0))?) +} + +fn rows_status(store: &Store, query: &SessionSearchQuery) -> Result> { + let mut statement = store.conn().prepare(&session_search_sql::page(true))?; + let values = named_params! {":workspace": query.workspace, ":text": query.text, + ":now_ms": query.now_ms, ":limit": query.limit, ":offset": query.offset}; + let rows = statement.query_map(values, super::sessions::summary_row)?; + Ok(rows.collect::>>()?) +} + +fn rows_ordinary(store: &Store, query: &SessionSearchQuery) -> Result> { + let mut statement = store.conn().prepare(&session_search_sql::page(false))?; + let values = named_params! {":workspace": query.workspace, ":text": query.text, + ":limit": query.limit, ":offset": query.offset}; + let rows = statement.query_map(values, super::sessions::summary_row)?; + Ok(rows.collect::>>()?) +} + +fn searches_status(text: &str) -> bool { + let text = caseless::default_case_fold_str(text); + !text.is_empty() + && ["active", "errored", "orphaned", "idle", "done"] + .iter() + .any(|status| status.contains(&text)) +} + +fn valid_query( + workspace: &str, + text: &str, + offset: usize, + limit: usize, + now_ms: u64, +) -> SessionSearchQuery { + SessionSearchQuery { + workspace: workspace.into(), + text: text.into(), + offset: offset as i64, + limit: limit as i64, + now_ms: now_ms as i64, + with_status: searches_status(text), + } +} + +fn validate(offset: usize, limit: usize, now_ms: u64) -> Result<()> { + ensure!(limit > 0, "session page limit must be positive"); + ensure!(offset <= i64::MAX as usize, "session page offset too large"); + ensure!(limit <= i64::MAX as usize, "session page limit too large"); + ensure!( + now_ms <= i64::MAX as u64, + "session search timestamp too large" + ); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::searches_status; + + #[test] + fn status_path_only_for_visible_status_matches() { + assert!(!searches_status("")); + assert!(!searches_status("needle")); + assert!(!searches_status("running")); + assert!(searches_status("ctiv")); + assert!(searches_status("IDLE")); + } +} diff --git a/src/store/sqlite/visualization/session_search_sql.rs b/src/store/sqlite/visualization/session_search_sql.rs new file mode 100644 index 0000000..b67a885 --- /dev/null +++ b/src/store/sqlite/visualization/session_search_sql.rs @@ -0,0 +1,137 @@ +const ORDINARY_CANDIDATES: &str = "WITH candidates AS MATERIALIZED ( + SELECT s.id, s.agent, s.model, s.branch, s.started_at_ms, + COALESCE(p.prompt, '') prompt + FROM sessions s + LEFT JOIN session_search_prompts p ON p.session_id = s.id + WHERE s.workspace = :workspace +)"; + +const STATUS_CANDIDATES: &str = "WITH status_rollup AS MATERIALIZED ( + SELECT e.session_id, MAX(e.ts_ms) last_event_ms, + SUM(e.kind = 'Error') error_count + FROM events e JOIN sessions w ON w.id = e.session_id + WHERE w.workspace = :workspace GROUP BY e.session_id +), candidates AS MATERIALIZED ( + SELECT s.id, s.agent, s.model, s.branch, s.started_at_ms, + COALESCE(p.prompt, '') prompt, + CASE + WHEN COALESCE(r.error_count, 0) > 0 THEN 'errored' + WHEN s.status = 'Done' OR s.ended_at_ms IS NOT NULL THEN 'done' + WHEN r.last_event_ms IS NULL THEN 'idle' + WHEN :now_ms - r.last_event_ms <= 300000 THEN 'active' + WHEN :now_ms - r.last_event_ms >= 1800000 THEN 'orphaned' + ELSE 'idle' + END derived_status + FROM sessions s + LEFT JOIN session_search_prompts p ON p.session_id = s.id + LEFT JOIN status_rollup r ON r.session_id = s.id + WHERE s.workspace = :workspace +)"; + +const MATCH_PREFIX: &str = "(:text = '' + OR instr(kaizen_casefold(c.prompt), kaizen_casefold(:text)) > 0 + OR instr(kaizen_casefold(c.id), kaizen_casefold(:text)) > 0 + OR instr(kaizen_casefold(c.agent), kaizen_casefold(:text)) > 0 + OR instr(kaizen_casefold(COALESCE(c.model, '')), kaizen_casefold(:text)) > 0 + OR instr(kaizen_casefold(COALESCE(c.branch, '')), kaizen_casefold(:text)) > 0"; + +const TOOL_MATCHES: &str = " + OR EXISTS (SELECT 1 FROM events e WHERE e.session_id = c.id + AND instr(kaizen_casefold(COALESCE(e.tool, '')), kaizen_casefold(:text)) > 0) + OR EXISTS (SELECT 1 FROM tool_spans t WHERE t.session_id = c.id + AND instr(kaizen_casefold(COALESCE(t.tool, '')), kaizen_casefold(:text)) > 0)"; + +const PAGE_SUFFIX: &str = ", matched AS MATERIALIZED ( + SELECT c.id, CASE WHEN instr(kaizen_casefold(c.prompt), kaizen_casefold(:text)) > 0 + THEN 0 ELSE 1 END match_tier + FROM candidates c WHERE "; + +const SUMMARY_SUFFIX: &str = " + ORDER BY match_tier, c.started_at_ms DESC, c.id ASC LIMIT :limit OFFSET :offset +), rollup AS ( + SELECT e.session_id, MAX(e.ts_ms) last_event_ms, COUNT(*) event_count, + SUM(e.kind = 'Error') error_count, SUM(e.kind = 'ToolCall') tool_call_count, + COALESCE(SUM(e.cost_usd_e6), 0) cost_usd_e6, + COALESCE(SUM(e.tokens_in), 0) tokens_in, COALESCE(SUM(e.tokens_out), 0) tokens_out, + COALESCE(SUM(e.reasoning_tokens), 0) reasoning_tokens, + COALESCE(SUM(e.cache_read_tokens), 0) cache_read_tokens, + COALESCE(SUM(e.cache_creation_tokens), 0) cache_creation_tokens + FROM events e JOIN matched m ON m.id = e.session_id GROUP BY e.session_id +) +SELECT s.id, s.agent, s.model, s.workspace, s.started_at_ms, s.ended_at_ms, + s.status, s.trace_path, s.start_commit, s.end_commit, s.branch, s.dirty_start, s.dirty_end, + s.repo_binding_source, s.prompt_fingerprint, s.parent_session_id, s.agent_version, s.os, s.arch, + s.repo_file_count, s.repo_total_loc, a.last_event_ms, COALESCE(a.event_count, 0), + COALESCE(a.error_count, 0), COALESCE(a.tool_call_count, 0), COALESCE(a.cost_usd_e6, 0), + COALESCE(a.tokens_in, 0), COALESCE(a.tokens_out, 0), COALESCE(a.reasoning_tokens, 0), + COALESCE(a.cache_read_tokens, 0), COALESCE(a.cache_creation_tokens, 0) +FROM matched m JOIN sessions s ON s.id = m.id LEFT JOIN rollup a ON a.session_id = s.id +ORDER BY m.match_tier, s.started_at_ms DESC, s.id ASC"; + +pub(super) fn count(with_status: bool) -> String { + let candidates = candidates(with_status); + let matches = matches(with_status); + format!("{candidates} SELECT COUNT(*) FROM candidates c WHERE {matches}") +} + +pub(super) fn page(with_status: bool) -> String { + let candidates = candidates(with_status); + let matches = matches(with_status); + format!("{candidates}{PAGE_SUFFIX}{matches}{SUMMARY_SUFFIX}") +} + +fn candidates(with_status: bool) -> &'static str { + if with_status { + STATUS_CANDIDATES + } else { + ORDINARY_CANDIDATES + } +} + +fn matches(with_status: bool) -> String { + let status = if with_status { + "\n OR instr(kaizen_casefold(c.derived_status), kaizen_casefold(:text)) > 0" + } else { + "" + }; + format!("{MATCH_PREFIX}{status}{TOOL_MATCHES})") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::Store; + use rusqlite::named_params; + + #[test] + fn ordinary_sql_omits_status_rollup() { + for sql in [count(false), page(false)] { + assert!(!sql.contains("status_rollup"), "{sql}"); + } + } + + #[test] + fn visible_status_sql_includes_status_rollup() { + for sql in [count(true), page(true)] { + assert!(sql.contains("status_rollup"), "{sql}"); + } + } + + #[test] + fn ordinary_page_plan_omits_status_rollup() -> anyhow::Result<()> { + let temp = tempfile::tempdir()?; + let store = Store::open(&temp.path().join("kaizen.db"))?; + let plan = explain(&store, &page(false))?; + assert!(plan.iter().all(|step| !step.contains("status_rollup"))); + Ok(()) + } + + fn explain(store: &Store, sql: &str) -> anyhow::Result> { + let sql = format!("EXPLAIN QUERY PLAN {sql}"); + let mut statement = store.conn().prepare(&sql)?; + let values = named_params! {":workspace": "/ws", ":text": "needle", + ":limit": 30_i64, ":offset": 0_i64}; + let rows = statement.query_map(values, |row| row.get(3))?; + Ok(rows.collect::>>()?) + } +} diff --git a/src/store/sqlite/visualization/sessions.rs b/src/store/sqlite/visualization/sessions.rs index 2cf22d7..234b1bb 100644 --- a/src/store/sqlite/visualization/sessions.rs +++ b/src/store/sqlite/visualization/sessions.rs @@ -1,39 +1,12 @@ +use super::session_search::{self, SessionRowsPage, SessionSearchQuery}; use super::{SessionSummaryRead, TokenRead}; use crate::store::Store; -use crate::visualization::{TokenTotals, TraceSummary, derive_status}; +use crate::visualization::{SessionPageMeta, TokenTotals, TraceSummary, derive_status}; use anyhow::Result; -use rusqlite::params; +use rusqlite::params_from_iter; use std::collections::HashMap; -const SUMMARIES_SQL: &str = " -WITH recent AS MATERIALIZED ( - SELECT id, agent, model, workspace, started_at_ms, ended_at_ms, - status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end, - repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch, - repo_file_count, repo_total_loc FROM sessions WHERE workspace = ?1 - ORDER BY started_at_ms DESC, id ASC LIMIT ?2 -), rollup AS ( - SELECT e.session_id, MAX(e.ts_ms) last_event_ms, COUNT(*) event_count, - SUM(e.kind = 'Error') error_count, SUM(e.kind = 'ToolCall') tool_call_count, - COALESCE(SUM(e.cost_usd_e6), 0) cost_usd_e6, - COALESCE(SUM(e.tokens_in), 0) tokens_in, COALESCE(SUM(e.tokens_out), 0) tokens_out, - COALESCE(SUM(e.reasoning_tokens), 0) reasoning_tokens, - COALESCE(SUM(e.cache_read_tokens), 0) cache_read_tokens, - COALESCE(SUM(e.cache_creation_tokens), 0) cache_creation_tokens - FROM events e JOIN recent r ON r.id = e.session_id GROUP BY e.session_id -) -SELECT r.*, a.last_event_ms, COALESCE(a.event_count, 0), COALESCE(a.error_count, 0), - COALESCE(a.tool_call_count, 0), COALESCE(a.cost_usd_e6, 0), COALESCE(a.tokens_in, 0), - COALESCE(a.tokens_out, 0), COALESCE(a.reasoning_tokens, 0), - COALESCE(a.cache_read_tokens, 0), COALESCE(a.cache_creation_tokens, 0) -FROM recent r LEFT JOIN rollup a ON a.session_id = r.id -ORDER BY r.started_at_ms DESC, r.id ASC"; - -const TOP_TOOLS_SQL: &str = " -WITH recent AS MATERIALIZED (SELECT id FROM sessions WHERE workspace = ?1 - ORDER BY started_at_ms DESC, id ASC LIMIT ?2), counts AS ( - SELECT t.session_id, t.tool, COUNT(*) count FROM tool_spans t - JOIN recent r ON r.id = t.session_id WHERE t.tool <> '' +const TOP_TOOLS_SUFFIX: &str = ") AND t.tool <> '' GROUP BY t.session_id, t.tool ), ranked AS ( SELECT session_id, tool, count, ROW_NUMBER() OVER ( @@ -44,35 +17,63 @@ SELECT session_id, tool, count FROM ranked WHERE rank <= 5 ORDER BY session_id, impl Store { pub(crate) fn visualization_sessions( &self, - workspace: &str, - limit: usize, + query: &SessionSearchQuery, now_ms: u64, - ) -> Result> { - let mut rows = summary_rows(self, workspace, limit)?; - let mut tools = top_tools(self, workspace, limit)?; - rows.iter_mut() - .for_each(|row| row.top_tools = tools.remove(&row.session.id).unwrap_or_default()); - Ok(rows.into_iter().map(|row| summary(row, now_ms)).collect()) + ) -> Result<(Vec, SessionPageMeta)> { + let mut page = session_search::read(self, query)?; + attach_tools(self, &mut page.rows)?; + let meta = page_meta(&page); + Ok(( + page.rows + .into_iter() + .map(|row| summary(row, now_ms)) + .collect(), + meta, + )) } } -fn summary_rows(store: &Store, workspace: &str, limit: usize) -> Result> { - let mut statement = store.conn().prepare(SUMMARIES_SQL)?; - let rows = statement.query_map(params![workspace, sql_limit(limit)], summary_row)?; - rows.map(|row| row.map_err(Into::into)).collect() +fn attach_tools(store: &Store, rows: &mut [SessionSummaryRead]) -> Result<()> { + let ids = rows + .iter() + .map(|row| row.session.id.as_str()) + .collect::>(); + let mut tools = top_tools(store, &ids)?; + rows.iter_mut() + .for_each(|row| row.top_tools = tools.remove(&row.session.id).unwrap_or_default()); + Ok(()) } -fn top_tools( - store: &Store, - workspace: &str, - limit: usize, -) -> Result>> { - let mut statement = store.conn().prepare(TOP_TOOLS_SQL)?; - let rows = statement.query_map(params![workspace, sql_limit(limit)], tool_row)?; +fn top_tools(store: &Store, ids: &[&str]) -> Result>> { + if ids.is_empty() { + return Ok(HashMap::new()); + } + let mut statement = store.conn().prepare(&top_tools_sql(ids.len()))?; + let rows = statement.query_map(params_from_iter(ids), tool_row)?; let rows = rows.collect::>>()?; Ok(rows.into_iter().fold(HashMap::new(), add_tool)) } +fn top_tools_sql(count: usize) -> String { + let slots = (1..=count) + .map(|n| format!("?{n}")) + .collect::>() + .join(","); + format!( + "WITH counts AS (SELECT t.session_id, t.tool, COUNT(*) count FROM tool_spans t WHERE t.session_id IN ({slots}{TOP_TOOLS_SUFFIX}" + ) +} + +fn page_meta(page: &SessionRowsPage) -> SessionPageMeta { + let shown = page.offset.saturating_add(page.rows.len()); + SessionPageMeta { + filtered_total: page.filtered_total, + offset: page.offset, + limit: page.limit, + next_offset: (shown < page.filtered_total).then_some(shown), + } +} + fn add_tool( mut tools: HashMap>, row: (String, String, u64), @@ -85,7 +86,7 @@ fn tool_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, String, u64)> Ok((row.get(0)?, row.get(1)?, row.get::<_, i64>(2)? as u64)) } -fn summary_row(row: &rusqlite::Row<'_>) -> rusqlite::Result { +pub(super) fn summary_row(row: &rusqlite::Row<'_>) -> rusqlite::Result { Ok(SessionSummaryRead { session: super::super::rows::session_row(row)?, last_event_ms: optional(row, 21)?, @@ -117,10 +118,6 @@ fn value(row: &rusqlite::Row<'_>, index: usize) -> rusqlite::Result { row.get::<_, i64>(index).map(|value| value as u64) } -fn sql_limit(limit: usize) -> i64 { - limit.min(i64::MAX as usize) as i64 -} - fn summary(row: SessionSummaryRead, now_ms: u64) -> TraceSummary { let (status, status_reason) = derive_status(&row.session, row.last_event_ms, row.error_count, now_ms); diff --git a/src/ui/tui/background.rs b/src/ui/tui/background.rs index fc11186..2c19dba 100644 --- a/src/ui/tui/background.rs +++ b/src/ui/tui/background.rs @@ -90,6 +90,7 @@ fn visualization_query(workspace: String) -> VisualizationQuery { now_ms: now_ms(), include_activity: true, select_latest: false, + session_search: Default::default(), limits: TUI_VISUALIZATION_LIMITS, } } diff --git a/src/visualization/build.rs b/src/visualization/build.rs index 0058f74..0cb4e37 100644 --- a/src/visualization/build.rs +++ b/src/visualization/build.rs @@ -2,13 +2,12 @@ use super::activity::activity; use super::types::*; -use crate::core::event::{SessionRecord, SessionStatus}; -use crate::store::Store; +use crate::core::event::SessionRecord; +use crate::store::{SessionSearchQuery, Store}; use anyhow::{Result, ensure}; use serde::{Deserialize, Serialize}; const ACTIVE_TTL_MS: u64 = 5 * 60_000; -const ORPHAN_TTL_MS: u64 = 30 * 60_000; #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct VisualizationLimits { @@ -31,6 +30,8 @@ pub struct VisualizationQuery { pub include_activity: bool, /// Fall back to latest session when requested selection is absent or invalid. pub select_latest: bool, + #[serde(default)] + pub session_search: SessionSearchInput, pub limits: VisualizationLimits, } @@ -58,13 +59,27 @@ pub(crate) fn build_report_observed( validate(&query.limits)?; let active_since_ms = query.now_ms.saturating_sub(ACTIVE_TTL_MS); let (totals, quality) = store.visualization_totals(&query.workspace, active_since_ms)?; - let sessions = - store.visualization_sessions(&query.workspace, query.limits.sessions, query.now_ms)?; + let search = SessionSearchQuery::new( + &query.workspace, + &query.session_search.q, + query.session_search.offset, + query.limits.sessions, + query.now_ms, + )?; + let (sessions, session_page) = store.visualization_sessions(&search, query.now_ms)?; let selected = selected_detail(store, &query, sessions.first())?; let activity = activity_report(store, &query)?; let materialized = counts(&sessions, &selected); Ok(BuiltReport { - report: report(query, totals, quality, sessions, selected, activity), + report: report( + query, + totals, + quality, + sessions, + session_page, + selected, + activity, + ), materialized, }) } @@ -157,6 +172,7 @@ fn report( totals: VisualizationTotals, quality: DataQuality, sessions: Vec, + session_page: SessionPageMeta, selected: Option, activity: ActivityReport, ) -> VisualizationReport { @@ -166,31 +182,8 @@ fn report( totals, activity, sessions, + session_page, selected, quality, } } - -pub(crate) fn derive_status( - session: &SessionRecord, - last_event_ms: Option, - error_count: u64, - now_ms: u64, -) -> (DerivedStatus, String) { - if error_count > 0 { - return (DerivedStatus::Errored, "error event".into()); - } - if session.status == SessionStatus::Done || session.ended_at_ms.is_some() { - return (DerivedStatus::Done, "session ended".into()); - } - match last_event_ms { - Some(ts) if now_ms.saturating_sub(ts) <= ACTIVE_TTL_MS => { - (DerivedStatus::Active, "recent event".into()) - } - Some(ts) if now_ms.saturating_sub(ts) >= ORPHAN_TTL_MS => { - (DerivedStatus::Orphaned, "stale open session".into()) - } - Some(_) => (DerivedStatus::Idle, "no recent event".into()), - None => (DerivedStatus::Idle, "no events".into()), - } -} diff --git a/src/visualization/mod.rs b/src/visualization/mod.rs index a914230..6abe261 100644 --- a/src/visualization/mod.rs +++ b/src/visualization/mod.rs @@ -3,8 +3,10 @@ mod activity; mod build; +mod status; mod types; -pub(crate) use build::{BuiltReport, build_report_observed, derive_status}; +pub(crate) use build::{BuiltReport, build_report_observed}; pub use build::{VisualizationLimits, VisualizationQuery, build_report}; +pub(crate) use status::derive_status; pub use types::*; diff --git a/src/visualization/status.rs b/src/visualization/status.rs new file mode 100644 index 0000000..04651b2 --- /dev/null +++ b/src/visualization/status.rs @@ -0,0 +1,33 @@ +use super::DerivedStatus; +use crate::core::event::{SessionRecord, SessionStatus}; + +const ACTIVE_TTL_MS: u64 = 5 * 60_000; +const ORPHAN_TTL_MS: u64 = 30 * 60_000; + +pub(crate) fn derive_status( + session: &SessionRecord, + last_event_ms: Option, + error_count: u64, + now_ms: u64, +) -> (DerivedStatus, String) { + if error_count > 0 { + return (DerivedStatus::Errored, "error event".into()); + } + if session.status == SessionStatus::Done || session.ended_at_ms.is_some() { + return (DerivedStatus::Done, "session ended".into()); + } + stale_status(last_event_ms, now_ms) +} + +fn stale_status(last_event_ms: Option, now_ms: u64) -> (DerivedStatus, String) { + match last_event_ms { + Some(ts) if now_ms.saturating_sub(ts) <= ACTIVE_TTL_MS => { + (DerivedStatus::Active, "recent event".into()) + } + Some(ts) if now_ms.saturating_sub(ts) >= ORPHAN_TTL_MS => { + (DerivedStatus::Orphaned, "stale open session".into()) + } + Some(_) => (DerivedStatus::Idle, "no recent event".into()), + None => (DerivedStatus::Idle, "no events".into()), + } +} diff --git a/src/visualization/types.rs b/src/visualization/types.rs index 94ca6e0..ceae641 100644 --- a/src/visualization/types.rs +++ b/src/visualization/types.rs @@ -11,10 +11,26 @@ pub struct VisualizationReport { pub totals: VisualizationTotals, pub activity: ActivityReport, pub sessions: Vec, + #[serde(default)] + pub session_page: SessionPageMeta, pub selected: Option, pub quality: DataQuality, } +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct SessionSearchInput { + pub q: String, + pub offset: usize, +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct SessionPageMeta { + pub filtered_total: usize, + pub offset: usize, + pub limit: usize, + pub next_offset: Option, +} + #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct VisualizationTotals { pub session_count: u64, diff --git a/src/web/assets.rs b/src/web/assets.rs index 726243b..2def1a3 100644 --- a/src/web/assets.rs +++ b/src/web/assets.rs @@ -1,8 +1,12 @@ // SPDX-License-Identifier: AGPL-3.0-or-later //! Embedded web assets. +pub mod brand; + +use axum::Router; use axum::http::header::{CACHE_CONTROL, CONTENT_TYPE}; use axum::response::IntoResponse; +use axum::routing::get; pub const INDEX: &str = include_str!("assets/index.html"); pub const TOKENS: &str = include_str!("assets/kaizen-tokens.css"); @@ -14,6 +18,23 @@ pub const RENDER_JS: &str = include_str!("assets/kaizen-render.js"); pub const RAW_JS: &str = include_str!("assets/kaizen-raw.js"); pub const DETAIL_JS: &str = include_str!("assets/kaizen-detail.js"); pub const FORMAT_JS: &str = include_str!("assets/kaizen-format.js"); +pub const SESSIONS_JS: &str = include_str!("assets/kaizen-sessions.js"); +pub const SESSION_CONTROLS_JS: &str = include_str!("assets/kaizen-session-controls.js"); +pub const SNAPSHOT_STATE_JS: &str = include_str!("assets/kaizen-snapshot-state.js"); + +pub fn session_router() -> Router +where + S: Clone + Send + Sync + 'static, +{ + Router::new() + .route("/assets/kaizen-format.js", get(format_js)) + .route("/assets/kaizen-sessions.js", get(sessions_js)) + .route( + "/assets/kaizen-session-controls.js", + get(session_controls_js), + ) + .route("/assets/kaizen-snapshot-state.js", get(snapshot_state_js)) +} pub async fn index() -> impl IntoResponse { content("text/html; charset=utf-8", INDEX) @@ -55,143 +76,23 @@ pub async fn format_js() -> impl IntoResponse { content("application/javascript", FORMAT_JS) } +pub async fn sessions_js() -> impl IntoResponse { + content("application/javascript", SESSIONS_JS) +} + +pub async fn session_controls_js() -> impl IntoResponse { + content("application/javascript", SESSION_CONTROLS_JS) +} + +pub async fn snapshot_state_js() -> impl IntoResponse { + content("application/javascript", SNAPSHOT_STATE_JS) +} + fn content(kind: &'static str, body: &'static str) -> impl IntoResponse { ([(CONTENT_TYPE, kind), (CACHE_CONTROL, "no-store")], body) } #[cfg(test)] -mod tests { - use super::{CSS, DETAIL_JS, FORMAT_JS, INDEX, JS, RAW_JS, RENDER_JS, TOKENS}; - - #[test] - fn web_assets_do_not_seed_fixture_values() { - let forbidden = [ - "web-smoke", - "tool:bash", - "web-rule", - "web review", - ">40<", - "Capture pipeline", - "Transcript tail -> events", - "screen-heading", - "screen-title", - "screen-copy", - ]; - for needle in forbidden { - assert!(!INDEX.contains(needle), "index contains {needle}"); - assert!(!JS.contains(needle), "js contains {needle}"); - } - } - - #[test] - fn web_assets_expose_read_only_visualization_screen() { - for needle in [ - ">kaizen_", - ">mcp/", - "onclick=", - "role=\"button\"", - "data-tool", - "data-feature", - "href=\"/session-detail\"", - "href=\"/experiments\"", - "href=\"/settings\"", - ] { - assert!(!INDEX.contains(needle), "index contains {needle}"); - } - assert!(INDEX.contains("= 4.5); - assert!(contrast(muted, [0xff, 0xfa, 0xf0]) >= 4.5); - } - - fn contrast(a: [u8; 3], b: [u8; 3]) -> f64 { - let (bright, dark) = match luminance(a) > luminance(b) { - true => (luminance(a), luminance(b)), - false => (luminance(b), luminance(a)), - }; - (bright + 0.05) / (dark + 0.05) - } - - fn luminance(rgb: [u8; 3]) -> f64 { - let [red, green, blue] = rgb.map(channel); - 0.2126 * red + 0.7152 * green + 0.0722 * blue - } - - fn channel(value: u8) -> f64 { - let value = f64::from(value) / 255.0; - match value <= 0.04045 { - true => value / 12.92, - false => ((value + 0.055) / 1.055).powf(2.4), - } - } -} +mod contract_tests; +#[cfg(test)] +mod tests; diff --git a/src/web/assets/brand.rs b/src/web/assets/brand.rs new file mode 100644 index 0000000..694b9c2 --- /dev/null +++ b/src/web/assets/brand.rs @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +//! Kaizen brand assets. + +use axum::http::header::{CACHE_CONTROL, CONTENT_TYPE}; +use axum::response::IntoResponse; +use axum::{Router, routing::get}; + +const KANJI: &[u8] = include_bytes!("kaizen-kanji.png"); + +pub fn router() -> Router +where + S: Clone + Send + Sync + 'static, +{ + Router::new().route("/assets/kaizen-kanji.png", get(kanji)) +} + +pub async fn kanji() -> impl IntoResponse { + ( + [(CONTENT_TYPE, "image/png"), (CACHE_CONTROL, "no-store")], + KANJI, + ) +} diff --git a/src/web/assets/contract_tests.rs b/src/web/assets/contract_tests.rs new file mode 100644 index 0000000..9649fd6 --- /dev/null +++ b/src/web/assets/contract_tests.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +use super::{INDEX, JS, SESSION_CONTROLS_JS, SNAPSHOT_STATE_JS}; + +#[test] +fn project_controls_live_in_header_navigation() { + let header = INDEX.find("
").unwrap(); + let controls = INDEX.find("class=\"project-controls\"").unwrap(); + assert!(header < controls && controls < header_end); + assert!(INDEX.contains("