Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions src/workers/continuum-core/src/airc/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub enum DiscoveryError {
EndpointCommandFailed(String),
#[error("`airc ipc-endpoint` returned an empty path — airc binary may be from before #1095 (add the command or upgrade airc)")]
EmptyPath,
#[error("`airc room` failed: {0}")]
RoomCommandFailed(String),
#[error("`airc room` output did not contain a parseable `channel: <uuid>` line: {0}")]
UnparseableChannel(String),
}

/// Discover the airc daemon socket path. See module docs for resolution
Expand Down Expand Up @@ -125,6 +129,81 @@ async fn query_airc_endpoint() -> Result<PathBuf, DiscoveryError> {
Ok(PathBuf::from(path))
}

/// Discover the airc scope's current room channel UUID. The owner-core
/// model requires `AttachRequest.channel` be set explicitly (per-channel
/// router subscriptions, no global fan-out) — so the inbound attach
/// path needs a specific channel before it can stream events.
///
/// Resolution order:
/// 1. `$AIRC_DEFAULT_CHANNEL` env override — explicit UUID for tests
/// or operators with multi-room scopes who want to pin the first
/// attach.
/// 2. Parse `airc room` output for the `channel: <uuid>` line — that's
/// the scope's current default room, the one `airc msg`/`airc send`
/// publish to.
///
/// Future work: when airc adds `airc room --print-channel` (mirroring
/// the `airc ipc-endpoint` decoupling pattern), switch to that flag for
/// stability — the current parser is robust to whitespace but coupled
/// to airc's human-prose stdout format.
pub async fn discover_default_channel() -> Result<uuid::Uuid, DiscoveryError> {
const AIRC_DEFAULT_CHANNEL_ENV: &str = "AIRC_DEFAULT_CHANNEL";
if let Some(raw) = std::env::var_os(AIRC_DEFAULT_CHANNEL_ENV) {
let raw = raw.to_string_lossy().trim().to_string();
return raw.parse::<uuid::Uuid>().map_err(|e| {
DiscoveryError::UnparseableChannel(format!(
"{AIRC_DEFAULT_CHANNEL_ENV}={raw:?} is not a valid UUID: {e}"
))
});
}
let out = TokioCommand::new("airc")
.arg("room")
.output()
.await
.map_err(|e| DiscoveryError::RoomCommandFailed(e.to_string()))?;
if !out.status.success() {
return Err(DiscoveryError::RoomCommandFailed(format!(
"exit {}: {}",
out.status,
String::from_utf8_lossy(&out.stderr).trim()
)));
}
parse_channel_from_room_output(&String::from_utf8_lossy(&out.stdout))
}

/// Extract the `channel: <uuid>` line from `airc room` stdout.
///
/// Output today (from airc rust-rewrite branch, as of this PR):
/// ```text
/// room: continuum
/// wire: /Users/joel/.airc/wires/continuum
/// channel: 11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa
/// ```
///
/// We match the literal `channel:` label (case-insensitive) followed by
/// whitespace and a UUID — robust to alignment changes but coupled to
/// the label name. If airc renames this field, the parser fails loudly
/// (UnparseableChannel error) rather than silently misreading.
fn parse_channel_from_room_output(stdout: &str) -> Result<uuid::Uuid, DiscoveryError> {
for line in stdout.lines() {
let trimmed = line.trim();
let Some(rest) = trimmed
.strip_prefix("channel:")
.or_else(|| trimmed.strip_prefix("Channel:"))
.or_else(|| trimmed.strip_prefix("CHANNEL:"))
else {
continue;
};
let candidate = rest.trim();
if let Ok(uuid) = candidate.parse::<uuid::Uuid>() {
return Ok(uuid);
}
}
Err(DiscoveryError::UnparseableChannel(format!(
"no `channel: <uuid>` line in stdout: {stdout:?}"
)))
}

async fn auto_install_airc() -> Result<(), DiscoveryError> {
// `curl -fsSL <URL> | bash` keeps the bootstrap one-shot and matches
// airc's own published install instructions (top of `install.sh`,
Expand Down Expand Up @@ -188,4 +267,47 @@ mod tests {
assert!(msg.contains(AIRC_INSTALL_URL));
assert!(msg.contains(AIRC_DISABLE_AUTOINSTALL));
}

#[test]
fn parses_channel_from_typical_airc_room_output() {
let stdout = "\
room: continuum
wire: /Users/joel/.airc/wires/continuum
channel: 11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa
";
let uuid = parse_channel_from_room_output(stdout).expect("parse channel");
assert_eq!(
uuid,
"11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa"
.parse::<uuid::Uuid>()
.unwrap()
);
}

#[test]
fn parses_channel_with_alternate_capitalization_and_whitespace() {
let stdout = " Channel: 11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa\n";
let uuid = parse_channel_from_room_output(stdout).expect("parse channel");
assert_eq!(
uuid,
"11c1a7ac-cb85-5ca0-a5b4-2847280ea3fa"
.parse::<uuid::Uuid>()
.unwrap()
);
}

#[test]
fn parser_fails_loud_when_channel_line_absent() {
let stdout = "room: continuum\nwire: /tmp/x\n";
let err = parse_channel_from_room_output(stdout).expect_err("must fail");
assert!(matches!(err, DiscoveryError::UnparseableChannel(_)));
assert!(err.to_string().contains("no `channel:"));
}

#[test]
fn parser_fails_loud_on_non_uuid_after_label() {
let stdout = "channel: not-a-uuid\n";
let err = parse_channel_from_room_output(stdout).expect_err("must fail");
assert!(matches!(err, DiscoveryError::UnparseableChannel(_)));
}
}
24 changes: 21 additions & 3 deletions src/workers/continuum-core/src/airc/inbound_attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::path::PathBuf;
use std::sync::Arc;

