Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions crates/khive-db/src/stores/note.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ fn json_extract_expr(path: &str) -> String {
format!("json_extract(properties, '{path}')")
}

fn json_type_expr(path: &str) -> String {
format!("json_type(properties, '{path}')")
}

fn sql_value_param(value: &SqlValue) -> Result<Box<dyn rusqlite::types::ToSql>, rusqlite::Error> {
Ok(match value {
SqlValue::Null => Box::new(Option::<String>::None),
Expand Down Expand Up @@ -232,26 +236,43 @@ fn build_note_filter_where(
}

for pf in &filter.property_filters {
let expr = json_extract_expr(&pf.json_path);
if matches!(pf.op, FilterOp::EqOrMissing) {
params.push(sql_value_param(&pf.value)?);
conditions.push(format!(
"({expr} = ?{n} OR {expr} IS NULL)",
n = params.len()
));
continue;
match pf.op {
FilterOp::EqOrMissing => {
let expr = json_extract_expr(&pf.json_path);
params.push(sql_value_param(&pf.value)?);
conditions.push(format!(
"({expr} = ?{n} OR {expr} IS NULL)",
n = params.len()
));
}
FilterOp::JsonTypeEq => {
let type_expr = json_type_expr(&pf.json_path);
params.push(sql_value_param(&pf.value)?);
conditions.push(format!("{type_expr} = ?{}", params.len()));
}
FilterOp::JsonTypeNeMissing => {
let type_expr = json_type_expr(&pf.json_path);
params.push(sql_value_param(&pf.value)?);
let n = params.len();
conditions.push(format!("({type_expr} IS NULL OR {type_expr} != ?{n})"));
}
_ => {
let expr = json_extract_expr(&pf.json_path);
let op = match pf.op {
FilterOp::Eq => "=",
FilterOp::Ne => "!=",
FilterOp::Lt => "<",
FilterOp::Lte => "<=",
FilterOp::Gt => ">",
FilterOp::Gte => ">=",
FilterOp::EqOrMissing | FilterOp::JsonTypeEq | FilterOp::JsonTypeNeMissing => {
unreachable!()
}
};
params.push(sql_value_param(&pf.value)?);
conditions.push(format!("{expr} {op} ?{}", params.len()));
}
}
let op = match pf.op {
FilterOp::Eq => "=",
FilterOp::Ne => "!=",
FilterOp::Lt => "<",
FilterOp::Lte => "<=",
FilterOp::Gt => ">",
FilterOp::Gte => ">=",
FilterOp::EqOrMissing => unreachable!(),
};
params.push(sql_value_param(&pf.value)?);
conditions.push(format!("{expr} {op} ?{}", params.len()));
}

Ok((format!(" WHERE {}", conditions.join(" AND ")), params))
Expand Down
61 changes: 15 additions & 46 deletions crates/khive-pack-comm/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ pub(crate) async fn handle_inbox(
};

// Push direction + read-status filters into SQL so idx_comm_message_direction is usable.
// Read filter uses json_type to match the old as_bool().unwrap_or(false) semantics:
// only JSON boolean `true` counts as read; missing/false/string/integer all count as unread.
let mut property_filters = vec![PropertyFilter {
json_path: "$.direction".to_string(),
op: FilterOp::Eq,
Expand All @@ -331,13 +333,13 @@ pub(crate) async fn handle_inbox(
match status {
"unread" => property_filters.push(PropertyFilter {
json_path: "$.read".to_string(),
op: FilterOp::EqOrMissing,
value: SqlValue::Bool(false),
op: FilterOp::JsonTypeNeMissing,
value: SqlValue::Text("true".to_string()),
}),
"read" => property_filters.push(PropertyFilter {
json_path: "$.read".to_string(),
op: FilterOp::Eq,
value: SqlValue::Bool(true),
op: FilterOp::JsonTypeEq,
value: SqlValue::Text("true".to_string()),
}),
_ => {} // "all" — no read-status filter
}
Expand Down Expand Up @@ -611,20 +613,17 @@ pub(crate) async fn handle_thread(
}
};

let canonical_short = &canonical_thread_id[..8];

// Paginated scan over kind=message notes (SQL handles deleted_at IS NULL).
// Thread matching is preserved exactly in Rust to cover all 4 cases:
// (a) own UUID == canonical_thread_id (the root note itself)
// (b) properties.thread_id == canonical_thread_id (exact UUID reply)
// (c) properties.thread_id == canonical_short (legacy 8-char stored value)
// (d) properties.thread_id[..8] == canonical_short (legacy prefix)
// Sorting happens after collection because SQL orders by created_at DESC;
// thread display requires chronological ascending order.
// Push thread_id predicate into SQL so idx_comm_message_thread can be used.
// The root note always has properties.thread_id == own_uuid == canonical_thread_id
// (patched by dual_write_message), so it is captured by the same SQL filter as replies.
let thread_store = runtime.notes(token)?;
let thread_filter = NoteFilter {
kind: Some("message".to_string()),
property_filters: vec![],
property_filters: vec![PropertyFilter {
json_path: "$.thread_id".to_string(),
op: FilterOp::Eq,
value: SqlValue::Text(canonical_thread_id.clone()),
}],
order_by: None,
};
const PAGE_SIZE: u32 = 200;
Expand All @@ -643,39 +642,9 @@ pub(crate) async fn handle_thread(
)
.await?;
let fetched = page.items.len() as u32;

for n in &page.items {
let props = n.properties.as_ref();

// A message belongs to this thread if:
// (a) its own UUID equals the canonical thread_id (it IS the root), or
// (b) its properties.thread_id equals the canonical thread_id (it is a reply).
let matches = {
let own_full = n.id.as_hyphenated().to_string();
if own_full == canonical_thread_id {
true
} else {
match props
.and_then(|p| p.get("thread_id"))
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
{
Some(stored_thread) => {
stored_thread == canonical_thread_id
|| stored_thread == canonical_short
|| (stored_thread.len() >= 8
&& &stored_thread[..8] == canonical_short)
}
None => false,
}
}
};

if matches {
messages.push(note_to_message_json(n));
}
messages.push(note_to_message_json(n));
}

if fetched < PAGE_SIZE {
break;
}
Expand Down
19 changes: 12 additions & 7 deletions crates/khive-pack-comm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@ use khive_types::{HandlerDef, Pack, ParamDef, Visibility};

/// Pack-auxiliary indexes for comm inbox and thread queries (ADR-040).
///
/// Both are partial indexes scoped to `kind = 'message'` notes.
/// The inbox index covers direction + read-status for efficient unread filtering.
/// The thread index covers thread_id for efficient thread retrieval.
/// Indexes use `WHERE deleted_at IS NULL` (not `WHERE kind = 'message'`) so that
/// SQLite's index planner can match them when queries contain the parameterized
/// `kind = ?N` predicate emitted by `build_note_filter_where`. A literal-value
/// partial index (`WHERE kind = 'message'`) cannot be used for a parameterized
/// comparison — the planner sees different predicates and falls back to a table scan.
/// `deleted_at IS NULL` is always present in filtered queries, so the partial
/// condition is always satisfied and the index is eligible.
/// `kind` is included as an indexed column so the `kind = ?N` predicate is covered.
/// Statements are idempotent (`CREATE INDEX IF NOT EXISTS`).
pub(crate) static COMM_SCHEMA_PLAN_STMTS: [&str; 2] = [
"CREATE INDEX IF NOT EXISTS idx_comm_message_direction \
ON notes(namespace, json_extract(properties, '$.direction'), \
ON notes(namespace, kind, json_extract(properties, '$.direction'), \
json_extract(properties, '$.read'), created_at DESC) \
WHERE kind = 'message'",
WHERE deleted_at IS NULL",
"CREATE INDEX IF NOT EXISTS idx_comm_message_thread \
ON notes(namespace, json_extract(properties, '$.thread_id'), created_at DESC) \
WHERE kind = 'message'",
ON notes(namespace, kind, json_extract(properties, '$.thread_id'), created_at DESC) \
WHERE deleted_at IS NULL",
];

pub struct CommPack {
Expand Down
Loading
Loading