diff --git a/src/workers/continuum-core/src/airc/discovery.rs b/src/workers/continuum-core/src/airc/discovery.rs index bab4c294d..4320d960f 100644 --- a/src/workers/continuum-core/src/airc/discovery.rs +++ b/src/workers/continuum-core/src/airc/discovery.rs @@ -59,6 +59,10 @@ pub enum DiscoveryError { EndpointCommandFailed(String), #[error("`airc ipc-endpoint` returned an empty path — airc binary may be from before #1095 (add the command or upgrade airc)")] EmptyPath, + #[error("`airc room` failed: {0}")] + RoomCommandFailed(String), + #[error("`airc room` output did not contain a parseable `channel: ` line: {0}")] + UnparseableChannel(String), } /// Discover the airc daemon socket path. See module docs for resolution @@ -125,6 +129,81 @@ async fn query_airc_endpoint() -> Result { Ok(PathBuf::from(path)) } +/// Discover the airc scope's current room channel UUID. The owner-core +/// model requires `AttachRequest.channel` be set explicitly (per-channel +/// router subscriptions, no global fan-out) — so the inbound attach +/// path needs a specific channel before it can stream events. +/// +/// Resolution order: +/// 1. `$AIRC_DEFAULT_CHANNEL` env override — explicit UUID for tests +/// or operators with multi-room scopes who want to pin the first +/// attach. +/// 2. Parse `airc room` output for the `channel: ` line — that's +/// the scope's current default room, the one `airc msg`/`airc send` +/// publish to. +/// +/// Future work: when airc adds `airc room --print-channel` (mirroring +/// the `airc ipc-endpoint` decoupling pattern), switch to that flag for +/// stability — the current parser is robust to whitespace but coupled +/// to airc's human-prose stdout format. +pub async fn discover_default_channel() -> Result { + const AIRC_DEFAULT_CHANNEL_ENV: &str = "AIRC_DEFAULT_CHANNEL"; + if let Some(raw) = std::env::var_os(AIRC_DEFAULT_CHANNEL_ENV) { + let raw = raw.to_string_lossy().trim().to_string(); + return raw.parse::().map_err(|e| { + DiscoveryError::UnparseableChannel(format!( + "{AIRC_DEFAULT_CHANNEL_ENV}={raw:?} is not a valid UUID: {e}" + )) + }); + } + let out = TokioCommand::new("airc") + .arg("room") + .output() + .await + .map_err(|e| DiscoveryError::RoomCommandFailed(e.to_string()))?; + if !out.status.success() { + return Err(DiscoveryError::RoomCommandFailed(format!( + "exit {}: {}", + out.status, + String::from_utf8_lossy(&out.stderr).trim() + ))); + } + parse_channel_from_room_output(&String::from_utf8_lossy(&out.stdout)) +} + +/// Extract the `channel: ` line from `airc room` stdout. +/// +/// Output today (from airc rust-rewrite branch, as of this PR): +/// ```text +/// room: continuum +/// wire: /Users/joel/.airc/wires/continuum +/// channel: 11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa +/// ``` +/// +/// We match the literal `channel:` label (case-insensitive) followed by +/// whitespace and a UUID — robust to alignment changes but coupled to +/// the label name. If airc renames this field, the parser fails loudly +/// (UnparseableChannel error) rather than silently misreading. +fn parse_channel_from_room_output(stdout: &str) -> Result { + for line in stdout.lines() { + let trimmed = line.trim(); + let Some(rest) = trimmed + .strip_prefix("channel:") + .or_else(|| trimmed.strip_prefix("Channel:")) + .or_else(|| trimmed.strip_prefix("CHANNEL:")) + else { + continue; + }; + let candidate = rest.trim(); + if let Ok(uuid) = candidate.parse::() { + return Ok(uuid); + } + } + Err(DiscoveryError::UnparseableChannel(format!( + "no `channel: ` line in stdout: {stdout:?}" + ))) +} + async fn auto_install_airc() -> Result<(), DiscoveryError> { // `curl -fsSL | bash` keeps the bootstrap one-shot and matches // airc's own published install instructions (top of `install.sh`, @@ -188,4 +267,47 @@ mod tests { assert!(msg.contains(AIRC_INSTALL_URL)); assert!(msg.contains(AIRC_DISABLE_AUTOINSTALL)); } + + #[test] + fn parses_channel_from_typical_airc_room_output() { + let stdout = "\ +room: continuum +wire: /Users/joel/.airc/wires/continuum +channel: 11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa +"; + let uuid = parse_channel_from_room_output(stdout).expect("parse channel"); + assert_eq!( + uuid, + "11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa" + .parse::() + .unwrap() + ); + } + + #[test] + fn parses_channel_with_alternate_capitalization_and_whitespace() { + let stdout = " Channel: 11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa\n"; + let uuid = parse_channel_from_room_output(stdout).expect("parse channel"); + assert_eq!( + uuid, + "11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa" + .parse::() + .unwrap() + ); + } + + #[test] + fn parser_fails_loud_when_channel_line_absent() { + let stdout = "room: continuum\nwire: /tmp/x\n"; + let err = parse_channel_from_room_output(stdout).expect_err("must fail"); + assert!(matches!(err, DiscoveryError::UnparseableChannel(_))); + assert!(err.to_string().contains("no `channel:")); + } + + #[test] + fn parser_fails_loud_on_non_uuid_after_label() { + let stdout = "channel: not-a-uuid\n"; + let err = parse_channel_from_room_output(stdout).expect_err("must fail"); + assert!(matches!(err, DiscoveryError::UnparseableChannel(_))); + } } diff --git a/src/workers/continuum-core/src/airc/inbound_attach.rs b/src/workers/continuum-core/src/airc/inbound_attach.rs index 95b904bcf..31700828d 100644 --- a/src/workers/continuum-core/src/airc/inbound_attach.rs +++ b/src/workers/continuum-core/src/airc/inbound_attach.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use std::sync::Arc; +use airc_core::RoomId; use airc_ipc::{codec::read_frame, AttachRequest, DaemonClient, Response}; use tracing::warn; @@ -15,20 +16,37 @@ use crate::runtime::MessageBus; pub fn spawn_daemon_attach( socket_path: PathBuf, + channel: RoomId, bus: Arc, runtime: &tokio::runtime::Handle, ) { runtime.spawn(async move { - if let Err(error) = run_daemon_attach(socket_path, bus).await { + if let Err(error) = run_daemon_attach(socket_path, channel, bus).await { warn!("AIRC daemon attach stream stopped: {error}"); } }); } -pub async fn run_daemon_attach(socket_path: PathBuf, bus: Arc) -> Result<(), String> { +pub async fn run_daemon_attach( + socket_path: PathBuf, + channel: RoomId, + bus: Arc, +) -> Result<(), String> { let client = DaemonClient::new(socket_path); + // Owner-core model (airc-daemon/src/server.rs:274): the router + // subscribes per channel — no global fan-out table. AttachRequest + // MUST carry `channel: Some(_)` or the daemon responds + // `attach requires a channel in the owner-core model`. continuum + // discovers the scope's default channel at boot via + // `crate::airc::discover_default_channel` (parses `airc room`). + // Multi-room scopes will spawn one daemon_attach task per channel + // they care about — single-attach today, per-room fan-out as a + // follow-up when continuum rooms become first-class. let mut stream = client - .attach(AttachRequest::default()) + .attach(AttachRequest { + channel: Some(channel), + ..AttachRequest::default() + }) .await .map_err(|error| format!("failed to attach to airc daemon: {error}"))?; diff --git a/src/workers/continuum-core/src/airc/mod.rs b/src/workers/continuum-core/src/airc/mod.rs index d3f877918..661c6dcf5 100644 --- a/src/workers/continuum-core/src/airc/mod.rs +++ b/src/workers/continuum-core/src/airc/mod.rs @@ -19,7 +19,7 @@ pub mod types; pub use client::{AircQueueClient, CliAircQueueClient}; #[allow(deprecated)] pub use daemon_endpoint::default_socket_path_in; -pub use discovery::{discover_airc_socket, DiscoveryError}; +pub use discovery::{discover_airc_socket, discover_default_channel, DiscoveryError}; pub use daemon_transport::{AircDaemonClient, DaemonAircEventTransport}; pub use event_transport::{AircEventTransport, StoreAircEventTransport}; pub use inbound_attach::spawn_daemon_attach; diff --git a/src/workers/continuum-core/src/modules/airc.rs b/src/workers/continuum-core/src/modules/airc.rs index 88aa8e863..825401ff6 100644 --- a/src/workers/continuum-core/src/modules/airc.rs +++ b/src/workers/continuum-core/src/modules/airc.rs @@ -1,15 +1,16 @@ //! ServiceModule adapter for Rust-native AIRC commands. use crate::airc::{ - discover_airc_socket, spawn_daemon_attach, AircEventTransport, AircQueueClient, - AircQueueListRequest, AircQueueScanParams, AircRealtimePublishParams, AircRealtimeReplayParams, - AircRealtimeStore, CliAircQueueClient, DaemonAircEventTransport, InMemoryAircRealtimeStore, - StoreAircEventTransport, TokioAircCommandRunner, + discover_airc_socket, discover_default_channel, spawn_daemon_attach, AircEventTransport, + AircQueueClient, AircQueueListRequest, AircQueueScanParams, AircRealtimePublishParams, + AircRealtimeReplayParams, AircRealtimeStore, CliAircQueueClient, DaemonAircEventTransport, + InMemoryAircRealtimeStore, StoreAircEventTransport, TokioAircCommandRunner, }; // `default_socket_path_in` retained for back-compat callers; deprecated, // see `crate::airc::daemon_endpoint` module docs. #[allow(deprecated)] use crate::airc::default_socket_path_in; +use airc_core::RoomId; use crate::runtime::{ CommandResult, CommandSchema, ModuleConfig, ModuleContext, ModulePriority, ParamSchema, ServiceModule, @@ -23,6 +24,12 @@ pub struct AircModule { queue_client: Arc, event_transport: Arc, attach_socket_path: Option, + /// Channel (room) to attach to at `initialize()`. Required by airc's + /// owner-core router model (`airc-daemon/src/server.rs:274`); without + /// a channel the daemon rejects attach with "attach requires a + /// channel in the owner-core model". Discovered via + /// [`discover_default_channel`] alongside the socket path. + attach_channel: Option, } impl AircModule { @@ -40,23 +47,23 @@ impl AircModule { } /// Discover the airc daemon socket via [`discover_airc_socket`] (asks - /// `airc ipc-endpoint` per airc#1095; auto-installs airc if missing). - /// On discovery failure, returns a degraded module that responds to - /// `airc/*` commands via the in-memory store but performs no daemon - /// attach — so the rest of continuum-core boots even when airc is - /// unreachable (e.g. CI without network for auto-install). + /// `airc ipc-endpoint` per airc#1095; auto-installs airc if missing) + /// AND the default channel via [`discover_default_channel`] (parses + /// `airc room` for the scope's current room channel — required by + /// airc's owner-core router model). On any discovery failure, returns + /// a degraded module that responds to `airc/*` commands via the + /// in-memory store but performs no daemon attach — so the rest of + /// continuum-core boots even when airc is unreachable (e.g. CI + /// without network for auto-install) or the scope has no current + /// room (fresh install before `airc room `). pub async fn discover_and_construct() -> Self { - match discover_airc_socket().await { - Ok(socket_path) => { + let socket_path = match discover_airc_socket().await { + Ok(path) => { tracing::info!( - ?socket_path, + socket_path = ?path, "Discovered airc daemon socket via `airc ipc-endpoint`" ); - Self { - queue_client: Arc::new(CliAircQueueClient::new(TokioAircCommandRunner)), - event_transport: Arc::new(DaemonAircEventTransport::new(socket_path.clone())), - attach_socket_path: Some(socket_path), - } + path } Err(error) => { tracing::warn!( @@ -66,8 +73,42 @@ impl AircModule { Resolve: install airc manually or set AIRC_DAEMON_SOCKET; see error \ above for the suggested remedy." ); - Self::with_queue_client(Arc::new(CliAircQueueClient::new(TokioAircCommandRunner))) + return Self::with_queue_client(Arc::new(CliAircQueueClient::new( + TokioAircCommandRunner, + ))); } + }; + + let attach_channel = match discover_default_channel().await { + Ok(uuid) => { + tracing::info!( + channel = %uuid, + "Discovered airc default channel via `airc room`" + ); + Some(RoomId::from_uuid(uuid)) + } + Err(error) => { + // Socket reachable but no channel — boot continues with + // queue + realtime commands, just no inbound attach. The + // common case is "fresh install, scope not yet subscribed + // to any room"; the operator runs `airc room ` and + // restarts to wire up the attach. + tracing::warn!( + %error, + "airc default-channel discovery failed — AIRC inbound attach disabled. \ + Resolve: run `airc room ` to subscribe the scope to a room, \ + or set AIRC_DEFAULT_CHANNEL= to pin a channel explicitly, then \ + restart continuum-core." + ); + None + } + }; + + Self { + queue_client: Arc::new(CliAircQueueClient::new(TokioAircCommandRunner)), + event_transport: Arc::new(DaemonAircEventTransport::new(socket_path.clone())), + attach_socket_path: Some(socket_path), + attach_channel, } } @@ -78,6 +119,7 @@ impl AircModule { queue_client: Arc::new(CliAircQueueClient::new(TokioAircCommandRunner)), event_transport: Arc::new(DaemonAircEventTransport::new(socket_path.clone())), attach_socket_path: Some(socket_path), + attach_channel: None, } } @@ -88,6 +130,7 @@ impl AircModule { InMemoryAircRealtimeStore::default(), ))), attach_socket_path: None, + attach_channel: None, } } @@ -99,6 +142,7 @@ impl AircModule { queue_client, event_transport: Arc::new(StoreAircEventTransport::new(realtime_store)), attach_socket_path: None, + attach_channel: None, } } @@ -110,6 +154,7 @@ impl AircModule { queue_client, event_transport, attach_socket_path: None, + attach_channel: None, } } } @@ -135,8 +180,23 @@ impl ServiceModule for AircModule { } async fn initialize(&self, ctx: &ModuleContext) -> Result<(), String> { - if let Some(socket_path) = self.attach_socket_path.clone() { - spawn_daemon_attach(socket_path, ctx.bus.clone(), &ctx.runtime); + // Inbound attach requires BOTH a socket (where to connect) AND a + // channel (what to subscribe to under airc's owner-core model). + // Either being None disables the attach but lets the rest of + // the module + the broader continuum-core boot — the operator + // sees one of the warnings from `discover_and_construct` so the + // remedy path is obvious. + match ( + self.attach_socket_path.clone(), + self.attach_channel, + ) { + (Some(socket_path), Some(channel)) => { + spawn_daemon_attach(socket_path, channel, ctx.bus.clone(), &ctx.runtime); + } + (Some(_), None) | (None, Some(_)) | (None, None) => { + // Already warned during construction; stay silent here + // to avoid duplicate noise on every boot. + } } Ok(()) }