Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ anyhow = "1"
memchr = "2"
tempfile = "3"
tar = "0.4"
rusqlite = { version = "0.40", features = ["bundled"] }
rusqlite = { version = "0.40", features = ["bundled", "functions"] }
caseless = "0.2.2"
rand = { version = "0.10", features = ["std_rng"] }
tokio = { version = "1", features = ["full"] }
notify = "8"
Expand Down
5 changes: 5 additions & 0 deletions docs/daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Kaizen can run a local daemon so one process owns store writes.
| `kaizen daemon start --background` | Spawn daemon, wait until ready, print pid/socket/log/web, exit |
| `kaizen daemon status` | Print `status: running` plus pid/uptime/queue/error/capture/web, or `status: stopped` plus socket path |
| `kaizen daemon stop` | Request graceful daemon shutdown |
| `kaizen daemon restart` | Gracefully stop when running, then start in background and restore registered workspace capture |
| `--no-daemon` or `KAIZEN_DAEMON=0` | Use direct SQLite mode |

Runtime files live under `$KAIZEN_HOME` or `~/.kaizen`:
Expand Down Expand Up @@ -58,3 +59,7 @@ remains compiled and supported for CI, smoke tests, and debugging.
workspace scanner loop and records capture health for `daemon status`. `kaizen
init --deep` also asks for provider proxy endpoints; unsupported agent config
rewrites stay fail-open and are reported as partial deep capture.

Daemon startup restores basic capture for every existing registered workspace.
Use `kaizen daemon restart` after daemon upgrades; existing SQLite sessions stay
available while transcript scanning resumes immediately.
19 changes: 14 additions & 5 deletions docs/web.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,29 @@ kaizen open --no-browser

The dashboard provides:

- automatic selection of the most recently active valid project, plus a manual
local-path fallback;
- project selection, manual local-path fallback, and refresh controls in the
responsive top navigation;
- session, active-session, error, and cost totals;
- project-level tool, attention, and telemetry-coverage insights; Tool Pattern
lists the selected session's three most frequent recent shell commands;
- the latest 30 sessions for the selected project;
- 30 sessions per page, with Previous and Next controls;
- one session search field that ranks prompt matches first, then matches session
ID, agent, model, status, branch, and tool;
- selected-session prompt, facts, recent events with bounded command details,
nested tool spans, touched files, and top tools;
- the exact bounded report under **Developer details**.

Selected-session detail is capped at 40 events, 40 spans, and 40 files. Those
Search accepts up to 256 characters. Search results and page controls use the
filtered count, while summary cards continue to show project-wide totals.
Changing the search starts at the first page. Automatic and manual refreshes
preserve the active search and page; if data changes and removes that page,
Kaizen returns to the last available page.

Selected-session detail is capped at 40 events, 40 spans, and 40 files. These
limits keep refresh latency and memory use predictable. The server watches the
selected project's SQLite database and WAL; a committed change requests a new
snapshot within one second. **Refresh now** remains available for manual checks.
snapshot within one second. **Refresh now** remains available in the top
navigation for manual checks.

`No completion` means Kaizen received activity but no final session event for
at least 30 minutes. It does not mean the work failed. Models and prompts remain
Expand Down
2 changes: 2 additions & 0 deletions src/bin_kaizen/args/operate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub(crate) enum DaemonCommand {
},
/// Gracefully stop daemon.
Stop,
/// Gracefully restart daemon in background.
Restart,
/// Show daemon pid, uptime, queue depth, and last error.
Status,
}
Expand Down
9 changes: 9 additions & 0 deletions src/bin_kaizen/dispatch/operate_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(super) fn daemon(cmd: DaemonCommand) -> anyhow::Result<()> {
println!("{}", kaizen::daemon::stop()?);
Ok(())
}
DaemonCommand::Restart => daemon_restart(),
DaemonCommand::Status => daemon_status(),
}
}
Expand Down Expand Up @@ -72,6 +73,14 @@ fn daemon_start(background: bool) -> anyhow::Result<()> {
Ok(())
}

fn daemon_restart() -> anyhow::Result<()> {
let started = kaizen::daemon::restart_background()?;
for line in background_start_lines(&started) {
println!("{line}");
}
Ok(())
}

