From f1ae29926b5edc10a5cf7c8d327bc151bffed34f Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Mon, 4 May 2026 10:47:41 -0400 Subject: [PATCH] fix(tui/remote-inbox): remove legacy Code Everywhere HTTP path --- .github/github-repo-workflow.json | 2 +- code-rs/core/src/config.rs | 173 ++- code-rs/core/src/config/builder.rs | 4 +- code-rs/tui/src/remote_inbox/client.rs | 1514 +----------------------- 4 files changed, 204 insertions(+), 1489 deletions(-) diff --git a/.github/github-repo-workflow.json b/.github/github-repo-workflow.json index aa7797368a59..c0e95ea512fa 100644 --- a/.github/github-repo-workflow.json +++ b/.github/github-repo-workflow.json @@ -37,7 +37,7 @@ "sandbox or execution behavior changes", "release behavior changes", "local overlay behavior changes", - "Code Everywhere behavior changes", + "Every Code remote inbox behavior changes", "upstream mirror behavior changes" ] }, diff --git a/code-rs/core/src/config.rs b/code-rs/core/src/config.rs index 979fc5bddfc2..2ba8acd2bef6 100644 --- a/code-rs/core/src/config.rs +++ b/code-rs/core/src/config.rs @@ -155,17 +155,61 @@ pub(crate) const CONFIG_TOML_FILE: &str = "config.toml"; const DEFAULT_RESPONSES_ORIGINATOR_HEADER: &str = "code_cli_rs"; -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct RemoteInboxConfig { - #[serde(default)] pub enabled: bool, pub bridge_url: Option, - pub code_everywhere_url: Option, pub token: Option, pub host_id: Option, pub host_label: Option, } +#[derive(Deserialize, Default)] +struct RemoteInboxConfigToml { + #[serde(default)] + enabled: bool, + bridge_url: Option, + code_everywhere_url: Option, + token: Option, + host_id: Option, + host_label: Option, +} + +impl<'de> Deserialize<'de> for RemoteInboxConfig { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let raw = RemoteInboxConfigToml::deserialize(deserializer)?; + let bridge_url = raw + .bridge_url + .or_else(|| raw.code_everywhere_url.and_then(migrate_legacy_remote_inbox_url)); + + Ok(Self { + enabled: raw.enabled, + bridge_url, + token: raw.token, + host_id: raw.host_id, + host_label: raw.host_label, + }) + } +} + +fn migrate_legacy_remote_inbox_url(url: String) -> Option { + let trimmed = url.trim(); + if trimmed.starts_with("ws://") || trimmed.starts_with("wss://") { + tracing::warn!( + "remote_inbox.code_everywhere_url is deprecated; use remote_inbox.bridge_url" + ); + return Some(url); + } + + tracing::warn!( + "remote_inbox.code_everywhere_url HTTP polling is no longer supported; configure remote_inbox.bridge_url with the Every Code WebSocket bridge URL" + ); + None +} + fn normalize_auto_drive_routing_reasoning_levels( levels: &[ReasoningEffort], ) -> Vec { @@ -590,6 +634,7 @@ impl Config { let mut root_value = TomlValue::Table(Default::default()); let cli_paths: Vec = cli_overrides.iter().map(|(path, _)| path.clone()).collect(); for (path, value) in cli_overrides { + let (path, value) = normalize_cli_override(path, value); validation::apply_toml_override(&mut root_value, &path, value); } @@ -598,6 +643,19 @@ impl Config { } } +fn normalize_cli_override(path: String, value: TomlValue) -> (String, TomlValue) { + if path == "remote_inbox.code_everywhere_url" + && matches!( + value.as_str().map(str::trim), + Some(url) if url.starts_with("ws://") || url.starts_with("wss://") + ) + { + return ("remote_inbox.bridge_url".to_string(), value); + } + + (path, value) +} + pub fn load_config_as_toml_with_cli_overrides( code_home: &Path, cli_overrides: Vec<(String, TomlValue)>, @@ -1956,7 +2014,6 @@ persistence = "none" [remote_inbox] enabled = true bridge_url = "ws://127.0.0.1:8787/every-code/connect" -code_everywhere_url = "http://127.0.0.1:4789" token = "shared-secret" host_id = "host-mac-studio" host_label = "Mac Studio" @@ -1969,7 +2026,6 @@ host_label = "Mac Studio" Some(RemoteInboxConfig { enabled: true, bridge_url: Some("ws://127.0.0.1:8787/every-code/connect".to_string()), - code_everywhere_url: Some("http://127.0.0.1:4789".to_string()), token: Some("shared-secret".to_string()), host_id: Some("host-mac-studio".to_string()), host_label: Some("Mac Studio".to_string()), @@ -1987,10 +2043,6 @@ host_label = "Mac Studio" resolved.remote_inbox.bridge_url.as_deref(), Some("ws://127.0.0.1:8787/every-code/connect") ); - assert_eq!( - resolved.remote_inbox.code_everywhere_url.as_deref(), - Some("http://127.0.0.1:4789") - ); assert_eq!(resolved.remote_inbox.token.as_deref(), Some("shared-secret")); assert_eq!(resolved.remote_inbox.host_id.as_deref(), Some("host-mac-studio")); assert_eq!(resolved.remote_inbox.host_label.as_deref(), Some("Mac Studio")); @@ -2004,6 +2056,109 @@ host_label = "Mac Studio" Ok(()) } + #[test] + fn remote_inbox_config_accepts_legacy_bridge_url_key() { + let cfg = toml::from_str::( + r#" +[remote_inbox] +enabled = true +code_everywhere_url = "ws://127.0.0.1:8787/every-code/connect" +"#, + ) + .expect("legacy remote inbox key should deserialize"); + + assert_eq!( + cfg.remote_inbox, + Some(RemoteInboxConfig { + enabled: true, + bridge_url: Some("ws://127.0.0.1:8787/every-code/connect".to_string()), + token: None, + host_id: None, + host_label: None, + }) + ); + } + + #[test] + fn remote_inbox_config_prefers_bridge_url_over_legacy_key() { + let cfg = toml::from_str::( + r#" +[remote_inbox] +enabled = true +bridge_url = "ws://127.0.0.1:8787/every-code/connect" +code_everywhere_url = "ws://127.0.0.1:9999/legacy" +"#, + ) + .expect("remote inbox config with both keys should deserialize"); + + assert_eq!( + cfg.remote_inbox + .expect("remote inbox config") + .bridge_url + .as_deref(), + Some("ws://127.0.0.1:8787/every-code/connect") + ); + } + + #[test] + fn remote_inbox_config_rejects_legacy_http_polling_url() { + let cfg = toml::from_str::( + r#" +[remote_inbox] +enabled = true +code_everywhere_url = "http://127.0.0.1:4789" +"#, + ) + .expect("legacy remote inbox HTTP key should deserialize"); + + assert_eq!( + cfg.remote_inbox + .expect("remote inbox config") + .bridge_url + .as_deref(), + None + ); + } + + #[test] + fn remote_inbox_cli_override_accepts_legacy_bridge_url_key() -> std::io::Result<()> { + let code_home = TempDir::new()?; + let config = ConfigBuilder::new() + .with_code_home(code_home.path().to_path_buf()) + .with_cli_overrides(vec![ + ("remote_inbox.enabled".to_string(), TomlValue::Boolean(true)), + ( + "remote_inbox.code_everywhere_url".to_string(), + TomlValue::String("ws://127.0.0.1:8787/every-code/connect".to_string()), + ), + ]) + .load()?; + + assert_eq!( + config.remote_inbox.bridge_url.as_deref(), + Some("ws://127.0.0.1:8787/every-code/connect") + ); + Ok(()) + } + + #[test] + fn remote_inbox_cli_override_does_not_migrate_legacy_http_url() -> std::io::Result<()> { + let code_home = TempDir::new()?; + let config = ConfigBuilder::new() + .with_code_home(code_home.path().to_path_buf()) + .with_cli_overrides(vec![ + ("remote_inbox.enabled".to_string(), TomlValue::Boolean(true)), + ( + "remote_inbox.code_everywhere_url".to_string(), + TomlValue::String("http://127.0.0.1:4789".to_string()), + ), + ]) + .load()?; + + assert_eq!(config.remote_inbox.bridge_url.as_deref(), None); + Ok(()) + } + #[test] fn auto_upgrade_enabled_accepts_string_boolean() { let cfg_true = r#"auto_upgrade_enabled = "true""#; diff --git a/code-rs/core/src/config/builder.rs b/code-rs/core/src/config/builder.rs index 247f80ab776e..8990a18978f6 100644 --- a/code-rs/core/src/config/builder.rs +++ b/code-rs/core/src/config/builder.rs @@ -4,7 +4,7 @@ use toml::Value as TomlValue; use super::sources; use super::validation::{apply_toml_override, deserialize_config_toml_with_cli_warnings}; -use super::{Config, ConfigOverrides, ConfigToml}; +use super::{Config, ConfigOverrides, ConfigToml, normalize_cli_override}; #[derive(Default, Debug, Clone)] pub struct ConfigBuilder { @@ -42,6 +42,7 @@ impl ConfigBuilder { let mut root_value = sources::load_config_as_toml(&code_home)?; let cli_paths: Vec = self.cli_overrides.iter().map(|(path, _)| path.clone()).collect(); for (path, value) in self.cli_overrides.into_iter() { + let (path, value) = normalize_cli_override(path, value); apply_toml_override(&mut root_value, &path, value); } @@ -71,6 +72,7 @@ impl ConfigBuilder { let mut root_value = sources::load_config_as_toml(&code_home)?; let cli_paths: Vec = self.cli_overrides.iter().map(|(path, _)| path.clone()).collect(); for (path, value) in self.cli_overrides.into_iter() { + let (path, value) = normalize_cli_override(path, value); apply_toml_override(&mut root_value, &path, value); } diff --git a/code-rs/tui/src/remote_inbox/client.rs b/code-rs/tui/src/remote_inbox/client.rs index fe210dd58feb..18360056e7d9 100644 --- a/code-rs/tui/src/remote_inbox/client.rs +++ b/code-rs/tui/src/remote_inbox/client.rs @@ -22,17 +22,12 @@ use code_core::protocol::McpToolCallBeginEvent; use code_core::protocol::McpToolCallEndEvent; use code_core::protocol::PatchApplyBeginEvent; use code_core::protocol::PatchApplyEndEvent; -use code_core::protocol::ReviewDecision; use code_core::protocol::TurnDiffEvent; -use code_protocol::request_user_input::RequestUserInputAnswer; use code_protocol::request_user_input::RequestUserInputEvent; -use code_protocol::request_user_input::RequestUserInputResponse; use futures::SinkExt; use futures::StreamExt; use futures::stream::FuturesUnordered; -use serde::Deserialize; use serde_json::Value; -use serde_json::json; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::client::IntoClientRequest; @@ -58,7 +53,6 @@ use crate::remote_inbox::protocol::SessionStatusEvent; use crate::remote_inbox::protocol::RemoteTurnStep; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); -const CODE_EVERYWHERE_POLL_INTERVAL: Duration = Duration::from_secs(2); const RECONNECT_DELAY: Duration = Duration::from_secs(5); const COMMAND_ACCEPT_TIMEOUT: Duration = Duration::from_secs(30); const APPROVAL_DECISION_TIMEOUT: Duration = Duration::from_secs(30); @@ -66,7 +60,6 @@ const MAX_PROCESSED_COMMAND_IDS: usize = 1024; type PendingCommandFuture = Pin + Send>>; type LatestStatusSnapshot = Arc>>; -type SessionPresenceCheck = tokio::task::JoinHandle>; #[derive(Debug, Clone)] pub(crate) struct RemoteInboxSession { @@ -84,13 +77,13 @@ pub(crate) struct RemoteInboxClientHandle { session_id: String, session_epoch: String, latest_status_snapshot: LatestStatusSnapshot, - code_everywhere_timeline_enabled: bool, + timeline_enabled: bool, } struct RemoteInboxClientSink { handle: tokio::task::JoinHandle<()>, status_tx: tokio::sync::mpsc::UnboundedSender, - code_everywhere_timeline_enabled: bool, + timeline_enabled: bool, } impl Drop for RemoteInboxClientHandle { @@ -111,11 +104,11 @@ impl RemoteInboxClientHandle { let mut handles = Vec::with_capacity(sinks.len()); let mut status_txs = Vec::with_capacity(sinks.len()); let mut timeline_status_txs = Vec::new(); - let mut code_everywhere_timeline_enabled = false; + let mut timeline_enabled = false; for sink in sinks { - if sink.code_everywhere_timeline_enabled { - code_everywhere_timeline_enabled = true; + if sink.timeline_enabled { + timeline_enabled = true; timeline_status_txs.push(sink.status_tx.clone()); } handles.push(sink.handle); @@ -129,7 +122,7 @@ impl RemoteInboxClientHandle { session_id, session_epoch, latest_status_snapshot, - code_everywhere_timeline_enabled, + timeline_enabled, } } @@ -170,7 +163,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_exec_command_begin(&self, turn_id: &str, event: &ExecCommandBeginEvent) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -192,7 +185,7 @@ impl RemoteInboxClientHandle { command: &[String], event: &ExecCommandEndEvent, ) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -207,7 +200,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_mcp_tool_begin(&self, turn_id: &str, event: &McpToolCallBeginEvent) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -222,7 +215,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_mcp_tool_end(&self, turn_id: &str, event: &McpToolCallEndEvent) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -241,7 +234,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_custom_tool_begin(&self, turn_id: &str, event: &CustomToolCallBeginEvent) { - if !self.code_everywhere_timeline_enabled || !should_mirror_custom_tool(&event.tool_name) { + if !self.timeline_enabled || !should_mirror_custom_tool(&event.tool_name) { return; } @@ -256,7 +249,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_custom_tool_end(&self, turn_id: &str, event: &CustomToolCallEndEvent) { - if !self.code_everywhere_timeline_enabled || !should_mirror_custom_tool(&event.tool_name) { + if !self.timeline_enabled || !should_mirror_custom_tool(&event.tool_name) { return; } @@ -275,7 +268,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_patch_apply_begin(&self, turn_id: &str, event: &PatchApplyBeginEvent) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -290,7 +283,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_patch_apply_end(&self, turn_id: &str, event: &PatchApplyEndEvent) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -305,7 +298,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_turn_diff(&self, turn_id: &str, event: &TurnDiffEvent) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -320,7 +313,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_image_generation_begin(&self, turn_id: &str, event: &ImageGenerationBeginEvent) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -335,7 +328,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_image_generation_end(&self, turn_id: &str, event: &ImageGenerationEndEvent) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -350,7 +343,7 @@ impl RemoteInboxClientHandle { } pub(crate) fn send_turn_error(&self, turn_id: &str, message: &str) { - if !self.code_everywhere_timeline_enabled { + if !self.timeline_enabled { return; } @@ -502,7 +495,7 @@ pub(crate) fn test_remote_inbox_client_handle( session_id: "session-1".to_string(), session_epoch: "epoch-1".to_string(), latest_status_snapshot, - code_everywhere_timeline_enabled: true, + timeline_enabled: true, }, status_rx, ) @@ -536,24 +529,8 @@ pub(crate) fn spawn_remote_inbox_client( )); } - if let Some(code_everywhere_url) = config - .code_everywhere_url - .clone() - .filter(|url| !url.trim().is_empty()) - { - sinks.push(spawn_code_everywhere_http_client( - code_everywhere_url, - config, - session, - app_event_tx, - latest_status_snapshot.clone(), - )); - } - if sinks.is_empty() { - tracing::warn!( - "remote inbox is enabled but neither bridge_url nor code_everywhere_url is configured" - ); + tracing::warn!("remote inbox is enabled but bridge_url is not configured"); return None; } @@ -596,7 +573,7 @@ fn spawn_websocket_remote_inbox_client( RemoteInboxClientSink { handle, status_tx, - code_everywhere_timeline_enabled: false, + timeline_enabled: true, } } @@ -617,149 +594,6 @@ impl RemoteInboxSession { } } -fn spawn_code_everywhere_http_client( - code_everywhere_url: String, - config: RemoteInboxConfig, - session: RemoteInboxSession, - app_event_tx: AppEventSender, - latest_status_snapshot: LatestStatusSnapshot, -) -> RemoteInboxClientSink { - let (status_tx, mut status_rx) = tokio::sync::mpsc::unbounded_channel(); - let client_latest_status_snapshot = latest_status_snapshot.clone(); - let handle = tokio::spawn(async move { - let http = CodeEverywhereHttpClient::new(code_everywhere_url); - let mut hello_published = false; - if let Err(err) = http - .publish_client_message(&session, &config, ClientMessage::Hello(session.hello(&config))) - .await - { - tracing::warn!("failed to publish Code Everywhere session hello: {err}"); - } else { - hello_published = true; - } - let mut session_presence_check: Option = None; - let mut poll = tokio::time::interval(CODE_EVERYWHERE_POLL_INTERVAL); - loop { - tokio::select! { - maybe_status = status_rx.recv() => { - let Some(status) = maybe_status else { - break; - }; - if is_replayable_status_snapshot(&status) { - match client_latest_status_snapshot.lock() { - Ok(mut latest) => *latest = Some(status.clone()), - Err(err) => tracing::warn!("failed to store Code Everywhere status snapshot: {err}"), - } - } - let result = if hello_published { - http.publish_client_message(&session, &config, status).await - } else { - http.publish_client_messages( - &session, - &config, - [ClientMessage::Hello(session.hello(&config)), status], - ) - .await - }; - if let Err(err) = result { - tracing::warn!("failed to publish Code Everywhere status event: {err}"); - hello_published = false; - } else { - hello_published = true; - } - } - _ = poll.tick() => { - match http.claim_commands(&session, &config).await { - Ok(commands) => { - for command in commands { - let queue_new_session = matches!(command, CodeEverywhereCommand::NewSession { .. }); - let outcome = if queue_new_session { - CodeEverywhereCommandOutcome { - command_id: command.id().to_string(), - command_kind: command.kind(), - status: "accepted", - reason: None, - accepted_resolution: None, - } - } else { - dispatch_code_everywhere_command(command, &session, &app_event_tx).await - }; - let new_session_command_id = if queue_new_session { - Some(outcome.command_id.clone()) - } else { - None - }; - if let Err(err) = http - .publish_command_outcome(&session, &config, outcome) - .await - { - tracing::warn!("failed to publish Code Everywhere command outcome: {err}"); - hello_published = false; - } else if let Some(command_id) = new_session_command_id { - queue_remote_new_session( - &app_event_tx, - command_id, - Some("Code Everywhere".to_string()), - ); - } - } - } - Err(err) => tracing::debug!("failed to claim Code Everywhere commands: {err}"), - } - if session_presence_check - .as_ref() - .is_some_and(tokio::task::JoinHandle::is_finished) - { - match session_presence_check.take().expect("finished check").await { - Ok(Ok(true)) => {} - Ok(Ok(false)) => { - if let Err(err) = http.publish_client_messages( - &session, - &config, - session_republish_messages( - &session, - &config, - &client_latest_status_snapshot, - ), - ) - .await - { - tracing::debug!("failed to republish Code Everywhere session hello: {err}"); - hello_published = false; - } else { - hello_published = true; - } - } - Ok(Err(err)) => { - tracing::debug!("failed to check Code Everywhere session snapshot: {err}"); - hello_published = false; - } - Err(err) => { - tracing::debug!("Code Everywhere session snapshot check task failed: {err}"); - hello_published = false; - } - } - } - if session_presence_check.is_none() { - let check_config = config.clone(); - let check_http = http.clone(); - let check_session = session.clone(); - session_presence_check = Some(tokio::spawn(async move { - check_http.session_is_present(&check_session, &check_config).await - })); - } - } - } - } - }); - - RemoteInboxClientSink { - handle, - status_tx, - code_everywhere_timeline_enabled: true, - } -} - impl RemoteInboxSession { fn hello(&self, config: &RemoteInboxConfig) -> SessionHello { SessionHello { @@ -778,430 +612,6 @@ impl RemoteInboxSession { } } -fn session_republish_messages( - session: &RemoteInboxSession, - config: &RemoteInboxConfig, - latest_status_snapshot: &LatestStatusSnapshot, -) -> Vec { - let mut messages = vec![ClientMessage::Hello(session.hello(config))]; - if let Some(latest_status) = latest_status_snapshot_message(latest_status_snapshot) { - messages.push(latest_status); - } - messages -} - -fn latest_status_snapshot_message( - latest_status_snapshot: &LatestStatusSnapshot, -) -> Option { - match latest_status_snapshot.lock() { - Ok(latest) => latest.clone(), - Err(err) => { - tracing::warn!("failed to read remote inbox status snapshot: {err}"); - None - } - } -} - -#[derive(Clone)] -struct CodeEverywhereHttpClient { - base_url: String, - client: reqwest::Client, -} - -impl CodeEverywhereHttpClient { - fn new(base_url: String) -> Self { - Self { - base_url, - client: reqwest::Client::new(), - } - } - - async fn publish_client_message( - &self, - session: &RemoteInboxSession, - config: &RemoteInboxConfig, - message: ClientMessage, - ) -> Result<(), reqwest::Error> { - self.publish_client_messages(session, config, [message]).await - } - - async fn publish_client_messages( - &self, - session: &RemoteInboxSession, - config: &RemoteInboxConfig, - messages: impl IntoIterator, - ) -> Result<(), reqwest::Error> { - let events = messages - .into_iter() - .flat_map(|message| code_everywhere_events_for_client_message(session, config, message)) - .collect::>(); - if events.is_empty() { - return Ok(()); - } - - self.publish_events(config, events).await - } - - async fn publish_command_outcome( - &self, - session: &RemoteInboxSession, - config: &RemoteInboxConfig, - outcome: CodeEverywhereCommandOutcome, - ) -> Result<(), reqwest::Error> { - self.publish_events(config, code_everywhere_command_events(session, outcome)) - .await - } - - async fn publish_events( - &self, - config: &RemoteInboxConfig, - events: Vec, - ) -> Result<(), reqwest::Error> { - self.apply_auth( - self.client - .post(local_http_url(&self.base_url, "events")) - .json(&json!({ "events": events })), - config, - ) - .send() - .await? - .error_for_status()?; - Ok(()) - } - - async fn session_is_present( - &self, - session: &RemoteInboxSession, - config: &RemoteInboxConfig, - ) -> Result { - let snapshot = self - .apply_auth( - self.client.get(local_http_url(&self.base_url, "snapshot")), - config, - ) - .send() - .await? - .error_for_status()? - .json::() - .await?; - - Ok(snapshot.sessions.into_iter().any(|candidate| { - candidate.session_id == session.session_id && candidate.session_epoch == session.session_epoch - })) - } - - async fn claim_commands( - &self, - session: &RemoteInboxSession, - config: &RemoteInboxConfig, - ) -> Result, reqwest::Error> { - let response = self - .apply_auth( - self.client - .post(local_http_url(&self.base_url, "commands/claim")) - .json(&json!({ "sessionId": session.session_id })), - config, - ) - .send() - .await? - .error_for_status()? - .json::() - .await?; - - Ok(response - .commands - .into_iter() - .filter_map(|record| parse_code_everywhere_command(record, session)) - .collect()) - } - - fn apply_auth( - &self, - request: reqwest::RequestBuilder, - config: &RemoteInboxConfig, - ) -> reqwest::RequestBuilder { - if let Some(header) = code_everywhere_auth_header_value(config) { - request.header(reqwest::header::AUTHORIZATION, header) - } else { - request - } - } -} - -fn code_everywhere_auth_header_value(config: &RemoteInboxConfig) -> Option { - let token = config.token.as_ref()?.trim(); - if token.is_empty() { - None - } else { - Some(format!("Bearer {token}")) - } -} - -#[derive(Debug, Deserialize)] -struct CodeEverywhereSnapshot { - sessions: Vec, -} - -#[derive(Debug, Deserialize)] -struct CodeEverywhereSessionSnapshot { - #[serde(rename = "sessionId")] - session_id: String, - #[serde(rename = "sessionEpoch")] - session_epoch: String, -} - -fn local_http_url(base_url: &str, path: &str) -> String { - let Ok(mut url) = url::Url::parse(base_url) else { - return format!("{}/{}", base_url.trim_end_matches('/'), path); - }; - let base_path = if url.path().ends_with('/') { - &url.path()[..url.path().len() - 1] - } else { - url.path() - }; - url.set_path(&format!("{base_path}/{path}")); - url.to_string() -} - -fn code_everywhere_events_for_client_message( - session: &RemoteInboxSession, - config: &RemoteInboxConfig, - message: ClientMessage, -) -> Vec { - let now = chrono::Utc::now().to_rfc3339(); - match message { - ClientMessage::Hello(hello) => { - let mut session = json!({ - "sessionId": hello.session_id, - "sessionEpoch": hello.session_epoch, - "hostLabel": hello.host_label, - "cwd": hello.cwd, - "branch": hello.branch, - "pid": hello.pid, - "model": "code", - "status": "idle", - "summary": "Connected to Code Everywhere.", - "startedAt": now, - "updatedAt": now, - "currentTurnId": null, - }); - if let Some(host_id) = hello.host_id { - session["hostId"] = json!(host_id); - } - - vec![json!({ - "kind": "session_hello", - "session": session, - })] - } - ClientMessage::StatusChanged(status) => status_changed_events(status, now), - ClientMessage::TurnStep(step) => vec![turn_step_event(step, now)], - ClientMessage::TurnComplete(status) => turn_complete_events(status, now), - ClientMessage::Error(status) => vec![session_status_event(status, "error", now)], - ClientMessage::ApprovalRequest(request) => vec![json!({ - "kind": "approval_requested", - "approval": { - "id": request.approval_id, - "sessionId": request.session_id, - "sessionEpoch": request.session_epoch, - "turnId": request.turn_id, - "title": "Approval required", - "body": request.reason.unwrap_or_else(|| "Approve the requested command.".to_string()), - "command": shell_command_label(&request.command), - "cwd": request.cwd, - "risk": "medium", - "requestedAt": now, - } - })], - ClientMessage::RequestUserInput(request) => vec![json!({ - "kind": "user_input_requested", - "input": { - "id": request.call_id, - "sessionId": request.session_id, - "sessionEpoch": request.session_epoch, - "turnId": request.turn_id, - "title": "Input requested", - "requestedAt": now, - "questions": request.questions.into_iter().map(code_everywhere_question).collect::>(), - } - })], - ClientMessage::Heartbeat(_) - | ClientMessage::UserMessage(_) - | ClientMessage::CommandAck(_) - | ClientMessage::CommandReject(_) - | ClientMessage::ApprovalDecisionAck(_) - | ClientMessage::ApprovalDecisionReject(_) => { - let _ = (session, config); - Vec::new() - } - } -} - -fn turn_step_event(step: RemoteTurnStep, timestamp: String) -> Value { - json!({ - "kind": "turn_step_added", - "sessionId": step.session_id, - "sessionEpoch": step.session_epoch, - "turnId": step.turn_id, - "step": { - "id": step.step_id, - "kind": step.kind, - "title": step.title, - "detail": step.detail, - "timestamp": timestamp, - "state": step.state, - } - }) -} - -fn status_changed_events(status: SessionStatusEvent, updated_at: String) -> Vec { - if status.message.as_deref() == Some("Turn started") { - if let Some(turn_id) = status.turn_id.clone() { - return vec![json!({ - "kind": "turn_started", - "sessionEpoch": status.session_epoch, - "turn": { - "id": turn_id, - "sessionId": status.session_id, - "title": "Every Code turn", - "status": "running", - "actor": "assistant", - "startedAt": updated_at, - "completedAt": null, - "summary": status.message.unwrap_or_else(|| "Turn started".to_string()), - "steps": [], - } - })]; - } - } - - vec![session_status_event(status, "running", updated_at)] -} - -fn turn_complete_events(status: SessionStatusEvent, updated_at: String) -> Vec { - let Some(turn_id) = status.turn_id.clone() else { - return vec![session_status_event(status, "idle", updated_at)]; - }; - let session_id = status.session_id.clone(); - let session_epoch = status.session_epoch.clone(); - - let mut events = Vec::new(); - if let Some(assistant_message) = status - .assistant_message - .clone() - .filter(|message| !message.trim().is_empty()) - { - events.push(json!({ - "kind": "turn_step_added", - "sessionId": session_id, - "sessionEpoch": session_epoch, - "turnId": turn_id.clone(), - "step": { - "id": format!("{turn_id}:assistant-message"), - "kind": "message", - "title": "Assistant message", - "detail": assistant_message, - "timestamp": updated_at.clone(), - "state": "completed", - } - })); - } - - events.push(json!({ - "kind": "turn_status_changed", - "sessionId": status.session_id, - "sessionEpoch": status.session_epoch, - "turnId": turn_id, - "status": "completed", - "summary": status.message.unwrap_or_else(|| "Turn complete.".to_string()), - "completedAt": updated_at, - })); - events -} - -fn session_status_event(status: SessionStatusEvent, state: &str, updated_at: String) -> Value { - json!({ - "kind": "session_status_changed", - "sessionId": status.session_id, - "sessionEpoch": status.session_epoch, - "status": state, - "summary": status.message.unwrap_or_else(|| "Session status changed.".to_string()), - "updatedAt": updated_at, - }) -} - -fn code_everywhere_command_events( - session: &RemoteInboxSession, - outcome: CodeEverywhereCommandOutcome, -) -> Vec { - let mut events = vec![code_everywhere_command_outcome_event(session, &outcome)]; - if outcome.status == "accepted" { - if let Some(resolution) = outcome.accepted_resolution { - events.push(code_everywhere_pending_work_resolved_event(session, resolution)); - } - } - events -} - -fn code_everywhere_command_outcome_event( - session: &RemoteInboxSession, - outcome: &CodeEverywhereCommandOutcome, -) -> Value { - json!({ - "kind": "command_outcome", - "outcome": { - "commandId": outcome.command_id, - "sessionId": session.session_id, - "sessionEpoch": session.session_epoch, - "commandKind": outcome.command_kind, - "status": outcome.status, - "reason": outcome.reason, - "handledAt": chrono::Utc::now().to_rfc3339(), - } - }) -} - -fn code_everywhere_pending_work_resolved_event( - session: &RemoteInboxSession, - resolution: CodeEverywherePendingWorkResolution, -) -> Value { - let resolved_at = chrono::Utc::now().to_rfc3339(); - match resolution { - CodeEverywherePendingWorkResolution::Approval { - approval_id, - decision, - } => json!({ - "kind": "approval_resolved", - "sessionId": session.session_id, - "sessionEpoch": session.session_epoch, - "approvalId": approval_id, - "decision": decision, - "resolvedAt": resolved_at, - }), - CodeEverywherePendingWorkResolution::RequestedInput { input_id } => json!({ - "kind": "user_input_resolved", - "sessionId": session.session_id, - "sessionEpoch": session.session_epoch, - "inputId": input_id, - "resolvedAt": resolved_at, - }), - } -} - -fn code_everywhere_question(question: code_protocol::request_user_input::RequestUserInputQuestion) -> Value { - json!({ - "id": question.id, - "label": question.header, - "prompt": question.question, - "required": !question.is_other, - "options": question.options.unwrap_or_default().into_iter().map(|option| json!({ - "label": option.label, - "value": option.label, - "description": option.description, - })).collect::>(), - }) -} - fn shell_command_label(command: &[String]) -> String { shlex::try_join(command.iter().map(String::as_str)).unwrap_or_else(|_| command.join(" ")) } @@ -1405,365 +815,6 @@ fn truncate_chars(value: &str, max_chars: usize) -> String { format!("{tail}...") } -#[derive(Debug, Deserialize)] -struct CodeEverywhereCommandClaim { - commands: Vec, -} - -#[derive(Debug, Deserialize)] -struct CodeEverywhereCommandRecord { - id: String, - command: CodeEverywhereCommandPayload, -} - -#[derive(Debug, Deserialize)] -#[serde(tag = "kind", rename_all = "snake_case")] -enum CodeEverywhereCommandPayload { - Reply { - #[serde(rename = "sessionId")] - session_id: String, - #[serde(rename = "sessionEpoch")] - session_epoch: String, - content: String, - }, - ContinueAutonomously { - #[serde(rename = "sessionId")] - session_id: String, - #[serde(rename = "sessionEpoch")] - session_epoch: String, - }, - PauseCurrentTurn { - #[serde(rename = "sessionId")] - session_id: String, - #[serde(rename = "sessionEpoch")] - session_epoch: String, - }, - NewSession { - #[serde(rename = "sessionId")] - session_id: String, - #[serde(rename = "sessionEpoch")] - session_epoch: String, - }, - EndSession { - #[serde(rename = "sessionId")] - session_id: String, - #[serde(rename = "sessionEpoch")] - session_epoch: String, - }, - StatusRequest { - #[serde(rename = "sessionId")] - session_id: String, - #[serde(rename = "sessionEpoch")] - session_epoch: String, - }, - ApprovalDecision { - #[serde(rename = "sessionId")] - session_id: String, - #[serde(rename = "sessionEpoch")] - session_epoch: String, - #[serde(rename = "approvalId")] - approval_id: String, - decision: String, - }, - RequestUserInputResponse { - #[serde(rename = "sessionId")] - session_id: String, - #[serde(rename = "sessionEpoch")] - session_epoch: String, - #[serde(rename = "inputId")] - input_id: Option, - #[serde(rename = "turnId")] - turn_id: String, - answers: Vec, - }, -} - -#[derive(Debug, Deserialize)] -struct CodeEverywhereRequestedInputAnswer { - #[serde(rename = "questionId")] - question_id: String, - value: String, -} - -enum CodeEverywhereCommand { - Stale { - id: String, - command_kind: &'static str, - reason: String, - }, - Reply { - id: String, - text: String, - }, - ContinueAutonomously { - id: String, - }, - PauseCurrentTurn { - id: String, - }, - NewSession { - id: String, - }, - EndSession { - id: String, - }, - StatusRequest { - id: String, - }, - ApprovalDecision { - id: String, - approval_id: String, - decision: ReviewDecision, - }, - RequestUserInputResponse { - id: String, - input_id: Option, - turn_id: String, - response: RequestUserInputResponse, - }, -} - -struct CodeEverywhereCommandOutcome { - command_id: String, - command_kind: &'static str, - status: &'static str, - reason: Option, - accepted_resolution: Option, -} - -enum CodeEverywherePendingWorkResolution { - Approval { - approval_id: String, - decision: &'static str, - }, - RequestedInput { - input_id: String, - }, -} - -impl CodeEverywhereCommand { - fn id(&self) -> &str { - match self { - CodeEverywhereCommand::Stale { id, .. } - | CodeEverywhereCommand::Reply { id, .. } - | CodeEverywhereCommand::ContinueAutonomously { id } - | CodeEverywhereCommand::PauseCurrentTurn { id } - | CodeEverywhereCommand::NewSession { id } - | CodeEverywhereCommand::EndSession { id } - | CodeEverywhereCommand::StatusRequest { id } - | CodeEverywhereCommand::ApprovalDecision { id, .. } - | CodeEverywhereCommand::RequestUserInputResponse { id, .. } => id, - } - } - - fn kind(&self) -> &'static str { - match self { - CodeEverywhereCommand::Stale { command_kind, .. } => command_kind, - CodeEverywhereCommand::Reply { .. } => "reply", - CodeEverywhereCommand::ContinueAutonomously { .. } => "continue_autonomously", - CodeEverywhereCommand::PauseCurrentTurn { .. } => "pause_current_turn", - CodeEverywhereCommand::NewSession { .. } => "new_session", - CodeEverywhereCommand::EndSession { .. } => "end_session", - CodeEverywhereCommand::StatusRequest { .. } => "status_request", - CodeEverywhereCommand::ApprovalDecision { .. } => "approval_decision", - CodeEverywhereCommand::RequestUserInputResponse { .. } => "request_user_input_response", - } - } - - fn accepted_resolution(&self) -> Option { - match self { - CodeEverywhereCommand::ApprovalDecision { - approval_id, - decision, - .. - } => Some(CodeEverywherePendingWorkResolution::Approval { - approval_id: approval_id.clone(), - decision: match decision { - ReviewDecision::Approved | ReviewDecision::ApprovedForSession => "approve", - ReviewDecision::Denied | ReviewDecision::Abort => "deny", - }, - }), - CodeEverywhereCommand::RequestUserInputResponse { - input_id: Some(input_id), - .. - } => Some(CodeEverywherePendingWorkResolution::RequestedInput { - input_id: input_id.clone(), - }), - _ => None, - } - } -} - -fn parse_code_everywhere_command( - record: CodeEverywhereCommandRecord, - session: &RemoteInboxSession, -) -> Option { - let command_kind = record.command.kind(); - let (session_id, session_epoch) = record.command.session_scope(); - if !matches_session(session_id, session_epoch, session) { - return Some(CodeEverywhereCommand::Stale { - id: record.id, - command_kind, - reason: stale_command_reason(session_id, session_epoch, session), - }); - } - - match record.command { - CodeEverywhereCommandPayload::Reply { - session_id: _, - session_epoch: _, - content, - } => Some(CodeEverywhereCommand::Reply { - id: record.id, - text: content, - }), - CodeEverywhereCommandPayload::ContinueAutonomously { - session_id: _, - session_epoch: _, - } => Some(CodeEverywhereCommand::ContinueAutonomously { - id: record.id, - }), - CodeEverywhereCommandPayload::PauseCurrentTurn { - session_id: _, - session_epoch: _, - } => Some(CodeEverywhereCommand::PauseCurrentTurn { - id: record.id, - }), - CodeEverywhereCommandPayload::NewSession { - session_id: _, - session_epoch: _, - } => Some(CodeEverywhereCommand::NewSession { - id: record.id, - }), - CodeEverywhereCommandPayload::EndSession { - session_id: _, - session_epoch: _, - } => Some(CodeEverywhereCommand::EndSession { - id: record.id, - }), - CodeEverywhereCommandPayload::StatusRequest { - session_id: _, - session_epoch: _, - } => Some(CodeEverywhereCommand::StatusRequest { - id: record.id, - }), - CodeEverywhereCommandPayload::ApprovalDecision { - session_id: _, - session_epoch: _, - approval_id, - decision, - } => { - let decision = match decision.as_str() { - "approve" => ReviewDecision::Approved, - "deny" => ReviewDecision::Denied, - _ => return None, - }; - Some(CodeEverywhereCommand::ApprovalDecision { - id: record.id, - approval_id, - decision, - }) - } - CodeEverywhereCommandPayload::RequestUserInputResponse { - session_id: _, - session_epoch: _, - input_id, - turn_id, - answers, - } => { - let response = RequestUserInputResponse { - answers: answers - .into_iter() - .map(|answer| { - ( - answer.question_id, - RequestUserInputAnswer { - answers: vec![answer.value], - }, - ) - }) - .collect::>(), - }; - Some(CodeEverywhereCommand::RequestUserInputResponse { - id: record.id, - input_id, - turn_id, - response, - }) - } - } -} - -impl CodeEverywhereCommandPayload { - fn kind(&self) -> &'static str { - match self { - CodeEverywhereCommandPayload::Reply { .. } => "reply", - CodeEverywhereCommandPayload::ContinueAutonomously { .. } => "continue_autonomously", - CodeEverywhereCommandPayload::PauseCurrentTurn { .. } => "pause_current_turn", - CodeEverywhereCommandPayload::NewSession { .. } => "new_session", - CodeEverywhereCommandPayload::EndSession { .. } => "end_session", - CodeEverywhereCommandPayload::StatusRequest { .. } => "status_request", - CodeEverywhereCommandPayload::ApprovalDecision { .. } => "approval_decision", - CodeEverywhereCommandPayload::RequestUserInputResponse { .. } => "request_user_input_response", - } - } - - fn session_scope(&self) -> (&str, &str) { - match self { - CodeEverywhereCommandPayload::Reply { - session_id, - session_epoch, - .. - } - | CodeEverywhereCommandPayload::ContinueAutonomously { - session_id, - session_epoch, - } - | CodeEverywhereCommandPayload::PauseCurrentTurn { - session_id, - session_epoch, - } - | CodeEverywhereCommandPayload::NewSession { - session_id, - session_epoch, - } - | CodeEverywhereCommandPayload::EndSession { - session_id, - session_epoch, - } - | CodeEverywhereCommandPayload::StatusRequest { - session_id, - session_epoch, - } - | CodeEverywhereCommandPayload::ApprovalDecision { - session_id, - session_epoch, - .. - } - | CodeEverywhereCommandPayload::RequestUserInputResponse { - session_id, - session_epoch, - .. - } => (session_id, session_epoch), - } - } -} - -fn stale_command_reason(session_id: &str, session_epoch: &str, session: &RemoteInboxSession) -> String { - format!( - "stale session scope: command targeted session {session_id} epoch {session_epoch}, active session is {} epoch {}", - session.session_id, session.session_epoch - ) -} - -fn matches_session(session_id: &str, session_epoch: &str, session: &RemoteInboxSession) -> bool { - if session_id != session.session_id || session_epoch != session.session_epoch { - tracing::warn!(session_id, session_epoch, "ignoring stale Code Everywhere command"); - return false; - } - true -} - fn queue_remote_new_session( app_event_tx: &AppEventSender, command_id: String, @@ -1777,142 +828,6 @@ fn queue_remote_new_session( }) } -async fn dispatch_code_everywhere_command( - command: CodeEverywhereCommand, - session: &RemoteInboxSession, - app_event_tx: &AppEventSender, -) -> CodeEverywhereCommandOutcome { - let command_id = command.id().to_string(); - let command_kind = command.kind(); - let accepted_resolution = command.accepted_resolution(); - let result = match command { - CodeEverywhereCommand::Stale { reason, .. } => Err(reason), - CodeEverywhereCommand::Reply { id, text } => { - let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - if !app_event_tx.send_with_result(AppEvent::RemoteInboxReply { - command_id: id, - text, - issued_by: Some("Code Everywhere".to_string()), - response_tx: Redacted(response_tx), - }) { - Err("app event channel is closed".to_string()) - } else { - wait_for_local_command_acceptance(response_rx).await - } - } - CodeEverywhereCommand::ContinueAutonomously { id } => { - let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - if !app_event_tx.send_with_result(AppEvent::RemoteInboxContinueAutonomously { - command_id: id, - issued_by: Some("Code Everywhere".to_string()), - response_tx: Redacted(response_tx), - }) { - Err("app event channel is closed".to_string()) - } else { - wait_for_local_command_acceptance(response_rx).await - } - } - CodeEverywhereCommand::PauseCurrentTurn { id } => { - let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - if !app_event_tx.send_with_result(AppEvent::RemoteInboxPauseCurrentTurn { - command_id: id, - issued_by: Some("Code Everywhere".to_string()), - response_tx: Redacted(response_tx), - }) { - Err("app event channel is closed".to_string()) - } else { - wait_for_local_command_acceptance(response_rx).await - } - } - CodeEverywhereCommand::NewSession { id } => { - if !queue_remote_new_session(app_event_tx, id, Some("Code Everywhere".to_string())) { - Err("app event channel is closed".to_string()) - } else { - Ok(()) - } - } - CodeEverywhereCommand::EndSession { .. } => { - if app_event_tx.send_with_result(AppEvent::ExitRequest) { - Ok(()) - } else { - Err("app event channel is closed".to_string()) - } - } - CodeEverywhereCommand::StatusRequest { .. } => { - tracing::info!( - session_id = session.session_id, - session_epoch = session.session_epoch, - "Code Everywhere requested status" - ); - Ok(()) - } - CodeEverywhereCommand::ApprovalDecision { - id: _, - approval_id, - decision, - } => { - let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - if !app_event_tx.send_with_result(AppEvent::RemoteInboxApprovalDecision { - approval_id, - decision, - response_tx: Redacted(response_tx), - }) { - Err("app event channel is closed".to_string()) - } else { - wait_for_local_command_acceptance(response_rx).await - } - } - CodeEverywhereCommand::RequestUserInputResponse { - id, - input_id: _, - turn_id, - response, - } => { - let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - if !app_event_tx.send_with_result(AppEvent::RemoteInboxRequestUserInputAnswer { - command_id: id, - call_id: None, - turn_id, - response, - issued_by: Some("Code Everywhere".to_string()), - response_tx: Redacted(response_tx), - }) { - Err("app event channel is closed".to_string()) - } else { - wait_for_local_command_acceptance(response_rx).await - } - } - }; - - match result { - Ok(()) => CodeEverywhereCommandOutcome { - command_id, - command_kind, - status: "accepted", - reason: None, - accepted_resolution, - }, - Err(reason) => CodeEverywhereCommandOutcome { - command_id, - command_kind, - status: "rejected", - reason: Some(reason), - accepted_resolution: None, - }, - } -} - -async fn wait_for_local_command_acceptance( - response_rx: tokio::sync::oneshot::Receiver>, -) -> Result<(), String> { - match tokio::time::timeout(COMMAND_ACCEPT_TIMEOUT, response_rx).await { - Ok(Ok(Ok(()))) => Ok(()), - Ok(Ok(Err(reason))) => Err(reason), - Ok(Err(_)) => Err("remote inbox command acceptance was canceled".to_string()), - Err(_) => Err("timed out waiting for app to accept command".to_string()), - } -} - async fn connect_once( bridge_url: &str, config: &RemoteInboxConfig, @@ -1932,23 +847,7 @@ async fn connect_once( tracing::info!("remote inbox connected"); let (mut write, mut read) = ws.split(); - send_json( - &mut write, - &ClientMessage::Hello(SessionHello { - session_id: session.session_id.clone(), - session_epoch: session.session_epoch.clone(), - host_id: config.host_id.clone().filter(|id| !id.trim().is_empty()), - host_label: config - .host_label - .clone() - .filter(|label| !label.trim().is_empty()) - .unwrap_or_else(|| "Every Code".to_string()), - cwd: session.cwd.clone(), - branch: session.branch.clone(), - pid: session.pid, - }), - ) - .await?; + send_json(&mut write, &ClientMessage::Hello(session.hello(config))).await?; send_latest_status_snapshot_if_idle(&mut write, status_rx, latest_status_snapshot).await?; let mut heartbeat = tokio::time::interval(HEARTBEAT_INTERVAL); @@ -2621,28 +1520,28 @@ mod tests { } #[test] - fn client_handle_fans_out_status_and_routes_timeline_only_to_code_everywhere() { - let (bridge_tx, mut bridge_rx) = tokio::sync::mpsc::unbounded_channel(); - let (code_everywhere_tx, mut code_everywhere_rx) = tokio::sync::mpsc::unbounded_channel(); + fn client_handle_fans_out_status_and_routes_timeline_to_enabled_sinks() { + let (status_only_tx, mut status_only_rx) = tokio::sync::mpsc::unbounded_channel(); + let (timeline_tx, mut timeline_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = RemoteInboxClientHandle { handles: Vec::new(), - status_txs: vec![bridge_tx, code_everywhere_tx.clone()], - timeline_status_txs: vec![code_everywhere_tx], + status_txs: vec![status_only_tx, timeline_tx.clone()], + timeline_status_txs: vec![timeline_tx], session_id: "session-1".to_string(), session_epoch: "epoch-1".to_string(), latest_status_snapshot: Arc::new(Mutex::new(None)), - code_everywhere_timeline_enabled: true, + timeline_enabled: true, }; handle.send_waiting_for_model(); assert!(matches!( - bridge_rx.try_recv().expect("bridge status"), + status_only_rx.try_recv().expect("status-only sink status"), ClientMessage::StatusChanged(_) )); assert!(matches!( - code_everywhere_rx + timeline_rx .try_recv() - .expect("Code Everywhere status"), + .expect("timeline sink status"), ClientMessage::StatusChanged(_) )); @@ -2657,11 +1556,11 @@ mod tests { state: "running".to_string(), })); - assert!(bridge_rx.try_recv().is_err()); + assert!(status_only_rx.try_recv().is_err()); assert!(matches!( - code_everywhere_rx + timeline_rx .try_recv() - .expect("Code Everywhere timeline step"), + .expect("timeline sink step"), ClientMessage::TurnStep(step) if step.step_id == "turn-1:tool:call-1" )); } @@ -2776,320 +1675,6 @@ mod tests { assert_eq!(step.detail, "failure"); } - #[test] - fn code_everywhere_url_join_preserves_base_path() { - assert_eq!( - local_http_url("http://127.0.0.1:4789", "commands/claim"), - "http://127.0.0.1:4789/commands/claim" - ); - assert_eq!( - local_http_url("http://127.0.0.1:4789/local/", "events"), - "http://127.0.0.1:4789/local/events" - ); - } - - #[test] - fn code_everywhere_http_auth_header_uses_remote_inbox_token() { - let config = RemoteInboxConfig { - enabled: true, - bridge_url: None, - code_everywhere_url: Some("http://127.0.0.1:4789".to_string()), - token: Some(" apple-proof-token ".to_string()), - host_id: None, - host_label: None, - }; - - assert_eq!( - code_everywhere_auth_header_value(&config).as_deref(), - Some("Bearer apple-proof-token") - ); - - let mut blank_config = config; - blank_config.token = Some(" ".to_string()); - assert_eq!(code_everywhere_auth_header_value(&blank_config), None); - } - - #[test] - fn code_everywhere_maps_remote_inbox_events() { - let session = test_session(); - let config = RemoteInboxConfig { - enabled: true, - bridge_url: None, - code_everywhere_url: Some("http://127.0.0.1:4789".to_string()), - token: None, - host_id: Some("host-mac-studio".to_string()), - host_label: Some("Mac Studio".to_string()), - }; - let hello = ClientMessage::Hello(session.hello(&config)); - let events = code_everywhere_events_for_client_message(&session, &config, hello); - - assert_eq!(events[0]["kind"], "session_hello"); - assert_eq!(events[0]["session"]["sessionId"], "session-1"); - assert_eq!(events[0]["session"]["hostId"], "host-mac-studio"); - assert_eq!(events[0]["session"]["hostLabel"], "Mac Studio"); - - let approval = ClientMessage::ApprovalRequest(RemoteApprovalRequest { - approval_id: "approval-1".to_string(), - call_id: "call-1".to_string(), - turn_id: "turn-1".to_string(), - session_id: "session-1".to_string(), - session_epoch: "epoch-1".to_string(), - command: vec!["pnpm".to_string(), "test".to_string()], - cwd: "/tmp/project".to_string(), - reason: Some("Run tests".to_string()), - }); - let events = code_everywhere_events_for_client_message(&session, &config, approval); - assert_eq!(events[0]["kind"], "approval_requested"); - assert_eq!(events[0]["approval"]["command"], "pnpm test"); - } - - #[test] - fn code_everywhere_maps_turn_lifecycle_events() { - let session = test_session(); - let config = RemoteInboxConfig { - enabled: true, - bridge_url: None, - code_everywhere_url: Some("http://127.0.0.1:4789".to_string()), - token: None, - host_id: None, - host_label: Some("Mac Studio".to_string()), - }; - - let mut started = status_event("Turn started", None); - started.turn_id = Some("turn-1".to_string()); - let events = code_everywhere_events_for_client_message( - &session, - &config, - ClientMessage::StatusChanged(started), - ); - assert_eq!(events[0]["kind"], "turn_started"); - assert_eq!(events[0]["sessionEpoch"], "epoch-1"); - assert_eq!(events[0]["turn"]["id"], "turn-1"); - assert_eq!(events[0]["turn"]["status"], "running"); - - let mut complete = status_event("Turn complete", Some("Done.")); - complete.turn_id = Some("turn-1".to_string()); - let events = code_everywhere_events_for_client_message( - &session, - &config, - ClientMessage::TurnComplete(complete), - ); - assert_eq!(events[0]["kind"], "turn_step_added"); - assert_eq!(events[0]["turnId"], "turn-1"); - assert_eq!(events[0]["step"]["id"], "turn-1:assistant-message"); - assert_eq!(events[0]["step"]["detail"], "Done."); - assert_eq!(events[1]["kind"], "turn_status_changed"); - assert_eq!(events[1]["turnId"], "turn-1"); - assert_eq!(events[1]["status"], "completed"); - } - - #[test] - fn code_everywhere_maps_turn_tool_steps() { - let session = test_session(); - let config = RemoteInboxConfig { - enabled: true, - bridge_url: None, - code_everywhere_url: Some("http://127.0.0.1:4789".to_string()), - token: None, - host_id: None, - host_label: Some("Mac Studio".to_string()), - }; - let events = code_everywhere_events_for_client_message( - &session, - &config, - ClientMessage::TurnStep(RemoteTurnStep { - session_id: "session-1".to_string(), - session_epoch: "epoch-1".to_string(), - turn_id: "turn-1".to_string(), - step_id: "turn-1:tool:call-1".to_string(), - kind: "tool".to_string(), - title: "Shell command".to_string(), - detail: "pnpm test".to_string(), - state: "running".to_string(), - }), - ); - - assert_eq!(events[0]["kind"], "turn_step_added"); - assert_eq!(events[0]["sessionId"], "session-1"); - assert_eq!(events[0]["turnId"], "turn-1"); - assert_eq!(events[0]["step"]["id"], "turn-1:tool:call-1"); - assert_eq!(events[0]["step"]["kind"], "tool"); - assert_eq!(events[0]["step"]["state"], "running"); - } - - #[test] - fn code_everywhere_maps_command_outcomes() { - let session = test_session(); - let outcome = CodeEverywhereCommandOutcome { - command_id: "command-1".to_string(), - command_kind: "status_request", - status: "accepted", - reason: None, - accepted_resolution: None, - }; - let event = code_everywhere_command_outcome_event( - &session, - &outcome, - ); - - assert_eq!(event["kind"], "command_outcome"); - assert_eq!(event["outcome"]["commandId"], "command-1"); - assert_eq!(event["outcome"]["sessionId"], "session-1"); - assert_eq!(event["outcome"]["sessionEpoch"], "epoch-1"); - assert_eq!(event["outcome"]["commandKind"], "status_request"); - assert_eq!(event["outcome"]["status"], "accepted"); - assert!(event["outcome"]["reason"].is_null()); - } - - #[test] - fn code_everywhere_maps_accepted_pending_work_command_resolutions() { - let session = test_session(); - let approval_events = code_everywhere_command_events( - &session, - CodeEverywhereCommandOutcome { - command_id: "command-approval".to_string(), - command_kind: "approval_decision", - status: "accepted", - reason: None, - accepted_resolution: Some(CodeEverywherePendingWorkResolution::Approval { - approval_id: "approval-1".to_string(), - decision: "approve", - }), - }, - ); - - assert_eq!(approval_events.len(), 2); - assert_eq!(approval_events[0]["kind"], "command_outcome"); - assert_eq!(approval_events[1]["kind"], "approval_resolved"); - assert_eq!(approval_events[1]["approvalId"], "approval-1"); - assert_eq!(approval_events[1]["decision"], "approve"); - - let input_events = code_everywhere_command_events( - &session, - CodeEverywhereCommandOutcome { - command_id: "command-input".to_string(), - command_kind: "request_user_input_response", - status: "accepted", - reason: None, - accepted_resolution: Some(CodeEverywherePendingWorkResolution::RequestedInput { - input_id: "input-1".to_string(), - }), - }, - ); - - assert_eq!(input_events.len(), 2); - assert_eq!(input_events[0]["kind"], "command_outcome"); - assert_eq!(input_events[1]["kind"], "user_input_resolved"); - assert_eq!(input_events[1]["inputId"], "input-1"); - } - - #[test] - fn code_everywhere_republish_includes_latest_status_snapshot() { - let session = test_session(); - let config = RemoteInboxConfig { - enabled: true, - bridge_url: None, - code_everywhere_url: Some("http://127.0.0.1:4789".to_string()), - token: None, - host_id: None, - host_label: Some("Mac Studio".to_string()), - }; - let latest_status_snapshot = Arc::new(Mutex::new(Some(ClientMessage::TurnComplete( - status_event("Turn complete", Some("Done.")), - )))); - - let messages = session_republish_messages(&session, &config, &latest_status_snapshot); - - assert!(matches!(messages.first(), Some(ClientMessage::Hello(_)))); - assert!(matches!( - messages.get(1), - Some(ClientMessage::TurnComplete(status)) - if status.message.as_deref() == Some("Turn complete") - && status.assistant_message.as_deref() == Some("Done.") - )); - } - - #[test] - fn code_everywhere_republish_without_snapshot_sends_only_hello() { - let session = test_session(); - let config = RemoteInboxConfig { - enabled: true, - bridge_url: None, - code_everywhere_url: Some("http://127.0.0.1:4789".to_string()), - token: None, - host_id: None, - host_label: Some("Mac Studio".to_string()), - }; - let latest_status_snapshot = Arc::new(Mutex::new(None)); - - let messages = session_republish_messages(&session, &config, &latest_status_snapshot); - - assert_eq!(messages.len(), 1); - assert!(matches!(messages.first(), Some(ClientMessage::Hello(_)))); - } - - #[test] - fn code_everywhere_claimed_commands_map_to_local_actions() { - let session = test_session(); - let reply = CodeEverywhereCommandRecord { - id: "command-1".to_string(), - command: CodeEverywhereCommandPayload::Reply { - session_id: "session-1".to_string(), - session_epoch: "epoch-1".to_string(), - content: "keep going".to_string(), - }, - }; - let stale = CodeEverywhereCommandRecord { - id: "command-2".to_string(), - command: CodeEverywhereCommandPayload::StatusRequest { - session_id: "session-1".to_string(), - session_epoch: "old-epoch".to_string(), - }, - }; - - assert!(matches!( - parse_code_everywhere_command(reply, &session), - Some(CodeEverywhereCommand::Reply { id, text }) if id == "command-1" && text == "keep going" - )); - assert!(matches!( - parse_code_everywhere_command(stale, &session), - Some(CodeEverywhereCommand::Stale { id, command_kind, reason }) - if id == "command-2" - && command_kind == "status_request" - && reason.contains("old-epoch") - && reason.contains("epoch-1") - )); - } - - #[test] - fn code_everywhere_claim_response_accepts_camel_case_commands() { - let session = test_session(); - let claim: CodeEverywhereCommandClaim = serde_json::from_value(json!({ - "commands": [ - { - "id": "command-1", - "command": { - "kind": "request_user_input_response", - "sessionId": "session-1", - "sessionEpoch": "epoch-1", - "inputId": "input-1", - "turnId": "turn-1", - "answers": [ - { "questionId": "question-1", "value": "Ship it" } - ] - } - } - ] - })) - .expect("claim response should deserialize"); - - assert!(matches!( - parse_code_everywhere_command(claim.commands.into_iter().next().unwrap(), &session), - Some(CodeEverywhereCommand::RequestUserInputResponse { input_id, response, .. }) - if input_id.as_deref() == Some("input-1") && response.answers.contains_key("question-1") - )); - } - fn command_message(command_id: &str) -> String { json!({ "type": "command", @@ -3347,33 +1932,6 @@ mod tests { assert!(pending_command_acceptances.is_empty()); } - #[tokio::test] - async fn code_everywhere_new_session_dispatch_does_not_wait_for_ui_reset_ack() { - let session = test_session(); - let (app_event_tx, app_event_rx) = app_event_sender(); - - let outcome = dispatch_code_everywhere_command( - CodeEverywhereCommand::NewSession { - id: "cmd-1".to_string(), - }, - &session, - &app_event_tx, - ) - .await; - - assert_eq!(outcome.command_id, "cmd-1"); - assert_eq!(outcome.command_kind, "new_session"); - assert_eq!(outcome.status, "accepted"); - assert!(outcome.reason.is_none()); - let event = app_event_rx.try_recv().expect("remote inbox event"); - assert!(matches!( - event, - AppEvent::RemoteInboxNewSession { command_id, issued_by, .. } - if command_id == "cmd-1" - && issued_by.as_deref() == Some("Code Everywhere") - )); - } - #[tokio::test] async fn pause_current_turn_command_sends_app_event() { let session = test_session(); @@ -3549,7 +2107,7 @@ mod tests { session_id: "session-1".to_string(), session_epoch: "epoch-1".to_string(), latest_status_snapshot: latest_status_snapshot.clone(), - code_everywhere_timeline_enabled: true, + timeline_enabled: true, }; handle.send_request_user_input(&RequestUserInputEvent { @@ -3585,7 +2143,7 @@ mod tests { session_id: "session-1".to_string(), session_epoch: "epoch-1".to_string(), latest_status_snapshot: latest_status_snapshot.clone(), - code_everywhere_timeline_enabled: true, + timeline_enabled: true, }; handle.send_request_user_input(&RequestUserInputEvent {