From a922d44416b4890bc2d8d1675ea4b163676ca951 Mon Sep 17 00:00:00 2001 From: Emily Albini Date: Thu, 19 Feb 2026 11:22:34 -0800 Subject: [PATCH] create wrappers for sending control messages in the agent --- agent/src/control/mod.rs | 322 ++++++++++++++------------------------- 1 file changed, 113 insertions(+), 209 deletions(-) diff --git a/agent/src/control/mod.rs b/agent/src/control/mod.rs index b26493a..d706b78 100644 --- a/agent/src/control/mod.rs +++ b/agent/src/control/mod.rs @@ -1,5 +1,5 @@ /* - * Copyright 2024 Oxide Computer Company + * Copyright 2026 Oxide Computer Company */ use std::{io::Read, ops::Range, time::Duration}; @@ -33,6 +33,42 @@ impl Stuff { Ok(()) } + async fn req(&mut self, payload: Payload) -> Result { + let message = Message { id: self.ids.next().unwrap(), payload }; + match self.send_and_recv(&message).await { + Ok(response) => Ok(response.payload), + Err(e) => { + /* + * Requests to the agent are relatively simple and over a UNIX + * socket; they should not fail. This implies something has + * gone seriously wrong and it is unlikely that it will be fixed + * without intervention. Don't retry. + */ + bail!("could not talk to the agent: {e}"); + } + } + } + + /* + * Requests to the buildomat core are allowed to fail intermittently. We + * need to retry until we are able to get a successful response of some + * kind back from the server. + */ + async fn req_retry(&mut self, payload: Payload) -> Result { + loop { + match self.req(payload.clone()).await? { + Payload::Error(e) => { + eprintln!( + "WARNING: control request failure (retrying): {e}" + ); + tokio::time::sleep(Duration::from_secs(2)).await; + continue; + } + payload => return Ok(payload), + } + } + } + async fn send_and_recv(&mut self, mout: &Message) -> Result { let us = self.us.as_mut().unwrap(); let dec = self.dec.as_mut().unwrap(); @@ -111,30 +147,12 @@ async fn cmd_address_list(mut l: Level) -> Result<()> { let filter = a.opts().opt_str("f"); - let addrs = { - let mout = Message { - id: l.context_mut().ids.next().unwrap(), - payload: Payload::MetadataAddresses, - }; - - match l.context_mut().send_and_recv(&mout).await { - Ok(min) => match min.payload { - Payload::Error(e) => { - bail!("WARNING: control request failure: {e}"); - } - Payload::MetadataAddressesResult(addrs) => addrs, - other => bail!("unexpected response: {other:?}"), - }, - Err(e) => { - /* - * Requests to the agent are relatively simple and over a UNIX - * socket; they should not fail. This implies something has - * gone seriously wrong and it is unlikely that it will be fixed - * without intervention. Don't retry. - */ - bail!("could not talk to the agent: {e}"); - } + let addrs = match l.context_mut().req(Payload::MetadataAddresses).await? { + Payload::Error(e) => { + bail!("WARNING: control request failure: {e}"); } + Payload::MetadataAddressesResult(addrs) => addrs, + other => bail!("unexpected response: {other:?}"), }; let mut t = a.table(); @@ -231,31 +249,15 @@ async fn cmd_eng(mut l: Level) -> Result<()> { async fn cmd_eng_metadata(mut l: Level) -> Result<()> { let _ = no_args!(l); - let mout = Message { - id: l.context_mut().ids.next().unwrap(), - payload: Payload::MetadataAddresses, - }; - - match l.context_mut().send_and_recv(&mout).await { - Ok(min) => match min.payload { - Payload::Error(e) => { - bail!("WARNING: control request failure: {e}"); - } - Payload::MetadataAddressesResult(addrs) => { - println!("addrs = {addrs:#?}"); - Ok(()) - } - other => bail!("unexpected response: {other:?}"), - }, - Err(e) => { - /* - * Requests to the agent are relatively simple and over a UNIX - * socket; they should not fail. This implies something has - * gone seriously wrong and it is unlikely that it will be fixed - * without intervention. Don't retry. - */ - bail!("could not talk to the agent: {e}"); + match l.context_mut().req(Payload::MetadataAddresses).await? { + Payload::Error(e) => { + bail!("WARNING: control request failure: {e}"); + } + Payload::MetadataAddressesResult(addrs) => { + println!("addrs = {addrs:#?}"); + Ok(()) } + other => bail!("unexpected response: {other:?}"), } } @@ -282,69 +284,39 @@ async fn cmd_store_get(mut l: Level) -> Result<()> { let no_wait = a.opts().opt_present("W"); let mut printed_wait = false; + let req = Payload::StoreGet(name.clone()); loop { - let mout = Message { - id: l.context_mut().ids.next().unwrap(), - payload: Payload::StoreGet(name.clone()), - }; + match l.context_mut().req_retry(req.clone()).await? { + Payload::StoreGetResult(Some(ent)) => { + /* + * Output formatting here should be kept consistent with + * what "buildomat job store get" does outside a job; + * see the "buildomat" crate. + */ + if ent.value.ends_with('\n') { + print!("{}", ent.value); + } else { + println!("{}", ent.value); + } + break Ok(()); + } + Payload::StoreGetResult(None) => { + if no_wait { + bail!("the store has no value for {name:?}"); + } - match l.context_mut().send_and_recv(&mout).await { - Ok(min) => { - match min.payload { - Payload::Error(e) => { - /* - * Requests to the buildomat core are allowed to fail - * intermittently. We need to retry until we are able - * to get a successful response of some kind back from - * the server. - */ - eprintln!( - "WARNING: control request failure (retrying): {e}" - ); - tokio::time::sleep(Duration::from_secs(2)).await; - continue; - } - Payload::StoreGetResult(Some(ent)) => { - /* - * Output formatting here should be kept consistent with - * what "buildomat job store get" does outside a job; - * see the "buildomat" crate. - */ - if ent.value.ends_with('\n') { - print!("{}", ent.value); - } else { - println!("{}", ent.value); - } - break Ok(()); - } - Payload::StoreGetResult(None) => { - if no_wait { - bail!("the store has no value for {name:?}"); - } - - if !printed_wait { - eprintln!( - "WARNING: job store has no value \ + if !printed_wait { + eprintln!( + "WARNING: job store has no value \ for {name:?}; waiting for a value..." - ); - printed_wait = true; - } - - tokio::time::sleep(Duration::from_secs(2)).await; - continue; - } - other => bail!("unexpected response: {other:?}"), + ); + printed_wait = true; } + + tokio::time::sleep(Duration::from_secs(2)).await; + continue; } - Err(e) => { - /* - * Requests to the agent are relatively simple and over a UNIX - * socket; they should not fail. This implies something has - * gone seriously wrong and it is unlikely that it will be fixed - * without intervention. Don't retry. - */ - bail!("could not talk to the agent: {e}"); - } + other => bail!("unexpected response: {other:?}"), } } } @@ -384,47 +356,11 @@ async fn cmd_store_put(mut l: Level) -> Result<()> { }; let secret = a.opts().opt_present("s"); + let req = Payload::StorePut(a.args()[0].to_string(), value.clone(), secret); - loop { - let mout = Message { - id: l.context_mut().ids.next().unwrap(), - payload: Payload::StorePut( - a.args()[0].to_string(), - value.clone(), - secret, - ), - }; - - match l.context_mut().send_and_recv(&mout).await { - Ok(min) => { - match min.payload { - Payload::Error(e) => { - /* - * Requests to the buildomat core are allowed to fail - * intermittently. We need to retry until we are able - * to get a successful response of some kind back from - * the server. - */ - eprintln!( - "WARNING: control request failure (retrying): {e}" - ); - tokio::time::sleep(Duration::from_secs(2)).await; - continue; - } - Payload::Ack => break Ok(()), - other => bail!("unexpected response: {other:?}"), - } - } - Err(e) => { - /* - * Requests to the agent are relatively simple and over a UNIX - * socket; they should not fail. This implies something has - * gone seriously wrong and it is unlikely that it will be fixed - * without intervention. Don't retry. - */ - bail!("could not talk to the agent: {e}"); - } - } + match l.context_mut().req_retry(req).await? { + Payload::Ack => Ok(()), + other => bail!("unexpected response: {other:?}"), } } @@ -445,81 +381,49 @@ async fn cmd_process_start(mut l: Level) -> Result<()> { bad_args!(l, "specify at least a process name and a command to run"); } - let mout = Message { - id: l.context_mut().ids.next().unwrap(), - payload: Payload::ProcessStart { - name: a.args()[0].to_string(), - cmd: a.args()[1].to_string(), - args: a.args().iter().skip(2).cloned().collect::>(), - - /* - * The process will actually be spawned by the agent, which is - * running under service management. To aid the user, we want - * to forward the environment and current directory so that the - * process can be started as if it were run from the job program - * itself. - */ - env: std::env::vars_os().collect::>(), - pwd: std::env::current_dir()?.to_str().unwrap().to_string(), - - uid: unsafe { libc::geteuid() }, - gid: unsafe { libc::getegid() }, - }, + let payload = Payload::ProcessStart { + name: a.args()[0].to_string(), + cmd: a.args()[1].to_string(), + args: a.args().iter().skip(2).cloned().collect::>(), + + /* + * The process will actually be spawned by the agent, which is + * running under service management. To aid the user, we want + * to forward the environment and current directory so that the + * process can be started as if it were run from the job program + * itself. + */ + env: std::env::vars_os().collect::>(), + pwd: std::env::current_dir()?.to_str().unwrap().to_string(), + + uid: unsafe { libc::geteuid() }, + gid: unsafe { libc::getegid() }, }; - match l.context_mut().send_and_recv(&mout).await { - Ok(min) => { - match min.payload { - Payload::Error(e) => { - /* - * This request is purely local to the agent, so an - * error is not something we should retry indefinitely. - */ - bail!("could not start process: {e}"); - } - Payload::Ack => Ok(()), - other => bail!("unexpected response: {other:?}"), - } - } - Err(e) => { + match l.context_mut().req(payload).await? { + Payload::Error(e) => { /* - * Requests to the agent are relatively simple and over a UNIX - * socket; they should not fail. This implies something has - * gone seriously wrong and it is unlikely that it will be fixed - * without intervention. Don't retry. + * This request is purely local to the agent, so an + * error is not something we should retry indefinitely. */ - bail!("could not talk to the agent: {e}"); + bail!("could not start process: {e}"); } + Payload::Ack => Ok(()), + other => bail!("unexpected response: {other:?}"), } } async fn factory_info(s: &mut Stuff) -> Result { - let mout = - Message { id: s.ids.next().unwrap(), payload: Payload::FactoryInfo }; - - match s.send_and_recv(&mout).await { - Ok(min) => { - match min.payload { - Payload::Error(e) => { - /* - * This request is purely local to the agent, so an - * error is not something we should retry indefinitely. - */ - bail!("could not get factory info: {e}"); - } - Payload::FactoryInfoResult(fi) => Ok(fi), - other => bail!("unexpected response: {other:?}"), - } - } - Err(e) => { + match s.req(Payload::FactoryInfo).await? { + Payload::Error(e) => { /* - * Requests to the agent are relatively simple and over a UNIX - * socket; they should not fail. This implies something has - * gone seriously wrong and it is unlikely that it will be fixed - * without intervention. Don't retry. + * This request is purely local to the agent, so an + * error is not something we should retry indefinitely. */ - bail!("could not talk to the agent: {e}"); + bail!("could not get factory info: {e}"); } + Payload::FactoryInfoResult(fi) => Ok(fi), + other => bail!("unexpected response: {other:?}"), } }