Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 113 additions & 209 deletions agent/src/control/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Oxide Computer Company
* Copyright 2026 Oxide Computer Company
*/

use std::{io::Read, ops::Range, time::Duration};
Expand Down Expand Up @@ -33,6 +33,42 @@ impl Stuff {
Ok(())
}

async fn req(&mut self, payload: Payload) -> Result<Payload> {
let message = Message { id: self.ids.next().unwrap(), payload };
match self.send_and_recv(&message).await {
Ok(response) => Ok(response.payload),
Err(e) => {
/*
* Requests to the agent are relatively simple and over a UNIX
* socket; they should not fail. This implies something has
* gone seriously wrong and it is unlikely that it will be fixed
* without intervention. Don't retry.
*/
bail!("could not talk to the agent: {e}");
}
}
}

/*
* Requests to the buildomat core are allowed to fail intermittently. We
* need to retry until we are able to get a successful response of some
* kind back from the server.
*/
async fn req_retry(&mut self, payload: Payload) -> Result<Payload> {
loop {
match self.req(payload.clone()).await? {
Payload::Error(e) => {
eprintln!(
"WARNING: control request failure (retrying): {e}"
);
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
payload => return Ok(payload),
}
}
}

async fn send_and_recv(&mut self, mout: &Message) -> Result<Message> {
let us = self.us.as_mut().unwrap();
let dec = self.dec.as_mut().unwrap();
Expand Down Expand Up @@ -111,30 +147,12 @@ async fn cmd_address_list(mut l: Level<Stuff>) -> Result<()> {

let filter = a.opts().opt_str("f");

let addrs = {
let mout = Message {
id: l.context_mut().ids.next().unwrap(),
payload: Payload::MetadataAddresses,
};

match l.context_mut().send_and_recv(&mout).await {
Ok(min) => match min.payload {
Payload::Error(e) => {
bail!("WARNING: control request failure: {e}");
}
Payload::MetadataAddressesResult(addrs) => addrs,
other => bail!("unexpected response: {other:?}"),
},
Err(e) => {
/*
* Requests to the agent are relatively simple and over a UNIX
* socket; they should not fail. This implies something has
* gone seriously wrong and it is unlikely that it will be fixed
* without intervention. Don't retry.
*/
bail!("could not talk to the agent: {e}");
}
let addrs = match l.context_mut().req(Payload::MetadataAddresses).await? {
Payload::Error(e) => {
bail!("WARNING: control request failure: {e}");
}
Payload::MetadataAddressesResult(addrs) => addrs,
other => bail!("unexpected response: {other:?}"),
};

let mut t = a.table();
Expand Down Expand Up @@ -231,31 +249,15 @@ async fn cmd_eng(mut l: Level<Stuff>) -> Result<()> {
async fn cmd_eng_metadata(mut l: Level<Stuff>) -> Result<()> {
let _ = no_args!(l);

let mout = Message {
id: l.context_mut().ids.next().unwrap(),
payload: Payload::MetadataAddresses,
};

match l.context_mut().send_and_recv(&mout).await {
Ok(min) => match min.payload {
Payload::Error(e) => {
bail!("WARNING: control request failure: {e}");
}
Payload::MetadataAddressesResult(addrs) => {
println!("addrs = {addrs:#?}");
Ok(())
}
other => bail!("unexpected response: {other:?}"),
},
Err(e) => {
/*
* Requests to the agent are relatively simple and over a UNIX
* socket; they should not fail. This implies something has
* gone seriously wrong and it is unlikely that it will be fixed
* without intervention. Don't retry.
*/
bail!("could not talk to the agent: {e}");
match l.context_mut().req(Payload::MetadataAddresses).await? {
Payload::Error(e) => {
bail!("WARNING: control request failure: {e}");
}
Payload::MetadataAddressesResult(addrs) => {
println!("addrs = {addrs:#?}");
Ok(())
}
other => bail!("unexpected response: {other:?}"),
}
}

Expand All @@ -282,69 +284,39 @@ async fn cmd_store_get(mut l: Level<Stuff>) -> Result<()> {
let no_wait = a.opts().opt_present("W");
let mut printed_wait = false;

let req = Payload::StoreGet(name.clone());
loop {
let mout = Message {
id: l.context_mut().ids.next().unwrap(),
payload: Payload::StoreGet(name.clone()),
};
match l.context_mut().req_retry(req.clone()).await? {
Payload::StoreGetResult(Some(ent)) => {
/*
* Output formatting here should be kept consistent with
* what "buildomat job store get" does outside a job;
* see the "buildomat" crate.
*/
if ent.value.ends_with('\n') {
print!("{}", ent.value);
} else {
println!("{}", ent.value);
}
break Ok(());
}
Payload::StoreGetResult(None) => {
if no_wait {
bail!("the store has no value for {name:?}");
}

match l.context_mut().send_and_recv(&mout).await {
Ok(min) => {
match min.payload {
Payload::Error(e) => {
/*
* Requests to the buildomat core are allowed to fail
* intermittently. We need to retry until we are able
* to get a successful response of some kind back from
* the server.
*/
eprintln!(
"WARNING: control request failure (retrying): {e}"
);
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
Payload::StoreGetResult(Some(ent)) => {
/*
* Output formatting here should be kept consistent with
* what "buildomat job store get" does outside a job;
* see the "buildomat" crate.
*/
if ent.value.ends_with('\n') {
print!("{}", ent.value);
} else {
println!("{}", ent.value);
}
break Ok(());
}
Payload::StoreGetResult(None) => {
if no_wait {
bail!("the store has no value for {name:?}");
}

if !printed_wait {
eprintln!(
"WARNING: job store has no value \
if !printed_wait {
eprintln!(
"WARNING: job store has no value \
for {name:?}; waiting for a value..."
);
printed_wait = true;
}

tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
other => bail!("unexpected response: {other:?}"),
);
printed_wait = true;
}

tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
Err(e) => {
/*
* Requests to the agent are relatively simple and over a UNIX
* socket; they should not fail. This implies something has
* gone seriously wrong and it is unlikely that it will be fixed
* without intervention. Don't retry.
*/
bail!("could not talk to the agent: {e}");
}
other => bail!("unexpected response: {other:?}"),
}
}
}
Expand Down Expand Up @@ -384,47 +356,11 @@ async fn cmd_store_put(mut l: Level<Stuff>) -> Result<()> {
};

let secret = a.opts().opt_present("s");
let req = Payload::StorePut(a.args()[0].to_string(), value.clone(), secret);

loop {
let mout = Message {
id: l.context_mut().ids.next().unwrap(),
payload: Payload::StorePut(
a.args()[0].to_string(),
value.clone(),
secret,
),
};

match l.context_mut().send_and_recv(&mout).await {
Ok(min) => {
match min.payload {
Payload::Error(e) => {
/*
* Requests to the buildomat core are allowed to fail
* intermittently. We need to retry until we are able
* to get a successful response of some kind back from
* the server.
*/
eprintln!(
"WARNING: control request failure (retrying): {e}"
);
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
Payload::Ack => break Ok(()),
other => bail!("unexpected response: {other:?}"),
}
}
Err(e) => {
/*
* Requests to the agent are relatively simple and over a UNIX
* socket; they should not fail. This implies something has
* gone seriously wrong and it is unlikely that it will be fixed
* without intervention. Don't retry.
*/
bail!("could not talk to the agent: {e}");
}
}
match l.context_mut().req_retry(req).await? {
Payload::Ack => Ok(()),
other => bail!("unexpected response: {other:?}"),
}
}

Expand All @@ -445,81 +381,49 @@ async fn cmd_process_start(mut l: Level<Stuff>) -> Result<()> {
bad_args!(l, "specify at least a process name and a command to run");
}

let mout = Message {
id: l.context_mut().ids.next().unwrap(),
payload: Payload::ProcessStart {
name: a.args()[0].to_string(),
cmd: a.args()[1].to_string(),
args: a.args().iter().skip(2).cloned().collect::<Vec<_>>(),

/*
* The process will actually be spawned by the agent, which is
* running under service management. To aid the user, we want
* to forward the environment and current directory so that the
* process can be started as if it were run from the job program
* itself.
*/
env: std::env::vars_os().collect::<Vec<_>>(),
pwd: std::env::current_dir()?.to_str().unwrap().to_string(),

uid: unsafe { libc::geteuid() },
gid: unsafe { libc::getegid() },
},
let payload = Payload::ProcessStart {
name: a.args()[0].to_string(),
cmd: a.args()[1].to_string(),
args: a.args().iter().skip(2).cloned().collect::<Vec<_>>(),

/*
* The process will actually be spawned by the agent, which is
* running under service management. To aid the user, we want
* to forward the environment and current directory so that the
* process can be started as if it were run from the job program
* itself.
*/
env: std::env::vars_os().collect::<Vec<_>>(),
pwd: std::env::current_dir()?.to_str().unwrap().to_string(),

uid: unsafe { libc::geteuid() },
gid: unsafe { libc::getegid() },
};

match l.context_mut().send_and_recv(&mout).await {
Ok(min) => {
match min.payload {
Payload::Error(e) => {
/*
* This request is purely local to the agent, so an
* error is not something we should retry indefinitely.
*/
bail!("could not start process: {e}");
}
Payload::Ack => Ok(()),
other => bail!("unexpected response: {other:?}"),
}
}
Err(e) => {
match l.context_mut().req(payload).await? {
Payload::Error(e) => {
/*
* Requests to the agent are relatively simple and over a UNIX
* socket; they should not fail. This implies something has
* gone seriously wrong and it is unlikely that it will be fixed
* without intervention. Don't retry.
* This request is purely local to the agent, so an
* error is not something we should retry indefinitely.
*/
bail!("could not talk to the agent: {e}");
bail!("could not start process: {e}");
}
Payload::Ack => Ok(()),
other => bail!("unexpected response: {other:?}"),
}
}

async fn factory_info(s: &mut Stuff) -> Result<FactoryInfo> {
let mout =
Message { id: s.ids.next().unwrap(), payload: Payload::FactoryInfo };

match s.send_and_recv(&mout).await {
Ok(min) => {
match min.payload {
Payload::Error(e) => {
/*
* This request is purely local to the agent, so an
* error is not something we should retry indefinitely.
*/
bail!("could not get factory info: {e}");
}
Payload::FactoryInfoResult(fi) => Ok(fi),
other => bail!("unexpected response: {other:?}"),
}
}
Err(e) => {
match s.req(Payload::FactoryInfo).await? {
Payload::Error(e) => {
/*
* Requests to the agent are relatively simple and over a UNIX
* socket; they should not fail. This implies something has
* gone seriously wrong and it is unlikely that it will be fixed
* without intervention. Don't retry.
* This request is purely local to the agent, so an
* error is not something we should retry indefinitely.
*/
bail!("could not talk to the agent: {e}");
bail!("could not get factory info: {e}");
}
Payload::FactoryInfoResult(fi) => Ok(fi),
other => bail!("unexpected response: {other:?}"),
}
}

Expand Down