Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions apps/codex-plus-launcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,21 @@ async fn main() -> Result<()> {
}

fn acquire_single_instance_guard() -> anyhow::Result<Option<std::net::TcpListener>> {
match codex_plus_core::ports::acquire_loopback_port_guard(
match codex_plus_core::ports::acquire_resilient_loopback_port_guard(
codex_plus_core::ports::LAUNCHER_GUARD_PORT,
) {
Ok(listener) => Ok(Some(listener)),
Ok((listener, fallback_port)) => {
if let Some(actual_guard_port) = fallback_port {
let _ = codex_plus_core::diagnostic_log::append_diagnostic_log(
"launcher.guard_fallback",
json!({
"requested_guard_port": codex_plus_core::ports::LAUNCHER_GUARD_PORT,
"actual_guard_port": actual_guard_port
}),
);
}
Ok(Some(listener))
}
Err(error) if error.kind() == std::io::ErrorKind::AddrInUse => {
let _ = codex_plus_core::diagnostic_log::append_diagnostic_log(
"launcher.already_running",
Expand Down
15 changes: 13 additions & 2 deletions apps/codex-plus-manager/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,21 @@ fn install_panic_logger() {
}

fn acquire_single_instance_guard() -> Option<std::net::TcpListener> {
match codex_plus_core::ports::acquire_loopback_port_guard(
match codex_plus_core::ports::acquire_resilient_loopback_port_guard(
codex_plus_core::ports::MANAGER_GUARD_PORT,
) {
Ok(listener) => Some(listener),
Ok((listener, fallback_port)) => {
if let Some(actual_guard_port) = fallback_port {
let _ = codex_plus_core::diagnostic_log::append_diagnostic_log(
"manager.guard_fallback",
serde_json::json!({
"requested_guard_port": codex_plus_core::ports::MANAGER_GUARD_PORT,
"actual_guard_port": actual_guard_port
}),
);
}
Some(listener)
}
Err(error) if error.kind() == std::io::ErrorKind::AddrInUse => {
let _ = codex_plus_core::diagnostic_log::append_diagnostic_log(
"manager.already_running",
Expand Down
121 changes: 118 additions & 3 deletions crates/codex-plus-core/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ pub trait LaunchHooks: Send + Sync {
async fn wait_for_codex_exit(&self, launch: &CodexLaunch) -> anyhow::Result<()>;
async fn shutdown_helper(&self, helper_port: u16);
async fn terminate_codex(&self, launch: &CodexLaunch);
async fn prepare_injection_retry(&self, _app_dir: &Path, _debug_port: u16) -> bool {
false
}
}

#[derive(Default)]
Expand Down Expand Up @@ -221,9 +224,20 @@ where
launched = Some(launch.clone());

if settings.enhancements_enabled {
match hooks.bridge_context(debug_port).await? {
Some(ctx) => hooks.inject_bridge(debug_port, helper_port, ctx).await?,
None => hooks.inject(debug_port, helper_port).await?,
if let Err(error) = inject_from_hooks(hooks.as_ref(), debug_port, helper_port).await {
if hooks.prepare_injection_retry(&app_dir, debug_port).await {
let retry_launch = hooks
.launch_codex(&app_dir, debug_port, &settings.codex_extra_args)
.await?;
launched = Some(retry_launch.clone());
inject_from_hooks(hooks.as_ref(), debug_port, helper_port)
.await
.with_context(|| {
format!("Codex++ injection failed after restarting Codex: {error}")
})?;
} else {
return Err(error);
}
}
hooks.start_bridge_watchdog(debug_port, helper_port).await?;
}
Expand Down Expand Up @@ -268,6 +282,17 @@ where
}
}

async fn inject_from_hooks(
hooks: &dyn LaunchHooks,
debug_port: u16,
helper_port: u16,
) -> anyhow::Result<()> {
match hooks.bridge_context(debug_port).await? {
Some(ctx) => hooks.inject_bridge(debug_port, helper_port, ctx).await,
None => hooks.inject(debug_port, helper_port).await,
}
}

fn relay_protocol_proxy_enabled(settings: &BackendSettings) -> bool {
settings.active_relay_profile().protocol == crate::settings::RelayProtocol::ChatCompletions
}
Expand Down Expand Up @@ -569,6 +594,10 @@ impl LaunchHooks for DefaultLaunchHooks {
} => {}
}
}

async fn prepare_injection_retry(&self, app_dir: &Path, debug_port: u16) -> bool {
prepare_windows_injection_retry(app_dir, debug_port).await
}
}

async fn handle_helper_connection(
Expand Down Expand Up @@ -1137,6 +1166,92 @@ fn runtime_evaluate_result_is_true(result: &Value) -> bool {
.unwrap_or(false)
}

#[cfg(windows)]
async fn prepare_windows_injection_retry(app_dir: &Path, debug_port: u16) -> bool {
if crate::watcher::cdp_listening(debug_port) {
return false;
}

let app_dir = app_dir.to_path_buf();
tokio::task::spawn_blocking(move || {
let process_ids = windows_codex_processes_for_app_dir(&app_dir);
let _ = crate::diagnostic_log::append_diagnostic_log(
"launcher.injection_retry_restart_codex",
serde_json::json!({
"debug_port": debug_port,
"codex_app": app_dir.to_string_lossy(),
"process_count": process_ids.len()
}),
);
if process_ids.is_empty() {
return false;
}
for process_id in &process_ids {
let _ = crate::windows_integration::terminate_process(*process_id);
}
wait_for_windows_processes_exit(&process_ids, std::time::Duration::from_secs(8));
wait_for_cdp_port_closed(debug_port, std::time::Duration::from_secs(5));
true
})
.await
.unwrap_or(false)
}

#[cfg(not(windows))]
async fn prepare_windows_injection_retry(_app_dir: &Path, _debug_port: u16) -> bool {
false
}

#[cfg(windows)]
fn windows_codex_processes_for_app_dir(app_dir: &Path) -> Vec<u32> {
let app_dir = normalize_path_for_process_match(app_dir);
let app_dir_prefix = format!("{app_dir}\\");
crate::windows_integration::enumerate_processes()
.into_iter()
.filter(|process| process.exe_file.eq_ignore_ascii_case("codex.exe"))
.filter_map(|process| {
let path = process.executable_path.as_deref()?;
let exe_path = normalize_path_for_process_match(path);
exe_path
.starts_with(&app_dir_prefix)
.then_some(process.process_id)
})
.collect()
}

#[cfg(windows)]
fn normalize_path_for_process_match(path: &Path) -> String {
path.to_string_lossy()
.replace('/', "\\")
.trim_end_matches('\\')
.to_ascii_lowercase()
}

#[cfg(windows)]
fn wait_for_cdp_port_closed(debug_port: u16, timeout: std::time::Duration) {
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if !crate::watcher::cdp_listening(debug_port) {
return;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
}

#[cfg(windows)]
fn wait_for_windows_processes_exit(process_ids: &[u32], timeout: std::time::Duration) {
let start = std::time::Instant::now();
while start.elapsed() < timeout {
let running = crate::windows_integration::enumerate_processes()
.into_iter()
.any(|process| process_ids.contains(&process.process_id));
if !running {
return;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
}

async fn try_inject(debug_port: u16, helper_port: u16) -> anyhow::Result<()> {
let targets = crate::cdp::list_targets(debug_port).await?;
let target = crate::cdp::pick_page_target(&targets)?;
Expand Down
18 changes: 18 additions & 0 deletions crates/codex-plus-core/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,21 @@ pub fn can_connect_loopback_port(port: u16) -> bool {
pub fn acquire_loopback_port_guard(port: u16) -> std::io::Result<TcpListener> {
TcpListener::bind(("127.0.0.1", port))
}

pub fn acquire_resilient_loopback_port_guard(
port: u16,
) -> std::io::Result<(TcpListener, Option<u16>)> {
match acquire_loopback_port_guard(port) {
Ok(listener) => Ok((listener, None)),
Err(error) if error.kind() == std::io::ErrorKind::AddrInUse => {
if can_connect_loopback_port(port) {
Err(error)
} else {
let listener = TcpListener::bind(("127.0.0.1", 0))?;
let actual_port = listener.local_addr().ok().map(|address| address.port());
Ok((listener, actual_port))
}
}
Err(error) => Err(error),
}
}
79 changes: 79 additions & 0 deletions crates/codex-plus-core/tests/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,53 @@ async fn launch_lifecycle_writes_failure_and_cleans_helper_when_injection_fails(
assert!(status.message.contains("inject failed"));
}

#[tokio::test]
async fn launch_lifecycle_restarts_codex_once_when_injection_retry_is_prepared() {
let temp = tempfile::tempdir().unwrap();
let app_dir = temp.path().join("Codex.app");
std::fs::create_dir_all(&app_dir).unwrap();
let status_store = StatusStore::new(temp.path().join("latest-status.json"));
let events = Arc::new(Mutex::new(Vec::<String>::new()));
let hooks = FakeHooks::new(events.clone())
.with_inject_errors(vec![
"failed to query CDP targets".to_string(),
String::new(),
])
.with_prepare_retry(true);

let handle = launch_and_inject_with_hooks(
LaunchOptions {
app_dir: Some(app_dir),
debug_port: 9229,
helper_port: 57321,
status_store,
},
&hooks,
)
.await
.unwrap();
handle.wait_for_codex_exit().await.unwrap();

assert_eq!(
*events.lock().unwrap(),
vec![
"select-debug:9229",
"select-helper:57321",
"load-settings",
"apply-relay",
"start-helper:57321",
"launch:9229",
"inject:9229:57321",
"prepare-retry:9229",
"launch:9229",
"inject:9229:57321",
"status:running",
"wait-codex",
"shutdown-helper:57321",
]
);
}

#[tokio::test]
async fn launch_lifecycle_cleans_helper_when_launch_fails_after_helper_started() {
let temp = tempfile::tempdir().unwrap();
Expand Down Expand Up @@ -1004,6 +1051,8 @@ struct FakeHooks {
launch_result: CodexLaunch,
launch_error: Option<String>,
inject_error: Option<String>,
inject_errors: Arc<Mutex<Vec<String>>>,
prepare_retry: bool,
provider_sync_unsupported: bool,
}

Expand All @@ -1019,6 +1068,8 @@ impl FakeHooks {
},
launch_error: None,
inject_error: None,
inject_errors: Arc::new(Mutex::new(Vec::new())),
prepare_retry: false,
provider_sync_unsupported: false,
}
}
Expand All @@ -1038,6 +1089,16 @@ impl FakeHooks {
self
}

fn with_inject_errors(mut self, messages: Vec<String>) -> Self {
self.inject_errors = Arc::new(Mutex::new(messages));
self
}

fn with_prepare_retry(mut self, prepare_retry: bool) -> Self {
self.prepare_retry = prepare_retry;
self
}

fn with_launch_error(mut self, message: &str) -> Self {
self.launch_error = Some(message.to_string());
self
Expand Down Expand Up @@ -1130,6 +1191,19 @@ impl LaunchHooks for FakeHooks {

async fn inject(&self, debug_port: u16, helper_port: u16) -> anyhow::Result<()> {
self.event(format!("inject:{debug_port}:{helper_port}"));
let next_inject_error = {
let mut errors = self.inject_errors.lock().unwrap();
if errors.is_empty() {
None
} else {
Some(errors.remove(0))
}
};
if let Some(message) = next_inject_error {
if !message.is_empty() {
anyhow::bail!(message);
}
}
if let Some(message) = &self.inject_error {
anyhow::bail!(message.clone());
}
Expand All @@ -1156,4 +1230,9 @@ impl LaunchHooks for FakeHooks {
self.event("terminate-codex");
}
}

async fn prepare_injection_retry(&self, _app_dir: &Path, debug_port: u16) -> bool {
self.event(format!("prepare-retry:{debug_port}"));
self.prepare_retry
}
}