use airc_core::RoomId;
use airc_ipc::{codec::read_frame, AttachRequest, DaemonClient, Response};
use tracing::warn;

Expand All @@ -15,20 +16,37 @@ use crate::runtime::MessageBus;

pub fn spawn_daemon_attach(
socket_path: PathBuf,
channel: RoomId,
bus: Arc<MessageBus>,
runtime: &tokio::runtime::Handle,
) {
runtime.spawn(async move {
if let Err(error) = run_daemon_attach(socket_path, bus).await {
if let Err(error) = run_daemon_attach(socket_path, channel, bus).await {
warn!("AIRC daemon attach stream stopped: {error}");
}
});
}

pub async fn run_daemon_attach(socket_path: PathBuf, bus: Arc<MessageBus>) -> Result<(), String> {
pub async fn run_daemon_attach(
socket_path: PathBuf,
channel: RoomId,
bus: Arc<MessageBus>,
) -> Result<(), String> {
let client = DaemonClient::new(socket_path);
// Owner-core model (airc-daemon/src/server.rs:274): the router
// subscribes per channel — no global fan-out table. AttachRequest
// MUST carry `channel: Some(_)` or the daemon responds
// `attach requires a channel in the owner-core model`. continuum
// discovers the scope's default channel at boot via
// `crate::airc::discover_default_channel` (parses `airc room`).
// Multi-room scopes will spawn one daemon_attach task per channel
// they care about — single-attach today, per-room fan-out as a
// follow-up when continuum rooms become first-class.
let mut stream = client
.attach(AttachRequest::default())
.attach(AttachRequest {
channel: Some(channel),
..AttachRequest::default()
})
.await
.map_err(|error| format!("failed to attach to airc daemon: {error}"))?;

Expand Down
2 changes: 1 addition & 1 deletion src/workers/continuum-core/src/airc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod types;
pub use client::{AircQueueClient, CliAircQueueClient};
#[allow(deprecated)]
pub use daemon_endpoint::default_socket_path_in;
pub use discovery::{discover_airc_socket, DiscoveryError};
pub use discovery::{discover_airc_socket, discover_default_channel, DiscoveryError};
pub use daemon_transport::{AircDaemonClient, DaemonAircEventTransport};
pub use event_transport::{AircEventTransport, StoreAircEventTransport};
pub use inbound_attach::spawn_daemon_attach;
Expand Down
100 changes: 80 additions & 20 deletions src/workers/continuum-core/src/modules/airc.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
//! ServiceModule adapter for Rust-native AIRC commands.

use crate::airc::{
discover_airc_socket, spawn_daemon_attach, AircEventTransport, AircQueueClient,
AircQueueListRequest, AircQueueScanParams, AircRealtimePublishParams, AircRealtimeReplayParams,
AircRealtimeStore, CliAircQueueClient, DaemonAircEventTransport, InMemoryAircRealtimeStore,
StoreAircEventTransport, TokioAircCommandRunner,
discover_airc_socket, discover_default_channel, spawn_daemon_attach, AircEventTransport,
AircQueueClient, AircQueueListRequest, AircQueueScanParams, AircRealtimePublishParams,
AircRealtimeReplayParams, AircRealtimeStore, CliAircQueueClient, DaemonAircEventTransport,
InMemoryAircRealtimeStore, StoreAircEventTransport, TokioAircCommandRunner,
};
// `default_socket_path_in` retained for back-compat callers; deprecated,
// see `crate::airc::daemon_endpoint` module docs.
#[allow(deprecated)]
use crate::airc::default_socket_path_in;
use airc_core::RoomId;
use crate::runtime::{
CommandResult, CommandSchema, ModuleConfig, ModuleContext, ModulePriority, ParamSchema,
ServiceModule,
Expand All @@ -23,6 +24,12 @@ pub struct AircModule {
queue_client: Arc<dyn AircQueueClient>,
event_transport: Arc<dyn AircEventTransport>,
attach_socket_path: Option<std::path::PathBuf>,
/// Channel (room) to attach to at `initialize()`. Required by airc's
/// owner-core router model (`airc-daemon/src/server.rs:274`); without
/// a channel the daemon rejects attach with "attach requires a
/// channel in the owner-core model". Discovered via
/// [`discover_default_channel`] alongside the socket path.
attach_channel: Option<RoomId>,
}

impl AircModule {
Expand All @@ -40,23 +47,23 @@ impl AircModule {
}

/// Discover the airc daemon socket via [`discover_airc_socket`] (asks
/// `airc ipc-endpoint` per airc#1095; auto-installs airc if missing).
/// On discovery failure, returns a degraded module that responds to
/// `airc/*` commands via the in-memory store but performs no daemon
/// attach — so the rest of continuum-core boots even when airc is
/// unreachable (e.g. CI without network for auto-install).
/// `airc ipc-endpoint` per airc#1095; auto-installs airc if missing)
/// AND the default channel via [`discover_default_channel`] (parses
/// `airc room` for the scope's current room channel — required by
/// airc's owner-core router model). On any discovery failure, returns
/// a degraded module that responds to `airc/*` commands via the
/// in-memory store but performs no daemon attach — so the rest of
/// continuum-core boots even when airc is unreachable (e.g. CI
/// without network for auto-install) or the scope has no current
/// room (fresh install before `airc room <name>`).
pub async fn discover_and_construct() -> Self {
match discover_airc_socket().await {
Ok(socket_path) => {
let socket_path = match discover_airc_socket().await {
Ok(path) => {
tracing::info!(
?socket_path,
socket_path = ?path,
"Discovered airc daemon socket via `airc ipc-endpoint`"
);
Self {
queue_client: Arc::new(CliAircQueueClient::new(TokioAircCommandRunner)),
event_transport: Arc::new(DaemonAircEventTransport::new(socket_path.clone())),
attach_socket_path: Some(socket_path),
}
path
}
Err(error) => {
tracing::warn!(
Expand All @@ -66,8 +73,42 @@ impl AircModule {
Resolve: install airc manually or set AIRC_DAEMON_SOCKET; see error \
above for the suggested remedy."
);
Self::with_queue_client(Arc::new(CliAircQueueClient::new(TokioAircCommandRunner)))
return Self::with_queue_client(Arc::new(CliAircQueueClient::new(
TokioAircCommandRunner,
)));
}
};

let attach_channel = match discover_default_channel().await {
Ok(uuid) => {
tracing::info!(
channel = %uuid,
"Discovered airc default channel via `airc room`"
);
Some(RoomId::from_uuid(uuid))
}
Err(error) => {
// Socket reachable but no channel — boot continues with
// queue + realtime commands, just no inbound attach. The
// common case is "fresh install, scope not yet subscribed
// to any room"; the operator runs `airc room <name>` and
// restarts to wire up the attach.
tracing::warn!(
%error,
"airc default-channel discovery failed — AIRC inbound attach disabled. \
Resolve: run `airc room <name>` to subscribe the scope to a room, \
or set AIRC_DEFAULT_CHANNEL=<uuid> to pin a channel explicitly, then \
restart continuum-core."
);
None
}
};

Self {
queue_client: Arc::new(CliAircQueueClient::new(TokioAircCommandRunner)),
event_transport: Arc::new(DaemonAircEventTransport::new(socket_path.clone())),
attach_socket_path: Some(socket_path),
attach_channel,
}
}

Expand All @@ -78,6 +119,7 @@ impl AircModule {
queue_client: Arc::new(CliAircQueueClient::new(TokioAircCommandRunner)),
event_transport: Arc::new(DaemonAircEventTransport::new(socket_path.clone())),
attach_socket_path: Some(socket_path),
attach_channel: None,
}
}

Expand All @@ -88,6 +130,7 @@ impl AircModule {
InMemoryAircRealtimeStore::default(),
))),
attach_socket_path: None,
attach_channel: None,
}
}

Expand All @@ -99,6 +142,7 @@ impl AircModule {
queue_client,
event_transport: Arc::new(StoreAircEventTransport::new(realtime_store)),
attach_socket_path: None,
attach_channel: None,
}
}

Expand All @@ -110,6 +154,7 @@ impl AircModule {
queue_client,
event_transport,
attach_socket_path: None,
attach_channel: None,
}
}
}
Expand All @@ -135,8 +180,23 @@ impl ServiceModule for AircModule {
}

async fn initialize(&self, ctx: &ModuleContext) -> Result<(), String> {
if let Some(socket_path) = self.attach_socket_path.clone() {
spawn_daemon_attach(socket_path, ctx.bus.clone(), &ctx.runtime);
// Inbound attach requires BOTH a socket (where to connect) AND a
// channel (what to subscribe to under airc's owner-core model).
// Either being None disables the attach but lets the rest of
// the module + the broader continuum-core boot — the operator
// sees one of the warnings from `discover_and_construct` so the
// remedy path is obvious.
match (
self.attach_socket_path.clone(),
self.attach_channel,
) {
(Some(socket_path), Some(channel)) => {
spawn_daemon_attach(socket_path, channel, ctx.bus.clone(), &ctx.runtime);
}
(Some(_), None) | (None, Some(_)) | (None, None) => {
// Already warned during construction; stay silent here
// to avoid duplicate noise on every boot.
}
}
Ok(())
}
Expand Down
Loading