diff --git a/VISION.md b/VISION.md index 37538d2..df6d2d4 100644 --- a/VISION.md +++ b/VISION.md @@ -218,6 +218,7 @@ Sprout is designed as a complete platform, not a collection of independent micro | ✅ | Nostr proxy (`sprout-proxy`) — guest client compatibility | | ✅ | Huddle (`sprout-huddle`) — LiveKit integration | | ✅ | Admin CLI (`sprout-admin`) | +| ✅ | Channel features — messaging, threads, DMs, reactions, NIP-29 group management, soft-delete | | 🚧 | Web client (Tauri) — Stream, Forum, DM, Search | | ✅ | Workflow engine (`sprout-workflow`) — YAML-as-code, 4 trigger types, 7 action types, approval gates, execution traces | | ✅ | Home Feed (`/api/feed`) — @mentions, needs-action, channel activity, agent activity | diff --git a/crates/sprout-db/src/channel.rs b/crates/sprout-db/src/channel.rs index 17be3de..73a343a 100644 --- a/crates/sprout-db/src/channel.rs +++ b/crates/sprout-db/src/channel.rs @@ -511,8 +511,9 @@ pub async fn remove_member( pub async fn is_member(pool: &MySqlPool, channel_id: Uuid, pubkey: &[u8]) -> Result { let channel_id_bytes = channel_id.as_bytes().as_slice().to_vec(); let row = sqlx::query( - "SELECT COUNT(*) as cnt FROM channel_members \ - WHERE channel_id = ? AND pubkey = ? AND removed_at IS NULL", + "SELECT COUNT(*) as cnt FROM channel_members cm \ + JOIN channels c ON cm.channel_id = c.id AND c.deleted_at IS NULL \ + WHERE cm.channel_id = ? AND cm.pubkey = ? AND cm.removed_at IS NULL", ) .bind(&channel_id_bytes) .bind(pubkey) @@ -523,14 +524,17 @@ pub async fn is_member(pool: &MySqlPool, channel_id: Uuid, pubkey: &[u8]) -> Res } /// Returns all active members of the given channel. +/// +/// Returns an empty list if the channel has been soft-deleted. pub async fn get_members(pool: &MySqlPool, channel_id: Uuid) -> Result> { let channel_id_bytes = channel_id.as_bytes().as_slice().to_vec(); let rows = sqlx::query( r#" - SELECT channel_id, pubkey, role, joined_at, invited_by, removed_at - FROM channel_members - WHERE channel_id = ? AND removed_at IS NULL - ORDER BY joined_at ASC + SELECT cm.channel_id, cm.pubkey, cm.role, cm.joined_at, cm.invited_by, cm.removed_at + FROM channel_members cm + JOIN channels c ON cm.channel_id = c.id AND c.deleted_at IS NULL + WHERE cm.channel_id = ? AND cm.removed_at IS NULL + ORDER BY cm.joined_at ASC LIMIT 1000 "#, ) @@ -547,9 +551,10 @@ pub async fn get_members(pool: &MySqlPool, channel_id: Uuid) -> Result Result> { let rows = sqlx::query( r#" - SELECT channel_id - FROM channel_members - WHERE pubkey = ? AND removed_at IS NULL + SELECT cm.channel_id + FROM channel_members cm + JOIN channels c ON cm.channel_id = c.id AND c.deleted_at IS NULL + WHERE cm.pubkey = ? AND cm.removed_at IS NULL UNION SELECT id AS channel_id FROM channels @@ -1008,6 +1013,21 @@ pub async fn unarchive_channel(pool: &MySqlPool, channel_id: Uuid) -> Result<()> Ok(()) } +/// Soft-delete a channel by setting `deleted_at = NOW(6)`. +/// +/// Returns `Ok(true)` if the channel was deleted, `Ok(false)` if already +/// deleted or not found. +pub async fn soft_delete_channel(pool: &MySqlPool, channel_id: Uuid) -> Result { + let id_bytes = channel_id.as_bytes().as_slice().to_vec(); + let result = + sqlx::query("UPDATE channels SET deleted_at = NOW(6) WHERE id = ? AND deleted_at IS NULL") + .bind(&id_bytes) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + /// Returns the count of active (non-removed) members in a channel. pub async fn get_member_count(pool: &MySqlPool, channel_id: Uuid) -> Result { let id_bytes = channel_id.as_bytes().as_slice().to_vec(); @@ -1020,6 +1040,42 @@ pub async fn get_member_count(pool: &MySqlPool, channel_id: Uuid) -> Result Ok(row.try_get("cnt")?) } +/// Bulk-fetch member counts for a set of channel IDs. +/// +/// Returns a map of `channel_id → count`. Channels with zero members are omitted. +/// Single query regardless of input size. +pub async fn get_member_counts_bulk( + pool: &MySqlPool, + channel_ids: &[Uuid], +) -> Result> { + use crate::event::uuid_from_bytes; + + if channel_ids.is_empty() { + return Ok(std::collections::HashMap::new()); + } + + let mut qb: sqlx::QueryBuilder = sqlx::QueryBuilder::new( + "SELECT channel_id, COUNT(*) as cnt FROM channel_members \ + WHERE removed_at IS NULL AND channel_id IN (", + ); + let mut sep = qb.separated(", "); + for id in channel_ids { + sep.push_bind(id.as_bytes().to_vec()); + } + qb.push(") GROUP BY channel_id"); + + let rows = qb.build().fetch_all(pool).await?; + + let mut map = std::collections::HashMap::with_capacity(rows.len()); + for row in rows { + let id_bytes: Vec = row.try_get("channel_id")?; + let id = uuid_from_bytes(&id_bytes)?; + let cnt: i64 = row.try_get("cnt")?; + map.insert(id, cnt); + } + Ok(map) +} + /// Get the active role of a pubkey in a channel. /// /// Returns `None` if the pubkey is not an active member. @@ -1030,7 +1086,9 @@ pub async fn get_member_role( ) -> Result> { let channel_id_bytes = channel_id.as_bytes().as_slice().to_vec(); let row = sqlx::query( - "SELECT role FROM channel_members WHERE channel_id = ? AND pubkey = ? AND removed_at IS NULL", + "SELECT cm.role FROM channel_members cm \ + JOIN channels c ON cm.channel_id = c.id AND c.deleted_at IS NULL \ + WHERE cm.channel_id = ? AND cm.pubkey = ? AND cm.removed_at IS NULL", ) .bind(&channel_id_bytes) .bind(pubkey) diff --git a/crates/sprout-db/src/event.rs b/crates/sprout-db/src/event.rs index b5594fd..2614030 100644 --- a/crates/sprout-db/src/event.rs +++ b/crates/sprout-db/src/event.rs @@ -99,7 +99,7 @@ pub async fn query_events(pool: &MySqlPool, q: &EventQuery) -> Result = QueryBuilder::new( "SELECT id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id \ - FROM events WHERE 1=1", + FROM events WHERE deleted_at IS NULL", ); if let Some(ch) = q.channel_id { @@ -185,8 +185,154 @@ pub(crate) fn row_to_stored_event(row: sqlx::mysql::MySqlRow) -> Result Result { + let result = + sqlx::query("UPDATE events SET deleted_at = NOW(6) WHERE id = ? AND deleted_at IS NULL") + .bind(event_id) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Atomically soft-delete an event and decrement thread reply counters. +/// +/// Wraps the delete + counter update in a single transaction so a crash between +/// them cannot leave counters permanently inflated. Returns `Ok(true)` if the +/// event was deleted this call. +pub async fn soft_delete_event_and_update_thread( + pool: &MySqlPool, + event_id: &[u8], + parent_event_id: Option<&[u8]>, + root_event_id: Option<&[u8]>, +) -> Result { + let mut tx = pool.begin().await?; + + let result = + sqlx::query("UPDATE events SET deleted_at = NOW(6) WHERE id = ? AND deleted_at IS NULL") + .bind(event_id) + .execute(&mut *tx) + .await?; + + let deleted = result.rows_affected() > 0; + + if deleted { + if let Some(pid) = parent_event_id { + sqlx::query( + "UPDATE thread_metadata \ + SET reply_count = GREATEST(reply_count - 1, 0) \ + WHERE event_id = ?", + ) + .bind(pid) + .execute(&mut *tx) + .await?; + + if let Some(root_id) = root_event_id { + sqlx::query( + "UPDATE thread_metadata \ + SET descendant_count = GREATEST(descendant_count - 1, 0) \ + WHERE event_id = ?", + ) + .bind(root_id) + .execute(&mut *tx) + .await?; + } + } + } + + tx.commit().await?; + Ok(deleted) +} + +/// Returns the `created_at` timestamp of the most recent non-deleted event in a channel. +pub async fn get_last_message_at( + pool: &MySqlPool, + channel_id: uuid::Uuid, +) -> Result>> { + let id_bytes = channel_id.as_bytes().as_slice().to_vec(); + let row = sqlx::query( + "SELECT created_at FROM events \ + WHERE channel_id = ? AND deleted_at IS NULL \ + ORDER BY created_at DESC LIMIT 1", + ) + .bind(&id_bytes) + .fetch_optional(pool) + .await?; + + match row { + Some(r) => Ok(Some(r.try_get("created_at")?)), + None => Ok(None), + } +} + +/// Bulk-fetch the most recent `created_at` for a set of channel IDs. +/// +/// Returns a map of `channel_id → last_message_at`. Channels with no events are omitted. +/// Single query regardless of input size. +pub async fn get_last_message_at_bulk( + pool: &MySqlPool, + channel_ids: &[uuid::Uuid], +) -> Result>> { + if channel_ids.is_empty() { + return Ok(std::collections::HashMap::new()); + } + + let mut qb: QueryBuilder = QueryBuilder::new( + "SELECT channel_id, MAX(created_at) as last_at FROM events \ + WHERE deleted_at IS NULL AND channel_id IN (", + ); + let mut sep = qb.separated(", "); + for id in channel_ids { + sep.push_bind(id.as_bytes().to_vec()); + } + qb.push(") GROUP BY channel_id"); + + let rows = qb.build().fetch_all(pool).await?; + + let mut map = std::collections::HashMap::with_capacity(rows.len()); + for row in rows { + let id_bytes: Vec = row.try_get("channel_id")?; + let id = uuid_from_bytes(&id_bytes)?; + let last_at: DateTime = row.try_get("last_at")?; + map.insert(id, last_at); + } + Ok(map) +} + +/// Fetches a single non-deleted event by its raw 32-byte ID. +/// +/// Returns `None` if the event does not exist or has been soft-deleted. +/// Use [`get_event_by_id_including_deleted`] when you need to inspect +/// tombstoned rows (e.g. audit, undelete). pub async fn get_event_by_id(pool: &MySqlPool, id_bytes: &[u8]) -> Result> { + let row = sqlx::query( + "SELECT id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id \ + FROM events WHERE id = ? AND deleted_at IS NULL ORDER BY created_at DESC LIMIT 1", + ) + .bind(id_bytes) + .fetch_optional(pool) + .await?; + + match row { + Some(r) => row_to_stored_event(r), + None => Ok(None), + } +} + +/// Fetches a single event by its raw 32-byte ID, **including soft-deleted rows**. +/// +/// Most callers should use [`get_event_by_id`] instead. This variant is needed +/// when the caller must distinguish "never existed" from "was deleted" (e.g. +/// audit trails, compliance queries). +pub async fn get_event_by_id_including_deleted( + pool: &MySqlPool, + id_bytes: &[u8], +) -> Result> { let row = sqlx::query( "SELECT id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id \ FROM events WHERE id = ? ORDER BY created_at DESC LIMIT 1", @@ -201,6 +347,154 @@ pub async fn get_event_by_id(pool: &MySqlPool, id_bytes: &[u8]) -> Result