Skip to content

Commit 88e7f1c

Browse files
committed
feat(guest-agent,orchestrator): generalize op_id to all guest agent communications
Every request now carries an op_id (auto-generated UUID4 hex on the Python side), and every response echoes it back. The Rust guest-agent extracts op_id from raw JSON before serde deserialization (two-phase parse), then injects it into all outgoing messages via a new ResponseWriter struct (using entry().or_insert_with() to preserve existing op_id on file transfer responses). On the Python side, DualPortChannel._with_op_id() context manager handles op_id generation, queue registration, command dispatch, and cleanup — used by both send_request() and stream_messages(). This eliminates the shared default queue that caused stale messages (e.g. PongMessage from reconnection probes) to pollute subsequent operations. Key changes: - Rust: ResponseWriter wraps mpsc::Sender + op_id, replaces direct send_response/send_streaming_error calls in all handler paths - Rust: Remove dead CmdError::Reply.op_id field, with_op_id() method, and CmdError::io() second parameter (ResponseWriter handles injection) - Rust: Remove ActiveWriteHandle.op_id field and unused write_tx param from handle_write_file - Python: GuestAgentRequest base gets optional op_id; GuestAgentResponse base class added with optional op_id for all response types - Python: FileOpDispatcher routes ALL messages by op_id; _default_queue, get_default_message(), stream_events() removed as dead code - Python: Messages without op_id logged and discarded (prevents memory leak) - Python: ValidationError guard in dispatch loop prevents malformed messages from killing all routing - Python: RuntimeError added to _probe_guest_ready exception tuple - Python: Remove send_raw_request from both GuestChannel protocol and implementations (replaced by op_id-routed send_request) - Python: OP_QUEUE_DEPTH=64 constant for bounded per-op queues
1 parent b0673ae commit 88e7f1c

10 files changed

Lines changed: 1024 additions & 630 deletions

File tree

guest-agent/src/connection.rs

Lines changed: 200 additions & 150 deletions
Large diffs are not rendered by default.

guest-agent/src/error.rs

Lines changed: 267 additions & 49 deletions
Large diffs are not rendered by default.

guest-agent/src/file_io.rs

Lines changed: 42 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
44
use tokio::sync::mpsc;
55

66
use crate::constants::*;
7-
use crate::error::{CmdError, send_response};
7+
use crate::error::{CmdError, ResponseWriter};
88
use crate::types::{FileEntry, GuestResponse};
99

1010
// ============================================================================
@@ -67,7 +67,6 @@ pub(crate) struct ActiveWriteHandle {
6767
pub(crate) chunk_tx: mpsc::Sender<Vec<u8>>,
6868
pub(crate) task: tokio::task::JoinHandle<Result<WriteResult, WriteError>>,
6969
pub(crate) path_display: String,
70-
pub(crate) op_id: String,
7170
}
7271

7372
pub(crate) struct WriteResult {
@@ -149,105 +148,87 @@ pub(crate) fn validate_file_path(relative_path: &str) -> Result<std::path::PathB
149148
pub(crate) async fn handle_read_file(
150149
op_id: &str,
151150
path: &str,
152-
write_tx: &mpsc::Sender<Vec<u8>>,
151+
writer: &ResponseWriter,
153152
) -> Result<(), CmdError> {
154-
let resolved_path = validate_file_path(path).map_err(|e| {
155-
CmdError::validation(format!("Invalid path '{path}': {e}")).with_op_id(op_id)
156-
})?;
153+
let resolved_path = validate_file_path(path)
154+
.map_err(|e| CmdError::validation(format!("Invalid path '{path}': {e}")))?;
157155

158156
if resolved_path == std::path::Path::new(SANDBOX_ROOT) {
159-
return Err(CmdError::validation("Cannot read a directory").with_op_id(op_id));
157+
return Err(CmdError::validation("Cannot read a directory"));
160158
}
161159

162-
let canonical_path = tokio::fs::canonicalize(&resolved_path).await.map_err(|e| {
163-
CmdError::io(
164-
format!("File not found or inaccessible '{path}': {e}"),
165-
Some(op_id),
166-
)
167-
})?;
160+
let canonical_path = tokio::fs::canonicalize(&resolved_path)
161+
.await
162+
.map_err(|e| CmdError::io(format!("File not found or inaccessible '{path}': {e}")))?;
168163

169164
let sandbox_canonical = tokio::fs::canonicalize(SANDBOX_ROOT)
170165
.await
171166
.unwrap_or_else(|_| std::path::PathBuf::from(SANDBOX_ROOT));
172167
if !canonical_path.starts_with(&sandbox_canonical) {
173-
return Err(
174-
CmdError::validation(format!("Path '{path}' resolves outside sandbox"))
175-
.with_op_id(op_id),
176-
);
168+
return Err(CmdError::validation(format!(
169+
"Path '{path}' resolves outside sandbox"
170+
)));
177171
}
178172

179173
let metadata = tokio::fs::metadata(&canonical_path)
180174
.await
181-
.map_err(|e| CmdError::io(format!("Cannot read '{path}': {e}"), Some(op_id)))?;
175+
.map_err(|e| CmdError::io(format!("Cannot read '{path}': {e}")))?;
182176

183177
if metadata.is_dir() {
184-
return Err(
185-
CmdError::validation(format!("'{path}' is a directory, not a file")).with_op_id(op_id),
186-
);
178+
return Err(CmdError::validation(format!(
179+
"'{path}' is a directory, not a file"
180+
)));
187181
}
188182
if metadata.len() as usize > MAX_FILE_SIZE_BYTES {
189183
return Err(CmdError::validation(format!(
190184
"File too large: {} bytes (max {})",
191185
metadata.len(),
192186
MAX_FILE_SIZE_BYTES
193-
))
194-
.with_op_id(op_id));
187+
)));
195188
}
196189

