diff --git a/src/commands/flush_metrics_db.rs b/src/commands/flush_metrics_db.rs index 13fad65019..5e0b8316fd 100644 --- a/src/commands/flush_metrics_db.rs +++ b/src/commands/flush_metrics_db.rs @@ -1,6 +1,6 @@ //! Handle flush-metrics-db command (kept for manual human use). //! -//! Drains the metrics database queue by uploading batches to the API. +//! Uploads pending metrics database rows to the API. use crate::api::{ApiClient, ApiContext, upload_metrics_with_retry}; use crate::metrics::db::MetricsDatabase; @@ -69,9 +69,10 @@ pub fn handle_flush_metrics_db(_args: &[String]) { record_ids.push(record.id); } else { total_invalid += 1; - // Invalid JSON - delete the record + // Invalid JSON cannot upload successfully. Mark it delivered so + // future flushes can continue past the malformed historical row. if let Ok(mut db_lock) = db.lock() { - let _ = db_lock.delete_records(&[record.id]); + let _ = db_lock.mark_records_delivered(&[record.id], current_unix_ts()); } } } @@ -92,10 +93,10 @@ pub fn handle_flush_metrics_db(_args: &[String]) { " ✓ batch {} - uploaded {} events", total_batches, event_count ); - // Success - delete ALL records from this batch - // Validation errors are logged to Sentry and won't succeed on retry + // Success - keep rows as history and mark them delivered. + // Validation errors are logged to Sentry and won't succeed on retry. if let Ok(mut db_lock) = db.lock() { - let _ = db_lock.delete_records(&record_ids); + let _ = db_lock.mark_records_delivered(&record_ids, current_unix_ts()); } } Err(e) => { @@ -111,7 +112,7 @@ pub fn handle_flush_metrics_db(_args: &[String]) { if total_invalid > 0 { eprintln!( - "flush-metrics-db: discarded {} invalid record(s)", + "flush-metrics-db: marked {} invalid record(s) delivered", total_invalid ); } @@ -121,3 +122,10 @@ pub fn handle_flush_metrics_db(_args: &[String]) { total_uploaded, total_batches ); } + +fn current_unix_ts() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} diff --git a/src/commands/git_ai_handlers.rs b/src/commands/git_ai_handlers.rs index 8b24c0282c..cd03f3f9f3 100644 --- a/src/commands/git_ai_handlers.rs +++ b/src/commands/git_ai_handlers.rs @@ -53,6 +53,7 @@ pub fn handle_git_ai(args: &[String]) { | "install-hooks" | "install" | "uninstall-hooks" + | "usage" ); if needs_daemon { use crate::daemon::telemetry_handle::{ @@ -108,6 +109,9 @@ pub fn handle_git_ai(args: &[String]) { } handle_stats(&args[1..]); } + "usage" => { + commands::usage::handle_usage(&args[1..]); + } "status" => { commands::status::handle_status(&args[1..]); } @@ -335,6 +339,9 @@ fn print_help() { ); eprintln!(" stats [commit] Show AI authorship statistics for a commit"); eprintln!(" --json Output in JSON format"); + eprintln!(" usage Show local AI usage statistics"); + eprintln!(" --period <1d|3d|7d|30d> Time window (default: 30d)"); + eprintln!(" --json Output in JSON format"); eprintln!(" status Show uncommitted AI authorship status (debug)"); eprintln!(" --json Output in JSON format"); eprintln!(" show Display authorship logs for a revision or range"); diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 8bf97f32a1..90bbd22ba6 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -23,4 +23,5 @@ pub mod show_prompt; pub mod squash_authorship; pub mod status; pub mod upgrade; +pub mod usage; pub mod whoami; diff --git a/src/commands/usage.rs b/src/commands/usage.rs new file mode 100644 index 0000000000..6ebb630a29 --- /dev/null +++ b/src/commands/usage.rs @@ -0,0 +1,615 @@ +//! `git-ai usage` — local statistics from persisted metric events. + +use crate::metrics::local_stats::{ + BucketGranularity, LocalActivityStats, RepoActivitySummary, compute_all, +}; +use std::collections::HashSet; +use std::time::{SystemTime, UNIX_EPOCH}; + +pub fn handle_usage(args: &[String]) { + let mut json = false; + let mut period = "30d".to_string(); + let mut repo_filter: Option = None; + + let mut i = 0; + while i < args.len() { + match args[i].as_str() { + "--json" => json = true, + "--period" if i + 1 < args.len() => { + period = args[i + 1].clone(); + i += 1; + } + "--repo" if i + 1 < args.len() => { + // Normalize: strip protocol prefix so both "https://github.com/org/repo" + // and "github.com/org/repo" resolve to the same substring match. + repo_filter = Some(strip_protocol(args[i + 1].as_str()).to_string()); + i += 1; + } + "--help" | "-h" => { + print_help(); + return; + } + other => { + eprintln!("Unknown argument: {}", other); + eprintln!("Run 'git-ai usage --help' for usage."); + std::process::exit(1); + } + } + i += 1; + } + + let (since_ts, period_label, granularity) = match period.as_str() { + "1d" => ( + days_ago(1), + "last 1 day".to_string(), + BucketGranularity::Daily, + ), + "3d" => ( + days_ago(3), + "last 3 days".to_string(), + BucketGranularity::Daily, + ), + "7d" => ( + days_ago(7), + "last 7 days".to_string(), + BucketGranularity::Daily, + ), + "30d" => ( + days_ago(30), + "last 30 days".to_string(), + BucketGranularity::Weekly, + ), + other => { + eprintln!("Unknown period '{}'. Use 1d, 3d, 7d, or 30d.", other); + std::process::exit(1); + } + }; + + // Fetch events once and derive both views from the same snapshot so the + // per-repo breakdown totals are always consistent with the headline stats. + let (stats, repos) = + match compute_all(since_ts, period_label, granularity, repo_filter.as_deref()) { + Ok(pair) => pair, + Err(e) => { + eprintln!("error: {}", e); + std::process::exit(1); + } + }; + + // When filtering by repo, bail out early if nothing matched. + // Include human_lines/diff_added_lines so human-only periods aren't + // falsely reported as empty (commits.total only counts AI-involved commits). + // Also include checkpoint lines so checkpoint-only activity isn't missed. + let no_data = stats.commits.total == 0 + && stats.commits.human_lines == 0 + && stats.commits.diff_added_lines == 0 + && stats.sessions.total == 0 + && stats.checkpoints.ai_lines_added == 0 + && stats.checkpoints.human_lines_added == 0 + && stats.tokens.input + + stats.tokens.output + + stats.tokens.cache_read + + stats.tokens.cache_creation + == 0; + if no_data { + if let Some(ref filter) = repo_filter { + eprintln!( + "No data found for '{}' in the {} window.", + filter, stats.period_label + ); + eprintln!("Try --period 30d or a different substring."); + } else { + eprintln!( + "No activity data found for the {} window.", + stats.period_label + ); + } + std::process::exit(1); + } + + if json { + match serde_json::to_string_pretty(&stats) { + Ok(s) => println!("{}", s), + Err(e) => { + eprintln!("error serializing JSON: {}", e); + std::process::exit(1); + } + } + } else { + print_terminal(&stats, &repos, repo_filter.as_deref()); + } +} + +fn days_ago(days: u64) -> u32 { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + now.saturating_sub(days * 24 * 3600).min(u32::MAX as u64) as u32 +} + +fn print_help() { + eprintln!("git-ai usage - Show local activity statistics"); + eprintln!(); + eprintln!("Usage: git-ai usage [options]"); + eprintln!(); + eprintln!("Options:"); + eprintln!(" --period <1d|3d|7d|30d> Time window (default: 30d)"); + eprintln!( + " --repo Filter to a repository (substring match, https:// optional)" + ); + eprintln!(" --json Output as JSON"); + eprintln!(" --help Show this help"); + eprintln!(); + eprintln!("Statistics are sourced from locally recorded metric events."); + eprintln!("Metric rows older than approximately 45 days are pruned locally."); +} + +fn print_terminal( + stats: &LocalActivityStats, + repos: &[RepoActivitySummary], + repo_filter: Option<&str>, +) { + const GRAY: &str = "\x1b[90m"; + const BOLD: &str = "\x1b[1m"; + const RESET: &str = "\x1b[0m"; + const BAR_WIDTH: u32 = 20; + + if let Some(repo) = repo_filter { + let display = strip_protocol(repo); + if repos.len() > 1 { + println!( + "{BOLD}git-ai usage{RESET} {GRAY}— {} repos matching '{}' · {}{RESET}", + repos.len(), + display, + stats.period_label + ); + } else { + // Single match: show the full matched URL, not just the search term. + let matched = repos + .first() + .map(|r| strip_protocol(&r.repo_url)) + .unwrap_or(display); + println!( + "{BOLD}git-ai usage{RESET} {GRAY}— {} · {}{RESET}", + matched, stats.period_label + ); + } + } else { + println!( + "{BOLD}git-ai usage{RESET} {GRAY}— {}{RESET}", + stats.period_label + ); + } + + // --- Top bar: AI vs Human split --- + println!(); + let total_lines = stats.commits.ai_lines + stats.commits.human_lines; + if let Some(ai_pct) = (stats.commits.ai_lines as u64 * 100) + .checked_div(total_lines as u64) + .map(|p| p as u32) + { + let human_pct = 100 - ai_pct; + println!( + " {} {BOLD}AI{RESET} {:>3}% · {BOLD}Human{RESET} {:>3}%", + bar(ai_pct, 40), + ai_pct, + human_pct, + ); + } + + // --- Per-repo breakdown --- + // Only shown when there are multiple repos — a single-row table adds nothing. + if repos.len() > 1 { + println!(); + println!(" {BOLD}Repositories{RESET}"); + + // Pre-compute display strings for column alignment. + let names: Vec<&str> = repos + .iter() + .map(|r| { + let d = strip_protocol(&r.repo_url); + if d.is_empty() { "unknown" } else { d } + }) + .collect(); + let lines_strs: Vec = repos.iter().map(|r| format_num(r.ai_lines)).collect(); + let commit_strs: Vec = repos.iter().map(|r| format_num(r.commits)).collect(); + let session_strs: Vec = repos.iter().map(|r| format_num(r.sessions)).collect(); + + let max_name_w = names.iter().map(|n| n.len()).max().unwrap_or(0); + let max_lines_w = lines_strs.iter().map(|s| s.len()).max().unwrap_or(0); + let max_commits_w = commit_strs.iter().map(|s| s.len()).max().unwrap_or(0); + let max_sessions_w = session_strs.iter().map(|s| s.len()).max().unwrap_or(0); + + for (i, r) in repos.iter().enumerate() { + let name_col = format!("{:width$}", lines_strs[i], width = max_lines_w); + let commits_col = format!("{:>width$}", commit_strs[i], width = max_commits_w); + let sessions_col = format!("{:>width$}", session_strs[i], width = max_sessions_w); + // Pad singular labels to match the width of the plural so columns stay aligned. + let commit_label = if r.commits == 1 { "commit " } else { "commits" }; + let session_label = if r.sessions == 1 { + "session " + } else { + "sessions" + }; + let cost_str = if r.estimated_cost_usd > 0.0 { + format!(" {GRAY}{}{RESET}", format_cost(r.estimated_cost_usd)) + } else { + String::new() + }; + println!( + " {GRAY}{} {} lines {} {} {} {}{}{RESET}", + name_col, + lines_col, + commits_col, + commit_label, + sessions_col, + session_label, + cost_str, + ); + } + } + + // --- AI section --- + println!(); + println!(" {BOLD}AI{RESET}"); + let yield_total = stats.sessions.yield_stats.shipped + stats.sessions.yield_stats.abandoned; + if let Some(shipped_pct) = (stats.sessions.yield_stats.shipped * 100).checked_div(yield_total) { + println!( + " Sessions {:>6} {GRAY}({} shipped · {} abandoned · {}% yield){RESET}", + format_num(stats.sessions.total), + format_num(stats.sessions.yield_stats.shipped), + format_num(stats.sessions.yield_stats.abandoned), + shipped_pct, + ); + } else { + println!( + " Sessions {:>6}", + format_num(stats.sessions.total) + ); + } + println!( + " Commits {:>6}", + format_num(stats.commits.total) + ); + println!( + " Lines committed {:>6}", + format_num(stats.commits.ai_lines) + ); + println!( + " Edits {:>6}", + format_num(stats.checkpoints.ai_lines_added) + ); + // Show acceptance rate: range when multiple tools have valid data, single value otherwise. + let valid_tool_rates: Vec = stats + .commits + .acceptance_by_tool + .iter() + .filter(|(_, pct)| *pct <= 100) + .map(|(_, pct)| *pct) + .collect(); + if valid_tool_rates.len() >= 2 { + let min_r = *valid_tool_rates.iter().min().unwrap(); + let max_r = *valid_tool_rates.iter().max().unwrap(); + if min_r == max_r { + println!(" Acceptance rate {:>5}%", min_r); + } else { + println!(" Acceptance rate {GRAY}{min_r}–{max_r}%{RESET}"); + } + } else if let Some(acceptance_pct) = (stats.commits.ai_lines as u64 * 100) + .checked_div(stats.checkpoints.ai_lines_added as u64) + .map(|p| p as u32) + { + if acceptance_pct <= 100 { + println!(" Acceptance rate {:>5}%", acceptance_pct); + } else { + // >100% means checkpoint data is incomplete (pre-backfill events). + println!(" Acceptance rate {GRAY}>100% (incomplete checkpoint data){RESET}"); + } + } + // Track which tools have already had their acceptance rate shown so we + // don't repeat the same tool-level rate on every model variant line. + let mut shown_accept: HashSet<&str> = HashSet::new(); + // Pre-compute column widths for aligned tool breakdown. + let max_tool_w = stats + .commits + .by_tool + .iter() + .map(|(t, _)| t.len()) + .max() + .unwrap_or(0); + let max_tool_count_w = stats + .commits + .by_tool + .iter() + .map(|(_, c)| format_num(*c).len()) + .max() + .unwrap_or(0); + for (tool, count) in &stats.commits.by_tool { + let tool_name = tool.split(" · ").next().unwrap_or(tool.as_str()); + let accept_str = if shown_accept.insert(tool_name) { + // First line for this tool — show the acceptance rate once. + stats + .commits + .acceptance_by_tool + .iter() + .find(|(t, _)| t == tool_name) + .map(|(_, pct)| { + if *pct <= 100 { + format!(" {GRAY}({pct}% accept){RESET}") + } else { + format!(" {GRAY}(>100% accept — incomplete checkpoint data){RESET}") + } + }) + .unwrap_or_default() + } else { + String::new() + }; + println!( + " {GRAY}{:count_w$} lines{RESET}{}", + tool, + format_num(*count), + accept_str, + tool_w = max_tool_w, + count_w = max_tool_count_w, + ); + } + + // --- Human section --- + println!(); + println!(" {BOLD}Human{RESET}"); + println!( + " Lines committed {:>6}", + format_num(stats.commits.human_lines) + ); + println!( + " Edits {:>6}", + format_num(stats.checkpoints.human_lines_added) + ); + + // --- Tokens section --- + let t = &stats.tokens; + if t.input + t.output + t.cache_read + t.cache_creation > 0 { + println!(); + println!(" {BOLD}Tokens{RESET} {GRAY}(estimated cost){RESET}"); + println!(" Input {:>12}", format_num_u64(t.input)); + println!(" Output {:>12}", format_num_u64(t.output)); + println!(" Cache read {:>12}", format_num_u64(t.cache_read)); + println!( + " Cache write {:>12}", + format_num_u64(t.cache_creation) + ); + if t.estimated_cost_usd > 0.0 { + println!( + " {BOLD}Est. cost{RESET} {:>12}", + format_cost(t.estimated_cost_usd) + ); + } + if let Some(wow) = &t.wow_spend { + // When last week had no spend, "new this week" is redundant — skip the label. + let change_str = match (wow.new_this_week, wow.change_pct) { + (true, _) => String::new(), + (_, Some(change_pct)) if change_pct > 0.0 => { + format!("↑ {:.0}% vs last week", change_pct) + } + (_, Some(change_pct)) if change_pct < 0.0 => { + format!("↓ {:.0}% vs last week", change_pct.abs()) + } + _ => "no change vs last week".to_string(), + }; + // Avoid printing "$-0.00" when last week rounds to zero. + let last_week_str = if wow.last_week_usd.abs() < 0.005 { + "$0".to_string() + } else { + format_cost(wow.last_week_usd) + }; + let trail = if change_str.is_empty() { + String::new() + } else { + format!(" {change_str}") + }; + println!( + " {GRAY}This week {} · Last week {}{}{RESET}", + format_cost(wow.this_week_usd), + last_week_str, + trail, + ); + } + // Pre-compute column widths for aligned model breakdown. + let max_model_w = t.by_model.iter().map(|m| m.model.len()).max().unwrap_or(0); + let max_tokens_w = t + .by_model + .iter() + .map(|m| format_num_u64(m.input + m.output + m.cache_read + m.cache_creation).len()) + .max() + .unwrap_or(0); + let max_cost_w = t + .by_model + .iter() + .map(|m| { + m.estimated_cost_usd + .map(|c| format_cost(c).len()) + .unwrap_or(0) + }) + .max() + .unwrap_or(0); + for m in &t.by_model { + let total = m.input + m.output + m.cache_read + m.cache_creation; + let cost_str = m + .estimated_cost_usd + .map(|c| format!("{:>width$}", format_cost(c), width = max_cost_w)) + .unwrap_or_else(|| " ".repeat(max_cost_w)); + let cache = m + .cache_hit_ratio + .map(|r| format!(" cache {:.0}% hit", r * 100.0)) + .unwrap_or_default(); + println!( + " {GRAY}{:tokens_w$} tokens {}{}{RESET}", + m.model, + format_num_u64(total), + cost_str, + cache, + model_w = max_model_w, + tokens_w = max_tokens_w, + ); + } + } + + // --- Activity over time --- + if !stats.buckets.is_empty() { + println!(); + println!(" {BOLD}Activity over time{RESET}"); + let max_ai = stats + .buckets + .iter() + .map(|b| b.ai_lines) + .max() + .unwrap_or(1) + .max(1); + for bucket in &stats.buckets { + let bar_str = ratio_bar(bucket.ai_lines, max_ai, BAR_WIDTH); + if bucket.ai_lines > 0 { + // Coverage for this bucket: attributed / total diff additions. + let coverage = (bucket.attributed_lines as u64 * 100) + .checked_div(bucket.diff_added_lines as u64) + .map(|pct| format!(" · {}% attributed", pct)) + .unwrap_or_default(); + println!( + " {GRAY}{}{RESET} {} {GRAY}{} lines · {} commits{}{RESET}", + bucket.label, + bar_str, + format_num(bucket.ai_lines), + bucket.commit_count, + coverage, + ); + } else { + println!(" {GRAY}{} {}{RESET}", bucket.label, bar_str); + } + } + } + + // --- Time of day heatmap --- + if stats.hourly.iter().any(|&v| v > 0) { + println!(); + println!(" {BOLD}Time of day{RESET} {GRAY}(AI lines committed){RESET}"); + let max_hour = stats.hourly.iter().copied().max().unwrap_or(1).max(1); + + // Each slot is 3 chars: spark char + 2 spaces. Labels are left-padded to 3. + let spark: String = stats + .hourly + .iter() + .map(|&v| spark_char(v, max_hour)) + .collect::>() + .join(" "); + println!(" {}", spark); + + let labels: Vec = (0..24) + .map(|h| match h { + 0 => "am".to_string(), + 12 => "pm".to_string(), + h if h < 12 => format!("{h}"), + h => format!("{}", h - 12), + }) + .collect(); + let label_row: String = labels + .iter() + .map(|l| format!("{:<3}", l)) + .collect::>() + .join(""); + println!(" {GRAY}{}{RESET}", label_row.trim_end()); + } + + // --- Day of week heatmap --- + if stats.daily.iter().any(|&v| v > 0) { + println!(); + println!(" {BOLD}Day of week{RESET} {GRAY}(AI lines committed){RESET}"); + let max_day = stats.daily.iter().copied().max().unwrap_or(1).max(1); + let spark: String = stats + .daily + .iter() + .map(|&v| spark_char(v, max_day)) + .collect::>() + .join(" "); + println!(" {}", spark); + let label_row = "Mon Tue Wed Thu Fri Sat Sun"; + println!(" {GRAY}{}{RESET}", label_row); + } + + println!(); + if repo_filter.is_none() { + println!(" {GRAY}Tip: use --repo to filter by repository{RESET}"); + } + println!( + " {GRAY}Local data only · See full history and team insights at https://usegitai.com/dashboard{RESET}" + ); + println!(); +} + +fn spark_char(value: u32, max: u32) -> &'static str { + if value == 0 { + return "·"; + } + let pct = (value as u64 * 8 / max as u64) as u32; + match pct { + 0 => "▁", + 1 => "▂", + 2 => "▃", + 3 => "▄", + 4 => "▅", + 5 => "▆", + 6 => "▇", + _ => "█", + } +} + +/// Strip `https://` or `http://` from a URL for display purposes. +fn strip_protocol(url: &str) -> &str { + url.trim_start_matches("https://") + .trim_start_matches("http://") +} + +/// Render a block bar where `value` out of `max` determines the fill ratio. +fn ratio_bar(value: u32, max: u32, width: u32) -> String { + let filled = if max > 0 { + ((value as u64 * width as u64 / max as u64).min(width as u64)) as u32 + } else { + 0 + }; + let empty = width - filled; + format!( + "{}{}", + "█".repeat(filled as usize), + "░".repeat(empty as usize) + ) +} + +fn bar(pct: u32, width: u32) -> String { + ratio_bar(pct, 100, width) +} + +/// Format a USD cost estimate. Rounds to whole dollars for amounts >= $10 +/// (estimates don't warrant cent-level precision at that scale); shows cents otherwise. +fn format_cost(usd: f64) -> String { + if usd >= 10.0 { + format!("~${:.0}", usd) + } else { + format!("~${:.2}", usd) + } +} + +fn format_num(n: u32) -> String { + format_num_u64(n as u64) +} + +fn format_num_u64(n: u64) -> String { + let s = n.to_string(); + let mut result = String::new(); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.push(','); + } + result.push(c); + } + result.chars().rev().collect() +} diff --git a/src/daemon.rs b/src/daemon.rs index 51fa9eff54..50f2bd5ef9 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -7602,9 +7602,15 @@ impl ActorDaemonCoordinator { }), ControlRequest::SubmitTelemetry { envelopes } => { if let Some(worker) = &self.telemetry_worker { - worker.submit_telemetry(envelopes).await; + match worker.submit_telemetry(envelopes).await { + Ok(()) => Ok(ControlResponse::ok(None, None)), + Err(e) => Ok(ControlResponse::err(format!( + "telemetry worker failed: {e}" + ))), + } + } else { + Ok(ControlResponse::err("telemetry worker unavailable")) } - Ok(ControlResponse::ok(None, None)) } ControlRequest::SubmitCas { records } => { if let Some(worker) = &self.telemetry_worker { diff --git a/src/daemon/sentry_layer.rs b/src/daemon/sentry_layer.rs index e2a75800d3..14b1cad035 100644 --- a/src/daemon/sentry_layer.rs +++ b/src/daemon/sentry_layer.rs @@ -88,6 +88,6 @@ impl Layer for SentryLayer { context, }; - crate::daemon::telemetry_worker::submit_daemon_internal_telemetry(vec![envelope]); + let _ = crate::daemon::telemetry_worker::submit_daemon_internal_telemetry(vec![envelope]); } } diff --git a/src/daemon/telemetry_handle.rs b/src/daemon/telemetry_handle.rs index 6d54da60a1..4b3781b00f 100644 --- a/src/daemon/telemetry_handle.rs +++ b/src/daemon/telemetry_handle.rs @@ -223,14 +223,21 @@ pub fn send_via_daemon(request: &ControlRequest) -> Result) { +/// Returns the original envelopes when the daemon send failed so metric callers +/// can persist to SQLite locally instead of losing the event. +pub fn submit_telemetry(envelopes: Vec) -> Result<(), Vec> { if envelopes.is_empty() { - return; + return Ok(()); } let request = ControlRequest::SubmitTelemetry { envelopes }; - let _ = send_via_daemon(&request); + if send_via_daemon(&request).is_ok_and(|response| response.ok) { + Ok(()) + } else { + let ControlRequest::SubmitTelemetry { envelopes } = request else { + unreachable!("request was constructed as telemetry") + }; + Err(envelopes) + } } /// Submit CAS sync records to the daemon over the control socket. diff --git a/src/daemon/telemetry_worker.rs b/src/daemon/telemetry_worker.rs index f2fffa1944..eab205520e 100644 --- a/src/daemon/telemetry_worker.rs +++ b/src/daemon/telemetry_worker.rs @@ -6,7 +6,8 @@ use crate::api::{ApiClient, ApiContext, CasObject, CasUploadRequest}; use crate::config::{Config, get_or_create_distinct_id}; use crate::daemon::control_api::{CasSyncPayload, TelemetryEnvelope}; -use crate::metrics::db::MetricsDatabase; +use crate::error::GitAiError; +use crate::metrics::db::{MetricRecord, MetricsDatabase}; use crate::metrics::{MetricEvent, MetricsBatch}; use crate::observability::MAX_METRICS_PER_ENVELOPE; use serde_json::{Value, json}; @@ -145,8 +146,25 @@ impl DaemonTelemetryWorkerHandle { } /// Submit telemetry envelopes for batched processing. - pub async fn submit_telemetry(&self, envelopes: Vec) { - self.buffer.lock().await.ingest_envelopes(envelopes); + pub async fn submit_telemetry( + &self, + envelopes: Vec, + ) -> Result<(), GitAiError> { + let (buffered_envelopes, metric_events) = split_metric_envelopes(envelopes); + if !buffered_envelopes.is_empty() { + self.buffer + .lock() + .await + .ingest_envelopes(buffered_envelopes); + } + + if !metric_events.is_empty() { + tokio::task::spawn_blocking(move || store_metrics_in_db(&metric_events).map(|_| ())) + .await + .map_err(|e| GitAiError::Generic(format!("metrics DB task failed: {e}")))??; + } + + Ok(()) } /// Submit CAS records for batched upload. @@ -154,17 +172,30 @@ impl DaemonTelemetryWorkerHandle { self.buffer.lock().await.ingest_cas(records); } - /// Returns the current number of buffered metric events. + /// Returns the current number of metrics waiting for upload. /// - /// Used by the transcript worker for backpressure: if the buffer is - /// above a threshold, the worker yields to let the flush loop drain it. - /// Returns `usize::MAX` when the lock is contended, so callers default - /// to "wait" rather than "push more". + /// Used by the transcript worker for backpressure: if SQLite pending rows + /// or the legacy in-memory buffer are above a threshold, the worker yields + /// to let the flush loop drain them. Returns `usize::MAX` when the buffer + /// lock is contended, so callers default to "wait" rather than "push more". pub fn metrics_buffer_len(&self) -> usize { - self.buffer + let buffered = self + .buffer .try_lock() .map(|buf| buf.metrics.len()) - .unwrap_or(usize::MAX) + .unwrap_or(usize::MAX); + if buffered == usize::MAX { + return usize::MAX; + } + + let pending = match MetricsDatabase::global() { + Ok(db) => match db.try_lock() { + Ok(db) => db.count().unwrap_or(usize::MAX), + Err(_) => usize::MAX, + }, + Err(_) => 0, + }; + buffered.saturating_add(pending) } /// Submit telemetry envelopes synchronously (best-effort, non-blocking). @@ -172,10 +203,28 @@ impl DaemonTelemetryWorkerHandle { /// Used by the daemon process's own `observability::log_*()` calls which /// cannot go through the control socket (the daemon can't connect to itself). /// Uses `try_lock()` to avoid blocking the caller if the buffer is contested. - pub fn submit_telemetry_sync(&self, envelopes: Vec) { - if let Ok(mut buf) = self.buffer.try_lock() { - buf.ingest_envelopes(envelopes); + pub fn submit_telemetry_sync( + &self, + envelopes: Vec, + ) -> Result<(), Vec> { + let (mut buffered_envelopes, metric_events) = split_metric_envelopes(envelopes); + if !buffered_envelopes.is_empty() + && let Ok(mut buf) = self.buffer.try_lock() + { + buf.ingest_envelopes(std::mem::take(&mut buffered_envelopes)); + } + + if !metric_events.is_empty() + && let Err(e) = store_metrics_in_db(&metric_events) + { + tracing::warn!(%e, "telemetry: failed to persist daemon metrics locally"); + return Err(rebuild_telemetry_envelopes( + buffered_envelopes, + metric_events, + )); } + + Ok(()) } /// Submit CAS records synchronously (best-effort, non-blocking). @@ -204,21 +253,44 @@ pub fn set_daemon_internal_telemetry(handle: DaemonTelemetryWorkerHandle) { } /// Submit telemetry from within the daemon process. -/// Returns true if the handle was available and envelopes were submitted. -pub fn submit_daemon_internal_telemetry(envelopes: Vec) -> bool { +/// Returns the original envelopes when metrics were not persisted through the +/// in-process handle, so metric callers can fall back to SQLite directly. +pub fn submit_daemon_internal_telemetry( + envelopes: Vec, +) -> Result<(), Vec> { if let Some(handle) = DAEMON_INTERNAL_TELEMETRY.get() { - if let Ok(runtime) = tokio::runtime::Handle::try_current() { - let handle = handle.clone(); - runtime.spawn(async move { - handle.submit_telemetry(envelopes).await; - }); - } else { - handle.submit_telemetry_sync(envelopes); - } - true + handle.submit_telemetry_sync(envelopes) } else { - false + Err(envelopes) + } +} + +fn split_metric_envelopes( + envelopes: Vec, +) -> (Vec, Vec) { + let mut buffered_envelopes = Vec::new(); + let mut metric_events = Vec::new(); + + for envelope in envelopes { + match envelope { + TelemetryEnvelope::Metrics { events } => metric_events.extend(events), + other => buffered_envelopes.push(other), + } + } + + (buffered_envelopes, metric_events) +} + +fn rebuild_telemetry_envelopes( + mut buffered_envelopes: Vec, + metric_events: Vec, +) -> Vec { + if !metric_events.is_empty() { + buffered_envelopes.push(TelemetryEnvelope::Metrics { + events: metric_events, + }); } + buffered_envelopes } /// Submit CAS records from within the daemon process (sync, best-effort). @@ -267,14 +339,18 @@ async fn telemetry_flush_loop(buffer: Arc>) { let snapshot = { let mut buf = buffer.lock().await; if buf.is_empty() { - continue; + None + } else { + Some(buf.take()) } - buf.take() }; // Flush in a blocking task since the underlying HTTP clients are synchronous. tokio::task::spawn_blocking(move || { - flush_telemetry_batch(snapshot); + if let Some(snapshot) = snapshot { + flush_telemetry_batch(snapshot); + } + flush_pending_metrics(); }) .await .unwrap_or_else(|e| { @@ -327,36 +403,158 @@ fn flush_metrics(events: &[MetricEvent]) { let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30); for chunk in events.chunks(MAX_METRICS_PER_ENVELOPE) { + if let Err(e) = store_metrics_in_db(chunk) { + tracing::warn!(%e, "telemetry: failed to persist metrics before upload"); + continue; + } + if should_upload && !upload_failed && std::time::Instant::now() < deadline { - let batch = MetricsBatch::new(chunk.to_vec()); - if client.upload_metrics(&batch).is_ok() { - continue; + match flush_pending_metrics_from_db(&client, deadline) { + Ok(_) => {} + Err(e) => { + tracing::warn!(%e, "telemetry: failed to upload pending metrics"); + upload_failed = true; + } } - upload_failed = true; } - store_metrics_in_db(chunk); } } -fn store_metrics_in_db(events: &[MetricEvent]) { - if events.is_empty() { +fn flush_pending_metrics() { + let context = ApiContext::new(None); + let api_base_url = context.base_url.clone(); + let client = ApiClient::new(context); + + let using_default_api = api_base_url == crate::config::DEFAULT_API_BASE_URL; + if using_default_api && !client.is_logged_in() && !client.has_api_key() { return; } + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30); + if let Err(e) = flush_pending_metrics_from_db(&client, deadline) { + tracing::warn!(%e, "telemetry: failed to upload pending metrics"); + } +} + +fn store_metrics_in_db(events: &[MetricEvent]) -> Result, GitAiError> { + if events.is_empty() { + return Ok(Vec::new()); + } + let event_jsons: Vec = events .iter() - .filter_map(|e| serde_json::to_string(e).ok()) - .collect(); + .map(serde_json::to_string) + .collect::>()?; if event_jsons.is_empty() { - return; + return Ok(Vec::new()); } - if let Ok(db) = MetricsDatabase::global() - && let Ok(mut db_lock) = db.lock() - { - let _ = db_lock.insert_events(&event_jsons); + let db = MetricsDatabase::global()?; + let mut db_lock = db + .lock() + .map_err(|_| GitAiError::Generic("metrics DB lock poisoned".to_string()))?; + db_lock.insert_events(&event_jsons) +} + +#[derive(Debug, Default, PartialEq, Eq)] +struct PendingMetricsFlushResult { + uploaded_events: usize, + uploaded_batches: usize, + invalid_records: usize, +} + +fn flush_pending_metrics_from_db( + client: &ApiClient, + deadline: std::time::Instant, +) -> Result { + flush_pending_metric_records_with( + read_pending_metrics_batch, + mark_metric_records_delivered, + |batch| client.upload_metrics(batch).map(|_| ()), + deadline, + MAX_METRICS_PER_ENVELOPE, + ) +} + +fn read_pending_metrics_batch(limit: usize) -> Result, GitAiError> { + let db = MetricsDatabase::global()?; + let db_lock = db + .lock() + .map_err(|_| GitAiError::Generic("metrics DB lock poisoned".to_string()))?; + db_lock.get_batch(limit) +} + +fn mark_metric_records_delivered(ids: &[i64]) -> Result<(), GitAiError> { + let db = MetricsDatabase::global()?; + let mut db_lock = db + .lock() + .map_err(|_| GitAiError::Generic("metrics DB lock poisoned".to_string()))?; + db_lock.mark_records_delivered(ids, current_unix_ts()) +} + +fn flush_pending_metric_records_with( + mut get_batch: GetBatch, + mut mark_delivered: MarkDelivered, + mut upload_batch: UploadBatch, + deadline: std::time::Instant, + max_batch_size: usize, +) -> Result +where + GetBatch: FnMut(usize) -> Result, GitAiError>, + MarkDelivered: FnMut(&[i64]) -> Result<(), GitAiError>, + UploadBatch: FnMut(&MetricsBatch) -> Result<(), GitAiError>, +{ + let mut result = PendingMetricsFlushResult::default(); + + while std::time::Instant::now() < deadline { + let batch = get_batch(max_batch_size)?; + if batch.is_empty() { + break; + } + + let mut events = Vec::new(); + let mut record_ids = Vec::new(); + let mut invalid_ids = Vec::new(); + + for record in &batch { + match serde_json::from_str::(&record.event_json) { + Ok(event) => { + events.push(event); + record_ids.push(record.id); + } + Err(_) => { + invalid_ids.push(record.id); + } + } + } + + if !invalid_ids.is_empty() { + result.invalid_records += invalid_ids.len(); + mark_delivered(&invalid_ids)?; + } + + if events.is_empty() { + continue; + } + + let event_count = events.len(); + let metrics_batch = MetricsBatch::new(events); + upload_batch(&metrics_batch)?; + mark_delivered(&record_ids)?; + + result.uploaded_events += event_count; + result.uploaded_batches += 1; } + + Ok(result) +} + +fn current_unix_ts() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() } fn flush_sentry_and_posthog( @@ -766,3 +964,153 @@ impl SentryClient { } } } + +#[cfg(test)] +mod tests { + use super::*; + use std::cell::RefCell; + use std::rc::Rc; + + fn event_json(ts: u32) -> String { + format!(r#"{{"t":{ts},"e":1,"v":{{}},"a":{{}}}}"#) + } + + fn unix_now() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + } + + fn now_ts() -> u32 { + unix_now().min(u32::MAX as u64) as u32 + } + + #[test] + fn flush_pending_metric_records_uploads_from_db_and_marks_delivered() { + let db = Rc::new(RefCell::new( + MetricsDatabase::new_in_memory_for_tests().unwrap(), + )); + let ts1 = now_ts().saturating_sub(2); + let ts2 = now_ts().saturating_sub(1); + db.borrow_mut() + .insert_events(&[event_json(ts1), event_json(ts2)]) + .unwrap(); + + let uploaded = Rc::new(RefCell::new(Vec::>::new())); + let result = flush_pending_metric_records_with( + { + let db = Rc::clone(&db); + move |limit| db.borrow().get_batch(limit) + }, + { + let db = Rc::clone(&db); + move |ids| db.borrow_mut().mark_records_delivered(ids, unix_now()) + }, + { + let uploaded = Rc::clone(&uploaded); + move |batch| { + uploaded + .borrow_mut() + .push(batch.events.iter().map(|event| event.timestamp).collect()); + Ok(()) + } + }, + std::time::Instant::now() + std::time::Duration::from_secs(60), + 1, + ) + .unwrap(); + + assert_eq!( + result, + PendingMetricsFlushResult { + uploaded_events: 2, + uploaded_batches: 2, + invalid_records: 0, + } + ); + assert_eq!(*uploaded.borrow(), vec![vec![ts1], vec![ts2]]); + assert_eq!(db.borrow().count().unwrap(), 0); + assert_eq!( + db.borrow().get_metric_history(0, None, &[1]).unwrap().len(), + 2 + ); + } + + #[test] + fn flush_pending_metric_records_marks_invalid_rows_delivered() { + let db = Rc::new(RefCell::new( + MetricsDatabase::new_in_memory_for_tests().unwrap(), + )); + let ts = now_ts(); + db.borrow_mut() + .insert_events(&["not-json".to_string(), event_json(ts)]) + .unwrap(); + + let uploaded = Rc::new(RefCell::new(Vec::::new())); + let result = flush_pending_metric_records_with( + { + let db = Rc::clone(&db); + move |limit| db.borrow().get_batch(limit) + }, + { + let db = Rc::clone(&db); + move |ids| db.borrow_mut().mark_records_delivered(ids, unix_now()) + }, + { + let uploaded = Rc::clone(&uploaded); + move |batch| { + uploaded + .borrow_mut() + .extend(batch.events.iter().map(|event| event.timestamp)); + Ok(()) + } + }, + std::time::Instant::now() + std::time::Duration::from_secs(60), + 10, + ) + .unwrap(); + + assert_eq!( + result, + PendingMetricsFlushResult { + uploaded_events: 1, + uploaded_batches: 1, + invalid_records: 1, + } + ); + assert_eq!(*uploaded.borrow(), vec![ts]); + assert_eq!(db.borrow().count().unwrap(), 0); + assert_eq!( + db.borrow().get_metric_history(0, None, &[1]).unwrap().len(), + 1 + ); + } + + #[test] + fn flush_pending_metric_records_keeps_rows_pending_after_upload_failure() { + let db = Rc::new(RefCell::new( + MetricsDatabase::new_in_memory_for_tests().unwrap(), + )); + let ts = now_ts(); + db.borrow_mut().insert_events(&[event_json(ts)]).unwrap(); + + let result = flush_pending_metric_records_with( + { + let db = Rc::clone(&db); + move |limit| db.borrow().get_batch(limit) + }, + { + let db = Rc::clone(&db); + move |ids| db.borrow_mut().mark_records_delivered(ids, unix_now()) + }, + |_batch| Err(GitAiError::Generic("upload failed".to_string())), + std::time::Instant::now() + std::time::Duration::from_secs(60), + 10, + ); + + assert!(result.is_err()); + assert_eq!(db.borrow().count().unwrap(), 1); + assert_eq!(db.borrow().get_batch(10).unwrap().len(), 1); + } +} diff --git a/src/daemon/test_sync.rs b/src/daemon/test_sync.rs index e9fc678258..b8a1a166e7 100644 --- a/src/daemon/test_sync.rs +++ b/src/daemon/test_sync.rs @@ -233,10 +233,55 @@ fn parse_alias_tokens(value: &str) -> Option> { mod tests { use super::*; use std::process::Command; + use std::sync::OnceLock; + + fn real_git_for_test() -> &'static str { + static REAL_GIT: OnceLock = OnceLock::new(); + REAL_GIT + .get_or_init(|| { + #[cfg(not(windows))] + { + for candidate in [ + "/opt/homebrew/bin/git", + "/usr/local/bin/git", + "/usr/bin/git", + "/bin/git", + ] { + if crate::config::is_real_git_candidate(Path::new(candidate)) { + return candidate.to_string(); + } + } + } + + if let Some(path) = std::env::var_os("PATH") { + for dir in std::env::split_paths(&path) { + for name in git_binary_names_for_path_lookup() { + let candidate = dir.join(name); + if crate::config::is_real_git_candidate(&candidate) { + return candidate.to_string_lossy().to_string(); + } + } + } + } + + "git".to_string() + }) + .as_str() + } + + #[cfg(windows)] + fn git_binary_names_for_path_lookup() -> &'static [&'static str] { + &["git.exe", "git.cmd", "git.bat", "git"] + } + + #[cfg(not(windows))] + fn git_binary_names_for_path_lookup() -> &'static [&'static str] { + &["git"] + } fn init_repo() -> tempfile::TempDir { let temp = tempfile::tempdir().expect("create tempdir"); - let status = Command::new("git") + let status = Command::new(real_git_for_test()) .arg("init") .env("GIT_TRACE2", "0") .env("GIT_TRACE2_EVENT", "0") @@ -248,7 +293,7 @@ mod tests { } fn git_config(repo: &Path, key: &str, value: &str) { - let status = Command::new("git") + let status = Command::new(real_git_for_test()) .args(["config", key, value]) .env("GIT_TRACE2", "0") .env("GIT_TRACE2_EVENT", "0") diff --git a/src/git/repository.rs b/src/git/repository.rs index 964efad162..513044a664 100644 --- a/src/git/repository.rs +++ b/src/git/repository.rs @@ -16,6 +16,7 @@ use gix_index::entry::Stage; use regex::Regex; use std::cell::Cell; use std::collections::{HashMap, HashSet}; +use std::ffi::OsString; use std::path::{Path, PathBuf}; use std::process::{Command, Output}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -2216,9 +2217,6 @@ fn git_config_file_for_repo_paths( git_dir: &Path, git_common_dir: &Path, ) -> Result, GitAiError> { - let mut config = - gix_config::File::from_globals().map_err(|e| GitAiError::GixError(e.to_string()))?; - let home = dirs::home_dir(); let options = gix_config::file::init::Options { includes: gix_config::file::includes::Options::follow( @@ -2234,6 +2232,13 @@ fn git_config_file_for_repo_paths( ..Default::default() }; + let mut config = gix_config::File::default(); + for (path, source) in no_exec_global_config_paths() { + let file = gix_config::File::from_path_no_includes(path, source) + .map_err(|e| GitAiError::GixError(e.to_string()))?; + config.append(file); + } + config .resolve_includes(options) .map_err(|e| GitAiError::GixError(e.to_string()))?; @@ -2275,6 +2280,107 @@ fn git_config_file_for_repo_paths( Ok(config) } +fn no_exec_global_config_paths() -> Vec<(PathBuf, gix_config::Source)> { + let mut paths = Vec::new(); + let nosystem = + std::env::var_os("GIT_CONFIG_NOSYSTEM").is_some_and(|value| !git_env_bool_is_false(&value)); + + if !nosystem { + if let Some(path) = std::env::var_os("GIT_CONFIG_SYSTEM").map(PathBuf::from) { + push_existing_config_path(&mut paths, path, gix_config::Source::System); + } else { + #[cfg(unix)] + push_existing_config_path( + &mut paths, + PathBuf::from("/etc/gitconfig"), + gix_config::Source::System, + ); + } + } + + if let Some(path) = std::env::var_os("GIT_CONFIG_GLOBAL").map(PathBuf::from) { + push_existing_config_path(&mut paths, path, gix_config::Source::Git); + } else { + let xdg_config_home = std::env::var_os("XDG_CONFIG_HOME") + .filter(|value| !value.is_empty()) + .map(PathBuf::from) + .or_else(|| home_dir_from_env().map(|home| home.join(".config"))); + if let Some(xdg_config_home) = xdg_config_home { + push_existing_config_path( + &mut paths, + xdg_config_home.join("git").join("config"), + gix_config::Source::Git, + ); + } + + if let Some(home) = home_dir_from_env() { + push_existing_config_path( + &mut paths, + home.join(".gitconfig"), + gix_config::Source::User, + ); + } + } + + paths +} + +fn git_env_bool_is_false(value: &std::ffi::OsStr) -> bool { + let value = value.to_string_lossy(); + matches!(value.as_ref(), "" | "0") + || value.eq_ignore_ascii_case("false") + || value.eq_ignore_ascii_case("no") + || value.eq_ignore_ascii_case("off") +} + +fn home_dir_from_env() -> Option { + env_path_from_value(std::env::var_os("HOME")).or_else(windows_home_dir_from_env) +} + +fn env_path_from_value(value: Option) -> Option { + value.filter(|value| !value.is_empty()).map(PathBuf::from) +} + +#[cfg(windows)] +fn windows_home_dir_from_env() -> Option { + windows_home_dir_from_values( + std::env::var_os("HOMEDRIVE"), + std::env::var_os("HOMEPATH"), + std::env::var_os("USERPROFILE"), + ) +} + +#[cfg(not(windows))] +fn windows_home_dir_from_env() -> Option { + None +} + +#[cfg(windows)] +fn windows_home_dir_from_values( + homedrive: Option, + homepath: Option, + userprofile: Option, +) -> Option { + if let (Some(homedrive), Some(homepath)) = ( + env_path_from_value(homedrive), + env_path_from_value(homepath), + ) { + return Some(homedrive.join(homepath)); + } + + env_path_from_value(userprofile) +} + +fn push_existing_config_path( + paths: &mut Vec<(PathBuf, gix_config::Source)>, + path: PathBuf, + source: gix_config::Source, +) { + if path.is_file() { + paths.push((path, source)); + } +} + pub fn config_get_str_for_path_no_git_exec( path: &Path, key: &str, @@ -3001,6 +3107,52 @@ mod tests { assert!(forwarded[1].starts_with("core.hooksPath=")); } + #[test] + fn git_env_bool_false_values_match_git_parsing() { + for value in [ + "", "0", "false", "FALSE", "False", "no", "NO", "No", "off", "OFF", "Off", + ] { + assert!( + git_env_bool_is_false(&OsString::from(value)), + "{value:?} should parse as false" + ); + } + } + + #[test] + fn git_env_bool_non_false_values_parse_as_true() { + for value in ["1", "true", "yes", "on", "anything-else"] { + assert!( + !git_env_bool_is_false(&OsString::from(value)), + "{value:?} should not parse as false" + ); + } + } + + #[cfg(windows)] + #[test] + fn windows_home_dir_values_prefer_homedrive_homepath() { + let home = windows_home_dir_from_values( + Some(OsString::from("C:")), + Some(OsString::from(r"\Users\git-ai")), + Some(OsString::from(r"D:\Users\fallback")), + ); + + assert_eq!(home, Some(PathBuf::from(r"C:\Users\git-ai"))); + } + + #[cfg(windows)] + #[test] + fn windows_home_dir_values_fall_back_to_userprofile() { + let home = windows_home_dir_from_values( + Some(OsString::from("")), + Some(OsString::from(r"\Users\git-ai")), + Some(OsString::from(r"D:\Users\fallback")), + ); + + assert_eq!(home, Some(PathBuf::from(r"D:\Users\fallback"))); + } + #[test] fn patch_profile_applies_canonical_machine_parse_flags() { let args = vec!["diff".to_string(), "HEAD^".to_string(), "HEAD".to_string()]; diff --git a/src/metrics/db.rs b/src/metrics/db.rs index d0f2586f40..5e54396775 100644 --- a/src/metrics/db.rs +++ b/src/metrics/db.rs @@ -1,15 +1,20 @@ -//! Simple metrics storage for offline buffering. +//! Metrics storage for local history and offline buffering. //! -//! Events are stored here when API conditions aren't met. -//! Server handles idempotency - no retry/queue logic needed. +//! Every metric event is stored here. `delivered_ts IS NULL` means the row is +//! still pending upload; delivered rows are retained as the local history. +//! Server handles idempotency. use crate::error::GitAiError; +use crate::metrics::attrs::attr_pos; +use crate::metrics::pos_encoded::sparse_get_string; +use crate::metrics::types::MetricEvent; use rusqlite::{Connection, OptionalExtension, params}; +use serde::Deserialize; use std::path::PathBuf; use std::sync::{Mutex, OnceLock}; /// Current schema version (must match MIGRATIONS.len()) -const SCHEMA_VERSION: usize = 2; +const SCHEMA_VERSION: usize = 6; /// Database migrations - each migration upgrades the schema by one version const MIGRATIONS: &[&str] = &[ @@ -27,6 +32,24 @@ const MIGRATIONS: &[&str] = &[ last_sent_ts INTEGER NOT NULL ); "#, + // Migration 2 -> 3: Reserved for a removed local_events design. + r#" + "#, + // Migration 3 -> 4: Reserved for a removed local_events repo_url migration. + r#" + "#, + // Migration 4 -> 5: Keep delivered metrics in the authoritative metrics table. + r#" + CREATE TABLE IF NOT EXISTS metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_json TEXT NOT NULL + ); + ALTER TABLE metrics ADD COLUMN delivered_ts INTEGER; + "#, + // Migration 5 -> 6: Speed pending queue scans. + r#" + CREATE INDEX IF NOT EXISTS metrics_delivered_ts_id ON metrics (delivered_ts, id); + "#, ]; /// Global database singleton @@ -39,12 +62,26 @@ pub struct MetricRecord { pub event_json: String, } +/// Record returned for local usage aggregation from the metrics table. +#[derive(Debug, Clone)] +pub struct MetricHistoryRecord { + pub event_id: u16, + pub ts: u32, + pub repo_url: Option, + pub event: MetricEvent, +} + /// Database wrapper for metrics storage pub struct MetricsDatabase { conn: Connection, } impl MetricsDatabase { + /// How long metric rows are retained for local history/offline retry (45 days). + const METRICS_RETENTION_SECS: u64 = 45 * 24 * 3600; + /// Minimum interval between prune passes (24 hours). + const METRICS_PRUNE_INTERVAL_SECS: u64 = 24 * 3600; + /// Get or initialize the global database pub fn global() -> Result<&'static Mutex, GitAiError> { let db_mutex = METRICS_DB.get_or_init(|| { @@ -89,6 +126,22 @@ impl MetricsDatabase { Ok(db) } + #[cfg(test)] + pub(crate) fn new_in_memory_for_tests() -> Result { + let conn = Connection::open_in_memory()?; + conn.execute_batch( + r#" + PRAGMA journal_mode=WAL; + PRAGMA synchronous=NORMAL; + "#, + )?; + + let mut db = Self { conn }; + db.initialize_schema()?; + + Ok(db) + } + /// Get database path: ~/.git-ai/internal/metrics-db fn database_path() -> Result { // Allow test override via environment variable @@ -186,37 +239,63 @@ impl MetricsDatabase { let migration_sql = MIGRATIONS[from_version]; let tx = self.conn.transaction()?; - tx.execute_batch(migration_sql)?; + match tx.execute_batch(migration_sql) { + Ok(()) => {} + Err(e) if e.to_string().contains("duplicate column name") => { + // Another process already applied this ALTER TABLE concurrently. + } + Err(e) => return Err(e.into()), + } tx.commit()?; Ok(()) } - /// Insert events as JSON strings - pub fn insert_events(&mut self, events: &[String]) -> Result<(), GitAiError> { + /// Insert undelivered events as JSON strings. + pub fn insert_events(&mut self, events: &[String]) -> Result, GitAiError> { + self.insert_events_with_delivered_ts(events, None) + } + + /// Insert events as JSON strings, optionally marking them delivered immediately. + pub fn insert_events_with_delivered_ts( + &mut self, + events: &[String], + delivered_ts: Option, + ) -> Result, GitAiError> { if events.is_empty() { - return Ok(()); + return Ok(Vec::new()); } let tx = self.conn.transaction()?; + let mut ids = Vec::with_capacity(events.len()); { let mut stmt = tx.prepare_cached("INSERT INTO metrics (event_json) VALUES (?1)")?; + let mut delivered_stmt = tx + .prepare_cached("INSERT INTO metrics (event_json, delivered_ts) VALUES (?1, ?2)")?; for event_json in events { - stmt.execute(params![event_json])?; + if let Some(ts) = delivered_ts { + delivered_stmt.execute(params![event_json, ts as i64])?; + } else { + stmt.execute(params![event_json])?; + } + ids.push(tx.last_insert_rowid()); } } tx.commit()?; - Ok(()) + self.prune_old_metrics_if_due()?; + Ok(ids) } - /// Get batch of events (oldest first) + /// Get a batch of undelivered events (oldest first). pub fn get_batch(&self, limit: usize) -> Result, GitAiError> { - let mut stmt = self - .conn - .prepare("SELECT id, event_json FROM metrics ORDER BY id ASC LIMIT ?1")?; + let mut stmt = self.conn.prepare( + "SELECT id, event_json FROM metrics \ + WHERE delivered_ts IS NULL \ + ORDER BY id ASC LIMIT ?1", + )?; let rows = stmt.query_map(params![limit], |row| { Ok(MetricRecord { @@ -233,8 +312,12 @@ impl MetricsDatabase { Ok(records) } - /// Delete records by ID (after successful upload) - pub fn delete_records(&mut self, ids: &[i64]) -> Result<(), GitAiError> { + /// Mark records as delivered after a successful upload. + pub fn mark_records_delivered( + &mut self, + ids: &[i64], + delivered_ts: u64, + ) -> Result<(), GitAiError> { if ids.is_empty() { return Ok(()); } @@ -242,25 +325,146 @@ impl MetricsDatabase { let tx = self.conn.transaction()?; { - let mut stmt = tx.prepare_cached("DELETE FROM metrics WHERE id = ?1")?; + let mut stmt = tx.prepare_cached( + "UPDATE metrics SET delivered_ts = ?1 WHERE id = ?2 AND delivered_ts IS NULL", + )?; for id in ids { - stmt.execute(params![id])?; + stmt.execute(params![delivered_ts as i64, id])?; } } tx.commit()?; + self.prune_old_metrics_if_due()?; Ok(()) } - /// Get count of pending metrics - pub fn count(&self) -> Result { - let count: i64 = self + /// Delete metric rows outside the local retention window. + /// + /// Valid rows are pruned by event timestamp, regardless of delivery state. Malformed + /// rows cannot be aged by event timestamp, so delivered malformed rows fall back to + /// `delivered_ts`. + fn prune_old_metrics_if_due(&mut self) -> Result<(), GitAiError> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let last_prune: Option = self .conn - .query_row("SELECT COUNT(*) FROM metrics", [], |row| row.get(0))?; + .query_row( + "SELECT value FROM schema_metadata WHERE key = 'metrics_last_prune_ts'", + [], + |row| row.get(0), + ) + .optional()? + .and_then(|v: String| v.parse().ok()); + + if let Some(last) = last_prune + && now.saturating_sub(last as u64) < Self::METRICS_PRUNE_INTERVAL_SECS + { + return Ok(()); + } + + let cutoff = now.saturating_sub(Self::METRICS_RETENTION_SECS); + let rows_to_prune = self.old_metric_row_ids(cutoff)?; + let tx = self.conn.transaction()?; + tx.execute( + "INSERT OR REPLACE INTO schema_metadata (key, value) VALUES ('metrics_last_prune_ts', ?1)", + params![now.to_string()], + )?; + { + let mut stmt = tx.prepare_cached("DELETE FROM metrics WHERE id = ?1")?; + for id in rows_to_prune { + stmt.execute(params![id])?; + } + } + tx.commit()?; + + Ok(()) + } + + fn old_metric_row_ids(&self, cutoff: u64) -> Result, GitAiError> { + let mut stmt = self + .conn + .prepare("SELECT id, event_json, delivered_ts FROM metrics ORDER BY id ASC")?; + let rows = stmt.query_map([], |row| { + Ok(( + row.get::<_, i64>(0)?, + row.get::<_, String>(1)?, + row.get::<_, Option>(2)?, + )) + })?; + + let mut ids = Vec::new(); + for row in rows { + let (id, event_json, delivered_ts) = row?; + if metric_row_is_older_than_cutoff(&event_json, delivered_ts, cutoff) { + ids.push(id); + } + } + + Ok(ids) + } + + /// Get count of pending metrics. + pub fn count(&self) -> Result { + let count: i64 = self.conn.query_row( + "SELECT COUNT(*) FROM metrics WHERE delivered_ts IS NULL", + [], + |row| row.get(0), + )?; Ok(count as usize) } + /// Query persisted metric rows since `since_ts` (Unix seconds). + /// + /// When `repo_filter` is `Some(url)`, only events matching that repo_url are returned. + /// An empty string `""` is a sentinel meaning "events with no repo_url (NULL)". + /// When `None`, all events are returned regardless of repo. + pub fn get_metric_history( + &self, + since_ts: u32, + repo_filter: Option<&str>, + event_ids: &[u16], + ) -> Result, GitAiError> { + let mut stmt = self + .conn + .prepare("SELECT event_json FROM metrics ORDER BY id ASC")?; + let rows = stmt.query_map([], |row| row.get::<_, String>(0))?; + + let mut records = Vec::new(); + for row in rows { + let event_json = row?; + let Ok(event) = serde_json::from_str::(&event_json) else { + continue; + }; + + if event.timestamp < since_ts || !event_ids.contains(&event.event_id) { + continue; + } + + let repo_url = sparse_get_string(&event.attrs, attr_pos::REPO_URL).flatten(); + let repo_matches = match repo_filter { + None => true, + Some("") => repo_url.is_none(), + Some(filter) => repo_url.as_deref().is_some_and(|url| url.contains(filter)), + }; + if !repo_matches { + continue; + } + + records.push(MetricHistoryRecord { + event_id: event.event_id, + ts: event.timestamp, + repo_url, + event, + }); + } + + Ok(records) + } + /// Returns whether an `agent_usage` event should be emitted for this prompt_id. /// /// If emitted, this method also updates the prompt's last-sent timestamp. @@ -303,6 +507,24 @@ impl MetricsDatabase { } } +fn metric_row_is_older_than_cutoff( + event_json: &str, + delivered_ts: Option, + cutoff: u64, +) -> bool { + if let Ok(event) = serde_json::from_str::(event_json) { + return u64::from(event.timestamp) < cutoff; + } + + delivered_ts.is_some_and(|ts| ts >= 0 && (ts as u64) < cutoff) +} + +#[derive(Deserialize)] +struct MetricTimestampOnly { + #[serde(rename = "t")] + timestamp: u32, +} + #[cfg(test)] mod tests { use super::*; @@ -321,6 +543,27 @@ mod tests { (db, temp_dir) } + fn unix_now() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + } + + fn days_ago(days: u64) -> u32 { + unix_now() + .saturating_sub(days * 24 * 3600) + .min(u32::MAX as u64) as u32 + } + + fn event_json(ts: u32) -> String { + format!(r#"{{"t":{ts},"e":1,"v":{{}},"a":{{}}}}"#) + } + + fn event_json_with_repo(ts: u32, event_id: u16, repo: &str) -> String { + format!(r#"{{"t":{ts},"e":{event_id},"v":{{}},"a":{{"1":"{repo}"}}}}"#) + } + #[test] fn test_initialize_schema() { let (db, _temp_dir) = create_test_db(); @@ -345,7 +588,18 @@ mod tests { |row| row.get(0), ) .unwrap(); - assert_eq!(version, "2"); + assert_eq!(version, "6"); + + // Verify delivered_ts exists on the authoritative metrics table. + let delivered_ts_columns: i64 = db + .conn + .query_row( + "SELECT COUNT(*) FROM pragma_table_info('metrics') WHERE name = 'delivered_ts'", + [], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(delivered_ts_columns, 1); } #[test] @@ -383,33 +637,35 @@ mod tests { |row| row.get(0), ) .unwrap(); - assert_eq!(version, "2"); + assert_eq!(version, "6"); } #[test] fn test_insert_events() { let (mut db, _temp_dir) = create_test_db(); + let ts1 = days_ago(2); + let ts2 = days_ago(1); let events = vec![ - r#"{"t":1234567890,"e":1,"v":{"0":"abc123"},"a":{"0":"1.0.0"}}"#.to_string(), - r#"{"t":1234567891,"e":1,"v":{"0":"def456"},"a":{"0":"1.0.0"}}"#.to_string(), + format!(r#"{{"t":{ts1},"e":1,"v":{{"0":"abc123"}},"a":{{"0":"1.0.0"}}}}"#), + format!(r#"{{"t":{ts2},"e":1,"v":{{"0":"def456"}},"a":{{"0":"1.0.0"}}}}"#), ]; - db.insert_events(&events).unwrap(); + let ids = db.insert_events(&events).unwrap(); let count = db.count().unwrap(); assert_eq!(count, 2); + assert_eq!(ids.len(), 2); } #[test] fn test_get_batch() { let (mut db, _temp_dir) = create_test_db(); + let ts1 = days_ago(3); + let ts2 = days_ago(2); + let ts3 = days_ago(1); - let events = vec![ - r#"{"t":1,"e":1,"v":{},"a":{}}"#.to_string(), - r#"{"t":2,"e":1,"v":{},"a":{}}"#.to_string(), - r#"{"t":3,"e":1,"v":{},"a":{}}"#.to_string(), - ]; + let events = vec![event_json(ts1), event_json(ts2), event_json(ts3)]; db.insert_events(&events).unwrap(); @@ -419,36 +675,175 @@ mod tests { // Verify order (oldest first) assert!(batch[0].id < batch[1].id); - assert!(batch[0].event_json.contains("\"t\":1")); - assert!(batch[1].event_json.contains("\"t\":2")); + assert!(batch[0].event_json.contains(&format!("\"t\":{ts1}"))); + assert!(batch[1].event_json.contains(&format!("\"t\":{ts2}"))); } #[test] - fn test_delete_records() { + fn test_mark_records_delivered() { let (mut db, _temp_dir) = create_test_db(); + let ts1 = days_ago(3); + let ts2 = days_ago(2); + let ts3 = days_ago(1); - let events = vec![ - r#"{"t":1,"e":1,"v":{},"a":{}}"#.to_string(), - r#"{"t":2,"e":1,"v":{},"a":{}}"#.to_string(), - r#"{"t":3,"e":1,"v":{},"a":{}}"#.to_string(), - ]; + let events = vec![event_json(ts1), event_json(ts2), event_json(ts3)]; db.insert_events(&events).unwrap(); - // Get batch and delete first two + // Get batch and mark first two delivered. let batch = db.get_batch(2).unwrap(); let ids: Vec = batch.iter().map(|r| r.id).collect(); - db.delete_records(&ids).unwrap(); + db.mark_records_delivered(&ids, unix_now()).unwrap(); - // Verify only one remains + // Verify only one remains pending. let count = db.count().unwrap(); assert_eq!(count, 1); - // Verify remaining is the third one + // Verify remaining pending row is the third one. let remaining = db.get_batch(10).unwrap(); assert_eq!(remaining.len(), 1); - assert!(remaining[0].event_json.contains("\"t\":3")); + assert!(remaining[0].event_json.contains(&format!("\"t\":{ts3}"))); + + // Verify delivered rows are retained. + let total: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM metrics", [], |row| row.get(0)) + .unwrap(); + assert_eq!(total, 3); + } + + #[test] + fn test_insert_events_with_delivered_ts_skips_batch() { + let (mut db, _temp_dir) = create_test_db(); + + let delivered_ts = unix_now(); + let delivered_event_ts = days_ago(2); + let pending_event_ts = days_ago(1); + let delivered = vec![event_json(delivered_event_ts)]; + let pending = vec![event_json(pending_event_ts)]; + + db.insert_events_with_delivered_ts(&delivered, Some(delivered_ts)) + .unwrap(); + db.insert_events(&pending).unwrap(); + + let batch = db.get_batch(10).unwrap(); + assert_eq!(batch.len(), 1); + assert!( + batch[0] + .event_json + .contains(&format!("\"t\":{pending_event_ts}")) + ); + assert_eq!(db.count().unwrap(), 1); + + let total: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM metrics", [], |row| row.get(0)) + .unwrap(); + assert_eq!(total, 2); + } + + #[test] + fn test_get_metric_history_reads_authoritative_metrics_table() { + let (mut db, _temp_dir) = create_test_db(); + + let delivered_ts = unix_now(); + let ts1 = days_ago(4); + let ts2 = days_ago(3); + let ts3 = days_ago(2); + let ts4 = days_ago(1); + let delivered = vec![event_json_with_repo( + ts1, + 1, + "https://github.com/acme/project", + )]; + let pending = vec![ + event_json_with_repo(ts2, 4, "https://github.com/acme/project"), + event_json_with_repo(ts3, 2, "https://github.com/acme/project"), + event_json_with_repo(ts4, 5, "https://github.com/other/repo"), + ]; + + db.insert_events_with_delivered_ts(&delivered, Some(delivered_ts)) + .unwrap(); + db.insert_events(&pending).unwrap(); + + let records = db + .get_metric_history(0, Some("acme/project"), &[1, 4, 5]) + .unwrap(); + assert_eq!(records.len(), 2); + assert_eq!(records[0].event_id, 1); + assert_eq!(records[0].ts, ts1); + assert_eq!(records[1].event_id, 4); + assert_eq!(records[1].ts, ts2); + + // Delivered rows are retained for history, but only undelivered rows flush. + let batch = db.get_batch(10).unwrap(); + assert_eq!(batch.len(), 3); + } + + #[test] + fn test_prunes_metric_rows_older_than_retention_by_event_timestamp() { + let (mut db, _temp_dir) = create_test_db(); + + let delivered_ts = unix_now(); + let old_event_ts = days_ago(46); + let recent_event_ts = days_ago(44); + let events = vec![event_json(old_event_ts), event_json(recent_event_ts)]; + + db.insert_events_with_delivered_ts(&events, Some(delivered_ts)) + .unwrap(); + + let total_after_prune: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM metrics", [], |row| row.get(0)) + .unwrap(); + assert_eq!(total_after_prune, 1); + + let records = db.get_metric_history(0, None, &[1]).unwrap(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].ts, recent_event_ts); + } + + #[test] + fn test_prunes_old_pending_metric_rows() { + let (mut db, _temp_dir) = create_test_db(); + + let old_event_ts = days_ago(46); + let recent_event_ts = days_ago(1); + let pending = vec![event_json(old_event_ts), event_json(recent_event_ts)]; + + db.insert_events(&pending).unwrap(); + + let total: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM metrics", [], |row| row.get(0)) + .unwrap(); + assert_eq!(total, 1); + assert_eq!(db.count().unwrap(), 1); + + let batch = db.get_batch(10).unwrap(); + assert_eq!(batch.len(), 1); + assert!( + batch[0] + .event_json + .contains(&format!("\"t\":{recent_event_ts}")) + ); + } + + #[test] + fn test_prunes_malformed_delivered_rows_by_delivered_timestamp() { + let (mut db, _temp_dir) = create_test_db(); + + let old_delivered_ts = + unix_now().saturating_sub(MetricsDatabase::METRICS_RETENTION_SECS + 1); + db.insert_events_with_delivered_ts(&["not-json".to_string()], Some(old_delivered_ts)) + .unwrap(); + + let total: i64 = db + .conn + .query_row("SELECT COUNT(*) FROM metrics", [], |row| row.get(0)) + .unwrap(); + assert_eq!(total, 0); } #[test] @@ -462,8 +857,8 @@ mod tests { let batch = db.get_batch(10).unwrap(); assert!(batch.is_empty()); - // Delete empty should succeed - db.delete_records(&[]).unwrap(); + // Marking an empty set delivered should succeed. + db.mark_records_delivered(&[], 1_700_000_000).unwrap(); // Count empty should return 0 let count = db.count().unwrap(); diff --git a/src/metrics/local_stats.rs b/src/metrics/local_stats.rs new file mode 100644 index 0000000000..52b2941079 --- /dev/null +++ b/src/metrics/local_stats.rs @@ -0,0 +1,1285 @@ +//! In-memory aggregation of persisted metric events for `git-ai usage`. + +/// How long after a session's last message a subsequent commit is attributed +/// to that session for yield and ai_lines_committed calculations. +const YIELD_WINDOW_SECS: u32 = 4 * 3600; + +use crate::error::GitAiError; +use crate::metrics::attrs::attr_pos; +use crate::metrics::db::{MetricHistoryRecord, MetricsDatabase}; +use crate::metrics::events::{checkpoint_pos, committed_pos, session_event_pos}; +use crate::metrics::pos_encoded::{ + sparse_get_string, sparse_get_u32, sparse_get_vec_string, sparse_get_vec_u32, +}; +use crate::metrics::types::MetricEvent; +use chrono::{DateTime, Datelike, Local, NaiveDate, TimeZone, Timelike}; +use serde::Serialize; +use std::cmp::Reverse; +use std::collections::{HashMap, HashSet}; + +#[derive(Debug, Serialize)] +pub struct LocalActivityStats { + pub period_label: String, + pub commits: CommitSummary, + pub checkpoints: CheckpointSummary, + pub sessions: SessionSummary, + pub tokens: TokenSummary, + /// Activity bucketed by day/week/month depending on period. + pub buckets: Vec, + /// AI lines committed per hour of day (local time), 24 elements. + pub hourly: Vec, + /// AI lines committed per day of week (local time), 7 elements: Mon=0 … Sun=6. + pub daily: Vec, +} + +#[derive(Debug, Default, Serialize)] +pub struct TokenSummary { + pub input: u64, + pub output: u64, + pub cache_read: u64, + pub cache_creation: u64, + /// Estimated cost in USD, summed across models with known pricing. + pub estimated_cost_usd: f64, + /// Per-model breakdown, sorted by total tokens descending. + pub by_model: Vec, + /// Week-over-week spend comparison (current 7 days vs previous 7 days). + /// None when either week has no cost data (e.g. viewing a period < 14 days + /// or when pricing is unavailable for all models). + pub wow_spend: Option, +} + +/// Week-over-week spend comparison. +#[derive(Debug, Serialize)] +pub struct WowSpend { + pub this_week_usd: f64, + pub last_week_usd: f64, + /// Percentage change: positive = up, negative = down. None when last week + /// was zero and this week has spend. + pub change_pct: Option, + pub new_this_week: bool, +} + +#[derive(Debug, Default, Serialize)] +pub struct TokenModelStat { + pub model: String, + pub sessions: u32, + pub input: u64, + pub output: u64, + pub cache_read: u64, + pub cache_creation: u64, + /// Estimated cost in USD; None if the model has no pricing entry. + pub estimated_cost_usd: Option, + /// Cache hit ratio: cache_read / (cache_read + cache_creation), 0.0–1.0. + /// None when neither cache_read nor cache_creation is non-zero (model + /// doesn't use prompt caching, e.g. codex). + pub cache_hit_ratio: Option, +} + +#[derive(Debug, Serialize)] +pub struct BucketStats { + pub label: String, + pub ai_lines: u32, + pub commit_count: u32, + /// Total git diff additions in this bucket (across all commits). + pub diff_added_lines: u32, + /// Lines attributed to AI or known-human in this bucket. + pub attributed_lines: u32, +} + +#[derive(Debug, Serialize)] +pub struct CommitSummary { + /// Commits that include at least one AI-attributed line. Human-only commits + /// are not counted here; use the diff/human stats for full commit coverage. + pub total: u32, + pub ai_lines: u32, + pub human_lines: u32, + /// Total lines added across all commits (git diff additions), used to + /// measure attribution coverage: lines not attributed to AI or known-human + /// are "untracked" holes in the data. + pub diff_added_lines: u32, + /// Per-tool AI line counts (tool · model label), sorted descending. + pub by_tool: Vec<(String, u32)>, + /// Per-tool acceptance rate: committed AI lines / checkpoint AI lines, as a + /// percentage. Values >100 indicate incomplete checkpoint data (e.g. events + /// recorded before the repo_url backfill). Sorted by tool name. + pub acceptance_by_tool: Vec<(String, u32)>, +} + +#[derive(Debug, Serialize)] +pub struct CheckpointSummary { + pub total: u32, + pub ai_lines_added: u32, + pub human_lines_added: u32, + pub files_edited: u32, +} + +#[derive(Debug, Serialize)] +pub struct SessionSummary { + pub total: u32, + pub by_tool: Vec<(String, u32)>, + pub yield_stats: YieldStats, +} + +/// Classifies sessions by whether they were followed by a commit within +/// a short window — a proxy for "did this AI session actually ship work?" +#[derive(Debug, Default, Serialize)] +pub struct YieldStats { + /// Sessions followed by at least one commit within `YIELD_WINDOW_SECS`. + pub shipped: u32, + /// Sessions with no commit found within the window. + pub abandoned: u32, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum BucketGranularity { + Daily, + Weekly, + Monthly, +} + +/// Event types useful for local usage stats. +const USAGE_EVENT_IDS: &[u16] = &[ + 1, // Committed + 4, // Checkpoint + 5, // SessionEvent +]; +const SESSION_RAW_JSON_KEY: &str = "0"; + +/// Acquire the global DB lock and fetch metric history for the given window. +fn fetch_metric_history( + since_ts: u32, + repo_filter: Option<&str>, +) -> Result, GitAiError> { + let db = MetricsDatabase::global()?; + let db_lock = db + .lock() + .map_err(|_| GitAiError::Generic("metrics DB lock poisoned".to_string()))?; + db_lock.get_metric_history(since_ts, repo_filter, USAGE_EVENT_IDS) +} + +/// Aggregate metric history since `since_ts` (Unix seconds) into activity stats. +/// +/// When `repo_filter` is `Some(url)`, only events from that repository are +/// aggregated. When `None`, events from all repositories are included. +pub fn compute_activity( + since_ts: u32, + period_label: String, + granularity: BucketGranularity, + repo_filter: Option<&str>, +) -> Result { + let records = fetch_metric_history(since_ts, repo_filter)?; + let refs: Vec<&MetricHistoryRecord> = records.iter().collect(); + compute_activity_from_records(&refs, since_ts, period_label, granularity) +} + +/// Aggregate a pre-fetched slice of `MetricHistoryRecord`s into activity stats. +/// +/// Separated from `compute_activity` so callers that already hold all events +/// (e.g. `compute_repo_summaries`) can avoid re-fetching from the DB per repo. +fn compute_activity_from_records( + records: &[&MetricHistoryRecord], + since_ts: u32, + period_label: String, + granularity: BucketGranularity, +) -> Result { + let mut total_commits = 0u32; + let mut total_ai_lines = 0u32; + let mut total_human_lines = 0u32; + let mut total_diff_added = 0u32; + let mut commit_tool_counts: HashMap = HashMap::new(); + + let mut total_checkpoints = 0u32; + let mut ai_lines_added = 0u32; + let mut human_lines_added = 0u32; + let mut files_edited: HashSet = HashSet::new(); + // Checkpoint AI lines keyed by plain tool name, for per-tool acceptance rate. + let mut checkpoint_ai_by_tool: HashMap = HashMap::new(); + // Committed AI lines keyed by plain tool name (extracted from tool::model pairs). + let mut committed_ai_by_plain_tool: HashMap = HashMap::new(); + + let mut session_ids: HashSet = HashSet::new(); + let mut session_tool_counts: HashMap = HashMap::new(); + + // Claude-shaped token usage keyed by assistant message id. Value is + // (model, accum, record_ts, session_id). `record_ts` is the Unix timestamp of the + // first event that introduced this message id — used for WoW bucketing. + let mut message_usage: HashMap = HashMap::new(); + + // Codex-shaped token usage keyed by session id. Codex reports cumulative + // session totals (total_token_usage) on each token_count event, so we keep + // the per-session max rather than summing. + let mut codex_sessions: HashMap = HashMap::new(); + + // bucket_key -> accumulated stats + let mut bucket_map: HashMap = HashMap::new(); + // bucket_key -> sort key (for ordering) + let mut bucket_order: HashMap = HashMap::new(); + + let mut hourly: Vec = vec![0u32; 24]; + let mut daily: Vec = vec![0u32; 7]; + + // Yield classification: track the latest timestamp seen per session, and + // all commit timestamps, then correlate after the loop. + let mut session_last_ts: HashMap = HashMap::new(); + let mut commit_timestamps: Vec = Vec::new(); + + for record in records { + let event = &record.event; + + match record.event_id { + 1 => { + commit_timestamps.push(record.ts); + let c = aggregate_committed( + event, + &mut total_commits, + &mut total_ai_lines, + &mut total_human_lines, + &mut total_diff_added, + &mut commit_tool_counts, + &mut committed_ai_by_plain_tool, + ); + + // Bucket every commit that added lines so coverage spans all + // committed code, not just AI commits. + if c.diff_added > 0 { + let local_dt = ts_to_local(record.ts); + if c.ai_lines > 0 { + hourly[local_dt.hour() as usize] += c.ai_lines; + // Weekday: Mon=0 … Sun=6 (chrono's num_days_from_monday). + daily[local_dt.weekday().num_days_from_monday() as usize] += c.ai_lines; + } + + let (key, order_key) = bucket_key(&local_dt, granularity); + let entry = bucket_map.entry(key.clone()).or_default(); + entry.ai_lines += c.ai_lines; + // Count AI commits only, to match the AI-lines bar. + if c.ai_lines > 0 { + entry.commit_count += 1; + } + entry.diff_added += c.diff_added; + entry.attributed += c.ai_lines + c.human_lines; + bucket_order.entry(key).or_insert(order_key); + } + } + 4 => aggregate_checkpoint( + event, + &mut total_checkpoints, + &mut ai_lines_added, + &mut human_lines_added, + &mut files_edited, + &mut checkpoint_ai_by_tool, + ), + 5 => { + aggregate_session(event, &mut session_ids, &mut session_tool_counts); + + // Track last-seen timestamp per session for yield classification. + if let Some(sid) = sparse_get_string(&event.attrs, attr_pos::SESSION_ID).flatten() { + let entry = session_last_ts.entry(sid).or_insert(0); + *entry = (*entry).max(record.ts); + } + let tool = sparse_get_string(&event.attrs, attr_pos::TOOL) + .flatten() + .unwrap_or_default(); + if tool == "codex" { + aggregate_codex_tokens(event, record.ts, &mut codex_sessions); + } else { + let sid = sparse_get_string(&event.attrs, attr_pos::SESSION_ID) + .flatten() + .unwrap_or_default(); + aggregate_session_tokens(event, record.ts, sid, &mut message_usage); + } + } + _ => {} + } + } + + // Yield classification: for each unique session, check if a commit landed + // within 4 hours of the session's last observed event. + // + // Limitation: the all-repos view aggregates activity globally, so a commit + // in repo-A can incorrectly "claim" a nearby session from repo-B. The + // per-repo view avoids this by grouping on repo_url before aggregation. + + commit_timestamps.sort_unstable(); + let mut yield_shipped = 0u32; + let mut yield_abandoned = 0u32; + for last_ts in session_last_ts.values() { + let window_end = last_ts.saturating_add(YIELD_WINDOW_SECS); + // Find the first commit at or after this session's last event. + let pos = commit_timestamps.partition_point(|&t| t < *last_ts); + if commit_timestamps.get(pos).is_some_and(|&t| t <= window_end) { + yield_shipped += 1; + } else { + yield_abandoned += 1; + } + } + + // Per-tool acceptance rate: committed AI lines / checkpoint AI lines. + // Values >100 indicate incomplete checkpoint data (e.g. checkpoint events + // aged out of the window while committed events remain). u32::MAX is the + // sentinel for "no checkpoint events at all" — same display path as >100. + let mut acceptance_by_tool: Vec<(String, u32)> = committed_ai_by_plain_tool + .iter() + .map(|(tool, &committed)| { + let pct = match checkpoint_ai_by_tool.get(tool).copied() { + Some(checkpoint) if checkpoint > 0 => (committed as u64 * 100) + .checked_div(checkpoint as u64) + .map(|p| p as u32) + .unwrap_or(u32::MAX), + _ => u32::MAX, + }; + (tool.clone(), pct) + }) + .collect(); + acceptance_by_tool.sort_by(|(a, _), (b, _)| a.cmp(b)); + + let mut commit_by_tool: Vec<(String, u32)> = commit_tool_counts.into_iter().collect(); + commit_by_tool.sort_by_key(|&(_, count)| Reverse(count)); + + let mut session_by_tool: Vec<(String, u32)> = session_tool_counts.into_iter().collect(); + session_by_tool.sort_by_key(|&(_, count)| Reverse(count)); + + let now_ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as u32; + let tokens = build_token_summary(message_usage, codex_sessions, now_ts, since_ts); + + // Map by order key for fill_buckets to look up real data. + let bucket_by_order: HashMap = bucket_map + .into_iter() + .map(|(label, accum)| (bucket_order[&label], accum)) + .collect(); + + // Fill in empty buckets between since_ts and now so the chart has no gaps. + let filled = fill_buckets(bucket_by_order, since_ts, granularity); + + Ok(LocalActivityStats { + period_label, + commits: CommitSummary { + total: total_commits, + ai_lines: total_ai_lines, + human_lines: total_human_lines, + diff_added_lines: total_diff_added, + by_tool: commit_by_tool, + acceptance_by_tool, + }, + checkpoints: CheckpointSummary { + total: total_checkpoints, + ai_lines_added, + human_lines_added, + files_edited: files_edited.len() as u32, + }, + sessions: SessionSummary { + total: session_ids.len() as u32, + by_tool: session_by_tool, + yield_stats: YieldStats { + shipped: yield_shipped, + abandoned: yield_abandoned, + }, + }, + tokens, + buckets: filled, + hourly, + daily, + }) +} + +/// Per-model token accumulator. +#[derive(Debug, Default, Clone)] +struct TokenAccum { + input: u64, + output: u64, + cache_read: u64, + cache_creation: u64, +} + +/// Per-session codex accumulator. Codex reports *cumulative* session totals on +/// each `token_count` event, so we track the max of each raw field. The model +/// name arrives on a separate event (`payload.model`), captured when seen. +#[derive(Debug, Default, Clone)] +struct CodexSessionAccum { + model: Option, + /// Unix timestamp of the latest token-usage event seen for this session + /// (WoW bucketing). + last_usage_ts: u32, + /// Cumulative input tokens (includes cached). + input_tokens: u64, + /// Cumulative cached input tokens (subset of input_tokens). + cached_input_tokens: u64, + /// Cumulative output tokens (includes reasoning). + output_tokens: u64, +} + +impl CodexSessionAccum { + /// Map codex token fields onto the shared `TokenAccum` schema. + /// + /// Codex `input_tokens` *includes* cached tokens, so non-cached input is + /// the difference. Codex has no cache-creation concept. + fn to_token_accum(&self) -> TokenAccum { + TokenAccum { + input: self.input_tokens.saturating_sub(self.cached_input_tokens), + output: self.output_tokens, + cache_read: self.cached_input_tokens, + cache_creation: 0, + } + } +} + +/// Per-million-token pricing for a model (USD). +struct ModelPricing { + input: f64, + output: f64, + cache_write: f64, + cache_read: f64, +} + +/// Built-in pricing estimate, matched by substring of the model id. +/// Rates are public Anthropic list prices (USD per million tokens) and are +/// only an estimate — they go stale as pricing changes. +fn pricing_for(model: &str) -> Option { + let m = model.to_lowercase(); + if m.contains("opus") { + Some(ModelPricing { + input: 15.0, + output: 75.0, + cache_write: 18.75, + cache_read: 1.5, + }) + } else if m.contains("sonnet") { + Some(ModelPricing { + input: 3.0, + output: 15.0, + cache_write: 3.75, + cache_read: 0.3, + }) + } else if m.contains("haiku") { + Some(ModelPricing { + input: 0.8, + output: 4.0, + cache_write: 1.0, + cache_read: 0.08, + }) + } else if m.contains("gpt") { + // OpenAI GPT-5 family estimate; cache_write unused (codex reports no + // cache-creation tokens). + Some(ModelPricing { + input: 1.25, + output: 10.0, + cache_write: 1.25, + cache_read: 0.125, + }) + } else { + None + } +} + +fn estimate_cost(acc: &TokenAccum, pricing: &ModelPricing) -> f64 { + (acc.input as f64 * pricing.input + + acc.output as f64 * pricing.output + + acc.cache_creation as f64 * pricing.cache_write + + acc.cache_read as f64 * pricing.cache_read) + / 1_000_000.0 +} + +/// Shorten a model id for display: strip a trailing "-YYYYMMDD" date snapshot +/// (e.g. "claude-haiku-4-5-20251001" -> "claude-haiku-4-5"). +fn shorten_model(model: &str) -> String { + match model.rsplit_once('-') { + Some((head, tail)) if tail.len() == 8 && tail.chars().all(|c| c.is_ascii_digit()) => { + head.to_string() + } + _ => model.to_string(), + } +} + +/// Fold a set of message-usage entries into a per-model cost estimate (USD). +/// Used to compute each WoW half independently. +fn cost_for_message_slice(entries: impl Iterator) -> f64 { + let mut model_totals: HashMap = HashMap::new(); + for (model, acc) in entries { + let e = model_totals.entry(model).or_default(); + e.input += acc.input; + e.output += acc.output; + e.cache_read += acc.cache_read; + e.cache_creation += acc.cache_creation; + } + model_totals + .iter() + .filter_map(|(model, acc)| pricing_for(model).map(|p| estimate_cost(acc, &p))) + .sum() +} + +fn build_token_summary( + message_usage: HashMap, + codex_sessions: HashMap, + now_ts: u32, + since_ts: u32, +) -> TokenSummary { + // Week-over-week split: "this week" = last 7 days, "last week" = 7–14 days ago. + // Only meaningful when the query window covers at least 14 days; otherwise + // last-week events were never fetched and last_week_cost would be 0 by + // omission rather than by fact. + let this_week_start = now_ts.saturating_sub(7 * 24 * 3600); + let last_week_start = now_ts.saturating_sub(14 * 24 * 3600); + let wow_eligible = since_ts <= last_week_start; + + let mut this_week_msgs: Vec<(String, TokenAccum)> = Vec::new(); + let mut last_week_msgs: Vec<(String, TokenAccum)> = Vec::new(); + + // Fold per-message (deduped, max) usage into per-model totals. + // Key by shorten_model() so date-snapshot variants (e.g. claude-sonnet-4-6-20250101 + // and claude-sonnet-4-6-20250201) are folded into a single display row. + let mut model_tokens: HashMap = HashMap::new(); + let mut model_session_ids: HashMap> = HashMap::new(); + for (_id, (model, acc, ts, sid)) in message_usage { + let short = shorten_model(&model); + let entry = model_tokens.entry(short.clone()).or_default(); + entry.input += acc.input; + entry.output += acc.output; + entry.cache_read += acc.cache_read; + entry.cache_creation += acc.cache_creation; + + if !sid.is_empty() { + model_session_ids + .entry(short.clone()) + .or_default() + .insert(sid); + } + + if ts >= this_week_start { + this_week_msgs.push((short, acc)); + } else if ts >= last_week_start { + last_week_msgs.push((short, acc)); + } + } + + // Fold per-session codex totals into per-model totals, mapping codex's + // field semantics onto ours: codex input_tokens *includes* cached, so the + // non-cached input is the difference; cached maps to cache_read; codex has + // no cache-creation concept. + let mut this_week_codex: Vec<(String, TokenAccum)> = Vec::new(); + let mut last_week_codex: Vec<(String, TokenAccum)> = Vec::new(); + + for (sid, acc) in codex_sessions { + let model = acc.model.clone().unwrap_or_else(|| "codex".to_string()); + let short = shorten_model(&model); + let mapped = acc.to_token_accum(); + let entry = model_tokens.entry(short.clone()).or_default(); + entry.input += mapped.input; + entry.output += mapped.output; + entry.cache_read += mapped.cache_read; + model_session_ids + .entry(short.clone()) + .or_default() + .insert(sid); + + if acc.last_usage_ts >= this_week_start { + this_week_codex.push((short, mapped)); + } else if acc.last_usage_ts >= last_week_start { + last_week_codex.push((short, mapped)); + } + } + + // Compute WoW spend from the two half-slices. + let this_week_cost = cost_for_message_slice(this_week_msgs.into_iter().chain(this_week_codex)); + let last_week_cost = cost_for_message_slice(last_week_msgs.into_iter().chain(last_week_codex)); + + let wow_spend = if wow_eligible && (this_week_cost > 0.0 || last_week_cost > 0.0) { + let (change_pct, new_this_week) = if last_week_cost > 0.0 { + ( + Some((this_week_cost - last_week_cost) / last_week_cost * 100.0), + false, + ) + } else { + (None, true) + }; + Some(WowSpend { + this_week_usd: this_week_cost, + last_week_usd: last_week_cost, + change_pct, + new_this_week, + }) + } else { + None + }; + + let mut summary = TokenSummary::default(); + let mut by_model: Vec = Vec::new(); + + for (model, acc) in model_tokens { + // Skip placeholder/synthetic entries that carried no real token counts. + if acc.input == 0 && acc.output == 0 && acc.cache_read == 0 && acc.cache_creation == 0 { + continue; + } + + summary.input += acc.input; + summary.output += acc.output; + summary.cache_read += acc.cache_read; + summary.cache_creation += acc.cache_creation; + + let cost = pricing_for(&model).map(|p| estimate_cost(&acc, &p)); + if let Some(c) = cost { + summary.estimated_cost_usd += c; + } + + let cache_total = acc.cache_read + acc.cache_creation; + let cache_hit_ratio = if cache_total > 0 { + Some(acc.cache_read as f64 / cache_total as f64) + } else { + None + }; + + let sessions = model_session_ids + .get(&model) + .map(|s| s.len() as u32) + .unwrap_or(0); + by_model.push(TokenModelStat { + model, // already shortened at insertion into model_tokens + sessions, + input: acc.input, + output: acc.output, + cache_read: acc.cache_read, + cache_creation: acc.cache_creation, + estimated_cost_usd: cost, + cache_hit_ratio, + }); + } + + by_model.sort_by_key(|m| Reverse(m.input + m.output + m.cache_read + m.cache_creation)); + summary.by_model = by_model; + summary.wow_spend = wow_spend; + summary +} + +fn ts_to_local(ts: u32) -> DateTime { + Local + .timestamp_opt(ts as i64, 0) + .single() + .unwrap_or_else(Local::now) +} + +/// Produce the display label for a bucket whose "anchor" date (Monday for +/// weekly, 1st for monthly, the day itself for daily) is `date`. +fn bucket_label(date: NaiveDate, granularity: BucketGranularity) -> String { + match granularity { + BucketGranularity::Daily => date.format("%b %d").to_string(), + BucketGranularity::Weekly => { + let sunday = date + chrono::Duration::days(6); + format!("{} – {}", date.format("%b %d"), sunday.format("%b %d")) + } + BucketGranularity::Monthly => date.format("%b %Y").to_string(), + } +} + +fn bucket_key(dt: &DateTime, granularity: BucketGranularity) -> (String, i64) { + match granularity { + BucketGranularity::Daily => { + let date = dt.date_naive(); + let order = date.num_days_from_ce() as i64; + (bucket_label(date, granularity), order) + } + BucketGranularity::Weekly => { + // ISO week: key on Monday of the week. + let weekday = dt.weekday().num_days_from_monday() as i64; + let monday = dt.date_naive() - chrono::Duration::days(weekday); + let order = monday.num_days_from_ce() as i64; + (bucket_label(monday, granularity), order) + } + BucketGranularity::Monthly => { + let order = dt.year() as i64 * 12 + dt.month0() as i64; + (bucket_label(dt.date_naive(), granularity), order) + } + } +} + +/// Fill gaps between `since_ts` and today so charts have contiguous buckets. +fn fill_buckets( + mut data_map: HashMap, + since_ts: u32, + granularity: BucketGranularity, +) -> Vec { + let now = Local::now(); + if since_ts == 0 && data_map.is_empty() { + return Vec::new(); + } + let since_date = if since_ts == 0 { + let earliest_order = data_map.keys().copied().min(); + earliest_order + .and_then(|order| bucket_start_date(order, granularity)) + .unwrap_or_else(|| now.date_naive()) + } else { + ts_to_local(since_ts).date_naive() + }; + + let make = |label: String, accum: BucketAccum| BucketStats { + label, + ai_lines: accum.ai_lines, + commit_count: accum.commit_count, + diff_added_lines: accum.diff_added, + attributed_lines: accum.attributed, + }; + + // Generate all expected bucket keys between since and now. + let mut result = Vec::new(); + match granularity { + BucketGranularity::Daily => { + let mut day = since_date; + let today = now.date_naive(); + while day <= today { + let order = day.num_days_from_ce() as i64; + result.push(make( + bucket_label(day, granularity), + data_map.remove(&order).unwrap_or_default(), + )); + day = day.succ_opt().unwrap_or(today); + } + } + BucketGranularity::Weekly => { + let weekday = since_date.weekday().num_days_from_monday() as i64; + let mut monday: NaiveDate = since_date - chrono::Duration::days(weekday); + let today = now.date_naive(); + while monday <= today { + let order = monday.num_days_from_ce() as i64; + result.push(make( + bucket_label(monday, granularity), + data_map.remove(&order).unwrap_or_default(), + )); + monday = monday + .checked_add_signed(chrono::Duration::weeks(1)) + .unwrap_or(today); + } + } + BucketGranularity::Monthly => { + let mut year = since_date.year(); + let mut month = since_date.month(); + let now_year = now.year(); + let now_month = now.month(); + loop { + let order = year as i64 * 12 + (month - 1) as i64; + let Some(date) = NaiveDate::from_ymd_opt(year, month, 1) else { + break; + }; + let label = bucket_label(date, granularity); + result.push(make(label, data_map.remove(&order).unwrap_or_default())); + if year == now_year && month == now_month { + break; + } + month += 1; + if month > 12 { + month = 1; + year += 1; + } + } + } + } + + result +} + +fn bucket_start_date(order: i64, granularity: BucketGranularity) -> Option { + match granularity { + BucketGranularity::Daily | BucketGranularity::Weekly => { + NaiveDate::from_num_days_from_ce_opt(order.try_into().ok()?) + } + BucketGranularity::Monthly => { + let year = order.div_euclid(12); + let month0 = order.rem_euclid(12); + NaiveDate::from_ymd_opt(year.try_into().ok()?, (month0 + 1).try_into().ok()?, 1) + } + } +} + +/// Per-bucket accumulator for the activity-over-time chart. +#[derive(Debug, Default, Clone)] +struct BucketAccum { + ai_lines: u32, + commit_count: u32, + diff_added: u32, + attributed: u32, +} + +/// Per-commit contribution returned by `aggregate_committed` for bucketing. +struct CommitContribution { + ai_lines: u32, + human_lines: u32, + diff_added: u32, +} + +fn aggregate_committed( + event: &MetricEvent, + total_commits: &mut u32, + total_ai_lines: &mut u32, + total_human_lines: &mut u32, + total_diff_added: &mut u32, + commit_tool_counts: &mut HashMap, + committed_ai_by_plain_tool: &mut HashMap, +) -> CommitContribution { + let human = sparse_get_u32(&event.values, committed_pos::HUMAN_ADDITIONS) + .flatten() + .unwrap_or(0); + let diff_added = sparse_get_u32(&event.values, committed_pos::GIT_DIFF_ADDED_LINES) + .flatten() + .unwrap_or(0); + let ai_vecs = sparse_get_vec_u32(&event.values, committed_pos::AI_ADDITIONS) + .flatten() + .unwrap_or_default(); + let total_ai = ai_vecs.first().copied().unwrap_or(0); + + // Always accumulate human lines and total diff additions regardless of + // whether the commit has AI lines (coverage spans all committed code). + *total_human_lines += human; + *total_diff_added += diff_added; + + let contribution = CommitContribution { + ai_lines: total_ai, + human_lines: human, + diff_added, + }; + + // Only count the commit toward the AI-commits total when AI was involved. + // Human-only commits still contribute to human_lines and diff_added above. + if total_ai == 0 { + return contribution; + } + + *total_commits += 1; + *total_ai_lines += total_ai; + + // Per-tool breakdown: index 0 = "all" aggregate, 1+ = per tool::model. + // Parse pairs once and use them for both the display label map and the + // plain-tool map used for acceptance rate — no second parse needed. + let pairs = sparse_get_vec_string(&event.values, committed_pos::TOOL_MODEL_PAIRS) + .flatten() + .unwrap_or_default(); + for (i, pair) in pairs.iter().enumerate().skip(1) { + let ai_for_tool = ai_vecs.get(i).copied().unwrap_or(0); + if ai_for_tool > 0 { + *commit_tool_counts + .entry(format_tool_model(pair)) + .or_insert(0) += ai_for_tool; + let plain_tool = pair.split_once("::").map(|(t, _)| t).unwrap_or(pair); + *committed_ai_by_plain_tool + .entry(plain_tool.to_string()) + .or_insert(0) += ai_for_tool; + } + } + + contribution +} + +/// Format a "tool::model" pair into a readable "tool · model" label, +/// trimming a redundant tool prefix from the model (e.g. "claude::claude-sonnet-4-6" +/// becomes "claude · sonnet-4-6"). +fn format_tool_model(pair: &str) -> String { + match pair.split_once("::") { + Some((tool, model)) if !model.is_empty() => { + let prefix = format!("{tool}-"); + let model = model.strip_prefix(&prefix).unwrap_or(model); + format!("{tool} · {model}") + } + _ => pair.to_string(), + } +} + +fn aggregate_checkpoint( + event: &MetricEvent, + total_checkpoints: &mut u32, + ai_lines_added: &mut u32, + human_lines_added: &mut u32, + files_edited: &mut HashSet, + checkpoint_ai_by_tool: &mut HashMap, +) { + *total_checkpoints += 1; + + let kind = sparse_get_string(&event.values, checkpoint_pos::KIND) + .flatten() + .unwrap_or_default(); + let file_path = sparse_get_string(&event.values, checkpoint_pos::FILE_PATH) + .flatten() + .unwrap_or_default(); + let lines_added = sparse_get_u32(&event.values, checkpoint_pos::LINES_ADDED) + .flatten() + .unwrap_or(0); + + if !file_path.is_empty() { + files_edited.insert(file_path); + } + + match kind.as_str() { + "ai_agent" | "ai_tab" => { + *ai_lines_added += lines_added; + if lines_added > 0 { + let tool = sparse_get_string(&event.attrs, attr_pos::TOOL) + .flatten() + .unwrap_or_else(|| "unknown".to_string()); + *checkpoint_ai_by_tool.entry(tool).or_insert(0) += lines_added; + } + } + "known_human" => *human_lines_added += lines_added, + _ => {} + } +} + +fn aggregate_session( + event: &MetricEvent, + session_ids: &mut HashSet, + session_tool_counts: &mut HashMap, +) { + let session_id = sparse_get_string(&event.attrs, attr_pos::SESSION_ID).flatten(); + let tool = sparse_get_string(&event.attrs, attr_pos::TOOL) + .flatten() + .unwrap_or_else(|| "unknown".to_string()); + + if let Some(sid) = session_id + && session_ids.insert(sid) + { + *session_tool_counts.entry(tool).or_insert(0) += 1; + } +} + +/// Extract token usage from a session event's raw transcript JSON (position 0). +/// Only assistant messages carry usage. Keyed by message id, keeping the +/// field-wise max across re-emitted copies (streaming partials report lower +/// counts than the final message). `record_ts` is stored on first insertion +/// for week-over-week bucketing. +fn aggregate_session_tokens( + event: &MetricEvent, + record_ts: u32, + session_id: String, + message_usage: &mut HashMap, +) { + debug_assert_eq!(session_event_pos::RAW_JSON, 0); + let Some(raw) = event.values.get(SESSION_RAW_JSON_KEY) else { + return; + }; + let Some(message) = raw.get("message") else { + return; + }; + if message.get("role").and_then(|r| r.as_str()) != Some("assistant") { + return; + } + let Some(usage) = message.get("usage") else { + return; + }; + let Some(id) = message.get("id").and_then(|i| i.as_str()) else { + return; + }; + + let model = message + .get("model") + .and_then(|m| m.as_str()) + .unwrap_or("unknown") + .to_string(); + + let get = |key: &str| usage.get(key).and_then(|v| v.as_u64()).unwrap_or(0); + + let (stored_model, acc, _ts, stored_sid) = + message_usage.entry(id.to_string()).or_insert_with(|| { + ( + model.clone(), + TokenAccum::default(), + record_ts, + session_id.clone(), + ) + }); + // If the entry was created with an "unknown" placeholder model (e.g. from a + // streaming partial that arrived before the final event), upgrade it now. + if stored_model == "unknown" && model != "unknown" { + *stored_model = model; + } + // Similarly, upgrade an empty session_id once a real one is available. + if stored_sid.is_empty() && !session_id.is_empty() { + *stored_sid = session_id; + } + // Field-wise max: input/cache are fixed per message; output grows while + // streaming, so the final (largest) value is authoritative. + acc.input = acc.input.max(get("input_tokens")); + acc.output = acc.output.max(get("output_tokens")); + acc.cache_read = acc.cache_read.max(get("cache_read_input_tokens")); + acc.cache_creation = acc.cache_creation.max(get("cache_creation_input_tokens")); +} + +/// Extract token usage from a codex session event. Codex emits `token_count` +/// events carrying cumulative `payload.info.total_token_usage`, and reports its +/// model on a separate event via `payload.model`. Both are keyed by session id; +/// cumulative totals are tracked as a per-session max. +fn aggregate_codex_tokens( + event: &MetricEvent, + record_ts: u32, + codex_sessions: &mut HashMap, +) { + let Some(session_id) = sparse_get_string(&event.attrs, attr_pos::SESSION_ID).flatten() else { + return; + }; + debug_assert_eq!(session_event_pos::RAW_JSON, 0); + let Some(raw) = event.values.get(SESSION_RAW_JSON_KEY) else { + return; + }; + let Some(payload) = raw.get("payload") else { + return; + }; + + let entry = codex_sessions.entry(session_id).or_default(); + + // Capture the model name when it appears (not on token_count events). + if let Some(model) = payload.get("model").and_then(|m| m.as_str()) + && entry.model.is_none() + { + entry.model = Some(model.to_string()); + } + + // Cumulative session totals; keep the running max. + if let Some(usage) = payload.get("info").and_then(|i| i.get("total_token_usage")) { + let get = |key: &str| usage.get(key).and_then(|v| v.as_u64()).unwrap_or(0); + entry.last_usage_ts = entry.last_usage_ts.max(record_ts); + entry.input_tokens = entry.input_tokens.max(get("input_tokens")); + entry.cached_input_tokens = entry.cached_input_tokens.max(get("cached_input_tokens")); + entry.output_tokens = entry.output_tokens.max(get("output_tokens")); + } +} + +// ─── Per-repository breakdown ───────────────────────────────────────────────── + +/// Summary of activity for a single repository. +#[derive(Debug, Serialize)] +pub struct RepoActivitySummary { + /// Normalised repository URL (e.g. `github.com/org/repo`). + pub repo_url: String, + pub ai_lines: u32, + pub commits: u32, + pub sessions: u32, + pub estimated_cost_usd: f64, +} + +/// Aggregate a pre-fetched slice of events into a per-repository breakdown. +fn repo_summaries_from_records( + all_records: &[MetricHistoryRecord], + since_ts: u32, + granularity: BucketGranularity, +) -> Result, GitAiError> { + // Group records by repo_url, skipping events with no repo (NULL) — these + // predate repo_url emission and have no meaningful identity to display. + let mut by_repo: HashMap<&str, Vec<&MetricHistoryRecord>> = HashMap::new(); + for record in all_records { + if let Some(ref url) = record.repo_url { + by_repo.entry(url.as_str()).or_default().push(record); + } + } + + let mut summaries: Vec = by_repo + .into_iter() + .filter_map(|(url, records)| { + let stats = + compute_activity_from_records(&records, since_ts, String::new(), granularity) + .ok()?; + Some(RepoActivitySummary { + repo_url: url.to_string(), + ai_lines: stats.commits.ai_lines, + commits: stats.commits.total, + sessions: stats.sessions.total, + estimated_cost_usd: stats.tokens.estimated_cost_usd, + }) + }) + .collect(); + + summaries.sort_by_key(|s| std::cmp::Reverse(s.ai_lines)); + Ok(summaries) +} + +/// Fetch events once and compute overall activity stats and the per-repo +/// breakdown from the same snapshot, ensuring the two views are consistent. +pub fn compute_all( + since_ts: u32, + period_label: String, + granularity: BucketGranularity, + repo_filter: Option<&str>, +) -> Result<(LocalActivityStats, Vec), GitAiError> { + let records = fetch_metric_history(since_ts, repo_filter)?; + let refs: Vec<&MetricHistoryRecord> = records.iter().collect(); + let stats = compute_activity_from_records(&refs, since_ts, period_label, granularity)?; + let repos = repo_summaries_from_records(&records, since_ts, granularity)?; + Ok((stats, repos)) +} + +/// Compute a per-repository breakdown for the given time window. +/// +/// Fetches all matching events in a single DB query, groups them in memory by +/// `repo_url`, and aggregates each group — O(n) instead of O(n × repos). +/// Sorted by `ai_lines` descending. +pub fn compute_repo_summaries( + since_ts: u32, + granularity: BucketGranularity, + repo_filter: Option<&str>, +) -> Result, GitAiError> { + let all_records = fetch_metric_history(since_ts, repo_filter)?; + repo_summaries_from_records(&all_records, since_ts, granularity) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metrics::attrs::EventAttributes; + use crate::metrics::events::{CheckpointValues, CommittedValues, SessionEventValues}; + use crate::metrics::pos_encoded::{PosEncoded, sparse_get_string}; + use serde_json::json; + + fn now_ts() -> u32 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as u32 + } + + fn attrs( + repo_url: Option<&str>, + tool: &str, + session_id: Option<&str>, + ) -> crate::metrics::types::SparseArray { + let mut attrs = EventAttributes::with_version("test").tool(tool); + if let Some(repo_url) = repo_url { + attrs = attrs.repo_url(repo_url); + } + if let Some(session_id) = session_id { + attrs = attrs.session_id(session_id); + } + attrs.to_sparse() + } + + fn record(event: MetricEvent) -> MetricHistoryRecord { + let repo_url = sparse_get_string(&event.attrs, attr_pos::REPO_URL).flatten(); + MetricHistoryRecord { + event_id: event.event_id, + ts: event.timestamp, + repo_url, + event, + } + } + + fn committed( + ts: u32, + repo_url: &str, + ai: u32, + human: u32, + diff_added: u32, + ) -> MetricHistoryRecord { + let values = CommittedValues::new() + .human_additions(human) + .git_diff_added_lines(diff_added) + .tool_model_pairs(vec![ + "all".to_string(), + "claude::claude-sonnet-4-6".to_string(), + ]) + .ai_additions(vec![ai, ai]); + record(MetricEvent::with_timestamp( + ts, + &values, + attrs(Some(repo_url), "claude", None), + )) + } + + fn checkpoint(ts: u32, repo_url: &str, lines_added: u32) -> MetricHistoryRecord { + let values = CheckpointValues::new() + .kind("ai_agent") + .file_path("src/main.rs") + .lines_added(lines_added); + record(MetricEvent::with_timestamp( + ts, + &values, + attrs(Some(repo_url), "claude", Some("session-1")), + )) + } + + fn claude_session(ts: u32, repo_url: Option<&str>, session_id: &str) -> MetricHistoryRecord { + let values = SessionEventValues::new(json!({ + "message": { + "id": "msg-1", + "role": "assistant", + "model": "claude-sonnet-4-6-20250101", + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "cache_read_input_tokens": 20, + "cache_creation_input_tokens": 10 + } + } + })); + record(MetricEvent::with_timestamp( + ts, + &values, + attrs(repo_url, "claude", Some(session_id)), + )) + } + + #[test] + fn compute_activity_aggregates_commits_checkpoints_sessions_and_tokens() { + let now = now_ts(); + let repo = "github.com/acme/project"; + let session_ts = now.saturating_sub(600); + let commit_ts = now.saturating_sub(300); + let records = [ + claude_session(session_ts, Some(repo), "session-1"), + checkpoint(session_ts + 10, repo, 12), + committed(commit_ts, repo, 10, 2, 12), + ]; + let refs: Vec<&MetricHistoryRecord> = records.iter().collect(); + + let stats = compute_activity_from_records( + &refs, + now.saturating_sub(24 * 3600), + "last 1 day".to_string(), + BucketGranularity::Daily, + ) + .unwrap(); + + assert_eq!(stats.commits.total, 1); + assert_eq!(stats.commits.ai_lines, 10); + assert_eq!(stats.commits.human_lines, 2); + assert_eq!(stats.commits.diff_added_lines, 12); + assert_eq!( + stats.commits.by_tool, + vec![("claude · sonnet-4-6".to_string(), 10)] + ); + assert_eq!( + stats.commits.acceptance_by_tool, + vec![("claude".to_string(), 83)] + ); + assert_eq!(stats.checkpoints.total, 1); + assert_eq!(stats.checkpoints.ai_lines_added, 12); + assert_eq!(stats.checkpoints.files_edited, 1); + assert_eq!(stats.sessions.total, 1); + assert_eq!(stats.sessions.yield_stats.shipped, 1); + assert_eq!(stats.sessions.yield_stats.abandoned, 0); + assert_eq!(stats.tokens.input, 100); + assert_eq!(stats.tokens.output, 50); + assert_eq!(stats.tokens.cache_read, 20); + assert_eq!(stats.tokens.cache_creation, 10); + assert_eq!(stats.tokens.by_model[0].model, "claude-sonnet-4-6"); + assert!(stats.buckets.iter().any(|bucket| bucket.ai_lines == 10)); + } + + #[test] + fn repo_summaries_group_records_by_repo_and_skip_unknown_repo() { + let now = now_ts(); + let repo = "github.com/acme/project"; + let records = [ + committed(now.saturating_sub(300), repo, 8, 0, 8), + claude_session(now.saturating_sub(200), Some(repo), "session-1"), + claude_session(now.saturating_sub(100), None, "session-unknown"), + ]; + + let summaries = repo_summaries_from_records( + &records, + now.saturating_sub(24 * 3600), + BucketGranularity::Daily, + ) + .unwrap(); + + assert_eq!(summaries.len(), 1); + assert_eq!(summaries[0].repo_url, repo); + assert_eq!(summaries[0].ai_lines, 8); + assert_eq!(summaries[0].commits, 1); + assert_eq!(summaries[0].sessions, 1); + assert!(summaries[0].estimated_cost_usd > 0.0); + } +} diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index a925d1ff1c..6b6bf9a4c4 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -8,6 +8,7 @@ pub mod attrs; pub mod db; pub mod events; +pub mod local_stats; pub mod pos_encoded; pub mod types; diff --git a/src/observability/mod.rs b/src/observability/mod.rs index fbf9aab5f3..d890f072fc 100644 --- a/src/observability/mod.rs +++ b/src/observability/mod.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; use std::time::Duration; +use crate::error::GitAiError; use crate::metrics::MetricEvent; +use crate::metrics::db::MetricsDatabase; pub mod performance_targets; @@ -11,15 +13,67 @@ pub const MAX_METRICS_PER_ENVELOPE: usize = 1000; /// Submit telemetry envelopes via the best available path: /// 1. External daemon control socket (wrapper processes) /// 2. In-process daemon telemetry worker (daemon process itself) -/// 3. Silently drop if neither is available +/// 3. Local SQLite storage for metric events if neither daemon path is available fn submit_telemetry_envelope(envelopes: Vec) { - if crate::daemon::telemetry_handle::daemon_telemetry_available() { - crate::daemon::telemetry_handle::submit_telemetry(envelopes); - } else if crate::daemon::daemon_process_active() { - crate::daemon::telemetry_worker::submit_daemon_internal_telemetry(envelopes); + let envelopes = if crate::daemon::telemetry_handle::daemon_telemetry_available() { + match crate::daemon::telemetry_handle::submit_telemetry(envelopes) { + Ok(()) => return, + Err(envelopes) => envelopes, + } + } else { + envelopes + }; + + let envelopes = if crate::daemon::daemon_process_active() { + match crate::daemon::telemetry_worker::submit_daemon_internal_telemetry(envelopes) { + Ok(()) => return, + Err(envelopes) => envelopes, + } + } else { + envelopes + }; + + if let Err(e) = store_metrics_envelopes_locally(envelopes) { + tracing::warn!(%e, "telemetry: failed to persist metrics locally"); } } +fn store_metrics_envelopes_locally( + envelopes: Vec, +) -> Result<(), GitAiError> { + let mut events = Vec::new(); + for envelope in envelopes { + if let crate::daemon::TelemetryEnvelope::Metrics { + events: metric_events, + } = envelope + { + events.extend(metric_events); + } + } + + if events.is_empty() { + return Ok(()); + } + + for chunk in events.chunks(MAX_METRICS_PER_ENVELOPE) { + let event_jsons: Vec = chunk + .iter() + .map(serde_json::to_string) + .collect::>()?; + if event_jsons.is_empty() { + continue; + } + + let db = MetricsDatabase::global()?; + let mut db_lock = db + .lock() + .map_err(|_| GitAiError::Generic("metrics DB lock poisoned".to_string()))?; + db_lock.insert_events(&event_jsons)?; + } + + Ok(()) +} + /// Log an error to Sentry (via daemon telemetry worker) pub fn log_error(error: &dyn std::error::Error, context: Option) { let envelope = crate::daemon::TelemetryEnvelope::Error {