fn background_start_lines(started: &kaizen::daemon::BackgroundStart) -> Vec<String> {
let lines = [
format!("daemon {}", background_state(started)),
Expand Down
1 change: 1 addition & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod legacy_import;
pub mod machine_registry;
pub mod paths;
pub mod project_identity;
pub(crate) mod prompt_text;
pub mod repo;
pub mod safe_fs;
pub mod session;
Expand Down
63 changes: 63 additions & 0 deletions src/core/prompt_text.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use serde_json::Value;

const MAX_PROMPT_CHARS: usize = 8_000;

pub(crate) fn from_value(value: &Value) -> Option<String> {
direct(value)
.or_else(|| user_message(value))
.map(|text| compact(&text))
}

fn direct(value: &Value) -> Option<String> {
["/prompt", "/user_prompt"]
.iter()
.find_map(|path| value.pointer(path)?.as_str().and_then(clean))
}

fn user_message(value: &Value) -> Option<String> {
let message = value.get("message").unwrap_or(value);
(message.get("role")?.as_str()? == "user")
.then(|| content(message.get("content")?))
.flatten()
}

fn content(value: &Value) -> Option<String> {
value.as_str().and_then(clean).or_else(|| {
value
.as_array()?
.iter()
.rev()
.find_map(|part| part.get("text")?.as_str().and_then(clean))
})
}

fn clean(raw: &str) -> Option<String> {
let value = objective(raw).unwrap_or(raw).trim();
(!value.is_empty() && !ignored(value)).then(|| value.to_string())
}

fn objective(raw: &str) -> Option<&str> {
tagged(raw, "<objective>", "</objective>")
.or_else(|| tagged(raw, "<untrusted_objective>", "</untrusted_objective>"))
}

fn tagged<'a>(raw: &'a str, open: &str, close: &str) -> Option<&'a str> {
raw.split_once(open)?.1.split_once(close).map(|pair| pair.0)
}

fn ignored(value: &str) -> bool {
value.starts_with("<environment_context>") || value.starts_with("# AGENTS.md instructions")
}

fn compact(value: &str) -> String {
let normalized = value.split_whitespace().collect::<Vec<_>>().join(" ");
let truncated = normalized
.chars()
.take(MAX_PROMPT_CHARS)
.collect::<String>();
if normalized.chars().count() > MAX_PROMPT_CHARS {
format!("{truncated}...")
} else {
truncated
}
}
53 changes: 43 additions & 10 deletions src/daemon/background.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
//! Background daemon process startup.

use super::lifecycle::{RuntimePaths, runtime_paths, runtime_paths_for, try_status};
use super::lifecycle::{
DaemonStatusOutcome, RuntimePaths, runtime_paths, runtime_paths_for, status_outcome,
};
use crate::ipc::{DaemonStatus, WebEndpoint};
use anyhow::{Context, Result, anyhow};
use std::path::Path;
use std::process::{Child, Command, Stdio};
use std::time::{Duration, Instant};

const START_WAIT_MS: u64 = 2_000;
// Restore runs migrations, backfills, first scans before readiness. Keep slow startup bounded.
const START_WAIT_MS: u64 = 30_000;
const STOP_WAIT_MS: u64 = 2_000;