197190
let file_size = metadata.len();
198191
let file = std::fs::File::open(&canonical_path)
199-
.map_err(|e| CmdError::io(format!("Failed to read '{path}': {e}"), Some(op_id)))?;
192+
.map_err(|e| CmdError::io(format!("Failed to read '{path}': {e}")))?;
200193

201-
let mut encoder =
202-
zstd::stream::read::Encoder::new(file, FILE_TRANSFER_ZSTD_LEVEL).map_err(|e| {
203-
CmdError::io(
204-
format!("Compression init failed for '{path}': {e}"),
205-
Some(op_id),
206-
)
207-
})?;
194+
let mut encoder = zstd::stream::read::Encoder::new(file, FILE_TRANSFER_ZSTD_LEVEL)
195+
.map_err(|e| CmdError::io(format!("Compression init failed for '{path}': {e}")))?;
208196

209197
let mut buf = vec![0u8; FILE_TRANSFER_CHUNK_SIZE];
210198
loop {
211199
let n = match std::io::Read::read(&mut encoder, &mut buf) {
212200
Ok(0) => break,
213201
Ok(n) => n,
214202
Err(e) => {
215-
return Err(CmdError::io(format!("Compression error: {e}"), Some(op_id)));
203+
return Err(CmdError::io(format!("Compression error: {e}")));
216204
}
217205
};
218206
let chunk_b64 = BASE64.encode(&buf[..n]);
219-
send_response(
220-
write_tx,
221-
&GuestResponse::FileChunk {
207+
writer
208+
.send(&GuestResponse::FileChunk {
222209
op_id: op_id.to_string(),
223210
data: chunk_b64,
224-
},
225-
)
226-
.await?;
211+
})
212+
.await?;
227213
}
228214

229-
send_response(
230-
write_tx,
231-
&GuestResponse::FileReadComplete {
215+
writer
216+
.send(&GuestResponse::FileReadComplete {
232217
op_id: op_id.to_string(),
233218
path: path.to_string(),
234219
size: file_size,
235-
},
236-
)
237-
.await
220+
})
221+
.await
238222
}
239223

