-
Notifications
You must be signed in to change notification settings - Fork 0
Migrate channel scoping to h tags #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,11 @@ use nostr::Event; | |
| use sprout_audit::{AuditAction, NewAuditEntry}; | ||
| use sprout_core::event::StoredEvent; | ||
| use sprout_core::kind::{ | ||
| event_kind_u32, is_ephemeral, is_workflow_execution_kind, KIND_AUTH, KIND_PRESENCE_UPDATE, | ||
| event_kind_u32, is_ephemeral, is_workflow_execution_kind, KIND_AUTH, KIND_CANVAS, | ||
| KIND_FORUM_COMMENT, KIND_FORUM_POST, KIND_FORUM_VOTE, KIND_PRESENCE_UPDATE, | ||
| KIND_STREAM_MESSAGE, KIND_STREAM_MESSAGE_BOOKMARKED, KIND_STREAM_MESSAGE_EDIT, | ||
| KIND_STREAM_MESSAGE_PINNED, KIND_STREAM_MESSAGE_SCHEDULED, KIND_STREAM_MESSAGE_V2, | ||
| KIND_STREAM_REMINDER, | ||
| }; | ||
| use sprout_core::verification::verify_event; | ||
|
|
||
|
|
@@ -19,6 +23,75 @@ use crate::connection::{AuthState, ConnectionState}; | |
| use crate::protocol::RelayMessage; | ||
| use crate::state::AppState; | ||
|
|
||
| /// Publish a stored event to subscribers and kick off async side effects. | ||
| pub(crate) async fn dispatch_persistent_event( | ||
| state: &Arc<AppState>, | ||
| stored_event: &StoredEvent, | ||
| kind_u32: u32, | ||
| actor_pubkey_hex: &str, | ||
| ) -> usize { | ||
| let event_id_hex = stored_event.event.id.to_hex(); | ||
|
|
||
| if let Some(ch_id) = stored_event.channel_id { | ||
| if let Err(e) = state.pubsub.publish_event(ch_id, &stored_event.event).await { | ||
| warn!(event_id = %event_id_hex, "Redis publish failed: {e}"); | ||
| } | ||
| } | ||
|
|
||
| let matches = state.sub_registry.fan_out(stored_event); | ||
| debug!( | ||
| event_id = %event_id_hex, | ||
| channel_id = ?stored_event.channel_id, | ||
| match_count = matches.len(), | ||
| "Fan-out" | ||
| ); | ||
|
|
||
| let event_json = serde_json::to_string(&stored_event.event) | ||
| .expect("nostr::Event serialization is infallible for well-formed events"); | ||
| for (target_conn_id, sub_id) in &matches { | ||
| let msg = format!(r#"["EVENT","{}",{}]"#, sub_id, event_json); | ||
| state.conn_manager.send_to(*target_conn_id, msg); | ||
| } | ||
|
|
||
| let search = Arc::clone(&state.search); | ||
| let stored_for_search = stored_event.clone(); | ||
| tokio::spawn(async move { | ||
| if let Err(e) = search.index_event(&stored_for_search).await { | ||
| error!(event_id = %stored_for_search.event.id.to_hex(), "Search index failed: {e}"); | ||
| } | ||
| }); | ||
|
|
||
| let audit = Arc::clone(&state.audit); | ||
| let audit_event_id = event_id_hex.clone(); | ||
| let audit_actor_pubkey = actor_pubkey_hex.to_string(); | ||
| let audit_channel_id = stored_event.channel_id; | ||
| tokio::spawn(async move { | ||
| let entry = NewAuditEntry { | ||
| event_id: audit_event_id.clone(), | ||
| event_kind: kind_u32, | ||
| actor_pubkey: audit_actor_pubkey, | ||
| action: AuditAction::EventCreated, | ||
| channel_id: audit_channel_id, | ||
| metadata: serde_json::Value::Null, | ||
| }; | ||
| if let Err(e) = audit.log(entry).await { | ||
| error!(event_id = %audit_event_id, "Audit log failed: {e}"); | ||
| } | ||
| }); | ||
|
|
||
| if !is_workflow_execution_kind(kind_u32) { | ||
| let workflow_engine = Arc::clone(&state.workflow_engine); | ||
| let workflow_event = stored_event.clone(); | ||
| tokio::spawn(async move { | ||
| if let Err(e) = workflow_engine.on_event(&workflow_event).await { | ||
| tracing::error!(event_id = ?workflow_event.event.id, "Workflow trigger failed: {e}"); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| matches.len() | ||
| } | ||
|
|
||
| /// Handle an EVENT message: authenticate, verify, store, fan-out, index, and audit the event. | ||
| pub async fn handle_event(event: Event, conn: Arc<ConnectionState>, state: Arc<AppState>) { | ||
| let event_id_hex = event.id.to_hex(); | ||
|
|
@@ -169,6 +242,15 @@ pub async fn handle_event(event: Event, conn: Arc<ConnectionState>, state: Arc<A | |
| extract_channel_id(&event) | ||
| }; | ||
|
|
||
| if requires_h_channel_scope(kind_u32) && channel_id.is_none() { | ||
| conn.send(RelayMessage::ok( | ||
| &event_id_hex, | ||
| false, | ||
| "invalid: channel-scoped events must include an h tag", | ||
| )); | ||
| return; | ||
| } | ||
|
|
||
| if let Some(ch_id) = channel_id { | ||
| if let Err(msg) = | ||
| check_channel_membership(&state, ch_id, &pubkey_bytes, conn_id, &event_id_hex).await | ||
|
|
@@ -248,70 +330,15 @@ pub async fn handle_event(event: Event, conn: Arc<ConnectionState>, state: Arc<A | |
| } | ||
| } | ||
|
|
||
| if let Some(ch_id) = channel_id { | ||
| if let Err(e) = state.pubsub.publish_event(ch_id, &event).await { | ||
| warn!(event_id = %event_id_hex, "Redis publish failed: {e}"); | ||
| } | ||
| } | ||
|
|
||
| let matches = state.sub_registry.fan_out(&stored_event); | ||
| debug!( | ||
| event_id = %event_id_hex, | ||
| channel_id = ?stored_event.channel_id, | ||
| match_count = matches.len(), | ||
| "Fan-out" | ||
| ); | ||
| let event_json = serde_json::to_string(&stored_event.event) | ||
| .expect("nostr::Event serialization is infallible for well-formed events"); | ||
| for (target_conn_id, sub_id) in &matches { | ||
| let msg = format!(r#"["EVENT","{}",{}]"#, sub_id, event_json); | ||
| state.conn_manager.send_to(*target_conn_id, msg); | ||
| } | ||
|
|
||
| let search = Arc::clone(&state.search); | ||
| let stored_for_search = stored_event.clone(); | ||
| tokio::spawn(async move { | ||
| if let Err(e) = search.index_event(&stored_for_search).await { | ||
| error!(event_id = %stored_for_search.event.id.to_hex(), "Search index failed: {e}"); | ||
| } | ||
| }); | ||
|
|
||
| let audit = Arc::clone(&state.audit); | ||
| let audit_event_id = event_id_hex.clone(); | ||
| let audit_pubkey = pubkey_hex.clone(); | ||
| tokio::spawn(async move { | ||
| let entry = NewAuditEntry { | ||
| event_id: audit_event_id.clone(), | ||
| event_kind: kind_u32, | ||
| actor_pubkey: audit_pubkey, | ||
| action: AuditAction::EventCreated, | ||
| channel_id, | ||
| metadata: serde_json::Value::Null, | ||
| }; | ||
| if let Err(e) = audit.log(entry).await { | ||
| error!(event_id = %audit_event_id, "Audit log failed: {e}"); | ||
| } | ||
| }); | ||
|
|
||
| // Don't trigger workflows for workflow execution events (prevents infinite loops). | ||
| let is_workflow_event = is_workflow_execution_kind(kind_u32); | ||
| if !is_workflow_event { | ||
| let wf = Arc::clone(&state.workflow_engine); | ||
| let ev = stored_event.clone(); | ||
| tokio::spawn(async move { | ||
| if let Err(e) = wf.on_event(&ev).await { | ||
| tracing::error!(event_id = ?ev.event.id, "Workflow trigger failed: {e}"); | ||
| } | ||
| }); | ||
| } | ||
| let fan_out = dispatch_persistent_event(&state, &stored_event, kind_u32, &pubkey_hex).await; | ||
|
|
||
| conn.send(RelayMessage::ok(&event_id_hex, true, "")); | ||
|
|
||
| info!( | ||
| event_id = %event_id_hex, | ||
| kind = kind_u32, | ||
| conn_id = %conn_id, | ||
| fan_out = matches.len(), | ||
| fan_out, | ||
| "Event ingested" | ||
| ); | ||
| } | ||
|
|
@@ -538,12 +565,12 @@ async fn derive_reaction_channel( | |
|
|
||
| /// Extract a channel UUID from event tags. | ||
| /// | ||
| /// Checks `"channel"` custom tags and `"h"` NIP-29 group tags for a channel UUID. | ||
| /// Checks the `"h"` NIP-29 group tag for a channel UUID. | ||
| /// The `"e"` tag is intentionally NOT checked — it is reserved for event references only. | ||
| fn extract_channel_id(event: &Event) -> Option<uuid::Uuid> { | ||
| for tag in event.tags.iter() { | ||
| let key = tag.kind().to_string(); | ||
| if key == "channel" || key == "h" { | ||
| if key == "h" { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This change makes Useful? React with 👍 / 👎. |
||
| if let Some(val) = tag.content() { | ||
| if let Ok(id) = val.parse::<uuid::Uuid>() { | ||
| return Some(id); | ||
|
|
@@ -553,3 +580,57 @@ fn extract_channel_id(event: &Event) -> Option<uuid::Uuid> { | |
| } | ||
| None | ||
| } | ||
|
|
||
| fn requires_h_channel_scope(kind: u32) -> bool { | ||
| matches!( | ||
| kind, | ||
| KIND_STREAM_MESSAGE | ||
| | KIND_STREAM_MESSAGE_V2 | ||
| | KIND_STREAM_MESSAGE_EDIT | ||
| | KIND_STREAM_MESSAGE_PINNED | ||
| | KIND_STREAM_MESSAGE_BOOKMARKED | ||
| | KIND_STREAM_MESSAGE_SCHEDULED | ||
| | KIND_STREAM_REMINDER | ||
| | KIND_CANVAS | ||
| | KIND_FORUM_POST | ||
| | KIND_FORUM_VOTE | ||
| | KIND_FORUM_COMMENT | ||
| ) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::requires_h_channel_scope; | ||
| use sprout_core::kind::{ | ||
| KIND_CANVAS, KIND_FORUM_COMMENT, KIND_FORUM_POST, KIND_FORUM_VOTE, KIND_PRESENCE_UPDATE, | ||
| KIND_STREAM_MESSAGE, | ||
| }; | ||
|
|
||
| #[test] | ||
| fn channel_scoped_content_kinds_require_h_tags() { | ||
| for kind in [ | ||
| KIND_STREAM_MESSAGE, | ||
| KIND_CANVAS, | ||
| KIND_FORUM_POST, | ||
| KIND_FORUM_VOTE, | ||
| KIND_FORUM_COMMENT, | ||
| ] { | ||
| assert!( | ||
| requires_h_channel_scope(kind), | ||
| "kind {kind} should require h" | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn non_channel_kinds_do_not_require_h_tags() { | ||
| assert!( | ||
| !requires_h_channel_scope(nostr::Kind::Reaction.as_u16().into()), | ||
| "reactions derive channel from the target event" | ||
| ); | ||
| assert!( | ||
| !requires_h_channel_scope(KIND_PRESENCE_UPDATE), | ||
| "presence updates are global/ephemeral" | ||
| ); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_canvasnow queries only by#h, but older canvas events were written with the previous channel tag format. Since matching is tag-literal, pre-migration canvas documents will no longer be returned, causing MCP to report "No canvas set" even when a canvas already exists. Add a fallback path for legacy tags (or migrate existing canvas events) before removing old-read compatibility.Useful? React with 👍 / 👎.