#[derive(Debug, Clone)]
pub struct BackgroundStart {
Expand All @@ -26,19 +30,50 @@ pub fn start_background_for(workspace: &Path) -> Result<BackgroundStart> {
start_background_at(runtime_paths_for(workspace)?)
}

pub fn restart_background() -> Result<BackgroundStart> {
if !daemon_stopped()? {
super::stop()?;
wait_until_stopped()?;
}
start_background()
}

fn wait_until_stopped() -> Result<()> {
let deadline = Instant::now() + Duration::from_millis(STOP_WAIT_MS);
while Instant::now() < deadline {
if daemon_stopped()? {
return Ok(());
}
std::thread::sleep(Duration::from_millis(25));
}
Err(anyhow!("daemon did not stop within {STOP_WAIT_MS}ms"))
}

fn daemon_stopped() -> Result<bool> {
Ok(matches!(
super::status_outcome()?,
super::DaemonStatusOutcome::Stopped { .. }
))
}

fn start_background_at(paths: RuntimePaths) -> Result<BackgroundStart> {
if let Some(start) = running_start(&paths) {
if let Some(start) = status_start(&paths, true)? {
return Ok(start);
}
std::fs::create_dir_all(&paths.dir)?;
let mut child = spawn_background(&paths)?;
wait_until_ready(paths, &mut child)
}

fn running_start(paths: &RuntimePaths) -> Option<BackgroundStart> {
try_status()
.ok()
.map(|status| background_start(status, paths.clone(), true))
fn status_start(paths: &RuntimePaths, already_running: bool) -> Result<Option<BackgroundStart>> {
match status_outcome()? {
DaemonStatusOutcome::Running(status) => Ok(Some(background_start(
status,
paths.clone(),
already_running,
))),
DaemonStatusOutcome::Stopped { .. } => Ok(None),
}
}

fn spawn_background(paths: &RuntimePaths) -> Result<Child> {
Expand Down Expand Up @@ -69,9 +104,7 @@ fn poll_start(child: &mut Child, paths: &RuntimePaths) -> Result<Option<Backgrou
if let Some(status) = child.try_wait().context("poll daemon child")? {
return Err(early_exit(status, paths));
}
Ok(try_status()
.ok()
.map(|status| background_start(status, paths.clone(), false)))
status_start(paths, false)
}

fn background_start(
Expand Down
10 changes: 10 additions & 0 deletions src/daemon/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ use crate::ipc::{
};
use anyhow::{Context, Result, anyhow};
use std::path::Path;
use std::time::Duration;
use tokio::net::UnixStream;

const LIFECYCLE_TIMEOUT_MS: u64 = 500;

pub fn request_blocking(request: DaemonRequest) -> Result<DaemonResponse> {
super::ensure_running()?;
tokio::runtime::Runtime::new()?.block_on(request_async(request))
Expand Down Expand Up @@ -83,3 +86,10 @@ pub(super) async fn request_async(request: DaemonRequest) -> Result<DaemonRespon
write_frame(&mut stream, &request).await?;
read_frame(&mut stream).await
}

pub(super) async fn request_lifecycle_async(request: DaemonRequest) -> Result<DaemonResponse> {
let timeout = Duration::from_millis(LIFECYCLE_TIMEOUT_MS);
tokio::time::timeout(timeout, request_async(request))
.await
.map_err(|_| anyhow!("daemon IPC timed out after {LIFECYCLE_TIMEOUT_MS}ms"))?
}
7 changes: 4 additions & 3 deletions src/daemon/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ pub fn ensure_running_for(workspace: &Path) -> Result<()> {
}

pub fn try_status() -> Result<DaemonStatus> {
let response = tokio::runtime::Runtime::new()?
.block_on(super::client::request_async(DaemonRequest::Status))?;
let response = tokio::runtime::Runtime::new()?.block_on(
super::client::request_lifecycle_async(DaemonRequest::Status),
)?;
match response {
DaemonResponse::Status(status) => Ok(status),
DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
Expand Down Expand Up @@ -114,7 +115,7 @@ pub fn start_foreground() -> Result<()> {

pub fn stop() -> Result<String> {
let response = tokio::runtime::Runtime::new()?
.block_on(super::client::request_async(DaemonRequest::Stop))?;
.block_on(super::client::request_lifecycle_async(DaemonRequest::Stop))?;
match response {
DaemonResponse::Ack { message } => Ok(message),
DaemonResponse::Error { message, .. } => Err(anyhow!(message)),
Expand Down
3 changes: 2 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ mod capture_status;
mod client;
mod lifecycle;
mod proxy_task;
mod scanner_health;
mod scanner_task;
mod server;
mod supervisor;
mod worker;

pub use background::{BackgroundStart, start_background, start_background_for};
pub use background::{BackgroundStart, restart_background, start_background, start_background_for};
pub use client::{
begin_observed_session_blocking, ensure_capture_blocking, ensure_proxy_blocking,
hello_blocking, request_blocking,
Expand Down
34 changes: 34 additions & 0 deletions src/daemon/scanner_health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
//! Transcript scanner health projection.

use super::capture_status::component;
use crate::ipc::{CaptureComponent, CaptureComponentStatus, CaptureStatus};

const SCANNER: &str = "transcript-scanner";
const ERROR_PREFIX: &str = "transcript-scanner:";

pub(super) fn pending() -> CaptureComponent {
component(
SCANNER,
CaptureComponentStatus::Partial,
Some("initial scan pending".into()),
)
}

pub(super) fn update(capture: &mut CaptureStatus, error: Option<String>) {
capture.watchers.retain(|watcher| watcher.name != SCANNER);
capture.watchers.push(health_component(error.as_deref()));
capture
.errors
.retain(|message| !message.starts_with(ERROR_PREFIX));
if let Some(message) = error {
capture.errors.push(format!("{ERROR_PREFIX} {message}"));
}
}

fn health_component(error: Option<&str>) -> CaptureComponent {
match error {
Some(message) => component(SCANNER, CaptureComponentStatus::Error, Some(message.into())),
None => component(SCANNER, CaptureComponentStatus::Ready, None),
}
}
Loading
Loading