From 0ebf93a4469d1176ed969e13d1bd280f0c7d035d Mon Sep 17 00:00:00 2001 From: OceanLi <122793010+ohdearquant@users.noreply.github.com> Date: Sun, 31 May 2026 13:47:48 -0400 Subject: [PATCH] fix(comm+schedule): SQL pushdown for thread, indexes, read filter (#535) Co-Authored-By: Claude Sonnet 4.6 --- crates/khive-db/src/stores/note.rs | 59 ++++-- crates/khive-pack-comm/src/handlers.rs | 61 ++---- crates/khive-pack-comm/src/lib.rs | 19 +- crates/khive-pack-comm/tests/integration.rs | 198 +++++++++++++++++- crates/khive-pack-schedule/src/lib.rs | 13 +- .../khive-pack-schedule/tests/integration.rs | 6 +- crates/khive-storage/src/note.rs | 8 + 7 files changed, 283 insertions(+), 81 deletions(-) diff --git a/crates/khive-db/src/stores/note.rs b/crates/khive-db/src/stores/note.rs index 0a46ad54..da08148b 100644 --- a/crates/khive-db/src/stores/note.rs +++ b/crates/khive-db/src/stores/note.rs @@ -199,6 +199,10 @@ fn json_extract_expr(path: &str) -> String { format!("json_extract(properties, '{path}')") } +fn json_type_expr(path: &str) -> String { + format!("json_type(properties, '{path}')") +} + fn sql_value_param(value: &SqlValue) -> Result, rusqlite::Error> { Ok(match value { SqlValue::Null => Box::new(Option::::None), @@ -232,26 +236,43 @@ fn build_note_filter_where( } for pf in &filter.property_filters { - let expr = json_extract_expr(&pf.json_path); - if matches!(pf.op, FilterOp::EqOrMissing) { - params.push(sql_value_param(&pf.value)?); - conditions.push(format!( - "({expr} = ?{n} OR {expr} IS NULL)", - n = params.len() - )); - continue; + match pf.op { + FilterOp::EqOrMissing => { + let expr = json_extract_expr(&pf.json_path); + params.push(sql_value_param(&pf.value)?); + conditions.push(format!( + "({expr} = ?{n} OR {expr} IS NULL)", + n = params.len() + )); + } + FilterOp::JsonTypeEq => { + let type_expr = json_type_expr(&pf.json_path); + params.push(sql_value_param(&pf.value)?); + conditions.push(format!("{type_expr} = ?{}", params.len())); + } + FilterOp::JsonTypeNeMissing => { + let type_expr = json_type_expr(&pf.json_path); + params.push(sql_value_param(&pf.value)?); + let n = params.len(); + conditions.push(format!("({type_expr} IS NULL OR {type_expr} != ?{n})")); + } + _ => { + let expr = json_extract_expr(&pf.json_path); + let op = match pf.op { + FilterOp::Eq => "=", + FilterOp::Ne => "!=", + FilterOp::Lt => "<", + FilterOp::Lte => "<=", + FilterOp::Gt => ">", + FilterOp::Gte => ">=", + FilterOp::EqOrMissing | FilterOp::JsonTypeEq | FilterOp::JsonTypeNeMissing => { + unreachable!() + } + }; + params.push(sql_value_param(&pf.value)?); + conditions.push(format!("{expr} {op} ?{}", params.len())); + } } - let op = match pf.op { - FilterOp::Eq => "=", - FilterOp::Ne => "!=", - FilterOp::Lt => "<", - FilterOp::Lte => "<=", - FilterOp::Gt => ">", - FilterOp::Gte => ">=", - FilterOp::EqOrMissing => unreachable!(), - }; - params.push(sql_value_param(&pf.value)?); - conditions.push(format!("{expr} {op} ?{}", params.len())); } Ok((format!(" WHERE {}", conditions.join(" AND ")), params)) diff --git a/crates/khive-pack-comm/src/handlers.rs b/crates/khive-pack-comm/src/handlers.rs index 22bbdaa7..44ab7fb9 100644 --- a/crates/khive-pack-comm/src/handlers.rs +++ b/crates/khive-pack-comm/src/handlers.rs @@ -323,6 +323,8 @@ pub(crate) async fn handle_inbox( }; // Push direction + read-status filters into SQL so idx_comm_message_direction is usable. + // Read filter uses json_type to match the old as_bool().unwrap_or(false) semantics: + // only JSON boolean `true` counts as read; missing/false/string/integer all count as unread. let mut property_filters = vec![PropertyFilter { json_path: "$.direction".to_string(), op: FilterOp::Eq, @@ -331,13 +333,13 @@ pub(crate) async fn handle_inbox( match status { "unread" => property_filters.push(PropertyFilter { json_path: "$.read".to_string(), - op: FilterOp::EqOrMissing, - value: SqlValue::Bool(false), + op: FilterOp::JsonTypeNeMissing, + value: SqlValue::Text("true".to_string()), }), "read" => property_filters.push(PropertyFilter { json_path: "$.read".to_string(), - op: FilterOp::Eq, - value: SqlValue::Bool(true), + op: FilterOp::JsonTypeEq, + value: SqlValue::Text("true".to_string()), }), _ => {} // "all" — no read-status filter } @@ -611,20 +613,17 @@ pub(crate) async fn handle_thread( } }; - let canonical_short = &canonical_thread_id[..8]; - - // Paginated scan over kind=message notes (SQL handles deleted_at IS NULL). - // Thread matching is preserved exactly in Rust to cover all 4 cases: - // (a) own UUID == canonical_thread_id (the root note itself) - // (b) properties.thread_id == canonical_thread_id (exact UUID reply) - // (c) properties.thread_id == canonical_short (legacy 8-char stored value) - // (d) properties.thread_id[..8] == canonical_short (legacy prefix) - // Sorting happens after collection because SQL orders by created_at DESC; - // thread display requires chronological ascending order. + // Push thread_id predicate into SQL so idx_comm_message_thread can be used. + // The root note always has properties.thread_id == own_uuid == canonical_thread_id + // (patched by dual_write_message), so it is captured by the same SQL filter as replies. let thread_store = runtime.notes(token)?; let thread_filter = NoteFilter { kind: Some("message".to_string()), - property_filters: vec![], + property_filters: vec![PropertyFilter { + json_path: "$.thread_id".to_string(), + op: FilterOp::Eq, + value: SqlValue::Text(canonical_thread_id.clone()), + }], order_by: None, }; const PAGE_SIZE: u32 = 200; @@ -643,39 +642,9 @@ pub(crate) async fn handle_thread( ) .await?; let fetched = page.items.len() as u32; - for n in &page.items { - let props = n.properties.as_ref(); - - // A message belongs to this thread if: - // (a) its own UUID equals the canonical thread_id (it IS the root), or - // (b) its properties.thread_id equals the canonical thread_id (it is a reply). - let matches = { - let own_full = n.id.as_hyphenated().to_string(); - if own_full == canonical_thread_id { - true - } else { - match props - .and_then(|p| p.get("thread_id")) - .and_then(Value::as_str) - .filter(|s| !s.is_empty()) - { - Some(stored_thread) => { - stored_thread == canonical_thread_id - || stored_thread == canonical_short - || (stored_thread.len() >= 8 - && &stored_thread[..8] == canonical_short) - } - None => false, - } - } - }; - - if matches { - messages.push(note_to_message_json(n)); - } + messages.push(note_to_message_json(n)); } - if fetched < PAGE_SIZE { break; } diff --git a/crates/khive-pack-comm/src/lib.rs b/crates/khive-pack-comm/src/lib.rs index b23d5713..4b8181d2 100644 --- a/crates/khive-pack-comm/src/lib.rs +++ b/crates/khive-pack-comm/src/lib.rs @@ -10,18 +10,23 @@ use khive_types::{HandlerDef, Pack, ParamDef, Visibility}; /// Pack-auxiliary indexes for comm inbox and thread queries (ADR-040). /// -/// Both are partial indexes scoped to `kind = 'message'` notes. -/// The inbox index covers direction + read-status for efficient unread filtering. -/// The thread index covers thread_id for efficient thread retrieval. +/// Indexes use `WHERE deleted_at IS NULL` (not `WHERE kind = 'message'`) so that +/// SQLite's index planner can match them when queries contain the parameterized +/// `kind = ?N` predicate emitted by `build_note_filter_where`. A literal-value +/// partial index (`WHERE kind = 'message'`) cannot be used for a parameterized +/// comparison — the planner sees different predicates and falls back to a table scan. +/// `deleted_at IS NULL` is always present in filtered queries, so the partial +/// condition is always satisfied and the index is eligible. +/// `kind` is included as an indexed column so the `kind = ?N` predicate is covered. /// Statements are idempotent (`CREATE INDEX IF NOT EXISTS`). pub(crate) static COMM_SCHEMA_PLAN_STMTS: [&str; 2] = [ "CREATE INDEX IF NOT EXISTS idx_comm_message_direction \ - ON notes(namespace, json_extract(properties, '$.direction'), \ + ON notes(namespace, kind, json_extract(properties, '$.direction'), \ json_extract(properties, '$.read'), created_at DESC) \ - WHERE kind = 'message'", + WHERE deleted_at IS NULL", "CREATE INDEX IF NOT EXISTS idx_comm_message_thread \ - ON notes(namespace, json_extract(properties, '$.thread_id'), created_at DESC) \ - WHERE kind = 'message'", + ON notes(namespace, kind, json_extract(properties, '$.thread_id'), created_at DESC) \ + WHERE deleted_at IS NULL", ]; pub struct CommPack { diff --git a/crates/khive-pack-comm/tests/integration.rs b/crates/khive-pack-comm/tests/integration.rs index d15eb511..1bbaf412 100644 --- a/crates/khive-pack-comm/tests/integration.rs +++ b/crates/khive-pack-comm/tests/integration.rs @@ -1934,9 +1934,11 @@ async fn comm_pack_exposes_non_empty_schema_plan() { combined.contains("CREATE INDEX IF NOT EXISTS"), "schema plan DDL must be idempotent; got: {combined}" ); + // Indexes now use WHERE deleted_at IS NULL so the parameterized kind = ?N + // predicate can use the index (literal WHERE kind = 'message' blocks this). assert!( - combined.contains("'message'"), - "schema plan indexes must be scoped to kind='message'; got: {combined}" + combined.contains("deleted_at IS NULL"), + "schema plan indexes must use WHERE deleted_at IS NULL partial condition; got: {combined}" ); } @@ -1958,3 +1960,195 @@ async fn verb_registry_aggregates_comm_schema_plan() { "comm schema plan must have DDL statements" ); } + +/// thread isolation: comm.thread returns only messages belonging to the requested thread, +/// not messages from other threads in the same namespace. +#[tokio::test] +async fn test_thread_returns_only_requested_thread_messages() { + let (registry, _rt) = build_registry_for_ns("local"); + + // Send two independent root messages (thread A and thread B). + let msg_a = registry + .dispatch( + "comm.send", + serde_json::json!({ "to": "local", "content": "thread A root" }), + ) + .await + .expect("send thread A root"); + let thread_a_id = msg_a + .get("full_id") + .and_then(|v| v.as_str()) + .expect("full_id A"); + + registry + .dispatch( + "comm.send", + serde_json::json!({ "to": "local", "content": "thread B root" }), + ) + .await + .expect("send thread B root"); + + // Reply to thread A. + registry + .dispatch( + "comm.reply", + serde_json::json!({ "id": thread_a_id, "content": "reply to A" }), + ) + .await + .expect("reply to A"); + + // Fetch thread A — must contain exactly the root + 1 reply (the inbound copy of each). + // With self-send, each comm.send creates outbound + inbound, and reply creates outbound + inbound. + // SQL filter ensures only thread-A messages are returned. + let thread = registry + .dispatch("comm.thread", serde_json::json!({ "id": thread_a_id })) + .await + .expect("thread A fetch"); + + let messages = thread + .get("messages") + .and_then(|v| v.as_array()) + .expect("messages array"); + + // All returned messages must have thread_id == thread_a_id. + for msg in messages { + let props = msg.get("properties").expect("has properties"); + let stored_tid = props + .get("thread_id") + .and_then(|v| v.as_str()) + .unwrap_or(""); + assert_eq!( + stored_tid, thread_a_id, + "all thread messages must carry thread_id={thread_a_id}, got {stored_tid}" + ); + } + + // Must have at least 2 messages (root + reply, inbound copies). + assert!( + messages.len() >= 2, + "thread must contain at least root + reply; got {}", + messages.len() + ); +} + +/// read filter 5-case truth table: json_type-based filter matches old as_bool().unwrap_or(false). +/// Seeds messages with $.read set to: missing, bool false, bool true, string "true", integer 1. +/// Verifies that inbox(status=unread) and inbox(status=read) classify each case correctly. +#[tokio::test] +async fn test_inbox_read_filter_json_type_truth_table() { + use khive_storage::note::{FilterOp, Note, NoteFilter, PropertyFilter}; + use khive_storage::types::{PageRequest, SqlValue}; + + let (_registry, rt) = build_registry_for_ns("local"); + let token = rt + .authorize(khive_runtime::Namespace::parse("local").unwrap()) + .unwrap(); + let store = rt.notes(&token).expect("note store"); + + // Seed 5 inbound message notes directly (bypassing send) to control $.read exactly. + let make_msg = |read_val: serde_json::Value, label: &str| -> Note { + Note::new("local", "message", label).with_properties(serde_json::json!({ + "direction": "inbound", + "from": "local", + "to": "local", + "thread_id": null, + "read": read_val, + })) + }; + + // missing: don't set read at all in properties + let note_missing = Note::new("local", "message", "read=missing").with_properties( + serde_json::json!({ "direction": "inbound", "from": "local", "to": "local" }), + ); + let note_false = make_msg(serde_json::json!(false), "read=false"); + let note_true = make_msg(serde_json::json!(true), "read=true"); + let note_str_true = make_msg(serde_json::json!("true"), "read=string_true"); + let note_int_1 = make_msg(serde_json::json!(1), "read=int_1"); + + store.upsert_note(note_missing).await.unwrap(); + store.upsert_note(note_false).await.unwrap(); + store.upsert_note(note_true).await.unwrap(); + store.upsert_note(note_str_true).await.unwrap(); + store.upsert_note(note_int_1).await.unwrap(); + + // Query unread: missing, false, "true" (string), 1 (integer) → all count as unread. + let unread_filter = NoteFilter { + kind: Some("message".to_string()), + property_filters: vec![ + PropertyFilter { + json_path: "$.direction".to_string(), + op: FilterOp::Eq, + value: SqlValue::Text("inbound".to_string()), + }, + PropertyFilter { + json_path: "$.read".to_string(), + op: FilterOp::JsonTypeNeMissing, + value: SqlValue::Text("true".to_string()), + }, + ], + order_by: None, + }; + let unread_page = store + .query_notes_filtered("local", &unread_filter, PageRequest::default()) + .await + .unwrap(); + let unread_contents: Vec<&str> = unread_page + .items + .iter() + .map(|n| n.content.as_str()) + .collect(); + + assert!( + unread_contents.contains(&"read=missing"), + "missing $.read must be unread; got {unread_contents:?}" + ); + assert!( + unread_contents.contains(&"read=false"), + "bool false must be unread; got {unread_contents:?}" + ); + assert!( + unread_contents.contains(&"read=string_true"), + "string 'true' must be unread (not JSON bool true); got {unread_contents:?}" + ); + assert!( + unread_contents.contains(&"read=int_1"), + "integer 1 must be unread (not JSON bool true); got {unread_contents:?}" + ); + assert!( + !unread_contents.contains(&"read=true"), + "JSON bool true must NOT be unread; got {unread_contents:?}" + ); + + // Query read: only JSON boolean true → exactly 1 result. + let read_filter = NoteFilter { + kind: Some("message".to_string()), + property_filters: vec![ + PropertyFilter { + json_path: "$.direction".to_string(), + op: FilterOp::Eq, + value: SqlValue::Text("inbound".to_string()), + }, + PropertyFilter { + json_path: "$.read".to_string(), + op: FilterOp::JsonTypeEq, + value: SqlValue::Text("true".to_string()), + }, + ], + order_by: None, + }; + let read_page = store + .query_notes_filtered("local", &read_filter, PageRequest::default()) + .await + .unwrap(); + assert_eq!( + read_page.items.len(), + 1, + "only JSON bool true must be in 'read'; got {:?}", + read_page + .items + .iter() + .map(|n| &n.content) + .collect::>() + ); + assert_eq!(read_page.items[0].content, "read=true"); +} diff --git a/crates/khive-pack-schedule/src/lib.rs b/crates/khive-pack-schedule/src/lib.rs index 9a23ebc3..34c49a1d 100644 --- a/crates/khive-pack-schedule/src/lib.rs +++ b/crates/khive-pack-schedule/src/lib.rs @@ -22,14 +22,17 @@ impl Pack for SchedulePack { /// ADR-040 §283-291: pack-auxiliary index for agenda() efficiency. /// -/// A partial index on `properties.trigger_at` (JSON-extracted) scoped to -/// `scheduled_event` notes enables efficient range scans in `agenda()`. -/// The statement is idempotent (`CREATE INDEX IF NOT EXISTS`) and is NOT +/// Uses `WHERE deleted_at IS NULL` instead of `WHERE kind = 'scheduled_event'` so +/// that the parameterized `kind = ?N` predicate in `build_note_filter_where` can +/// use this index. A literal-value partial condition (`WHERE kind = 'scheduled_event'`) +/// is invisible to the planner when the query uses a bound parameter for `kind`. +/// `namespace` and `kind` are included as indexed columns for efficient namespace+kind +/// range scans. The statement is idempotent (`CREATE INDEX IF NOT EXISTS`) and is NOT /// part of the core versioned migration chain (ADR-015). pub(crate) static SCHEDULE_SCHEMA_PLAN_STMTS: [&str; 1] = ["CREATE INDEX IF NOT EXISTS idx_schedule_trigger \ - ON notes(json_extract(properties, '$.trigger_at')) \ - WHERE kind = 'scheduled_event'"]; + ON notes(namespace, kind, json_extract(properties, '$.trigger_at')) \ + WHERE deleted_at IS NULL"]; static SCHEDULE_HANDLERS: [HandlerDef; 4] = [ HandlerDef { diff --git a/crates/khive-pack-schedule/tests/integration.rs b/crates/khive-pack-schedule/tests/integration.rs index 5abdbdfb..128c5332 100644 --- a/crates/khive-pack-schedule/tests/integration.rs +++ b/crates/khive-pack-schedule/tests/integration.rs @@ -865,9 +865,11 @@ async fn schedule_pack_exposes_non_empty_schema_plan() { combined.contains("CREATE INDEX IF NOT EXISTS"), "schema plan DDL must be idempotent (CREATE INDEX IF NOT EXISTS); got: {combined}" ); + // Index now uses WHERE deleted_at IS NULL so the parameterized kind = ?N + // predicate can use the index (literal WHERE kind = 'scheduled_event' blocks this). assert!( - combined.contains("scheduled_event"), - "schema plan index must be scoped to 'scheduled_event' kind; got: {combined}" + combined.contains("deleted_at IS NULL"), + "schema plan index must use WHERE deleted_at IS NULL partial condition; got: {combined}" ); } diff --git a/crates/khive-storage/src/note.rs b/crates/khive-storage/src/note.rs index 31c5c03f..12c9a3c2 100644 --- a/crates/khive-storage/src/note.rs +++ b/crates/khive-storage/src/note.rs @@ -91,6 +91,14 @@ pub enum FilterOp { Lte, Gt, Gte, + /// Matches rows where `json_type(properties, path) = value`. + /// Value must be a SQLite json_type string literal: 'true', 'false', 'integer', + /// 'real', 'text', 'array', 'object', or 'null'. + JsonTypeEq, + /// Matches rows where the json_type is absent (NULL) OR differs from value. + /// Equivalent to `json_type IS NULL OR json_type != value`. + /// Used for unread filter: matches any `$.read` that is NOT the JSON boolean true. + JsonTypeNeMissing, } /// A single `json_extract(properties, '$.field') op value` predicate.