Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions crates/forkd-controller/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub struct AppState {
/// of memory.bin; without a cap, an attacker can fill the disk by
/// firing many BRANCHes in parallel.
pub branch_sem: Arc<Semaphore>,
/// The configured maximum the `branch_sem` was constructed with.
/// Tracked separately for `/metrics` (`forkd_branch_concurrency_cap`)
/// because `Semaphore` doesn't expose its initial permit count.
pub branch_concurrency_cap: usize,
/// Scratch directory used for prewarm throwaway snapshots when
/// `CreateSandboxRequest::prewarm` is set. Mirror of
/// `DaemonConfig::prewarm_scratch_dir`.
Expand Down Expand Up @@ -178,6 +182,8 @@ async fn version() -> impl IntoResponse {

async fn metrics(State(s): State<SharedState>) -> impl IntoResponse {
let (snap_count, sb_count) = s.registry.counts();
let branches_in_flight = s.branch_in_flight.lock().len();
let branch_cap = s.branch_concurrency_cap;
// Prometheus text format. Keep names stable — exporters depend on them.
let body = format!(
"# HELP forkd_snapshots_total Number of snapshots known to the controller.\n\
Expand All @@ -186,6 +192,12 @@ async fn metrics(State(s): State<SharedState>) -> impl IntoResponse {
# HELP forkd_sandboxes_active Number of active sandboxes (child VMs).\n\
# TYPE forkd_sandboxes_active gauge\n\
forkd_sandboxes_active {sb_count}\n\
# HELP forkd_branches_in_flight Number of BRANCH operations currently writing memory.bin.\n\
# TYPE forkd_branches_in_flight gauge\n\
forkd_branches_in_flight {branches_in_flight}\n\
# HELP forkd_branch_concurrency_cap Configured maximum concurrent BRANCH operations.\n\
# TYPE forkd_branch_concurrency_cap gauge\n\
forkd_branch_concurrency_cap {branch_cap}\n\
# HELP forkd_build_info Build version of the controller binary.\n\
# TYPE forkd_build_info gauge\n\
forkd_build_info{{version=\"{BUILD_VERSION}\"}} 1\n"
Expand Down Expand Up @@ -1609,6 +1621,7 @@ mod tests {
snapshot_root,
branch_in_flight: Mutex::new(HashSet::new()),
branch_sem: Arc::new(Semaphore::new(DEFAULT_BRANCH_CONCURRENCY)),
branch_concurrency_cap: DEFAULT_BRANCH_CONCURRENCY,
prewarm_scratch_dir: std::env::temp_dir().join("forkd-test-prewarm"),
})
}
Expand Down Expand Up @@ -1663,6 +1676,61 @@ mod tests {
let s = std::str::from_utf8(&body).unwrap();
assert!(s.contains("forkd_sandboxes_active 0"));
assert!(s.contains("forkd_build_info"));
// BRANCH concurrency observability (see #177 / #179 follow-up).
assert!(s.contains("forkd_branches_in_flight 0"));
assert!(
s.contains(&format!(
"forkd_branch_concurrency_cap {DEFAULT_BRANCH_CONCURRENCY}"
)),
"expected cap to surface as the test_state default; got body:\n{s}"
);
}

#[tokio::test]
async fn metrics_branches_in_flight_tracks_slot_acquisitions() {
// Regression for the #179 follow-up: forkd_branches_in_flight
// must increment while a BranchSlot is held and decrement when
// it's dropped. Without this guarantee, operators can't size
// FORKD_BRANCH_CONCURRENCY empirically — which is the whole
// point of exposing it as a CLI flag.
let state = test_state();
let slot_a = state.try_acquire_branch_slot("t1").unwrap();
let slot_b = state.try_acquire_branch_slot("t2").unwrap();
let app = router(state.clone());
let resp = app
.oneshot(
Request::builder()
.uri("/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = to_bytes(resp.into_body(), 4096).await.unwrap();
let s = std::str::from_utf8(&body).unwrap();
assert!(
s.contains("forkd_branches_in_flight 2"),
"expected 2 in-flight branches while two slots are held; got:\n{s}"
);
// Drop both — the gauge must come back to 0.
drop(slot_a);
drop(slot_b);
let app = router(state);
let resp = app
.oneshot(
Request::builder()
.uri("/metrics")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = to_bytes(resp.into_body(), 4096).await.unwrap();
let s = std::str::from_utf8(&body).unwrap();
assert!(
s.contains("forkd_branches_in_flight 0"),
"expected gauge to return to 0 after slot drops; got:\n{s}"
);
}

#[tokio::test]
Expand Down Expand Up @@ -1757,6 +1825,7 @@ mod tests {
snapshot_root,
branch_in_flight: Mutex::new(HashSet::new()),
branch_sem: Arc::new(Semaphore::new(2)),
branch_concurrency_cap: 2,
prewarm_scratch_dir: std::env::temp_dir().join("forkd-test-prewarm"),
});
let _a = s.try_acquire_branch_slot("t1").unwrap();
Expand All @@ -1779,6 +1848,7 @@ mod tests {
snapshot_root,
branch_in_flight: Mutex::new(HashSet::new()),
branch_sem: Arc::new(Semaphore::new(1)),
branch_concurrency_cap: 1,
prewarm_scratch_dir: std::env::temp_dir().join("forkd-test-prewarm"),
});
let a = s.try_acquire_branch_slot("t1").unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/forkd-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub async fn run_daemon(cfg: DaemonConfig) -> Result<()> {
snapshot_root: cfg.snapshot_root.clone(),
branch_in_flight: Mutex::new(std::collections::HashSet::new()),
branch_sem: std::sync::Arc::new(tokio::sync::Semaphore::new(branch_concurrency)),
branch_concurrency_cap: branch_concurrency,
prewarm_scratch_dir: cfg.prewarm_scratch_dir.clone(),
});

Expand Down
Loading