diff --git a/src/workers/continuum-core/src/airc/event_transport.rs b/src/workers/continuum-core/src/airc/event_transport.rs new file mode 100644 index 000000000..7362dd41a --- /dev/null +++ b/src/workers/continuum-core/src/airc/event_transport.rs @@ -0,0 +1,94 @@ +//! Typed event transport seam for Continuum realtime envelopes. +//! +//! Command modules and future bridge loops should depend on this trait, +//! not on a concrete store or a CLI command. The first implementation is +//! store-backed so tests and local runtime keep deterministic replay; +//! later implementations can publish to the AIRC SDK/daemon without +//! changing command surfaces. + +use std::sync::Arc; + +use crate::airc::realtime_store::{ + AircRealtimePublishParams, AircRealtimePublishResult, AircRealtimeReplayParams, + AircRealtimeReplayResult, AircRealtimeStore, +}; + +pub trait AircEventTransport: Send + Sync { + fn publish( + &self, + params: AircRealtimePublishParams, + ) -> Result; + + fn replay(&self, params: AircRealtimeReplayParams) -> Result; +} + +#[derive(Clone)] +pub struct StoreAircEventTransport { + store: Arc, +} + +impl StoreAircEventTransport { + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +impl AircEventTransport for StoreAircEventTransport { + fn publish( + &self, + params: AircRealtimePublishParams, + ) -> Result { + self.store.publish(params) + } + + fn replay(&self, params: AircRealtimeReplayParams) -> Result { + self.store.replay(params) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::airc::{ + AircRealtimeEnvelope, AircRealtimePayload, AircRealtimePayloadRef, AircRealtimeSchema, + InMemoryAircRealtimeStore, + }; + use serde_json::json; + + #[test] + fn store_transport_round_trips_without_cli_output_parsing() { + let transport = + StoreAircEventTransport::new(Arc::new(InMemoryAircRealtimeStore::default())); + let envelope = AircRealtimeEnvelope::new( + "evt-1".to_string(), + "general".to_string(), + "continuum".to_string(), + 100, + AircRealtimePayload::ExistingSchema { + payload: AircRealtimePayloadRef::inline( + AircRealtimeSchema::EventBridgePayload, + json!({"event": "persona.ready"}), + ), + }, + ); + + let publish = transport + .publish(AircRealtimePublishParams { envelope }) + .unwrap(); + assert!(publish.stored_for_replay); + + let replay = transport + .replay(AircRealtimeReplayParams { + room_id: "general".to_string(), + after_event_id: None, + limit: Some(10), + include_presence: None, + include_subscriptions: None, + now_ms: None, + }) + .unwrap(); + + assert_eq!(replay.events.len(), 1); + assert_eq!(replay.events[0].event_id, "evt-1"); + } +} diff --git a/src/workers/continuum-core/src/airc/mod.rs b/src/workers/continuum-core/src/airc/mod.rs index 51606f14b..6c2d8f166 100644 --- a/src/workers/continuum-core/src/airc/mod.rs +++ b/src/workers/continuum-core/src/airc/mod.rs @@ -5,12 +5,14 @@ //! ServiceModule wrappers stay thin and future AIRC commands reuse one path. pub mod client; +pub mod event_transport; pub mod process; pub mod realtime; pub mod realtime_store; pub mod types; pub use client::{AircQueueClient, CliAircQueueClient}; +pub use event_transport::{AircEventTransport, StoreAircEventTransport}; pub use process::{AircCommandRunner, AircInvocation, TokioAircCommandRunner}; pub use realtime::{ AircMediaControlEvent, AircPresenceEvent, AircPresenceState, AircRealtimeDelivery, diff --git a/src/workers/continuum-core/src/modules/airc.rs b/src/workers/continuum-core/src/modules/airc.rs index 7c271f006..202680d7b 100644 --- a/src/workers/continuum-core/src/modules/airc.rs +++ b/src/workers/continuum-core/src/modules/airc.rs @@ -1,9 +1,9 @@ //! ServiceModule adapter for Rust-native AIRC commands. use crate::airc::{ - AircQueueClient, AircQueueListRequest, AircQueueScanParams, AircRealtimePublishParams, - AircRealtimeReplayParams, AircRealtimeStore, CliAircQueueClient, InMemoryAircRealtimeStore, - TokioAircCommandRunner, + AircEventTransport, AircQueueClient, AircQueueListRequest, AircQueueScanParams, + AircRealtimePublishParams, AircRealtimeReplayParams, AircRealtimeStore, CliAircQueueClient, + InMemoryAircRealtimeStore, StoreAircEventTransport, TokioAircCommandRunner, }; use crate::runtime::{ CommandResult, CommandSchema, ModuleConfig, ModuleContext, ModulePriority, ParamSchema, @@ -16,21 +16,25 @@ use std::sync::Arc; pub struct AircModule { queue_client: Arc, - realtime_store: Arc, + event_transport: Arc, } impl AircModule { pub fn new() -> Self { Self { queue_client: Arc::new(CliAircQueueClient::new(TokioAircCommandRunner)), - realtime_store: Arc::new(InMemoryAircRealtimeStore::default()), + event_transport: Arc::new(StoreAircEventTransport::new(Arc::new( + InMemoryAircRealtimeStore::default(), + ))), } } pub fn with_queue_client(queue_client: Arc) -> Self { Self { queue_client, - realtime_store: Arc::new(InMemoryAircRealtimeStore::default()), + event_transport: Arc::new(StoreAircEventTransport::new(Arc::new( + InMemoryAircRealtimeStore::default(), + ))), } } @@ -40,7 +44,17 @@ impl AircModule { ) -> Self { Self { queue_client, - realtime_store, + event_transport: Arc::new(StoreAircEventTransport::new(realtime_store)), + } + } + + pub fn with_event_transport( + queue_client: Arc, + event_transport: Arc, + ) -> Self { + Self { + queue_client, + event_transport, } } } @@ -81,13 +95,13 @@ impl ServiceModule for AircModule { "airc/realtime-publish" => { let params: AircRealtimePublishParams = serde_json::from_value(params) .map_err(|e| format!("invalid airc/realtime-publish params: {e}"))?; - let result = self.realtime_store.publish(params)?; + let result = self.event_transport.publish(params)?; CommandResult::json(&result) } "airc/realtime-replay" => { let params: AircRealtimeReplayParams = serde_json::from_value(params) .map_err(|e| format!("invalid airc/realtime-replay params: {e}"))?; - let result = self.realtime_store.replay(params)?; + let result = self.event_transport.replay(params)?; CommandResult::json(&result) } _ => Err(format!("Unknown airc command: {command}")), @@ -190,9 +204,11 @@ impl ServiceModule for AircModule { mod tests { use super::*; use crate::airc::{ - AircPresenceEvent, AircPresenceState, AircQueueScanResult, AircRealtimeEnvelope, - AircRealtimePayload, + AircPresenceEvent, AircPresenceState, AircQueueScanResult, AircRealtimeDelivery, + AircRealtimeEnvelope, AircRealtimePayload, AircRealtimePublishResult, + AircRealtimeReplayResult, }; + use parking_lot::Mutex; use serde_json::json; struct FakeQueueClient; @@ -216,6 +232,51 @@ mod tests { } } + struct FakeEventTransport { + published: Mutex>, + } + + impl FakeEventTransport { + fn new() -> Self { + Self { + published: Mutex::new(Vec::new()), + } + } + } + + impl AircEventTransport for FakeEventTransport { + fn publish( + &self, + params: AircRealtimePublishParams, + ) -> Result { + self.published.lock().push(params.envelope.event_id.clone()); + Ok(AircRealtimePublishResult { + ok: true, + event_id: params.envelope.event_id, + room_id: params.envelope.room_id, + delivery: AircRealtimeDelivery::Durable, + stored_for_replay: true, + coalesced_presence_key: None, + replay_depth: 1, + active_presence_count: 0, + active_subscription_count: 0, + }) + } + + fn replay( + &self, + params: AircRealtimeReplayParams, + ) -> Result { + Ok(AircRealtimeReplayResult { + room_id: params.room_id, + events: Vec::new(), + cursor: None, + active_presence: Vec::new(), + active_subscriptions: Vec::new(), + }) + } + } + #[tokio::test] async fn queue_scan_command_uses_queue_client() { let module = AircModule::with_queue_client(Arc::new(FakeQueueClient)); @@ -287,4 +348,38 @@ mod tests { assert_eq!(replay_value["events"].as_array().unwrap().len(), 0); assert_eq!(replay_value["activePresence"].as_array().unwrap().len(), 1); } + + #[tokio::test] + async fn realtime_publish_uses_event_transport_seam() { + let transport = Arc::new(FakeEventTransport::new()); + let module = AircModule::with_event_transport(Arc::new(FakeQueueClient), transport.clone()); + let envelope = AircRealtimeEnvelope::new( + "evt-through-transport".to_string(), + "general".to_string(), + "persona-1".to_string(), + 100, + AircRealtimePayload::Presence { + event: AircPresenceEvent { + room_id: "general".to_string(), + subject_id: "persona-1".to_string(), + display_name: None, + state: AircPresenceState::Online, + started_at_ms: 100, + expires_at_ms: None, + call_id: None, + }, + }, + ); + + let result = module + .handle_command("airc/realtime-publish", json!({ "envelope": envelope })) + .await + .unwrap(); + + let CommandResult::Json(value) = result else { + panic!("expected JSON result"); + }; + assert_eq!(value["eventId"], "evt-through-transport"); + assert_eq!(transport.published.lock()[0], "evt-through-transport"); + } }