Skip to content

Commit ae9e766

Browse files
authored
fix(server): prevent unbounded bus entry growth for sandbox IDs (#138)
Closes NVIDIA#27 Add remove() methods to TracingLogBus, PlatformEventBus, and SandboxWatchBus to clean up entries when sandboxes are deleted. Wire cleanup into both handle_deleted (K8s reconciler) and delete_sandbox (gRPC handler). Reorder watch_sandbox to validate sandbox existence before subscribing to buses, preventing entries for non-existent IDs. Add one-time sandbox validation at stream open in push_sandbox_logs. Co-authored-by: John Myers <johntmyers@users.noreply.github.com>
1 parent 024150e commit ae9e766

File tree

6 files changed

+253
-3
lines changed

6 files changed

+253
-3
lines changed

architecture/gateway.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,10 @@ All buses use `tokio::sync::broadcast` channels keyed by sandbox ID. Buffer size
340340

341341
Broadcast lag is translated to `Status::resource_exhausted` via `broadcast_to_status()`.
342342

343+
**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into both the `handle_deleted` reconciler (Kubernetes watcher) and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes.
344+
345+
**Validation:** `WatchSandbox` validates that the sandbox exists before subscribing to any bus, preventing entries from being created for non-existent IDs. `PushSandboxLogs` validates sandbox existence once on the first batch of the stream.
346+
343347
## Remote Exec via SSH
344348

345349
The `ExecSandbox` RPC (`crates/navigator-server/src/grpc.rs`) executes a command inside a sandbox pod over SSH and streams stdout/stderr/exit back to the client.

crates/navigator-server/src/grpc.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,23 @@ impl Navigator for NavigatorService {
205205

206206
// Spawn producer task.
207207
tokio::spawn(async move {
208-
// Subscribe to all buses BEFORE reading the initial snapshot to avoid
208+
// Validate that the sandbox exists BEFORE subscribing to any buses.
209+
// This prevents creating bus entries for non-existent sandbox IDs.
210+
match state.store.get_message::<Sandbox>(&sandbox_id).await {
211+
Ok(Some(_)) => {} // sandbox exists, proceed
212+
Ok(None) => {
213+
let _ = tx.send(Err(Status::not_found("sandbox not found"))).await;
214+
return;
215+
}
216+
Err(e) => {
217+
let _ = tx
218+
.send(Err(Status::internal(format!("fetch sandbox failed: {e}"))))
219+
.await;
220+
return;
221+
}
222+
}
223+
224+
// Subscribe to all buses BEFORE reading the snapshot to avoid
209225
// missing notifications that fire between the snapshot read and subscribe.
210226
let mut status_rx = if follow_status {
211227
Some(state.sandbox_watch_bus.subscribe(&sandbox_id))
@@ -228,7 +244,8 @@ impl Navigator for NavigatorService {
228244
None
229245
};
230246

231-
// Always start with a snapshot if present.
247+
// Re-read the snapshot now that we have subscriptions active
248+
// (avoids missing notifications between validate and subscribe).
232249
match state.store.get_message::<Sandbox>(&sandbox_id).await {
233250
Ok(Some(sandbox)) => {
234251
state.sandbox_index.update_from_sandbox(&sandbox);
@@ -253,6 +270,7 @@ impl Navigator for NavigatorService {
253270
}
254271
}
255272
Ok(None) => {
273+
// Sandbox was deleted between validate and subscribe — end stream.
256274
let _ = tx.send(Err(Status::not_found("sandbox not found"))).await;
257275
return;
258276
}
@@ -481,6 +499,11 @@ impl Navigator for NavigatorService {
481499
warn!(sandbox_id = %id, error = %e, "Failed to clean up store after delete");
482500
}
483501

502+
// Clean up bus entries to prevent unbounded memory growth.
503+
self.state.tracing_log_bus.remove(&id);
504+
self.state.tracing_log_bus.platform_event_bus.remove(&id);
505+
self.state.sandbox_watch_bus.remove(&id);
506+
484507
info!(
485508
sandbox_id = %id,
486509
sandbox_name = %sandbox.name,
@@ -1147,6 +1170,7 @@ impl Navigator for NavigatorService {
11471170
request: Request<tonic::Streaming<PushSandboxLogsRequest>>,
11481171
) -> Result<Response<PushSandboxLogsResponse>, Status> {
11491172
let mut stream = request.into_inner();
1173+
let mut validated = false;
11501174

11511175
while let Some(batch) = stream
11521176
.message()
@@ -1157,6 +1181,20 @@ impl Navigator for NavigatorService {
11571181
continue;
11581182
}
11591183

1184+
// Validate sandbox existence once at stream open (first batch).
1185+
// Subsequent batches trust the validated sandbox_id. If the sandbox
1186+
// is deleted mid-stream, bus remove() drops the sender and publish
1187+
// silently discards via `let _ = tx.send(...)`.
1188+
if !validated {
1189+
self.state
1190+
.store
1191+
.get_message::<Sandbox>(&batch.sandbox_id)
1192+
.await
1193+
.map_err(|e| Status::internal(format!("fetch sandbox failed: {e}")))?
1194+
.ok_or_else(|| Status::not_found("sandbox not found"))?;
1195+
validated = true;
1196+
}
1197+
11601198
// Cap lines per batch to prevent abuse.
11611199
for log in batch.logs.into_iter().take(100) {
11621200
let mut log = log;

crates/navigator-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ pub async fn run_server(config: Config, tracing_log_bus: TracingLogBus) -> Resul
128128
state.sandbox_client.clone(),
129129
state.sandbox_index.clone(),
130130
state.sandbox_watch_bus.clone(),
131+
state.tracing_log_bus.clone(),
131132
);
132133
spawn_kube_event_tailer(state.clone());
133134

crates/navigator-server/src/sandbox/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ pub fn spawn_sandbox_watcher(
217217
client: SandboxClient,
218218
index: crate::sandbox_index::SandboxIndex,
219219
watch_bus: crate::sandbox_watch::SandboxWatchBus,
220+
tracing_log_bus: crate::tracing_bus::TracingLogBus,
220221
) {
221222
let namespace = client.namespace().to_string();
222223
info!(namespace = %namespace, "Starting sandbox watcher");
@@ -240,7 +241,9 @@ pub fn spawn_sandbox_watcher(
240241
Event::Deleted(obj) => {
241242
let obj_name = obj.metadata.name.clone().unwrap_or_default();
242243
debug!(sandbox_name = %obj_name, "Received Deleted event from Kubernetes");
243-
if let Err(err) = handle_deleted(&store, &index, &watch_bus, obj).await {
244+
if let Err(err) =
245+
handle_deleted(&store, &index, &watch_bus, &tracing_log_bus, obj).await
246+
{
244247
warn!(sandbox_name = %obj_name, error = %err, "Failed to delete sandbox record");
245248
}
246249
}
@@ -363,6 +366,7 @@ async fn handle_deleted(
363366
store: &Store,
364367
index: &crate::sandbox_index::SandboxIndex,
365368
watch_bus: &crate::sandbox_watch::SandboxWatchBus,
369+
tracing_log_bus: &crate::tracing_bus::TracingLogBus,
366370
obj: DynamicObject,
367371
) -> Result<(), String> {
368372
let id = sandbox_id_from_object(&obj)?;
@@ -373,6 +377,12 @@ async fn handle_deleted(
373377
debug!(sandbox_id = %id, deleted, "Deleted sandbox record");
374378
index.remove_sandbox(&id);
375379
watch_bus.notify(&id);
380+
381+
// Clean up bus entries to prevent unbounded memory growth.
382+
tracing_log_bus.remove(&id);
383+
tracing_log_bus.platform_event_bus.remove(&id);
384+
watch_bus.remove(&id);
385+
376386
Ok(())
377387
}
378388

crates/navigator-server/src/sandbox_watch.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ impl SandboxWatchBus {
5757
pub fn subscribe(&self, sandbox_id: &str) -> broadcast::Receiver<()> {
5858
self.sender_for(sandbox_id).subscribe()
5959
}
60+
61+
/// Remove the bus entry for the given sandbox id.
62+
///
63+
/// This drops the broadcast sender, closing any active receivers with
64+
/// `RecvError::Closed`.
65+
pub fn remove(&self, sandbox_id: &str) {
66+
let mut inner = self.inner.lock().expect("sandbox watch bus lock poisoned");
67+
inner.remove(sandbox_id);
68+
}
6069
}
6170

6271
/// Spawn a background Kubernetes Event tailer.
@@ -160,6 +169,53 @@ fn map_kube_event_to_platform(
160169
))
161170
}
162171

172+
#[cfg(test)]
173+
mod tests {
174+
use super::*;
175+
176+
#[test]
177+
fn sandbox_watch_bus_remove_cleans_up() {
178+
let bus = SandboxWatchBus::new();
179+
let sandbox_id = "sb-1";
180+
181+
let mut rx = bus.subscribe(sandbox_id);
182+
183+
// Notify and receive
184+
bus.notify(sandbox_id);
185+
assert!(rx.try_recv().is_ok());
186+
187+
// Remove
188+
bus.remove(sandbox_id);
189+
190+
// Receiver should be closed
191+
match rx.try_recv() {
192+
Err(broadcast::error::TryRecvError::Closed) => {} // expected
193+
other => panic!("expected Closed, got {other:?}"),
194+
}
195+
}
196+
197+
#[test]
198+
fn sandbox_watch_bus_subscribe_after_remove_creates_fresh_channel() {
199+
let bus = SandboxWatchBus::new();
200+
let sandbox_id = "sb-2";
201+
202+
let _old_rx = bus.subscribe(sandbox_id);
203+
bus.remove(sandbox_id);
204+
205+
// New subscription should work
206+
let mut new_rx = bus.subscribe(sandbox_id);
207+
bus.notify(sandbox_id);
208+
assert!(new_rx.try_recv().is_ok());
209+
}
210+
211+
#[test]
212+
fn sandbox_watch_bus_remove_nonexistent_is_noop() {
213+
let bus = SandboxWatchBus::new();
214+
// Should not panic
215+
bus.remove("nonexistent");
216+
}
217+
}
218+
163219
/// Helper to translate broadcast lag into a gRPC status.
164220
pub fn broadcast_to_status(err: broadcast::error::RecvError) -> Status {
165221
match err {

crates/navigator-server/src/tracing_bus.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ impl TracingLogBus {
7575
self.sender_for(sandbox_id).subscribe()
7676
}
7777

78+
/// Remove all bus entries for the given sandbox id.
79+
///
80+
/// This drops the broadcast sender (closing any active receivers with
81+
/// `RecvError::Closed`) and frees the tail buffer.
82+
pub fn remove(&self, sandbox_id: &str) {
83+
let mut inner = self.inner.lock().expect("tracing bus lock poisoned");
84+
inner.per_id.remove(sandbox_id);
85+
inner.tails.remove(sandbox_id);
86+
}
87+
7888
pub fn tail(&self, sandbox_id: &str, max: usize) -> Vec<SandboxStreamEvent> {
7989
let inner = self.inner.lock().expect("tracing bus lock poisoned");
8090
inner
@@ -186,6 +196,129 @@ fn current_time_ms() -> Option<i64> {
186196
i64::try_from(now.as_millis()).ok()
187197
}
188198

199+
#[cfg(test)]
200+
mod tests {
201+
use super::*;
202+
203+
fn make_log_event(sandbox_id: &str, message: &str) -> SandboxLogLine {
204+
SandboxLogLine {
205+
sandbox_id: sandbox_id.to_string(),
206+
timestamp_ms: 1000,
207+
level: "INFO".to_string(),
208+
target: "test".to_string(),
209+
message: message.to_string(),
210+
source: "gateway".to_string(),
211+
fields: HashMap::new(),
212+
}
213+
}
214+
215+
#[test]
216+
fn tracing_log_bus_remove_cleans_up_all_maps() {
217+
let bus = TracingLogBus::new();
218+
let sandbox_id = "sb-1";
219+
220+
// Create entries via subscribe and publish
221+
let _rx = bus.subscribe(sandbox_id);
222+
bus.publish_external(make_log_event(sandbox_id, "hello"));
223+
224+
// Verify entries exist
225+
assert_eq!(bus.tail(sandbox_id, 10).len(), 1);
226+
227+
// Remove
228+
bus.remove(sandbox_id);
229+
230+
// Verify entries are gone
231+
assert!(bus.tail(sandbox_id, 10).is_empty());
232+
}
233+
234+
#[test]
235+
fn tracing_log_bus_subscribe_after_remove_creates_fresh_channel() {
236+
let bus = TracingLogBus::new();
237+
let sandbox_id = "sb-2";
238+
239+
// Create and remove
240+
bus.publish_external(make_log_event(sandbox_id, "old message"));
241+
bus.remove(sandbox_id);
242+
243+
// Subscribe again — should get a fresh channel with no history
244+
let mut rx = bus.subscribe(sandbox_id);
245+
assert!(bus.tail(sandbox_id, 10).is_empty());
246+
247+
// New publish should reach the new subscriber
248+
bus.publish_external(make_log_event(sandbox_id, "new message"));
249+
let evt = rx.try_recv().expect("should receive new event");
250+
assert!(evt.payload.is_some());
251+
}
252+
253+
#[test]
254+
fn tracing_log_bus_remove_closes_active_receivers() {
255+
let bus = TracingLogBus::new();
256+
let sandbox_id = "sb-3";
257+
258+
let mut rx = bus.subscribe(sandbox_id);
259+
260+
// Remove drops the sender
261+
bus.remove(sandbox_id);
262+
263+
// Existing receiver should get Closed error
264+
match rx.try_recv() {
265+
Err(broadcast::error::TryRecvError::Closed) => {} // expected
266+
other => panic!("expected Closed, got {other:?}"),
267+
}
268+
}
269+
270+
#[test]
271+
fn tracing_log_bus_remove_nonexistent_is_noop() {
272+
let bus = TracingLogBus::new();
273+
// Should not panic
274+
bus.remove("nonexistent");
275+
}
276+
277+
#[test]
278+
fn platform_event_bus_remove_cleans_up() {
279+
let bus = PlatformEventBus::new();
280+
let sandbox_id = "sb-4";
281+
282+
let mut rx = bus.subscribe(sandbox_id);
283+
284+
// Publish an event
285+
let evt = SandboxStreamEvent { payload: None };
286+
bus.publish(sandbox_id, evt);
287+
assert!(rx.try_recv().is_ok());
288+
289+
// Remove
290+
bus.remove(sandbox_id);
291+
292+
// Receiver should be closed
293+
match rx.try_recv() {
294+
Err(broadcast::error::TryRecvError::Closed) => {} // expected
295+
other => panic!("expected Closed, got {other:?}"),
296+
}
297+
}
298+
299+
#[test]
300+
fn platform_event_bus_subscribe_after_remove_creates_fresh_channel() {
301+
let bus = PlatformEventBus::new();
302+
let sandbox_id = "sb-5";
303+
304+
let _old_rx = bus.subscribe(sandbox_id);
305+
bus.remove(sandbox_id);
306+
307+
// New subscription should work
308+
let mut new_rx = bus.subscribe(sandbox_id);
309+
let evt = SandboxStreamEvent { payload: None };
310+
bus.publish(sandbox_id, evt);
311+
assert!(new_rx.try_recv().is_ok());
312+
}
313+
314+
#[test]
315+
fn platform_event_bus_remove_nonexistent_is_noop() {
316+
let bus = PlatformEventBus::new();
317+
// Should not panic
318+
bus.remove("nonexistent");
319+
}
320+
}
321+
189322
/// Separate bus for platform event stream events.
190323
///
191324
/// This keeps platform events isolated from tracing capture.
@@ -220,4 +353,12 @@ impl PlatformEventBus {
220353
let tx = self.sender_for(sandbox_id);
221354
let _ = tx.send(event);
222355
}
356+
357+
/// Remove the bus entry for the given sandbox id.
358+
///
359+
/// This drops the broadcast sender, closing any active receivers.
360+
pub(crate) fn remove(&self, sandbox_id: &str) {
361+
let mut inner = self.inner.lock().expect("platform event bus lock poisoned");
362+
inner.remove(sandbox_id);
363+
}
223364
}

0 commit comments

Comments
 (0)