diff --git a/Cargo.lock b/Cargo.lock index d3bbaa3..60da6e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -651,15 +651,20 @@ dependencies = [ "chrono", "futures", "glob", + "hex", "hiercmd", "ipnet", "libc", "openssl", + "reqwest", "rusty_ulid", "serde", "serde_json", "slog", + "tar", + "tempfile", "tokio", + "zstd", ] [[package]] @@ -1634,6 +1639,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "filetime" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98844151eee8917efc50bd9e8318cb963ae8b297431495d3f758616ea5c57db" +dependencies = [ + "cfg-if", + "libc", + "libredox", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -2515,6 +2531,7 @@ checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ "bitflags", "libc", + "redox_syscall 0.7.1", ] [[package]] @@ -2889,7 +2906,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -3225,6 +3242,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_syscall" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35985aa610addc02e24fc232012c86fd11f14111180f902b67e2d5331f8ebf2b" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_users" version = "0.4.6" @@ -4219,6 +4245,17 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" +[[package]] +name = "tar" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempfile" version = "3.24.0" @@ -5258,6 +5295,16 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "xmlparser" version = "0.13.6" @@ -5426,3 +5473,31 @@ dependencies = [ "quote", "syn 1.0.109", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index f44a2b6..53552a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ futures = "0.3" futures-core = "0.3" getopts = "0.2" glob = "0.3" +hex = "0.4.3" hiercmd = { git = "https://github.com/jclulow/hiercmd" } hmac-sha256 = "1" html-escape = "0.2" @@ -84,6 +85,7 @@ slog-term = "2.7" smf = { git = "https://github.com/illumos/smf-rs.git" } strip-ansi-escapes = "0.2" strum = { version = "0.27", features = [ "derive" ] } +tar = "0.4" tempfile = "3.3" thiserror = "2" tlvc = { git = "https://github.com/oxidecomputer/tlvc", version = "0.3.1" } @@ -95,3 +97,4 @@ toml = "0.8" usdt = "0.6" uuid = { version = "1", features = [ "v4" ] } zone = { version = "0.3", features = [ "async" ], default-features = false } +zstd = "0.13" diff --git a/README.md b/README.md index ce132ee..643fa27 100644 --- a/README.md +++ b/README.md @@ -659,6 +659,78 @@ Configuration properties supported for basic jobs include: environment running in an ephemeral virtual machine, with a reasonable set of build tools. 32GB of RAM and 200GB of disk should be available. +## Caching + +Buildomat has native support for caching intermediate artifacts across separate +CI jobs, with both high-level integrations and a low-level manual CLI. + +Caches are identified by a "cache key", a string identifying the content being +cached. Buildomat only restores caches when the cache key requested by the job +is the exact match of the key of an existing cache, and doesn't provide a +way to restore a cache whose key is only a partial match. High-level +integrations will pick the most appropriate cache key automatically, while it's +your responsibility to pick the correct cache key when using the manual CLI. + +Each Buildomat account or GitHub repository has its own isolated caching +storage, to prevent unintentional cross-pollination. While a cache with a given +key exists on the Buildomat servers it won't be possible to upload a new cache +with the same key. Once the cache is removed for the server it will be possible +to upload a cache with that key again. + +### Rust dependencies caching + +Buildomat includes native support for caching Rust dependencies, as part of the +`bmat` CLI (pre-installed in every worker). It saves and restores Cargo's +`target/` directory, making sure to only cache third-party dependencies (as +caching workspace members often has diminishing returns) and choosing the +correct cache key. + +You can use the `bmat cache rust restore` and `bmat cache rust save` commands in +your CI script: + +```bash +#!/bin/bash +bmat cache rust restore +cargo build --locked +bmat cache rust save +``` + +Both commands optionally accept the path of the `Cargo.toml` corresponding to +the *workspace* to cache. This is only needed if multiple separate workspaces +are present in the same repository. If no `Cargo.toml` is provided `bmat` will +look in the current working directory for one, and fail if it's missing. + +### Manual caching + +If the files you need to cache are not covered by one of Buildomat's native +integrations, you can use the `bmat` CLI (pre-installed in every worker) to +manually save and restore caches. When using manual caching, it's your +responsibility to pick the right cache key, and to choose which files should be +included as part of the cache. + +The `bmat cache restore` command restores an existing cache, and requires the +cache key as its first argument. + +The `bmat cache save` command stores files in the cache: it requires the cache +key as its first argument, and the list of files to cache to the standard input +(for example by piping `find -type f` into it). + +```bash +#!/bin/bash +# +# This is a worse implementation of `bmat cache rust`. +# + +rust_version="$(rustc --version | cut -d ' ' -f 2)" +rust_host="$(rustc --print=host-tuple)" +cargo_lock="$(sha256sum | cut -d ' ' -f 1)" +cache_key="rust-$rust_version-$rust_host-$cargo_lock" + +bmat cache restore "$cache_key" +cargo build --locked +find target/ -type f | bmat cache save "$cache_key" +``` + ## Licence Unless otherwise noted, all components are licenced under the [Mozilla Public diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 71ba37b..c67a0d9 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -18,14 +18,19 @@ bytes = { workspace = true } chrono = { workspace = true } futures = { workspace = true } glob = { workspace = true } +hex = { workspace = true } hiercmd = { workspace = true } ipnet = { workspace = true } libc = { workspace = true } +reqwest = { workspace = true } rusty_ulid = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } slog = { workspace = true } +tar = { workspace = true } +tempfile = { workspace = true } tokio = { workspace = true } +zstd = { workspace = true } # # I believe it is necessary to pull this in here, so that we can demand the # static linking of the vendored OpenSSL. We don't use it directly, but the diff --git a/agent/src/control/cache/mod.rs b/agent/src/control/cache/mod.rs new file mode 100644 index 0000000..dd0dc33 --- /dev/null +++ b/agent/src/control/cache/mod.rs @@ -0,0 +1,288 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +pub mod rust; + +use std::borrow::BorrowMut as _; +use std::fs::File; +use std::io::SeekFrom; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use anyhow::{anyhow, bail, Error, Result}; +use buildomat_common::{render_bytes, DurationExt as _}; +use reqwest::header::ETAG; +use reqwest::Client as ReqwestClient; +use tar::{Archive as TarArchive, Builder as TarBuilder}; +use tempfile::{NamedTempFile, TempPath}; +use tokio::fs::File as TokioFile; +use tokio::io::{AsyncReadExt, AsyncSeekExt as _, AsyncWriteExt as _}; +use tokio::sync::Semaphore; +use tokio::task::spawn_blocking; +use zstd::stream::read::Decoder as ZstdReadDecoder; +use zstd::stream::write::Encoder as ZstdWriteEncoder; + +use crate::control::protocol::Payload; +use crate::control::Stuff; + +const ZSTD_COMPRESSION_LEVEL: i32 = 3; +const PARALLEL_UPLOADS: usize = 30; + +pub async fn restore(stuff: &mut Stuff, name: &str) -> Result<()> { + /* + * Retrieve the presigned download URL. + */ + let download_url = + match stuff.req_retry(Payload::CacheUrl(name.into())).await? { + Payload::CacheUrlResponse(Some(url)) => { + eprintln!("cache hit: {name}"); + url + } + Payload::CacheUrlResponse(None) => { + eprintln!("cache miss: {name}"); + return Ok(()); + } + other => bail!("unexpected response: {other:?}"), + }; + + /* + * Download the archive into a temporary file. + */ + let start = Instant::now(); + let mut response = reqwest::get(&download_url).await?.error_for_status()?; + let mut temp_file = tokio_temp_file()?; + while let Some(mut item) = response.chunk().await? { + temp_file.as_file_mut().write_all_buf(item.borrow_mut()).await?; + } + let temp_path = temp_file.into_temp_path(); + eprintln!( + "downloaded cache in {}, cache size is {}", + start.elapsed().render(), + render_bytes(temp_path.metadata()?.len()) + ); + + /* + * Extract the contents of the cache into the current working directory. + * This is done within a spawn_blocking as decompressing in the async task + * risks stalling the executor. + */ + let start = Instant::now(); + spawn_blocking(move || { + let file = File::open(temp_path)?; + let mut archive = TarArchive::new(ZstdReadDecoder::new(file)?); + archive.unpack(std::env::current_dir()?)?; + Ok::<_, Error>(()) + }) + .await??; + eprintln!("extracted cache in {}", start.elapsed().render()); + + Ok(()) +} + +pub async fn save( + stuff: &mut Stuff, + name: &str, + paths: Vec, +) -> Result<()> { + /* + * We want to avoid generating the archive if the cache is already present, + * to avoid wasting CI cycles creating compressed archives. + * + * This check is only advisory, as the actual check is done when we begin + * the cache upload: we thus avoid retrying or handling errors. + */ + match stuff.req(Payload::CacheUrl(name.into())).await? { + Payload::CacheUrlResponse(Some(_)) => { + eprintln!("a cache named {name} already exists, skipping upload"); + return Ok(()); + } + _ => {} + } + + /* + * Build the archive that we will upload. + */ + let start = Instant::now(); + let paths_len = paths.len(); + let archive = spawn_blocking(move || create_archive(&paths)).await??; + let archive_size = archive.metadata()?.len(); + eprintln!( + "created archive with {} files in {}, archive size is {}", + paths_len, + start.elapsed().render(), + render_bytes(archive_size), + ); + + /* + * Begin an upload with the buildomat server, returning the pre-signed + * upload URLs we have to use, along with the relevant metadata needed to + * complete the upload. + */ + let upload = match stuff + .req_retry(Payload::BeginCacheUpload { + name: name.into(), + size_bytes: archive_size, + }) + .await? + { + Payload::BeginCacheUploadOk(upload) => upload, + Payload::BeginCacheUploadSkip => { + eprintln!("a cache named {name} already exists, skipping upload"); + return Ok(()); + } + other => bail!("unexpected response: {other:?}"), + }; + eprintln!("registered buildomat cache with ID {}", upload.cache_id); + + /* + * To upload caches we use S3's multipart uploads feature, allowing us to + * upload chunks of the file in separate requests. The reason why we chose + * to adopt them is reliability: very large upload requests are more likely + * to fail, and in case of errors we can retry uploading only the failed + * chunk rather than the whole file. + * + * Since we are already using multipart uploads, we can speed up the upload + * by uploading individual chunks in parallel. We thus start Tokio tasks + * for every chunk at the start of the upload, and use a semaphore to limit + * how many concurrent requests we send to S3. + */ + let start = Instant::now(); + let http = ReqwestClient::new(); + let rate_limit = Arc::new(Semaphore::new(PARALLEL_UPLOADS)); + let chunks_count = upload.chunk_upload_urls.len(); + let mut tasks = Vec::new(); + for (idx, upload_url) in upload.chunk_upload_urls.into_iter().enumerate() { + let http = http.clone(); + let rate_limit = rate_limit.clone(); + let archive = archive.to_path_buf(); + let chunk_size = upload.chunk_size_bytes; + tasks.push(tokio::spawn(async move { + /* + * Do not add any code before acquiring the rate limit permit, as + * that code would be executed for all chunks in parallel. + */ + let _permit = rate_limit.acquire().await?; + + let mut file = TokioFile::open(archive).await?; + file.seek(SeekFrom::Start(idx as u64 * chunk_size as u64)).await?; + + /* + * Each chunk other than the last will contain exactly as many + * bytes as requested by the server. The last chunk will contain + * the remainder of the file. + */ + let content = if idx == chunks_count - 1 { + let mut content = Vec::new(); + file.read_to_end(&mut content).await?; + content + } else { + let mut content = vec![0; chunk_size as _]; + file.read_exact(&mut content).await?; + content + }; + + /* + * Upload the chunk and return the ETag header. There is no need to + * authenticate, as the server returns presigned URLs. + */ + let mut attempt = 1; + loop { + let response = http + .put(&upload_url) + .body(content.clone()) + .send() + .await + .and_then(|result| result.error_for_status()); + match response { + Ok(response) => { + return Ok::<_, Error>( + response + .headers() + .get(ETAG) + .ok_or_else(|| anyhow!("no ETag in response"))? + .to_str()? + .to_string(), + ); + } + Err(e) => { + if attempt >= 5 { + bail!("failed to upload chunk {idx}: {e}"); + } + attempt += 1; + + eprintln!("failed to upload chunk {idx}, retrying..."); + tokio::time::sleep(Duration::from_secs(attempt)).await; + } + } + } + })); + } + + /* + * Join all the chunk upload tasks. It's load bearing that we join them in + * the same order as the upload URLs returned by the server, as we need to + * send the corresponding ETag header values in that order. + */ + let mut uploaded_etags = Vec::new(); + let mut poison = false; + for (idx, task) in tasks.into_iter().enumerate() { + match task.await? { + Ok(etag) => uploaded_etags.push(etag), + Err(e) => { + eprintln!("ERROR: failed to upload chunk {idx}: {e:?}"); + poison = true; + } + } + } + if poison { + bail!("one or more chunks failed to upload"); + } + + eprintln!("uploaded {chunks_count} chunks in {}", start.elapsed().render()); + + /* + * Complete the upload of the cache, making it available to other jobs. + */ + match stuff + .req_retry(Payload::CompleteCacheUpload { + cache_id: upload.cache_id, + uploaded_etags, + }) + .await? + { + Payload::Ack => {} + other => bail!("unexpected response: {other:?}"), + } + eprintln!("cache upload for {name} finished"); + + Ok(()) +} + +/* + * This function is sync because we need to compress things while archiving + * them, so we'd need to run parts of it within spawn_blocking anyway. Making + * the whole function sync and calling it within spawn_blocking lets us use the + * more convenient sync APIs for compression and archiving. + */ +fn create_archive(paths: &[PathBuf]) -> Result { + let mut archive = TarBuilder::new(ZstdWriteEncoder::new( + NamedTempFile::new()?, + ZSTD_COMPRESSION_LEVEL, + )?); + for path in paths { + archive.append_path(path)?; + } + archive.finish()?; + /* + * Calling .into_temp_path() closes the fd, while still ensuring the + * underlying file gets removed when dropped. + */ + Ok(archive.into_inner()?.finish()?.into_temp_path()) +} + +fn tokio_temp_file() -> Result> { + let (file, path) = NamedTempFile::new()?.into_parts(); + Ok(NamedTempFile::from_parts(TokioFile::from_std(file), path)) +} diff --git a/agent/src/control/cache/rust.rs b/agent/src/control/cache/rust.rs new file mode 100644 index 0000000..db4b45b --- /dev/null +++ b/agent/src/control/cache/rust.rs @@ -0,0 +1,313 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use crate::control::Stuff; +use anyhow::{anyhow, bail, Error, Result}; +use openssl::sha::Sha256; +use serde::Deserialize; +use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use tokio::task::spawn_blocking; + +pub async fn restore(stuff: &mut Stuff, cargo_toml: &Path) -> Result<()> { + let cargo_toml = cargo_toml.to_path_buf(); + let cache_key = spawn_blocking(move || { + let metadata = cargo_metadata(&cargo_toml)?; + cache_key(&metadata) + }) + .await??; + + super::restore(stuff, &cache_key).await +} + +pub async fn save(stuff: &mut Stuff, cargo_toml: &Path) -> Result<()> { + let cargo_toml = cargo_toml.to_path_buf(); + let (cache_key, files_to_cache) = spawn_blocking(move || { + let metadata = cargo_metadata(&cargo_toml)?; + Ok::<_, Error>((cache_key(&metadata)?, files_to_cache(&metadata)?)) + }) + .await??; + + super::save(stuff, &cache_key, files_to_cache).await +} + +fn files_to_cache(metadata: &CargoMetadata) -> Result> { + /* + * Caching the build artifacts of path dependencies has diminishing results, + * as those (for example within a workspace) tend to change a lot. The + * default behavior of Rust caches in buildomat is thus to only cache the + * artifacts of third-party dependencies. + */ + let mut cacheable_packages = HashSet::new(); + let mut cacheable_targets = HashSet::new(); + for package in &metadata.packages { + if is_cacheable_package(package)? { + cacheable_packages.insert(package.name.to_string()); + cacheable_targets.insert(package.name.replace('-', "_")); + for target in &package.targets { + cacheable_targets.insert(target.name.replace('-', "_")); + } + } + } + + let mut ftc = FilesToCache { + files: Vec::new(), + cacheable_packages, + cacheable_targets, + }; + + /* + * Cargo 1.91 introduced the split between "target directories" (containing + * the final artifacts, like executables) and "build directories" (dedicated + * to the intermediate artifacts). As we only want to cache intermediate + * artifacts we look within the build directory. + * + * Previous versions of Cargo didn't emit build_directory in the metadata + * though, so in those cases we have to look within the target directory. + */ + if let Some(build_dir) = &metadata.build_directory { + ftc.find_profile_directories(build_dir)?; + } else { + ftc.find_profile_directories(&metadata.target_directory)?; + } + + /* + * The buildomat cache implementation requires cached paths to be relative, + * but the code above generates absolute paths (we base our search on the + * path returned by `cargo metadata`, which is absolute). + */ + let current_dir = std::env::current_dir()?; + for file in &mut ftc.files { + *file = file + .strip_prefix(¤t_dir) + .map_err(|_| anyhow!("path {file:?} is not in the cwd"))? + .into(); + } + + Ok(ftc.files) +} + +struct FilesToCache { + files: Vec, + cacheable_packages: HashSet, + cacheable_targets: HashSet, +} + +impl FilesToCache { + /* + * "Profile directories" are the directories containing the build artifacts + * to cache. They often are target/debug and target/release, but when cross + * compiling they become target/$host/debug and target/$host/release. + * + * To handle all cases well, we recursively search within target/ for all + * directories that look like a profile directory. + */ + fn find_profile_directories(&mut self, path: &Path) -> Result<()> { + /* + * There is not really a "this is a profile directory" indicator, other + * than looking whether it has the structure of one. We thus check for + * the presence of the .fingerprint/ directory within it. + */ + if path.join(".fingerprint").exists() { + self.handle_profile_directory(path)?; + } else { + for entry in path.read_dir()? { + let entry = entry?; + if entry.file_type()?.is_dir() { + self.find_profile_directories(&entry.path())?; + } + } + } + Ok(()) + } + + fn handle_profile_directory(&mut self, path: &Path) -> Result<()> { + /* + * .fingerprint/ and build/ contain an entry per package. + */ + self.collect_directory_with_packages(&path.join(".fingerprint"))?; + self.collect_directory_with_packages(&path.join("build"))?; + /* + * deps/ contains an entry per package *and* one for each "target" + * included within the package. Targets are e.g. libraries, binaries, + * proc macros, tests, examples. + */ + self.collect_directory_with_targets(&path.join("deps"))?; + Ok(()) + } + + fn collect_directory_with_packages(&mut self, path: &Path) -> Result<()> { + for entry in path.read_dir()? { + let entry = entry?; + let file_name = entry.file_name(); + let Some((name, _hash)) = file_name + .to_str() + .ok_or_else(|| anyhow!("non-utf-8 name"))? + .rsplit_once('-') + else { + continue; + }; + if self.cacheable_packages.contains(name) { + self.collect_recursive(&entry.path())?; + } + } + Ok(()) + } + + fn collect_directory_with_targets(&mut self, path: &Path) -> Result<()> { + for entry in path.read_dir()? { + let entry = entry?; + let file_name = entry.file_name(); + let Some((name, _hash)) = file_name + .to_str() + .ok_or_else(|| anyhow!("non-utf-8 name"))? + .rsplit_once('-') + else { + continue; + }; + /* + * Directories with targets contains both files starting with the + * dependency name, and files starting with `lib${dependency}`. + * + * Note that we cannot use `name.trim_end_matches` to strip the lib + * prefix, as that method removes one *or more* occurrences, not + * just one. That causes problems for crates called "libc", as then + * "liblibc" would be trimmed to "c", rather than "libc". + */ + if self.cacheable_targets.contains(name) + || self + .cacheable_targets + .contains(name.strip_prefix("lib").unwrap_or(name)) + { + self.collect_recursive(&entry.path())?; + } + } + Ok(()) + } + + fn collect_recursive(&mut self, path: &Path) -> Result<()> { + if path.is_dir() { + for child in path.read_dir()? { + self.collect_recursive(&child?.path())?; + } + } else { + self.files.push(path.into()); + } + Ok(()) + } +} + +/** + * Check whether a Cargo package (as returned by `cargo metadata`) should be + * cached. We currently cache every package outside of the local workspace. + * + * https://doc.rust-lang.org/cargo/reference/pkgid-spec.html + */ +fn is_cacheable_package(package: &CargoPackage) -> Result { + let kind = package.id.split_once('+').ok_or_else(|| { + anyhow!("package id {} is not fully qualified", package.id) + })?; + match kind.0 { + "git" => Ok(true), + "registry" => Ok(true), + "path" => Ok(false), + other => bail!("unknown package kind: {other}"), + } +} + +/* + * The cache key of a Rust project needs to include the rustc version (caches + * are version-specific), which host it's being built on (as some caches depend + * on host-specific resources), and what is actually being cached. Since we are + * only caching third-party dependencies, "what's actually being cached" is + * neatly described by Cargo.lock, which we hash. + */ +fn cache_key(metadata: &CargoMetadata) -> Result { + let rustc_version = rustc_version()?; + + let lockfile_path = metadata.workspace_root.join("Cargo.lock"); + let mut lockfile_digest = Sha256::new(); + lockfile_digest.update(&std::fs::read(&lockfile_path)?); + let lockfile_digest = hex::encode(lockfile_digest.finish()); + + Ok(format!("rust-deps-{rustc_version}-{lockfile_digest}")) +} + +fn rustc_version() -> Result { + eprintln!("discovering the rustc version..."); + let rustc = Command::new("rustc") + .arg("-vV") + .stdout(Stdio::piped()) + .spawn()? + .wait_with_output()?; + if !rustc.status.success() { + bail!("invoking rustc failed with {}", rustc.status); + } + let stdout = std::str::from_utf8(&rustc.stdout)?; + + let kv = stdout + .lines() + .filter_map(|line| line.split_once(": ")) + .collect::>(); + + let Some(release) = kv.get("release") else { + bail!("missing release field in `rustc -vV`"); + }; + let Some(commit_date) = kv.get("commit-date") else { + bail!("missing commit-date field in `rustc -vV`"); + }; + let Some(host) = kv.get("host") else { + bail!("missing host field in `rustc -vV"); + }; + + if release.ends_with("-nightly") { + Ok(format!("nightly-{commit_date}-{host}")) + } else if release.ends_with("-beta") { + Ok(format!("beta-{commit_date}-{host}")) + } else { + Ok(format!("{release}-{host}")) + } +} + +fn cargo_metadata(cargo_toml: &Path) -> Result { + eprintln!("discovering metadata about the Cargo project..."); + let cargo = Command::new("cargo") + .arg("metadata") + .arg("--format-version=1") + .arg("--manifest-path") + .arg(cargo_toml) + .stdout(Stdio::piped()) + .spawn()? + .wait_with_output()?; + if !cargo.status.success() { + bail!("invoking cargo failed with {}", cargo.status); + } + Ok(serde_json::from_slice(&cargo.stdout)?) +} + +#[derive(Deserialize)] +struct CargoMetadata { + packages: Vec, + workspace_root: PathBuf, + target_directory: PathBuf, + /* + * The build_directory field was added in Rust 1.91.0, older versions won't + * return it. We thus have to make it optional. + */ + #[serde(default)] + build_directory: Option, +} + +#[derive(Deserialize)] +struct CargoPackage { + id: String, + name: String, + targets: Vec, +} + +#[derive(Deserialize)] +struct CargoTarget { + name: String, +} diff --git a/agent/src/control/mod.rs b/agent/src/control/mod.rs index d706b78..7358d0f 100644 --- a/agent/src/control/mod.rs +++ b/agent/src/control/mod.rs @@ -2,7 +2,12 @@ * Copyright 2026 Oxide Computer Company */ -use std::{io::Read, ops::Range, time::Duration}; +use std::{ + io::Read, + ops::Range, + path::{Path, PathBuf}, + time::Duration, +}; use anyhow::{bail, Result}; use bytes::BytesMut; @@ -15,6 +20,7 @@ use tokio::{ use protocol::{Decoder, FactoryInfo, Message, Payload}; +mod cache; pub(crate) mod protocol; pub(crate) mod server; @@ -116,6 +122,7 @@ pub async fn main() -> Result<()> { l.cmd("address", "manage IP addresses for this job", cmd!(cmd_address))?; l.cmd("process", "manage background processes", cmd!(cmd_process))?; l.cmd("factory", "factory information for this worker", cmd!(cmd_factory))?; + l.cmd("cache", "save and restore caches", cmd!(cmd_cache))?; l.hcmd("eng", "for working on and testing buildomat", cmd!(cmd_eng))?; sel!(l).run().await @@ -482,3 +489,84 @@ async fn cmd_factory_private(mut l: Level) -> Result<()> { Ok(()) } + +async fn cmd_cache(mut l: Level) -> Result<()> { + l.context_mut().connect().await?; + + l.cmd("rust", "cache Rust target directories", cmd!(cmd_cache_rust))?; + l.cmd("save", "low level: save a cache", cmd!(cmd_cache_save))?; + l.cmd("restore", "low level: restore a cache", cmd!(cmd_cache_restore))?; + + sel!(l).run().await +} + +async fn cmd_cache_save(mut l: Level) -> Result<()> { + l.usage_args(Some("CACHE_NAME")); + + let a = args!(l); + if a.args().len() != 1 { + bad_args!(l, "you need to provide a cache name"); + } + let name = &a.args()[0]; + + let mut paths = Vec::new(); + for line in std::io::stdin().lines() { + paths.push(PathBuf::from(line?)); + } + if paths.is_empty() { + bad_args!(l, "you need to provide at least one path via stdin"); + } + + cache::save(l.context_mut(), name, paths).await +} + +async fn cmd_cache_restore(mut l: Level) -> Result<()> { + l.usage_args(Some("CACHE_NAME")); + + let a = args!(l); + if a.args().len() != 1 { + bad_args!(l, "you need to provide a cache name"); + } + let name = &a.args()[0]; + + cache::restore(l.context_mut(), name).await +} + +async fn cmd_cache_rust(mut l: Level) -> Result<()> { + l.context_mut().connect().await?; + + l.cmd("save", "save a cache", cmd!(cmd_cache_rust_save))?; + l.cmd("restore", "restore a cache", cmd!(cmd_cache_rust_restore))?; + + sel!(l).run().await +} + +async fn cmd_cache_rust_save(mut l: Level) -> Result<()> { + l.usage_args(Some("CARGO_TOML")); + + let a = args!(l); + let cargo_toml = match a.args() { + [] => "Cargo.toml", + [arg] => arg, + _ => { + bad_args!(l, "only one Cargo.toml is supported"); + } + }; + + cache::rust::save(l.context_mut(), Path::new(cargo_toml)).await +} + +async fn cmd_cache_rust_restore(mut l: Level) -> Result<()> { + l.usage_args(Some("CARGO_TOML")); + + let a = args!(l); + let cargo_toml = match a.args() { + [] => "Cargo.toml", + [arg] => arg, + _ => { + bad_args!(l, "only one Cargo.toml is supported"); + } + }; + + cache::rust::restore(l.context_mut(), Path::new(cargo_toml)).await +} diff --git a/agent/src/control/protocol.rs b/agent/src/control/protocol.rs index ed0808b..af96cc9 100644 --- a/agent/src/control/protocol.rs +++ b/agent/src/control/protocol.rs @@ -1,5 +1,5 @@ /* - * Copyright 2023 Oxide Computer Company + * Copyright 2026 Oxide Computer Company */ use std::ffi::OsString; @@ -23,6 +23,13 @@ pub struct StoreEntry { pub secret: bool, } +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct BeginCacheUpload { + pub cache_id: String, + pub chunk_size_bytes: u32, + pub chunk_upload_urls: Vec, +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum Payload { Ack, @@ -48,6 +55,21 @@ pub enum Payload { FactoryInfo, FactoryInfoResult(FactoryInfo), + + CacheUrl(String), + CacheUrlResponse(Option), + + BeginCacheUpload { + name: String, + size_bytes: u64, + }, + BeginCacheUploadOk(BeginCacheUpload), + BeginCacheUploadSkip, + + CompleteCacheUpload { + cache_id: String, + uploaded_etags: Vec, + }, } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/agent/src/control/server.rs b/agent/src/control/server.rs index 29bfe9a..7160ccc 100644 --- a/agent/src/control/server.rs +++ b/agent/src/control/server.rs @@ -1,5 +1,5 @@ /* - * Copyright 2023 Oxide Computer Company + * Copyright 2026 Oxide Computer Company */ use std::{io::ErrorKind, os::unix::prelude::PermissionsExt, sync::Arc}; @@ -205,7 +205,10 @@ async fn handle_client_turn( | Payload::StorePut(..) | Payload::MetadataAddresses | Payload::ProcessStart { .. } - | Payload::FactoryInfo => { + | Payload::FactoryInfo + | Payload::CacheUrl(_) + | Payload::BeginCacheUpload { .. } + | Payload::CompleteCacheUpload { .. } => { /* * These are requests from the control program. Pass them * on to the main loop. diff --git a/agent/src/main.rs b/agent/src/main.rs index 4705440..26d78d3 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,5 +1,5 @@ /* - * Copyright 2024 Oxide Computer Company + * Copyright 2026 Oxide Computer Company */ #![allow(clippy::many_single_char_names)] @@ -62,7 +62,7 @@ mod os_constants { } use os_constants::*; -use crate::control::protocol::StoreEntry; +use crate::control::protocol::{BeginCacheUpload, StoreEntry}; #[derive(Serialize, Deserialize)] struct ConfigFile { @@ -1467,6 +1467,65 @@ async fn cmd_run(mut l: Level) -> Result<()> { Payload::Error("factory info not available".into()) } } + Payload::CacheUrl(name) => { + match cw + .client + .worker_cache_get() + .job(cw.job_id().unwrap()) + .name(name) + .send() + .await + .map(|res| res.into_inner()) + { + Ok(resp) => { + Payload::CacheUrlResponse(resp.download_url) + } + Err(e) => Payload::Error(e.to_string()), + } + } + Payload::BeginCacheUpload { name, size_bytes } => { + match cw + .client + .worker_cache_upload() + .job(cw.job_id().unwrap()) + .name(name) + .body(WorkerCacheUploadBody { + size_bytes: *size_bytes, + }) + .send() + .await + .map(|res| res.into_inner()) + { + Ok(WorkerCacheUploadResult::Upload { + chunk_size_bytes, + chunk_upload_urls, + cache_id, + }) => Payload::BeginCacheUploadOk(BeginCacheUpload { + cache_id, + chunk_size_bytes, + chunk_upload_urls, + }), + Ok(WorkerCacheUploadResult::Skip) => { + Payload::BeginCacheUploadSkip + } + Err(e) => Payload::Error(e.to_string()), + } + } + Payload::CompleteCacheUpload { cache_id, uploaded_etags } => { + match cw + .client + .worker_cache_upload_complete() + .cache_id(cache_id) + .body(WorkerCacheUploadCompleteBody { + uploaded_etags: uploaded_etags.clone(), + }) + .send() + .await + { + Ok(_) => Payload::Ack, + Err(e) => Payload::Error(e.to_string()), + } + } _ => Payload::Error("unexpected message type".to_string()), }; diff --git a/client/openapi.json b/client/openapi.json index e924e60..0c6baf3 100644 --- a/client/openapi.json +++ b/client/openapi.json @@ -1811,6 +1811,53 @@ } } }, + "/0/worker/cache-upload/{cache_id}/complete": { + "post": { + "operationId": "worker_cache_upload_complete", + "parameters": [ + { + "in": "path", + "name": "cache_id", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/WorkerCacheUploadCompleteBody" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Null", + "type": "string", + "enum": [ + null + ] + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/0/worker/diagnostics/complete": { "post": { "operationId": "worker_diagnostics_complete", @@ -1879,6 +1926,96 @@ } } }, + "/0/worker/job/{job}/cache/{name}": { + "get": { + "operationId": "worker_cache_get", + "parameters": [ + { + "in": "path", + "name": "job", + "required": true, + "schema": { + "type": "string" + } + }, + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/WorkerCacheGetResponse" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + }, + "post": { + "operationId": "worker_cache_upload", + "parameters": [ + { + "in": "path", + "name": "job", + "required": true, + "schema": { + "type": "string" + } + }, + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/WorkerCacheUploadBody" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/WorkerCacheUploadResult" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/0/worker/job/{job}/chunk": { "post": { "operationId": "worker_job_upload_chunk", @@ -3810,6 +3947,91 @@ "id" ] }, + "WorkerCacheGetResponse": { + "type": "object", + "properties": { + "download_url": { + "nullable": true, + "type": "string" + } + } + }, + "WorkerCacheUploadBody": { + "type": "object", + "properties": { + "size_bytes": { + "type": "integer", + "format": "uint64", + "minimum": 0 + } + }, + "required": [ + "size_bytes" + ] + }, + "WorkerCacheUploadCompleteBody": { + "type": "object", + "properties": { + "uploaded_etags": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "uploaded_etags" + ] + }, + "WorkerCacheUploadResult": { + "oneOf": [ + { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": [ + "upload" + ] + }, + "cache_id": { + "type": "string" + }, + "chunk_size_bytes": { + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "chunk_upload_urls": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "action", + "cache_id", + "chunk_size_bytes", + "chunk_upload_urls" + ] + }, + { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": [ + "skip" + ] + } + }, + "required": [ + "action" + ] + } + ] + }, "WorkerCompleteDiagnostics": { "type": "object", "properties": { diff --git a/common/src/lib.rs b/common/src/lib.rs index 37a1c4d..5c1ba9d 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,5 +1,5 @@ /* - * Copyright 2025 Oxide Computer Company + * Copyright 2026 Oxide Computer Company */ use std::io::{IsTerminal, Read}; @@ -215,3 +215,17 @@ pub enum StringOrBool { String(String), Bool(bool), } + +pub fn render_bytes(bytes: u64) -> String { + if bytes < 1000 { + return format!("{bytes} bytes"); + } + let mut bytes = bytes as f64 / 1000.0; + for unit in ["kB", "MB", "GB"] { + if bytes < 1000.0 { + return format!("{bytes:.2} {unit}"); + } + bytes /= 1000.0; + } + format!("{bytes:.2} TB") +} diff --git a/server/schema.sql b/server/schema.sql index 0e8f444..3b97dff 100644 --- a/server/schema.sql +++ b/server/schema.sql @@ -340,3 +340,29 @@ ALTER TABLE job ADD COLUMN -- v 57 CREATE INDEX job_purging_queue ON job (id, complete, time_archived, time_purged) WHERE complete = true AND time_archived IS NOT NULL AND time_purged IS NULL; + +-- v 58 +CREATE TABLE cache_file ( + id TEXT PRIMARY KEY, + owner TEXT NOT NULL, + name TEXT NOT NULL, + size_bytes INTEGER NOT NULL, + time_upload TEXT NOT NULL, + time_last_use TEXT NOT NULL, + + UNIQUE (owner, name) +); + +-- v 59 +CREATE TABLE cache_pending_upload ( + id TEXT PRIMARY KEY, + s3_upload_id TEXT NOT NULL, + owner TEXT NOT NULL, + name TEXT NOT NULL, + worker TEXT NOT NULL, + size_bytes INTEGER NOT NULL, + chunks INTEGER NOT NULL, + etags TEXT, + time_begin TEXT NOT NULL, + time_finish TEXT +); diff --git a/server/src/api/worker.rs b/server/src/api/worker.rs index 544b573..e6b693a 100644 --- a/server/src/api/worker.rs +++ b/server/src/api/worker.rs @@ -1,8 +1,14 @@ /* - * Copyright 2024 Oxide Computer Company + * Copyright 2026 Oxide Computer Company */ +use std::time::Duration; + +use aws_sdk_s3::presigning::PresigningConfig; + use super::prelude::*; +use crate::db::CacheFileId; +use buildomat_common::looks_like_a_ulid; trait JobOwns { fn owns(&self, log: &Logger, job: &db::Job) -> DSResult<()>; @@ -925,3 +931,252 @@ pub(crate) async fn worker_diagnostics_enable( Ok(HttpResponseUpdatedNoContent()) } + +#[derive(Deserialize, JsonSchema)] +pub(crate) struct WorkerCachePath { + job: String, + name: String, +} + +#[derive(Serialize, JsonSchema)] +pub(crate) struct WorkerCacheGetResponse { + download_url: Option, +} + +#[endpoint { + method = GET, + path = "/0/worker/job/{job}/cache/{name}" +}] +pub(crate) async fn worker_cache_get( + rqctx: RequestContext>, + path: TypedPath, +) -> DSResult> { + const DOWNLOAD_EXPIRY: Duration = Duration::from_secs(600); /* 10 minutes */ + + let c = rqctx.context(); + let log = &rqctx.log; + let path = path.into_inner(); + + let w = c.require_worker(log, &rqctx.request).await?; + let j = c.db.job(path.job()?).or_500()?; /* XXX */ + w.owns(log, &j)?; + + if let Some(cache) = c.db.cache_file(j.owner, &path.name).or_500()? { + c.db.record_cache_use(cache.id).or_500()?; + Ok(HttpResponseOk(WorkerCacheGetResponse { + download_url: Some( + c.s3.get_object() + .bucket(&c.config.storage.bucket) + .key(c.cache_object_key(cache.id)) + .presigned( + PresigningConfig::expires_in(DOWNLOAD_EXPIRY) + .expect("invalid presigned config"), + ) + .await + .or_500()? + .uri() + .to_string(), + ), + })) + } else { + Ok(HttpResponseOk(WorkerCacheGetResponse { download_url: None })) + } +} + +#[derive(Deserialize, JsonSchema)] +pub(crate) struct WorkerCacheUploadBody { + size_bytes: u64, +} + +#[derive(Serialize, JsonSchema)] +#[serde(tag = "action", rename_all = "snake_case")] +pub(crate) enum WorkerCacheUploadResult { + Upload { + cache_id: String, + chunk_size_bytes: u32, + chunk_upload_urls: Vec, + }, + Skip, +} + +impl WorkerCachePath { + fn job(&self) -> DSResult { + self.job.parse::().or_500() + } +} + +#[endpoint { + method = POST, + path = "/0/worker/job/{job}/cache/{name}" +}] +pub(crate) async fn worker_cache_upload( + rqctx: RequestContext>, + path: TypedPath, + body: TypedBody, +) -> DSResult> { + const UPLOAD_EXPIRY: Duration = Duration::from_secs(3600); /* 1 hour */ + const CHUNK_SIZE: i64 = 50 * 1024 * 1024; + + let c = rqctx.context(); + let log = &rqctx.log; + let path = path.into_inner(); + let size_bytes = body.into_inner().size_bytes; + + let w = c.require_worker(log, &rqctx.request).await?; + let j = c.db.job(path.job()?).or_500()?; /* XXX */ + w.owns(log, &j)?; + + if size_bytes > c.config.job.max_bytes_per_individual_cache() { + return Err(HttpError::for_client_error( + None, + ClientErrorStatusCode::BAD_REQUEST, + "cache file too large".into(), + )); + } + + let valid_name = path.name.chars().all(|chr| { + chr.is_ascii_alphanumeric() || chr == '-' || chr == '_' || chr == '.' + }); + if !valid_name || looks_like_a_ulid(&path.name) { + return Err(HttpError::for_client_error( + None, + ClientErrorStatusCode::BAD_REQUEST, + "invalid cache name: {}".into(), + )); + } + + /* + * If a cache with this name already exists, avoid uploading it again to + * reduce time spent uploading in the CI job. + * + * Note that this check is not perfect, and it's possible that a cache gets + * uploaded between this request returning and the upload completing. The + * complete upload endpoint has the load-bearing check, this one is just a + * short circuit to avoid wasting time. + */ + if c.db.cache_file(j.owner, &path.name).or_500()?.is_some() { + return Ok(HttpResponseOk(WorkerCacheUploadResult::Skip)); + } + + let cache_id = CacheFileId::generate(); + + /* + * Caches can reach considerable size, so we parallelize their upload with + * S3's multipart upload feature, speeding up uploads in CI. + * + * We create a multipart upload in S3, which gives us an S3 upload ID. We + * can then generate a bunch of pre-signed URLs, one per chunk, and send + * them to the client. The client will use them to upload the file, and + * then call worker_cache_put_complete to save the file. + */ + let s3_upload_id = c + .s3 + .create_multipart_upload() + .bucket(&c.config.storage.bucket) + .key(c.cache_object_key(cache_id)) + .send() + .await + .or_500()? + .upload_id + .ok_or_else(|| anyhow::anyhow!("missing upload ID in the AWS response")) + .or_500()?; + + let preconf = PresigningConfig::expires_in(UPLOAD_EXPIRY) + .expect("invalid presigned config"); + + let mut part_number: u32 = 0; + let mut remaining_bytes = size_bytes as i64; + let mut chunk_upload_urls = Vec::new(); + while remaining_bytes > 0 { + part_number += 1; + chunk_upload_urls.push( + c.s3.upload_part() + .bucket(&c.config.storage.bucket) + .key(c.cache_object_key(cache_id)) + .upload_id(&s3_upload_id) + .part_number(part_number as _) + .content_length(remaining_bytes.min(CHUNK_SIZE)) + .presigned(preconf.clone()) + .await + .or_500()? + .uri() + .to_string(), + ); + remaining_bytes -= CHUNK_SIZE; + } + + c.db.record_pending_cache_upload( + cache_id, + j.owner, + &path.name, + w.id, + size_bytes, + &s3_upload_id, + part_number, + ) + .or_500()?; + + Ok(HttpResponseOk(WorkerCacheUploadResult::Upload { + cache_id: cache_id.to_string(), + chunk_size_bytes: CHUNK_SIZE as _, + chunk_upload_urls, + })) +} + +#[derive(Deserialize, JsonSchema)] +pub(crate) struct WorkerCacheUploadCompletePath { + cache_id: String, +} + +#[derive(Deserialize, JsonSchema)] +pub(crate) struct WorkerCacheUploadCompleteBody { + uploaded_etags: Vec, +} + +#[endpoint { + method = POST, + path = "/0/worker/cache-upload/{cache_id}/complete" +}] +pub(crate) async fn worker_cache_upload_complete( + rqctx: RequestContext>, + path: TypedPath, + body: TypedBody, +) -> DSResult> { + let c = rqctx.context(); + let log = &rqctx.log; + let p = path.into_inner(); + + let upload_id = p.cache_id.parse::().or_500()?; + let etags = body.into_inner().uploaded_etags; + + let w = c.require_worker(log, &rqctx.request).await?; + let upload = c.db.pending_cache_upload(upload_id).or_500()?; + if upload.worker != w.id { + return Err(HttpError::for_client_error( + None, + ClientErrorStatusCode::FORBIDDEN, + "not your upload".into(), + )); + } + + /* + * Double-check that the client sent as many etags as we expect. + */ + if upload.chunks as usize != etags.len() { + return Err(HttpError::for_bad_request( + None, + format!("expected {} etags, found {}", upload.chunks, etags.len()), + )); + } + + /* + * According to the documentation for the request, "The processing of a + * CompleteMultipartUpload request could take several minutes to finalize". + * We don't want to block the CI job until the request succeedes. Rather, + * we mark the upload as finished and let a background task take care of + * persisting it to S3 and in the database. + */ + c.db.finish_cache_upload(upload.id, &etags).or_500()?; + + Ok(HttpResponseOk(())) +} diff --git a/server/src/caches.rs b/server/src/caches.rs new file mode 100644 index 0000000..2615f03 --- /dev/null +++ b/server/src/caches.rs @@ -0,0 +1,160 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use std::sync::Arc; +use std::time::Duration; + +use anyhow::{anyhow, Error, Result}; +use aws_sdk_s3::error::ProvideErrorMetadata as _; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; +use slog::{debug, error, info, warn, Logger}; + +use crate::db::CachePendingUpload; +use crate::Central; + +async fn persist_upload( + log: &Logger, + c: &Arc, + upload: &CachePendingUpload, +) -> Result<()> { + /* + * We cannot have more than one cache for the same owner and name. If one + * already exists, we have to discard this one, or inserting the upload in + * the database would fail. + * + * It's fine to do the check at the start of the function, as only one + * persist job will run at a time in the background task. + */ + if c.db.cache_file(upload.owner, &upload.name)?.is_some() { + warn!( + log, + "there is already a cache with owner {} and name {}, \ + discarding cache ID {}", + upload.owner, + upload.name, + upload.id, + ); + + /* + * Aborting the multipart upload will both cleanup the temporary chunks + * stored in S3 and avoid creating the actual object in S3. + */ + let response = + c.s3.abort_multipart_upload() + .bucket(&c.config.storage.bucket) + .key(c.cache_object_key(upload.id)) + .upload_id(&upload.s3_upload_id) + .send() + .await; + match response { + /* + * The NoSuchUpload error would happen if the multipart upload is + * not present in S3. In those cases the data is lost anyway, so we + * should discard the upload from our database instead of erroring + * out (which would lead to a retry a second later). + */ + Err(err) if err.code() != Some("NoSuchUpload") => { + return Err(Error::from(err).context(format!( + "failed to abort multipart upload {} for cache {}", + upload.s3_upload_id, upload.id + ))); + } + _ => { + c.db.discard_cache_upload(upload.id)?; + return Ok(()); + } + } + } + + info!(log, "persisting upload of cache {}", upload.id); + /* + * Note that AWS documents that completing a multipart upload could take + * several minutes in case of large objects. + */ + let response = + c.s3.complete_multipart_upload() + .bucket(&c.config.storage.bucket) + .key(c.cache_object_key(upload.id)) + .upload_id(&upload.s3_upload_id) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some( + upload + .etags + .as_ref() + .ok_or_else(|| { + anyhow!("missing etags in finished upload") + })? + .iter() + .enumerate() + .map(|(idx, etag)| { + CompletedPart::builder() + .part_number(idx as i32 + 1) + .e_tag(etag) + .build() + }) + .collect(), + )) + .build(), + ) + .send() + .await; + match response { + Ok(_) => { + c.db.persist_cache_upload(upload.id)?; + info!(log, "successfully persisted cache {}", upload.id); + Ok(()) + } + Err(err) if err.code() == Some("NoSuchUpload") => { + /* + * There might be a case where the Buildomat server crashes after + * completing the multipart upload but before updating the database. + * + * In the most cases that should not result in a problem, as the job + * would be retried and the S3 API appears to be idempotent when + * dealing with multipart uploads (that's undocumented though). + * + * In case Buildomat crashes for so long that we are past the + * idempotency window, S3 is documented to return a NoSuchUpload + * error. In that case we discard the cache upload to remove it + * from the queue: a worker will have to upload it again. + */ + c.db.discard_cache_upload(upload.id)?; + warn!(log, "multipart upload for cache {} disappeared", upload.id); + Ok(()) + } + Err(err) => Err(Error::from(err).context(format!( + "failed to complete multipart upload {} for cache {}", + upload.s3_upload_id, upload.id + ))), + } +} + +pub(crate) async fn persist_uploads_task(log: Logger, c: Arc) { + let delay = Duration::from_secs(1); + info!(log, "start persist cache uploads task"); + + loop { + tokio::time::sleep(delay).await; + + let to_persist = match c.db.finished_cache_uploads() { + Ok(to_persist) => to_persist, + Err(err) => { + error!(log, "failed to get finished cache uploads: {err}"); + continue; + } + }; + if to_persist.is_empty() { + debug!(log, "no cache uploads to persist"); + continue; + } + + info!(log, "found {} cache uploads to persist", to_persist.len()); + for upload in to_persist { + if let Err(err) = persist_upload(&log, &c, &upload).await { + error!(log, "failed to persist cache {}: {err}", upload.id); + } + } + } +} diff --git a/server/src/config.rs b/server/src/config.rs index 5115eda..ca87147 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -36,6 +36,8 @@ pub struct ConfigFileJob { pub max_runtime: u64, #[serde(default = "default_max_size_per_file_mb")] pub max_size_per_file_mb: u64, + #[serde(default = "default_max_size_per_individual_cache_mb")] + pub max_size_per_individual_cache_mb: u64, #[serde(default)] pub auto_archive: bool, #[serde(default)] @@ -52,6 +54,10 @@ impl ConfigFileJob { pub fn max_bytes_per_input(&self) -> u64 { self.max_size_per_file_mb.saturating_mul(1024 * 1024) } + + pub fn max_bytes_per_individual_cache(&self) -> u64 { + self.max_size_per_individual_cache_mb.saturating_mul(1024 * 1024) + } } fn default_max_size_per_file_mb() -> u64 { @@ -61,6 +67,13 @@ fn default_max_size_per_file_mb() -> u64 { 1024 } +fn default_max_size_per_individual_cache_mb() -> u64 { + /* + * By default, allow 10GB cache files to be uploaded: + */ + 10240 +} + fn default_purge_delay_msec() -> u64 { /* * By default, wait half a second after a successful purge before purging diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 9c78fb8..cd87117 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -1,5 +1,5 @@ /* - * Copyright 2025 Oxide Computer Company + * Copyright 2026 Oxide Computer Company */ use std::collections::HashMap; @@ -37,6 +37,7 @@ mod types { sqlite_ulid_new_type!(WorkerId); sqlite_ulid_new_type!(FactoryId); sqlite_ulid_new_type!(TargetId); + sqlite_ulid_new_type!(CacheFileId); pub use buildomat_database::{Dictionary, IsoDate, JsonValue}; } @@ -3088,4 +3089,160 @@ impl Database { Ok(nt) }) } + + pub fn cache_file( + &self, + owner: UserId, + name: &str, + ) -> DBResult> { + self.sql.tx(|h| { + h.get_row_opt( + Query::select() + .from(CacheFileDef::Table) + .columns(CacheFile::columns()) + .and_where(Expr::col(CacheFileDef::Owner).eq(owner)) + .and_where(Expr::col(CacheFileDef::Name).eq(name)) + .to_owned(), + ) + }) + } + + pub fn record_cache_use(&self, id: CacheFileId) -> DBResult<()> { + self.sql.tx_immediate(|h| { + h.exec_update( + Query::update() + .table(CacheFileDef::Table) + .value(CacheFileDef::TimeLastUse, IsoDate::now()) + .and_where(Expr::col(CacheFileDef::Id).eq(id)) + .to_owned(), + )?; + Ok(()) + }) + } + + pub fn record_pending_cache_upload( + &self, + id: CacheFileId, + owner: UserId, + name: &str, + worker: WorkerId, + size_bytes: u64, + s3_upload_id: &str, + chunks: u32, + ) -> DBResult<()> { + self.sql.tx_immediate(|h| { + h.exec_insert( + Query::insert() + .into_table(CachePendingUploadDef::Table) + .columns(CachePendingUpload::bare_columns()) + .values_panic([ + id.into(), + s3_upload_id.into(), + owner.into(), + name.into(), + worker.into(), + size_bytes.into(), + chunks.into(), + None::.into(), /* etags */ + IsoDate::now().into(), /* time_begin */ + None::.into(), /* time_finish */ + ]) + .to_owned(), + )?; + Ok(()) + }) + } + + pub fn pending_cache_upload( + &self, + id: CacheFileId, + ) -> DBResult { + self.sql.tx(|h| { + h.get_row( + Query::select() + .from(CachePendingUploadDef::Table) + .columns(CachePendingUpload::columns()) + .and_where(Expr::col(CachePendingUploadDef::Id).eq(id)) + .to_owned(), + ) + }) + } + + pub fn finish_cache_upload( + &self, + id: CacheFileId, + etags: &[String], + ) -> DBResult<()> { + self.sql.tx_immediate(|h| { + let count = h.exec_update( + Query::update() + .table(CachePendingUploadDef::Table) + .value(CachePendingUploadDef::Etags, etags.join(",")) + .value(CachePendingUploadDef::TimeFinish, IsoDate::now()) + .and_where(Expr::col(CachePendingUploadDef::Id).eq(id)) + .to_owned(), + )?; + if count == 1 { + Ok(()) + } else { + conflict!("no pending cache upload with ID {id}"); + } + }) + } + + pub fn finished_cache_uploads(&self) -> DBResult> { + self.sql.tx(|h| { + h.get_rows( + Query::select() + .from(CachePendingUploadDef::Table) + .columns(CachePendingUpload::columns()) + .and_where( + Expr::col(CachePendingUploadDef::TimeFinish) + .is_not_null(), + ) + .order_by(CachePendingUploadDef::TimeFinish, Order::Asc) + .to_owned(), + ) + }) + } + + pub fn persist_cache_upload(&self, id: CacheFileId) -> DBResult<()> { + let pending = self.pending_cache_upload(id)?; + + self.sql.tx_immediate(|h| { + h.exec_insert( + Query::insert() + .into_table(CacheFileDef::Table) + .columns(CacheFile::bare_columns()) + .values_panic([ + id.into(), + pending.owner.into(), + pending.name.into(), + pending.size_bytes.into(), + IsoDate::now().into(), + IsoDate::now().into(), + ]) + .to_owned(), + )?; + h.exec_delete( + Query::delete() + .from_table(CachePendingUploadDef::Table) + .and_where(Expr::col(CachePendingUploadDef::Id).eq(id)) + .to_owned(), + )?; + Ok(()) + }) + } + + pub fn discard_cache_upload(&self, id: CacheFileId) -> DBResult<()> { + self.sql.tx_immediate(|h| { + h.exec_delete( + Query::delete() + .from_table(CachePendingUploadDef::Table) + .and_where(Expr::col(CachePendingUploadDef::Id).eq(id)) + .to_owned(), + )?; + Ok(()) + }) + } } diff --git a/server/src/db/tables/cache_file.rs b/server/src/db/tables/cache_file.rs new file mode 100644 index 0000000..fcb1476 --- /dev/null +++ b/server/src/db/tables/cache_file.rs @@ -0,0 +1,53 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use super::sublude::*; + +#[derive(Debug, Clone)] +#[enum_def(prefix = "", suffix = "Def")] +pub struct CacheFile { + pub id: CacheFileId, + #[expect(unused)] + pub owner: UserId, + #[expect(unused)] + pub name: String, + #[expect(unused)] + pub size_bytes: DataSize, + #[expect(unused)] + pub time_upload: IsoDate, + #[expect(unused)] + pub time_last_use: IsoDate, +} + +impl FromRow for CacheFile { + fn columns() -> Vec { + [ + CacheFileDef::Id, + CacheFileDef::Owner, + CacheFileDef::Name, + CacheFileDef::SizeBytes, + CacheFileDef::TimeUpload, + CacheFileDef::TimeLastUse, + ] + .into_iter() + .map(|col| { + ColumnRef::TableColumn( + SeaRc::new(CacheFileDef::Table), + SeaRc::new(col), + ) + }) + .collect() + } + + fn from_row(row: &Row) -> rusqlite::Result { + Ok(CacheFile { + id: row.get(0)?, + owner: row.get(1)?, + name: row.get(2)?, + size_bytes: row.get(3)?, + time_upload: row.get(4)?, + time_last_use: row.get(5)?, + }) + } +} diff --git a/server/src/db/tables/cache_pending_upload.rs b/server/src/db/tables/cache_pending_upload.rs new file mode 100644 index 0000000..20539c7 --- /dev/null +++ b/server/src/db/tables/cache_pending_upload.rs @@ -0,0 +1,67 @@ +/* + * Copyright 2026 Oxide Computer Company + */ + +use super::sublude::*; + +#[derive(Debug, Clone)] +#[enum_def(prefix = "", suffix = "Def")] +pub struct CachePendingUpload { + pub id: CacheFileId, + /** + * Opaque ID given to us by S3's multipart upload API. + */ + pub s3_upload_id: String, + pub owner: UserId, + pub name: String, + pub worker: WorkerId, + pub size_bytes: DataSize, + pub chunks: u32, + pub etags: Option>, + #[expect(unused)] + pub time_begin: IsoDate, + #[expect(unused)] + pub time_finish: Option, +} + +impl FromRow for CachePendingUpload { + fn columns() -> Vec { + [ + CachePendingUploadDef::Id, + CachePendingUploadDef::S3UploadId, + CachePendingUploadDef::Owner, + CachePendingUploadDef::Name, + CachePendingUploadDef::Worker, + CachePendingUploadDef::SizeBytes, + CachePendingUploadDef::Chunks, + CachePendingUploadDef::Etags, + CachePendingUploadDef::TimeBegin, + CachePendingUploadDef::TimeFinish, + ] + .into_iter() + .map(|col| { + ColumnRef::TableColumn( + SeaRc::new(CachePendingUploadDef::Table), + SeaRc::new(col), + ) + }) + .collect() + } + + fn from_row(row: &Row) -> rusqlite::Result { + Ok(CachePendingUpload { + id: row.get(0)?, + s3_upload_id: row.get(1)?, + owner: row.get(2)?, + name: row.get(3)?, + worker: row.get(4)?, + size_bytes: row.get(5)?, + chunks: row.get(6)?, + etags: row.get::<_, Option>(7)?.map(|string| { + string.split(',').map(|s| s.to_string()).collect() + }), + time_begin: row.get(8)?, + time_finish: row.get(9)?, + }) + } +} diff --git a/server/src/db/tables/mod.rs b/server/src/db/tables/mod.rs index 0b3d96e..1263295 100644 --- a/server/src/db/tables/mod.rs +++ b/server/src/db/tables/mod.rs @@ -1,5 +1,5 @@ /* - * Copyright 2024 Oxide Computer Company + * Copyright 2026 Oxide Computer Company */ use rusqlite::Row; @@ -25,6 +25,8 @@ mod sublude { }; } +mod cache_file; +mod cache_pending_upload; mod factory; mod job; mod job_depend; @@ -44,6 +46,8 @@ mod user_privilege; mod worker; mod worker_event; +pub use cache_file::*; +pub use cache_pending_upload::*; pub use factory::*; pub use job::*; pub use job_depend::*; diff --git a/server/src/main.rs b/server/src/main.rs index b7358da..4f90e25 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,5 +1,5 @@ /* - * Copyright 2025 Oxide Computer Company + * Copyright 2026 Oxide Computer Company */ #![allow(clippy::many_single_char_names)] @@ -32,6 +32,7 @@ use slog::{error, info, o, warn, Logger}; mod api; mod archive; +mod caches; mod chunks; mod config; mod db; @@ -40,7 +41,7 @@ mod jobs; mod workers; use db::{ - AuthUser, Job, JobEvent, JobFile, JobFileId, JobId, JobOutput, + AuthUser, CacheFileId, Job, JobEvent, JobFile, JobFileId, JobId, JobOutput, JobOutputAndFile, Worker, WorkerEvent, }; @@ -106,6 +107,18 @@ impl MakeInternalError for serde_json::Result { } } +impl MakeInternalError for Result> +where + E: std::fmt::Debug, +{ + fn or_500(self) -> SResult { + self.map_err(|e| { + let msg = format!("AWS error: {:?}", e); + HttpError::for_internal_error(msg) + }) + } +} + struct FilePresignedUrl { pub info: String, pub url: String, @@ -800,6 +813,10 @@ impl Central { ) -> Result> { Ok(self.db.worker_events(worker.id, minseq, limit)?) } + + fn cache_object_key(&self, id: CacheFileId) -> String { + self.object_key("cache", &id.to_string()) + } } #[allow(dead_code)] @@ -995,6 +1012,9 @@ async fn main() -> Result<()> { ad.register(api::worker::worker_fail)?; ad.register(api::worker::worker_diagnostics_enable)?; ad.register(api::worker::worker_diagnostics_complete)?; + ad.register(api::worker::worker_cache_get)?; + ad.register(api::worker::worker_cache_upload)?; + ad.register(api::worker::worker_cache_upload_complete)?; ad.register(api::worker::worker_append)?; ad.register(api::worker::worker_job_append)?; ad.register(api::worker::worker_job_append_one)?; @@ -1127,6 +1147,12 @@ async fn main() -> Result<()> { .context("worker cleanup task failure") }); + let c0 = Arc::clone(&c); + let log0 = log.new(o!("component" => "persist_cache_uploads")); + let t_persist_cache_uploads = tokio::task::spawn(async move { + caches::persist_uploads_task(log0, c0).await + }); + let server = HttpServerStarter::new( #[allow(clippy::needless_update)] &ConfigDropshot { @@ -1150,6 +1176,9 @@ async fn main() -> Result<()> { _ = t_archive_jobs => bail!("archive jobs task stopped early"), _ = t_purge_jobs => bail!("purge jobs task stopped early"), _ = t_workers => bail!("worker cleanup task stopped early"), + _ = t_persist_cache_uploads => { + bail!("persist cache uploads task stopped early"); + } _ = server_task => bail!("server stopped early"), } }