diff --git a/crates/khive-db/src/stores/note.rs b/crates/khive-db/src/stores/note.rs index 0a46ad54..da08148b 100644 --- a/crates/khive-db/src/stores/note.rs +++ b/crates/khive-db/src/stores/note.rs @@ -199,6 +199,10 @@ fn json_extract_expr(path: &str) -> String { format!("json_extract(properties, '{path}')") } +fn json_type_expr(path: &str) -> String { + format!("json_type(properties, '{path}')") +} + fn sql_value_param(value: &SqlValue) -> Result, rusqlite::Error> { Ok(match value { SqlValue::Null => Box::new(Option::::None), @@ -232,26 +236,43 @@ fn build_note_filter_where( } for pf in &filter.property_filters { - let expr = json_extract_expr(&pf.json_path); - if matches!(pf.op, FilterOp::EqOrMissing) { - params.push(sql_value_param(&pf.value)?); - conditions.push(format!( - "({expr} = ?{n} OR {expr} IS NULL)", - n = params.len() - )); - continue; + match pf.op { + FilterOp::EqOrMissing => { + let expr = json_extract_expr(&pf.json_path); + params.push(sql_value_param(&pf.value)?); + conditions.push(format!( + "({expr} = ?{n} OR {expr} IS NULL)", + n = params.len() + )); + } + FilterOp::JsonTypeEq => { + let type_expr = json_type_expr(&pf.json_path); + params.push(sql_value_param(&pf.value)?); + conditions.push(format!("{type_expr} = ?{}", params.len())); + } + FilterOp::JsonTypeNeMissing => { + let type_expr = json_type_expr(&pf.json_path); + params.push(sql_value_param(&pf.value)?); + let n = params.len(); + conditions.push(format!("({type_expr} IS NULL OR {type_expr} != ?{n})")); + } + _ => { + let expr = json_extract_expr(&pf.json_path); + let op = match pf.op { + FilterOp::Eq => "=", + FilterOp::Ne => "!=", + FilterOp::Lt => "<", + FilterOp::Lte => "<=", + FilterOp::Gt => ">", + FilterOp::Gte => ">=", + FilterOp::EqOrMissing | FilterOp::JsonTypeEq | FilterOp::JsonTypeNeMissing => { + unreachable!() + } + }; + params.push(sql_value_param(&pf.value)?); + conditions.push(format!("{expr} {op} ?{}", params.len())); + } } - let op = match pf.op { - FilterOp::Eq => "=", - FilterOp::Ne => "!=", - FilterOp::Lt => "<", - FilterOp::Lte => "<=", - FilterOp::Gt => ">", - FilterOp::Gte => ">=", - FilterOp::EqOrMissing => unreachable!(), - }; - params.push(sql_value_param(&pf.value)?); - conditions.push(format!("{expr} {op} ?{}", params.len())); } Ok((format!(" WHERE {}", conditions.join(" AND ")), params)) diff --git a/crates/khive-pack-comm/src/handlers.rs b/crates/khive-pack-comm/src/handlers.rs index 22bbdaa7..44ab7fb9 100644 --- a/crates/khive-pack-comm/src/handlers.rs +++ b/crates/khive-pack-comm/src/handlers.rs @@ -323,6 +323,8 @@ pub(crate) async fn handle_inbox( }; // Push direction + read-status filters into SQL so idx_comm_message_direction is usable. + // Read filter uses json_type to match the old as_bool().unwrap_or(false) semantics: + // only JSON boolean `true` counts as read; missing/false/string/integer all count as unread. let mut property_filters = vec![PropertyFilter { json_path: "$.direction".to_string(), op: FilterOp::Eq, @@ -331,13 +333,13 @@ pub(crate) async fn handle_inbox( match status { "unread" => property_filters.push(PropertyFilter { json_path: "$.read".to_string(), - op: FilterOp::EqOrMissing, - value: SqlValue::Bool(false), + op: FilterOp::JsonTypeNeMissing, + value: SqlValue::Text("true".to_string()), }), "read" => property_filters.push(PropertyFilter { json_path: "$.read".to_string(), - op: FilterOp::Eq, - value: SqlValue::Bool(true), + op: FilterOp::JsonTypeEq, + value: SqlValue::Text("true".to_string()), }), _ => {} // "all" — no read-status filter } @@ -611,20 +613,17 @@ pub(crate) async fn handle_thread( } }; - let canonical_short = &canonical_thread_id[..8]; - - // Paginated scan over kind=message notes (SQL handles deleted_at IS NULL). - // Thread matching is preserved exactly in Rust to cover all 4 cases: - // (a) own UUID == canonical_thread_id (the root note itself) - // (b) properties.thread_id == canonical_thread_id (exact UUID reply) - // (c) properties.thread_id == canonical_short (legacy 8-char stored value) - // (d) properties.thread_id[..8] == canonical_short (legacy prefix) - // Sorting happens after collection because SQL orders by created_at DESC; - // thread display requires chronological ascending order. + // Push thread_id predicate into SQL so idx_comm_message_thread can be used. + // The root note always has properties.thread_id == own_uuid == canonical_thread_id + // (patched by dual_write_message), so it is captured by the same SQL filter as replies. let thread_store = runtime.notes(token)?; let thread_filter = NoteFilter { kind: Some("message".to_string()), - property_filters: vec![], + property_filters: vec![PropertyFilter { + json_path: "$.thread_id".to_string(), + op: FilterOp::Eq, + value: SqlValue::Text(canonical_thread_id.clone()), + }], order_by: None, }; const PAGE_SIZE: u32 = 200; @@ -643,39 +642,9 @@ pub(crate) async fn handle_thread( ) .await?; let fetched = page.items.len() as u32; - for n in &page.items { - let props = n.properties.as_ref(); - - // A message belongs to this thread if: - // (a) its own UUID equals the canonical thread_id (it IS the root), or - // (b) its properties.thread_id equals the canonical thread_id (it is a reply). - let matches = { - let own_full = n.id.as_hyphenated().to_string(); - if own_full == canonical_thread_id { - true - } else { - match props - .and_then(|p| p.get("thread_id")) - .and_then(Value::as_str) - .filter(|s| !s.is_empty()) - { - Some(stored_thread) => { - stored_thread == canonical_thread_id - || stored_thread == canonical_short - || (stored_thread.len() >= 8 - && &stored_thread[..8] == canonical_short) - } - None => false, - } - } - }; - - if matches { - messages.push(note_to_message_json(n)); - } + messages.push(note_to_message_json(n)); } - if fetched < PAGE_SIZE { break; } diff --git a/crates/khive-pack-comm/src/lib.rs b/crates/khive-pack-comm/src/lib.rs index b23d5713..4b8181d2 100644 --- a/crates/khive-pack-comm/src/lib.rs +++ b/crates/khive-pack-comm/src/lib.rs @@ -10,18 +10,23 @@ use khive_types::{HandlerDef, Pack, ParamDef, Visibility}; /// Pack-auxiliary indexes for comm inbox and thread queries (ADR-040). /// -/// Both are partial indexes scoped to `kind = 'message'` notes. -/// The inbox index covers direction + read-status for efficient unread filtering. -/// The thread index covers thread_id for efficient thread retrieval. +/// Indexes use `WHERE deleted_at IS NULL` (not `WHERE kind = 'message'`) so that +/// SQLite's index planner can match them when queries contain the parameterized +/// `kind = ?N` predicate emitted by `build_note_filter_where`. A literal-value +/// partial index (`WHERE kind = 'message'`) cannot be used for a parameterized +/// comparison — the planner sees different predicates and falls back to a table scan. +/// `deleted_at IS NULL` is always present in filtered queries, so the partial +/// condition is always satisfied and the index is eligible. +/// `kind` is included as an indexed column so the `kind = ?N` predicate is covered. /// Statements are idempotent (`CREATE INDEX IF NOT EXISTS`). pub(crate) static COMM_SCHEMA_PLAN_STMTS: [&str; 2] = [ "CREATE INDEX IF NOT EXISTS idx_comm_message_direction \ - ON notes(namespace, json_extract(properties, '$.direction'), \ + ON notes(namespace, kind, json_extract(properties, '$.direction'), \ json_extract(properties, '$.read'), created_at DESC) \ - WHERE kind = 'message'", + WHERE deleted_at IS NULL", "CREATE INDEX IF NOT EXISTS idx_comm_message_thread \ - ON notes(namespace, json_extract(properties, '$.thread_id'), created_at DESC) \ - WHERE kind = 'message'", + ON notes(namespace, kind, json_extract(properties, '$.thread_id'), created_at DESC) \ + WHERE deleted_at IS NULL", ]; pub struct CommPack { diff --git a/crates/khive-pack-comm/tests/integration.rs b/crates/khive-pack-comm/tests/integration.rs index d15eb511..1bbaf412 100644 --- a/crates/khive-pack-comm/tests/integration.rs +++ b/crates/khive-pack-comm/tests/integration.rs @@ -1934,9 +1934,11 @@ async fn comm_pack_exposes_non_empty_schema_plan() { combined.contains("CREATE INDEX IF NOT EXISTS"), "schema plan DDL must be idempotent; got: {combined}" ); + // Indexes now use WHERE deleted_at IS NULL so the parameterized kind = ?N + // predicate can use the index (literal WHERE kind = 'message' blocks this). assert!( - combined.contains("'message'"), - "schema plan indexes must be scoped to kind='message'; got: {combined}" + combined.contains("deleted_at IS NULL"), + "schema plan indexes must use WHERE deleted_at IS NULL partial condition; got: {combined}" ); } @@ -1958,3 +1960,195 @@ async fn verb_registry_aggregates_comm_schema_plan() { "comm schema plan must have DDL statements" ); } + +/// thread isolation: comm.thread returns only messages belonging to the requested thread, +/// not messages from other threads in the same namespace. +#[tokio::test] +async fn test_thread_returns_only_requested_thread_messages() { + let (registry, _rt) = build_registry_for_ns("local"); + + // Send two independent root messages (thread A and thread B). + let msg_a = registry + .dispatch( + "comm.send", + serde_json::json!({ "to": "local", "content": "thread A root" }), + ) + .await + .expect("send thread A root"); + let thread_a_id = msg_a + .get("full_id") + .and_then(|v| v.as_str()) + .expect("full_id A"); + + registry + .dispatch( + "comm.send", + serde_json::json!({ "to": "local", "content": "thread B root" }), + ) + .await + .expect("send thread B root"); + + // Reply to thread A. + registry + .dispatch( + "comm.reply", + serde_json::json!({ "id": thread_a_id, "content": "reply to A" }), + ) + .await + .expect("reply to A"); + + // Fetch thread A — must contain exactly the root + 1 reply (the inbound copy of each). + // With self-send, each comm.send creates outbound + inbound, and reply creates outbound + inbound. + // SQL filter ensures only thread-A messages are returned. + let thread = registry + .dispatch("comm.thread", serde_json::json!({ "id": thread_a_id })) + .await + .expect("thread A fetch"); + + let messages = thread + .get("messages") + .and_then(|v| v.as_array()) + .expect("messages array"); + + // All returned messages must have thread_id == thread_a_id. + for msg in messages { + let props = msg.get("properties").expect("has properties"); + let stored_tid = props + .get("thread_id") + .and_then(|v| v.as_str()) + .unwrap_or(""); + assert_eq!( + stored_tid, thread_a_id, + "all thread messages must carry thread_id={thread_a_id}, got {stored_tid}" + ); + } + + // Must have at least 2 messages (root + reply, inbound copies). + assert!( + messages.len() >= 2, + "thread must contain at least root + reply; got {}", + messages.len() + ); +} + +/// read filter 5-case truth table: json_type-based filter matches old as_bool().unwrap_or(false). +/// Seeds messages with $.read set to: missing, bool false, bool true, string "true", integer 1. +/// Verifies that inbox(status=unread) and inbox(status=read) classify each case correctly. +#[tokio::test] +async fn test_inbox_read_filter_json_type_truth_table() { + use khive_storage::note::{FilterOp, Note, NoteFilter, PropertyFilter}; + use khive_storage::types::{PageRequest, SqlValue}; + + let (_registry, rt) = build_registry_for_ns("local"); + let token = rt + .authorize(khive_runtime::Namespace::parse("local").unwrap()) + .unwrap(); + let store = rt.notes(&token).expect("note store"); + + // Seed 5 inbound message notes directly (bypassing send) to control $.read exactly. + let make_msg = |read_val: serde_json::Value, label: &str| -> Note { + Note::new("local", "message", label).with_properties(serde_json::json!({ + "direction": "inbound", + "from": "local", + "to": "local", + "thread_id": null, + "read": read_val, + })) + }; + + // missing: don't set read at all in properties + let note_missing = Note::new("local", "message", "read=missing").with_properties( + serde_json::json!({ "direction": "inbound", "from": "local", "to": "local" }), + ); + let note_false = make_msg(serde_json::json!(false), "read=false"); + let note_true = make_msg(serde_json::json!(true), "read=true"); + let note_str_true = make_msg(serde_json::json!("true"), "read=string_true"); + let note_int_1 = make_msg(serde_json::json!(1), "read=int_1"); + + store.upsert_note(note_missing).await.unwrap(); + store.upsert_note(note_false).await.unwrap(); + store.upsert_note(note_true).await.unwrap(); + store.upsert_note(note_str_true).await.unwrap(); + store.upsert_note(note_int_1).await.unwrap(); + + // Query unread: missing, false, "true" (string), 1 (integer) → all count as unread. + let unread_filter = NoteFilter { + kind: Some("message".to_string()), + property_filters: vec![ + PropertyFilter { + json_path: "$.direction".to_string(), + op: FilterOp::Eq, + value: SqlValue::Text("inbound".to_string()), + }, + PropertyFilter { + json_path: "$.read".to_string(), + op: FilterOp::JsonTypeNeMissing, + value: SqlValue::Text("true".to_string()), + }, + ], + order_by: None, + }; + let unread_page = store + .query_notes_filtered("local", &unread_filter, PageRequest::default()) + .await + .unwrap(); + let unread_contents: Vec<&str> = unread_page + .items + .iter() + .map(|n| n.content.as_str()) + .collect(); + + assert!( + unread_contents.contains(&"read=missing"), + "missing $.read must be unread; got {unread_contents:?}" + ); + assert!( + unread_contents.contains(&"read=false"), + "bool false must be unread; got {unread_contents:?}" + ); + assert!( + unread_contents.contains(&"read=string_true"), + "string 'true' must be unread (not JSON bool true); got {unread_contents:?}" + ); + assert!( + unread_contents.contains(&"read=int_1"), + "integer 1 must be unread (not JSON bool true); got {unread_contents:?}" + ); + assert!( + !unread_contents.contains(&"read=true"), + "JSON bool true must NOT be unread; got {unread_contents:?}" + ); + + // Query read: only JSON boolean true → exactly 1 result. + let read_filter = NoteFilter { + kind: Some("message".to_string()), + property_filters: vec![ + PropertyFilter { + json_path: "$.direction".to_string(), + op: FilterOp::Eq, + value: SqlValue::Text("inbound".to_string()), + }, + PropertyFilter { + json_path: "$.read".to_string(), + op: FilterOp::JsonTypeEq, + value: SqlValue::Text("true".to_string()), + }, + ], + order_by: None, + }; + let read_page = store + .query_notes_filtered("local", &read_filter, PageRequest::default()) + .await + .unwrap(); + assert_eq!( + read_page.items.len(), + 1, + "only JSON bool true must be in 'read'; got {:?}", + read_page + .items + .iter() + .map(|n| &n.content) + .collect::>() + ); + assert_eq!(read_page.items[0].content, "read=true"); +} diff --git a/crates/khive-pack-schedule/src/lib.rs b/crates/khive-pack-schedule/src/lib.rs index 9a23ebc3..34c49a1d 100644 --- a/crates/khive-pack-schedule/src/lib.rs +++ b/crates/khive-pack-schedule/src/lib.rs @@ -22,14 +22,17 @@ impl Pack for SchedulePack { /// ADR-040 §283-291: pack-auxiliary index for agenda() efficiency. /// -/// A partial index on `properties.trigger_at` (JSON-extracted) scoped to -/// `scheduled_event` notes enables efficient range scans in `agenda()`. -/// The statement is idempotent (`CREATE INDEX IF NOT EXISTS`) and is NOT +/// Uses `WHERE deleted_at IS NULL` instead of `WHERE kind = 'scheduled_event'` so +/// that the parameterized `kind = ?N` predicate in `build_note_filter_where` can +/// use this index. A literal-value partial condition (`WHERE kind = 'scheduled_event'`) +/// is invisible to the planner when the query uses a bound parameter for `kind`. +/// `namespace` and `kind` are included as indexed columns for efficient namespace+kind +/// range scans. The statement is idempotent (`CREATE INDEX IF NOT EXISTS`) and is NOT /// part of the core versioned migration chain (ADR-015). pub(crate) static SCHEDULE_SCHEMA_PLAN_STMTS: [&str; 1] = ["CREATE INDEX IF NOT EXISTS idx_schedule_trigger \ - ON notes(json_extract(properties, '$.trigger_at')) \ - WHERE kind = 'scheduled_event'"]; + ON notes(namespace, kind, json_extract(properties, '$.trigger_at')) \ + WHERE deleted_at IS NULL"]; static SCHEDULE_HANDLERS: [HandlerDef; 4] = [ HandlerDef { diff --git a/crates/khive-pack-schedule/tests/integration.rs b/crates/khive-pack-schedule/tests/integration.rs index 5abdbdfb..128c5332 100644 --- a/crates/khive-pack-schedule/tests/integration.rs +++ b/crates/khive-pack-schedule/tests/integration.rs @@ -865,9 +865,11 @@ async fn schedule_pack_exposes_non_empty_schema_plan() { combined.contains("CREATE INDEX IF NOT EXISTS"), "schema plan DDL must be idempotent (CREATE INDEX IF NOT EXISTS); got: {combined}" ); + // Index now uses WHERE deleted_at IS NULL so the parameterized kind = ?N + // predicate can use the index (literal WHERE kind = 'scheduled_event' blocks this). assert!( - combined.contains("scheduled_event"), - "schema plan index must be scoped to 'scheduled_event' kind; got: {combined}" + combined.contains("deleted_at IS NULL"), + "schema plan index must use WHERE deleted_at IS NULL partial condition; got: {combined}" ); } diff --git a/crates/khive-storage/src/note.rs b/crates/khive-storage/src/note.rs index 31c5c03f..12c9a3c2 100644 --- a/crates/khive-storage/src/note.rs +++ b/crates/khive-storage/src/note.rs @@ -91,6 +91,14 @@ pub enum FilterOp { Lte, Gt, Gte, + /// Matches rows where `json_type(properties, path) = value`. + /// Value must be a SQLite json_type string literal: 'true', 'false', 'integer', + /// 'real', 'text', 'array', 'object', or 'null'. + JsonTypeEq, + /// Matches rows where the json_type is absent (NULL) OR differs from value. + /// Equivalent to `json_type IS NULL OR json_type != value`. + /// Used for unread filter: matches any `$.read` that is NOT the JSON boolean true. + JsonTypeNeMissing, } /// A single `json_extract(properties, '$.field') op value` predicate. diff --git a/docs/adr/ADR-046-event-sourced-proposals.md b/docs/adr/ADR-046-event-sourced-proposals.md index 826d3579..e442af12 100644 --- a/docs/adr/ADR-046-event-sourced-proposals.md +++ b/docs/adr/ADR-046-event-sourced-proposals.md @@ -247,7 +247,7 @@ maintains as a fold over the four proposal events: ```sql CREATE TABLE proposals_open ( - proposal_id BLOB PRIMARY KEY, + proposal_id TEXT PRIMARY KEY, namespace TEXT NOT NULL, proposer TEXT NOT NULL, title TEXT NOT NULL, @@ -255,7 +255,7 @@ CREATE TABLE proposals_open ( created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, expiry INTEGER, - last_decision TEXT, -- JSON of the most recent ProposalReviewedPayload.decision + last_decision TEXT, -- bare decision string from the most recent ProposalReviewedPayload review_count INTEGER NOT NULL DEFAULT 0, approve_count INTEGER NOT NULL DEFAULT 0, reject_count INTEGER NOT NULL DEFAULT 0 @@ -287,8 +287,8 @@ the proposal is not permanently stuck. `'applying'` is never written to the event log — it is a purely transient projection state. Hard-state (status != 'open' | 'changes_requested') rows are retained for -audit; cleanup is operator-driven (`kkernel call kg proposal_cleanup ---older-than 90d`). +audit. A `proposal_cleanup` operator command is deferred; future work must +define the CLI surface, retention policy, and safe-delete semantics. **Review history retrieval (Fix 7):** The projection stores only aggregates (`review_count`, `approve_count`, `reject_count`). Individual `ProposalReviewed` @@ -375,68 +375,30 @@ feedback. The check fires only when `decision=approve`; rejecting one's own proposal is allowed (treated as withdrawal-via-reject). When `ProposalPolicy::allow_self_approve = true`, the check is skipped entirely. -This is configurable in pack manifest (`kg.proposals`): +The shipped v1 default is **one approve from any non-self actor, no recorded +reject**. The inline self-approval guard in the `review` handler is the only +shipped policy enforcement point. -```toml -[packs.kg.proposals] -approval_threshold = 1 # number of approve votes required -allow_self_approve = true # OSS default: single-developer deployments work out-of-box -require_listed_reviewer = false # if true, only the listed reviewers can approve -``` - -**Defaults for OSS happy-path (Fix 10):** - -- `approval_threshold = 1` -- `allow_self_approve = true` — single-developer deployments work out-of-box - without needing a second actor - -Multi-actor deployments override with -`ProposalPolicy { approval_threshold: 2, allow_self_approve: false, ... }`. -The OSS default optimizes for single-developer ergonomics; review-gated -deployments are explicit opt-in. +Configurable approval thresholds, pack manifest TOML configuration +(`[packs.kg.proposals]`), `ProposalPolicy` struct instantiation, and +`require_listed_reviewer` are deferred. Multi-actor deployments requiring +configurable thresholds or reviewer lists must await a future ADR amendment +before those controls are available. -More sophisticated policies (M-of-N, weighted reviewers, mandatory operator -approval for high-blast-radius changesets) are operator-config additions, not -v1 ADR scope. +### ProposalPolicy: pack-owned, gate-enforced (deferred) -### ProposalPolicy: pack-owned, gate-enforced - -`ProposalPolicy` is proposal-pack configuration (not ADR-018-owned). ADR-046 -defines the policy struct and registers a `PackGatePolicy` implementation with -the authorization gate (ADR-018). The gate invokes the registered policy for -proposal verbs. - -```rust -pub struct ProposalPolicy { - pub allow_self_approve: bool, - pub approval_threshold: u32, - pub require_listed_reviewer: bool, -} - -pub struct ProposalGatePolicy { - policy: ProposalPolicy, -} - -impl PackGatePolicy for ProposalGatePolicy { - fn evaluate(&self, input: &GateRequest, ctx: &GateContext) -> GateDecision { - if input.verb == VerbName::new("review") - && !self.policy.allow_self_approve - && proposer_of(&input, ctx) == input.actor.id - { - return GateDecision::Deny { - reason: "self-approve forbidden by ProposalPolicy".to_string(), - }; - } - GateDecision::Allow { obligations: vec![] } - } -} -``` +`ProposalPolicy`, `ProposalGatePolicy`, and `PackGatePolicy` are deferred. +The shipped v1 enforcement is the inline self-approval guard in `handle_review`: +the handler reads `proposals_open.proposer` and rejects with +`RuntimeError::SelfApprovalForbidden { proposal_id, actor_id }` when +`decision=approve` and `actor.id == proposer`. This check fires before any +event is emitted, giving immediate feedback. -The `ProposalGatePolicy` is registered with the gate via -`VerbRegistryBuilder::with_pack_policy` during KG pack initialization. The gate -remains the authoritative trust boundary; the handler also defensively checks -`proposals_open.proposer == actor.id` to give callers immediate feedback before -emitting events (defense-in-depth, not sole enforcement). +The full configurable policy struct, gate registration, and +`VerbRegistryBuilder::with_pack_policy` wiring are future work. When shipped, +`ProposalGatePolicy` will register with the ADR-018 authorization gate as the +authoritative trust boundary; the handler's inline check will remain as a +defense-in-depth layer but not the sole enforcement point. ### 7. Brain integration @@ -465,10 +427,12 @@ verbs and the apply worker each have policy hooks: who need restrictions add a rego rule. - `review`: policy decides whether `actor` can review proposals in `namespace`. Default: any actor. Operators may restrict to specific roles. -- `propose-apply` worker: policy decides whether the worker's `system:propose-apply` - actor can write the target namespace's records. Default: yes for the - pack-owned namespace, denied for foreign namespaces. This is the security - boundary that prevents cross-namespace proposal injection. +- `propose-apply` worker: the worker emits `ActorRef { kind: "system", id: "propose-apply" }` + as its actor identity. The authorization gate evaluates this identity against + the active policy; with the OSS default (AllowAllGate) it is permitted + transparently. A dedicated `system:propose-apply` policy rule is future work; + production deployments requiring explicit cross-namespace injection prevention + should add a rego rule for this actor class when configuring ADR-018. ### 9. Failure modes @@ -478,20 +442,20 @@ verbs and the apply worker each have policy hooks: | Apply fails (validation, network, etc.) | `ProposalApplied { Failed }` emitted; status is reverted from `'applying'` back to `'approved'` (best-effort CAS) so the proposal is not permanently stuck. Apply retry is deferred to a follow-up ADR. v1 behavior: failed applies return to `'approved'`; operators may issue a new `propose` (with `parent_id` referencing the failed proposal) to retry. Direct re-emission of `apply` events is not supported in v1. | | Apply policy denied | Same as Apply fails with `error = "denied by policy"`. Operator adjusts policy and issues a new `propose` (with `parent_id`) to retry; direct `apply` re-emission is not supported in v1. | | Reviewer reverses Approve to Reject | Each review is its own event; the worker uses the latest decision per reviewer. If a previously-approved proposal hits Reject before Apply fires, status moves to 'rejected'. | -| Two reviewers race (both Approve simultaneously) | Each emits its own `ProposalReviewed` event; the apply worker is single-threaded per process — it sees them in event order and fires apply on the first one that crosses threshold. Idempotency on the worker side: it checks `proposals_open.status` before applying; if already applied, no-op. | +| Two reviewers race (both Approve simultaneously) | Each `review(approve)` call invokes the apply worker synchronously after `reviewed_and_emit`. The `reviewed_and_emit` CAS serializes concurrent reviews at the projection layer; the apply worker’s `approved → applying` CAS ensures only one invocation executes the changeset. The worker checks `proposals_open.status` before applying; if already `applied` or `applying`, it returns without re-executing. | | Proposal expires | A background sweep (TBD: cron-style, not v1) emits `ProposalWithdrawn` with `by = "system:expiry"` on proposals past their `expiry` timestamp. | | Stale-target conflict (Fix 6) | An `UpdateEntity` or `MergeEntities` proposal targets a specific entity ID. Between propose-time and apply-time the entity may be independently modified. v1 default: **last-writer-wins** — the proposal applies its patch unconditionally. Optional: proposals may include `expected_version: Option` in the payload (if entity versioning is introduced via ADR-014 amendment). The apply worker would then check `current_version == expected_version` and emit `ProposalApplied { Failed { error: "stale: target was modified since proposal", current_version, expected_version } }` on mismatch. v1 does NOT introduce entity versioning; this knob is gated on a future ADR-014 amendment. | ### 10. CLI / MCP surface summary -| Surface | Action | How | -| -------------------------------------------------------------- | --------------------------------------------------- | -------------------------------------- | -| MCP `propose(...)` | Create a proposal | Verb | -| MCP `review(proposal_id, decision, comment?)` | Cast a review | Verb | -| MCP `withdraw(proposal_id, rationale?)` | Withdraw a proposal (proposer-only) | Verb | -| MCP `list(kind=proposal, status="open")` | Browse open proposals | Lists from `proposals_open` projection | -| MCP `get(id=)` | Fetch a single proposal's `ProposalCreated` payload | Resolves to the event payload | -| CLI `kkernel call kg proposal_cleanup --older-than ` | Archive resolved proposals | Operator housekeeping | +| Surface | Action | How | +| -------------------------------------------------------------- | --------------------------------------------------------- | -------------------------------------- | +| MCP `propose(...)` | Create a proposal | Verb | +| MCP `review(proposal_id, decision, comment?)` | Cast a review | Verb | +| MCP `withdraw(proposal_id, rationale?)` | Withdraw a proposal (proposer-only) | Verb | +| MCP `list(kind=proposal, status="open")` | Browse open proposals | Lists from `proposals_open` projection | +| MCP `get(id=)` | Fetch a single proposal's `ProposalCreated` payload | Resolves to the event payload | +| CLI `kkernel call kg proposal_cleanup --older-than ` | Archive resolved proposals (deferred — not shipped in v1) | Future operator housekeeping | `list(kind=proposal)` dispatches to a new `kg.list_proposals` handler under the kg pack — it queries `proposals_open` directly, supports the standard @@ -622,24 +586,22 @@ supersede, compound). Future arms add to the enum via additive semver bumps. ### Crate placement -- Verb handlers: `khive-pack-kg::proposals` -- Apply worker: `khive-pack-kg::proposals::apply_worker` -- Projection table + projection worker: `khive-pack-kg::proposals::projection_worker` +- Verb handlers: `crates/khive-pack-kg/src/handlers.rs` +- Apply worker: `crates/khive-pack-kg/src/apply_worker.rs` +- Projection table + projection worker: `crates/khive-pack-kg/src/projection_worker.rs` - Payload types: `khive-types::events::proposal_payloads` ### Migration -The current latest migration in `khive-db` is V4 (`dedupe_graph_edge_triples`). -ADR-043 (Embedding Model Migration) takes V5. This ADR's migration is **V6**. -**Ordering rule:** ADR-043 migration (V5) MUST be applied before this ADR's -migration (V6) — the version sequence must remain contiguous and ADR-043 was -specified first. `proposals_open` has no schema dependency on ADR-043's tables. +The `proposals_open` projection table was created in migration V15 in +`crates/khive-db/src/migrations.rs`. The `applying` transient status and +its CAS invariants were added in V18. -A new `VersionedMigration` in `crates/khive-db/src/migrations.rs`: +A `VersionedMigration` in `crates/khive-db/src/migrations.rs`: ```rust VersionedMigration { - version: 6, + version: 15, name: "proposals_open", up: PROPOSALS_OPEN_DDL, } @@ -674,25 +636,25 @@ form (ADR-017 §pack handler trait shape; `VerbDef/VERBS` is deprecated): pub const HANDLERS: &[HandlerDef] = &[ // ... existing handlers ... HandlerDef { - name: "propose", - visibility: Visibility::Verb, - speech_act: SpeechAct::Commissive, - handler: Handler::Proposals(ProposalsHandler::Propose), - presentation_policy: VerbPresentationPolicy::Standard, + name: "propose", + description: "Create a proposal for a KG change.", + visibility: Visibility::Verb, + category: Category::Proposals, + params: &PROPOSE_PARAMS, }, HandlerDef { - name: "review", - visibility: Visibility::Verb, - speech_act: SpeechAct::Declaration, - handler: Handler::Proposals(ProposalsHandler::Review), - presentation_policy: VerbPresentationPolicy::Standard, + name: "review", + description: "Approve, reject, comment, or request changes on a proposal.", + visibility: Visibility::Verb, + category: Category::Proposals, + params: &REVIEW_PARAMS, }, HandlerDef { - name: "withdraw", - visibility: Visibility::Verb, - speech_act: SpeechAct::Commissive, - handler: Handler::Proposals(ProposalsHandler::Withdraw), - presentation_policy: VerbPresentationPolicy::Standard, + name: "withdraw", + description: "Rescind a proposal (proposer only).", + visibility: Visibility::Verb, + category: Category::Proposals, + params: &WITHDRAW_PARAMS, }, ]; ``` @@ -743,11 +705,12 @@ for v1, the JSON path is sufficient. Lookup wire shape: - `get(id=)` resolves to the specific event record by event UUID. -- For aggregate-ID lookup (all events for a proposal), use the events query - surface with `EventFilter { kinds: vec![EventKind::ProposalCreated], ... }` - and a payload predicate on `proposal_id`. Do NOT overload bare `get(id=...)` - to silently resolve event UUIDs OR aggregate IDs — that creates ambiguous - collision policy. +- `get(id=)` resolves raw proposal IDs and short prefixes via + `proposals_open` and returns the `ProposalCreated` event payload from the + event log. +- For full review history, use the events query surface with + `EventFilter { kinds: vec![EventKind::ProposalReviewed], ... }` and a + payload predicate on `proposal_id`. ## References diff --git a/docs/adr/ADR-048-knowledge-section-profiles.md b/docs/adr/ADR-048-knowledge-section-profiles.md index 91919962..07be18d9 100644 --- a/docs/adr/ADR-048-knowledge-section-profiles.md +++ b/docs/adr/ADR-048-knowledge-section-profiles.md @@ -22,6 +22,36 @@ compose/suggest, hooks, lint, export, and observability phases. | Resource entity dual-write | deferred | `knowledge.upsert_atoms` and `knowledge.upsert_domains` write corpus tables; domain mirror is into `knowledge_atoms` for FTS, not graph `entities`. | | `knowledge.lint`, `knowledge.lint_config`, `knowledge.export` | deferred | These verbs are not registered in the shipped knowledge pack. | +## Governance for Shipped Knowledge Sections and Profiles + +ADR-048 treats knowledge sections as shipped corpus storage and lifecycle state, while +profile persistence remains owned by the brain pack. The knowledge pack does not ship a +`knowledge_profiles` table or knowledge-local profile verbs. Shipped profile persistence +is the V20 brain profile snapshot/event-log model, and section posterior learning is +driven through brain profile state and feedback events. + +`knowledge_sections` is the authoritative table for atom sections. Section edits are +keyed by `(atom_id, section_type)`: `knowledge.edit` upserts only the specified sections, +preserves sibling sections, clears stale section embeddings, and downgrades edited verified +sections to reviewed. `knowledge.import` supports `atlas_md` file/directory import and, +with the default section chunk strategy, parses section headings into section rows. + +Section lifecycle governance is explicit. `knowledge.challenge` marks an eligible section +as disputed and increments the atom dispute counter. `knowledge.adjudicate` requires a +disputed section; accept marks the section verified, reject returns it to reviewed, and the +atom dispute counter is decremented. + +`knowledge.suggest` is a base domain-discovery verb. It accepts query/role/limit, searches +domains, may fuse ANN results when the index is warm, reranks with embeddings, and returns +domain IDs, names, and scores. It does not implicitly resolve a brain profile or apply +profile-weighted section scoring in the shipped implementation. + +`knowledge.compose` is a base explicit-composition verb. It requires explicit domain IDs +and/or atom IDs plus query context, resolves those records, reranks atom text, and returns +markdown with atom/domain metadata. It does not emit implicit feedback, does not call +`brain.resolve`, and does not perform section-manifest weighting in the shipped +implementation. + ## Context Knowledge atoms in the corpus tier ([ADR-047](ADR-047-knowledge-pack.md)) store content as @@ -84,13 +114,15 @@ CREATE TABLE IF NOT EXISTS knowledge_sections ( embedding BLOB, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, + status TEXT NOT NULL DEFAULT 'draft', FOREIGN KEY (atom_id) REFERENCES knowledge_atoms(id), UNIQUE(atom_id, section_type) ); ``` -V21 also creates indexes on `atom_id`, `(namespace, section_type)`, and `(namespace, atom_id)`, +V21 creates indexes on `atom_id`, `(namespace, section_type)`, and `(namespace, atom_id)`, plus an external-content FTS5 table `fts_sections` with insert/delete/update triggers. +V22 adds the `status` column and `idx_knowledge_sections_status`. Section_type is a closed enum matching the atlas schema v1: `overview`, `core_model`, `boundary_conditions`, `formalism`, `operational_guidance`, `examples`, `failure_modes`, @@ -1045,6 +1077,41 @@ Every phase ships with benchmarks that gate merge: --- +## Shipped Schema Reference + +### `knowledge_sections` (V21 + V22) + +V21 creates `knowledge_sections`; V22 adds the `status` lifecycle column. The shipped +columns, constraints, and indexes are: + +- `id TEXT PRIMARY KEY` +- `atom_id TEXT NOT NULL` +- `namespace TEXT NOT NULL` +- `section_type TEXT NOT NULL` +- `heading TEXT NOT NULL DEFAULT ''` +- `content TEXT NOT NULL DEFAULT ''` +- `tokens INTEGER NOT NULL DEFAULT 0` +- `sort_order INTEGER NOT NULL DEFAULT 0` +- `embedding BLOB` (nullable) +- `created_at INTEGER NOT NULL` +- `updated_at INTEGER NOT NULL` +- `status TEXT NOT NULL DEFAULT 'draft'` (added by V22) +- `FOREIGN KEY (atom_id) REFERENCES knowledge_atoms(id)` +- `UNIQUE(atom_id, section_type)` + +Indexes: `idx_knowledge_sections_atom`, `idx_knowledge_sections_ns_type`, +`idx_knowledge_sections_ns_atom`, `idx_knowledge_sections_status`. + +Full-text search: `fts_sections`, an external-content FTS5 table over `heading` and +`content`, with `id`, `namespace`, `atom_id`, and `section_type` as unindexed metadata. +FTS insert, delete, and update triggers maintain the FTS table on section changes. + +### Profile Persistence (brain-owned, not knowledge-local) + +ADR-048 does not ship a `knowledge_profiles` table. Profile persistence is brain-owned; +the authoritative shipped tables are `brain_profile_snapshots` and `brain_event_log` +(see V20 DDL note in the Amendment section below and ADR-032). + ## Amendment: Research-Informed Design Corrections (2026-05-27) **Authors**: Ocean, lambda:khive @@ -1121,9 +1188,26 @@ order produces different posteriors. This breaks event-sourced snapshot recovery ```markdown V20 brain persistence DDL in this ADR is superseded by ADR-032 and ADR-015 V20. The -authoritative shipped tables are `brain_profile_snapshots(profile_id, namespace, -snapshot_json, updated_at)` and `brain_event_log(id, profile_id, namespace, event_kind, -payload, created_at)`, plus `idx_brain_events_profile`. +authoritative shipped tables are defined in the brain pack. There is no shipped +`knowledge_profiles` table; profile persistence is brain-owned. + +`brain_profile_snapshots`: + +- `profile_id TEXT NOT NULL` +- `namespace TEXT NOT NULL DEFAULT 'default'` +- `snapshot_json TEXT NOT NULL` +- `updated_at INTEGER NOT NULL` +- `PRIMARY KEY (profile_id, namespace)` + +`brain_event_log`: + +- `id INTEGER PRIMARY KEY AUTOINCREMENT` +- `profile_id TEXT NOT NULL` +- `namespace TEXT NOT NULL DEFAULT 'default'` +- `event_kind TEXT NOT NULL` +- `payload TEXT NOT NULL` +- `created_at INTEGER NOT NULL` +- `idx_brain_events_profile` on `(profile_id, namespace, created_at)` ``` ### Correction 3: Filtered ANN — StitchedVamana, not ACORN