240224
/// Handle list_files command: read directory entries.
241-
pub(crate) async fn handle_list_files(
242-
path: &str,
243-
write_tx: &mpsc::Sender<Vec<u8>>,
244-
) -> Result<(), CmdError> {
225+
pub(crate) async fn handle_list_files(path: &str, writer: &ResponseWriter) -> Result<(), CmdError> {
245226
let resolved_path = validate_file_path(path)
246227
.map_err(|e| CmdError::validation(format!("Invalid path '{path}': {e}")))?;
247228

248229
let mut read_dir = tokio::fs::read_dir(&resolved_path)
249230
.await
250-
.map_err(|e| CmdError::io(format!("Cannot list '{path}': {e}"), None))?;
231+
.map_err(|e| CmdError::io(format!("Cannot list '{path}': {e}")))?;
251232

252233
let mut entries = Vec::new();
253234
while let Ok(Some(entry)) = read_dir.next_entry().await {
@@ -264,14 +245,12 @@ pub(crate) async fn handle_list_files(
264245

265246
entries.sort_by(|a, b| a.name.cmp(&b.name));
266247

267-
send_response(
268-
write_tx,
269-
&GuestResponse::FileList {
248+
writer
249+
.send(&GuestResponse::FileList {
270250
path: path.to_string(),
271251
entries,
272-
},
273-
)
274-
.await
252+
})
253+
.await
275254
}
276255

277256
/// Set up a pipelined file write operation.
@@ -281,25 +260,20 @@ pub(crate) async fn handle_write_file(
281260
op_id: &str,
282261
path: &str,
283262
make_executable: bool,
284-
write_tx: &mpsc::Sender<Vec<u8>>,
285263
) -> Result<ActiveWriteHandle, CmdError> {
286-
let resolved_path = validate_file_path(path).map_err(|e| {
287-
CmdError::validation(format!("Invalid path '{path}': {e}")).with_op_id(op_id)
288-
})?;
264+
let resolved_path = validate_file_path(path)
265+
.map_err(|e| CmdError::validation(format!("Invalid path '{path}': {e}")))?;
289266

290267
if resolved_path == std::path::Path::new(SANDBOX_ROOT) {
291-
return Err(
292-
CmdError::validation("Cannot write to sandbox root directory").with_op_id(op_id),
293-
);
268+
return Err(CmdError::validation(
269+
"Cannot write to sandbox root directory",
270+
));
294271
}
295272

296273
if let Some(parent) = resolved_path.parent()
297274
&& let Err(e) = std::fs::create_dir_all(parent)
298275
{
299-
return Err(CmdError::io(
300-
format!("Failed to create directories: {e}"),
301-
Some(op_id),
302-
));
276+
return Err(CmdError::io(format!("Failed to create directories: {e}")));
303277
}
304278

305279
let tmp_path = resolved_path
@@ -308,7 +282,7 @@ pub(crate) async fn handle_write_file(
308282
.join(format!(".wr.{op_id}.tmp"));
309283

310284
let file = std::fs::File::create(&tmp_path)
311-
.map_err(|e| CmdError::io(format!("Failed to create temp file: {e}"), Some(op_id)))?;
285+
.map_err(|e| CmdError::io(format!("Failed to create temp file: {e}")))?;
312286

313287
let counting_writer = CountingWriter {
314288
inner: file,
@@ -319,10 +293,7 @@ pub(crate) async fn handle_write_file(
319293
Ok(d) => d,
320294
Err(e) => {
321295
let _ = std::fs::remove_file(&tmp_path);
322-
return Err(CmdError::io(
323-
format!("Decompression init failed: {e}"),
324-
Some(op_id),
325-
));
296+
return Err(CmdError::io(format!("Decompression init failed: {e}")));
326297
}
327298
};
328299

@@ -391,17 +362,10 @@ pub(crate) async fn handle_write_file(
391362
})
392363
});
393364

394-
// Note: handle_write_file previously returned Ok(None) on validation errors.
395-
// Now those are Err(CmdError::Reply), and success always returns the handle.
396-
// The unused write_tx parameter is kept for API consistency; validation errors
397-
// now propagate via CmdError to the dispatcher.
398-
let _ = write_tx;
399-
400365
Ok(ActiveWriteHandle {
401366
chunk_tx,
402367
task,
403368
path_display: path.to_string(),
404-
op_id: op_id.to_string(),
405369
})
406370
}
407371

guest-agent/src/packages.rs

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Package installation (pip, bun) with validation.
22
33
use tokio::process::Command;
4-
use tokio::sync::mpsc;
54

65
use crate::constants::*;
76
use crate::error::*;
@@ -115,7 +114,7 @@ pub(crate) fn validate_package_name(pkg: &str) -> Result<(), String> {
115114
pub(crate) async fn install_packages(
116115
language: Language,
117116
packages: &[String],
118-
write_tx: &mpsc::Sender<Vec<u8>>,
117+
writer: &ResponseWriter,
119118
) -> Result<(), CmdError> {
120119
use std::time::Instant;
121120
use tokio::time::Duration;
@@ -227,35 +226,29 @@ pub(crate) async fn install_packages(
227226
}
228227

229228
if !stdout_lines.is_empty() {
230-
send_response(
231-
write_tx,
232-
&GuestResponse::Stdout {
229+
writer
230+
.send(&GuestResponse::Stdout {
233231
chunk: stdout_lines.join("\n") + "\n",
234-
},
235-
)
236-
.await?;
232+
})
233+
.await?;
237234
}
238235

239236
if !stderr_lines.is_empty() {
240-
send_response(
241-
write_tx,
242-
&GuestResponse::Stderr {
237+
writer
238+
.send(&GuestResponse::Stderr {
243239
chunk: stderr_lines.join("\n") + "\n",
244-
},
245-
)
246-
.await?;
240+
})
241+
.await?;
247242
}
248243

249-
send_response(
250-
write_tx,
251-
&GuestResponse::Complete {
244+
writer
245+
.send(&GuestResponse::Complete {
252246
exit_code,
253247
execution_time_ms: duration_ms,
254248
spawn_ms: None,
255249
process_ms: None,
256-
},
257-
)
258-
.await?;
250+
})
251+
.await?;
259252

260253
Ok(())
261254
}

0 commit comments

Comments
 (0)