From 44bc373753b7b8ae1b854d9df615a513b247c5bd Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sat, 27 Jun 2026 17:08:06 -0700 Subject: [PATCH 1/2] feat(python): outbound raw TCP + UDP sockets in the Pyodide bridge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Back Python's socket module with host sockets (outbound TCP connect/send/recv, UDP sendto/recvfrom), governed by the network policy + DNS egress guard like the HTTP bridge: - python.rs: SocketConnect/Send/Recv/Close + UdpCreate/Sendto/Recvfrom RPC methods, port/socketId wire fields, and Socket*/Udp* response payloads - execution.rs: handle_python_socket_rpc_request opens host TcpStream/UdpSocket per process (require_network_access + filter_dns_safe_ip_addrs), short read timeout so a recv RPC never stalls the shared event loop - state.rs: per-process PythonHostSocket registry - runner: socket bridge methods + a socket.socket shim that polls recv/recvfrom to emulate blocking semantics Listeners (bind/listen/accept) are out of scope — outbound only. Test: python_runtime_supports_raw_tcp_and_udp_sockets drives TCP + UDP echo servers through Python's stdlib socket module. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../assets/runners/python-runner.mjs | 176 +++++++++ crates/execution/src/python.rs | 67 ++++ crates/sidecar/src/execution.rs | 333 +++++++++++++++++- crates/sidecar/src/filesystem.rs | 15 +- crates/sidecar/src/state.rs | 14 + crates/sidecar/tests/python.rs | 151 +++++++- crates/sidecar/tests/service.rs | 8 + 7 files changed, 747 insertions(+), 17 deletions(-) diff --git a/crates/execution/assets/runners/python-runner.mjs b/crates/execution/assets/runners/python-runner.mjs index f28f821d..4ec50bb0 100644 --- a/crates/execution/assets/runners/python-runner.mjs +++ b/crates/execution/assets/runners/python-runner.mjs @@ -638,6 +638,29 @@ function createPythonBridgeRpcBridge() { maxBuffer, })); }, + socketConnectSync(host, port) { + return JSON.stringify(requestSync('socketConnect', { hostname: host, port })); + }, + socketSendSync(socketId, dataBase64) { + return JSON.stringify(requestSync('socketSend', { socketId, bodyBase64: dataBase64 })); + }, + socketRecvSync(socketId, maxBuffer) { + return JSON.stringify(requestSync('socketRecv', { socketId, maxBuffer })); + }, + socketCloseSync(socketId) { + return JSON.stringify(requestSync('socketClose', { socketId })); + }, + udpCreateSync() { + return JSON.stringify(requestSync('udpCreate', {})); + }, + udpSendtoSync(socketId, host, port, dataBase64) { + return JSON.stringify( + requestSync('udpSendto', { socketId, hostname: host, port, bodyBase64: dataBase64 }), + ); + }, + udpRecvfromSync(socketId, maxBuffer) { + return JSON.stringify(requestSync('udpRecvfrom', { socketId, maxBuffer })); + }, dispose() {}, }; } @@ -1094,6 +1117,159 @@ def _agentos_gethostbyname(host): _agentos_socket.getaddrinfo = _agentos_getaddrinfo _agentos_socket.gethostbyname = _agentos_gethostbyname +# Raw socket bridge: back socket.socket() with the host (outbound TCP + UDP). +# Reads poll (the host uses a short read timeout) so the synchronous RPC never +# stalls the sidecar; the loop below re-polls to emulate blocking semantics. +import base64 as _agentos_base64 +import time as _agentos_time + +_agentos_original_socket_class = _agentos_socket.socket + +class _SecureExecSocket: + def __init__(self, family=None, type=None, proto=0, fileno=None): + self.family = family if family is not None else _agentos_socket.AF_INET + self.type = type if type is not None else _agentos_socket.SOCK_STREAM + self.proto = proto + self._timeout = None # None blocks; 0 is non-blocking; >0 is a deadline + self._id = None + self._closed = False + self._is_udp = self.type == _agentos_socket.SOCK_DGRAM + if self._is_udp: + resp = _agentos_json.loads(_agentos_rpc.udpCreateSync()) + self._id = int(resp["socketId"]) + + def connect(self, address): + host, port = address[0], address[1] + resp = _agentos_json.loads(_agentos_rpc.socketConnectSync(str(host), int(port))) + self._id = int(resp["socketId"]) + + def connect_ex(self, address): + try: + self.connect(address) + return 0 + except OSError as exc: + return exc.errno or 1 + + def _ensure_id(self): + if self._id is None: + raise OSError(9, "Bad file descriptor") + return self._id + + def send(self, data, flags=0): + sid = self._ensure_id() + b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii") + resp = _agentos_json.loads(_agentos_rpc.socketSendSync(sid, b64)) + return int(resp.get("bytesSent", len(data))) + + def sendall(self, data, flags=0): + payload = bytes(data) + total = 0 + while total < len(payload): + total += self.send(payload[total:], flags) + return None + + def _poll(self, bufsize, recv_fn): + deadline = None + if self._timeout is not None and self._timeout > 0: + deadline = _agentos_time.monotonic() + self._timeout + while True: + resp = _agentos_json.loads(recv_fn(int(bufsize))) + if resp.get("closed"): + return b"", resp + data = resp.get("dataBase64") or "" + if data: + return _agentos_base64.b64decode(data), resp + if resp.get("timedOut"): + if self._timeout == 0: + raise BlockingIOError(11, "Resource temporarily unavailable") + if deadline is not None and _agentos_time.monotonic() >= deadline: + raise _agentos_socket.timeout("timed out") + continue + return b"", resp + + def recv(self, bufsize, flags=0): + sid = self._ensure_id() + data, _ = self._poll(bufsize, lambda n: _agentos_rpc.socketRecvSync(sid, n)) + return data + + def sendto(self, data, *args): + address = args[-1] + host, port = address[0], address[1] + if self._id is None: + resp = _agentos_json.loads(_agentos_rpc.udpCreateSync()) + self._id = int(resp["socketId"]) + b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii") + resp = _agentos_json.loads( + _agentos_rpc.udpSendtoSync(self._id, str(host), int(port), b64) + ) + return int(resp.get("bytesSent", len(data))) + + def recvfrom(self, bufsize, flags=0): + sid = self._ensure_id() + data, resp = self._poll(bufsize, lambda n: _agentos_rpc.udpRecvfromSync(sid, n)) + addr = (resp.get("host", ""), int(resp.get("port", 0))) if resp else ("", 0) + return data, addr + + def settimeout(self, value): + self._timeout = value + + def gettimeout(self): + return self._timeout + + def setblocking(self, flag): + self._timeout = None if flag else 0.0 + + def setsockopt(self, *args, **kwargs): + return None + + def getsockopt(self, *args, **kwargs): + return 0 + + def fileno(self): + return self._id if self._id is not None else -1 + + def getpeername(self): + return ("", 0) + + def getsockname(self): + return ("0.0.0.0", 0) + + def close(self): + if self._closed: + return + self._closed = True + if self._id is not None: + try: + _agentos_rpc.socketCloseSync(self._id) + except Exception: + pass + self._id = None + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + + def __del__(self): + try: + self.close() + except Exception: + pass + +def _agentos_socket_factory(family=-1, type=-1, proto=0, fileno=None): + fam = family if family != -1 else _agentos_socket.AF_INET + typ = type if type != -1 else _agentos_socket.SOCK_STREAM + if ( + fileno is None + and fam in (_agentos_socket.AF_INET, _agentos_socket.AF_INET6) + and typ in (_agentos_socket.SOCK_STREAM, _agentos_socket.SOCK_DGRAM) + ): + return _SecureExecSocket(fam, typ, proto) + return _agentos_original_socket_class(family, type, proto, fileno) + +_agentos_socket.socket = _agentos_socket_factory + class _SecureExecRequestsResponse: def __init__(self, payload): self.status_code = int(payload.get("status", 0)) diff --git a/crates/execution/src/python.rs b/crates/execution/src/python.rs index 45fb6413..e3f76a4b 100644 --- a/crates/execution/src/python.rs +++ b/crates/execution/src/python.rs @@ -56,6 +56,13 @@ pub enum PythonVfsRpcMethod { HttpRequest, DnsLookup, SubprocessRun, + SocketConnect, + SocketSend, + SocketRecv, + SocketClose, + UdpCreate, + UdpSendto, + UdpRecvfrom, } impl PythonVfsRpcMethod { @@ -72,6 +79,13 @@ impl PythonVfsRpcMethod { "httpRequest" => Some(Self::HttpRequest), "dnsLookup" => Some(Self::DnsLookup), "subprocessRun" => Some(Self::SubprocessRun), + "socketConnect" => Some(Self::SocketConnect), + "socketSend" => Some(Self::SocketSend), + "socketRecv" => Some(Self::SocketRecv), + "socketClose" => Some(Self::SocketClose), + "udpCreate" => Some(Self::UdpCreate), + "udpSendto" => Some(Self::UdpSendto), + "udpRecvfrom" => Some(Self::UdpRecvfrom), _ => None, } } @@ -92,6 +106,10 @@ pub struct PythonVfsRpcRequest { pub body_base64: Option, pub hostname: Option, pub family: Option, + /// Port for socket connect/sendto. + pub port: Option, + /// Socket handle for send/recv/close/sendto/recvfrom. + pub socket_id: Option, pub command: Option, pub args: Vec, pub cwd: Option, @@ -136,6 +154,23 @@ pub enum PythonVfsRpcResponsePayload { stderr: String, max_buffer_exceeded: bool, }, + SocketCreated { + socket_id: u64, + }, + SocketSent { + bytes_sent: usize, + }, + SocketReceived { + data_base64: String, + closed: bool, + timed_out: bool, + }, + UdpReceived { + data_base64: String, + host: String, + port: u16, + timed_out: bool, + }, } #[derive(Debug, Deserialize)] @@ -163,6 +198,10 @@ struct PythonVfsBridgeRequestWire { #[serde(default)] family: Option, #[serde(default)] + port: Option, + #[serde(default, rename = "socketId")] + socket_id: Option, + #[serde(default)] command: Option, #[serde(default)] args: Vec, @@ -477,6 +516,32 @@ impl PythonExecution { "stderr": stderr, "maxBufferExceeded": max_buffer_exceeded, }), + PythonVfsRpcResponsePayload::SocketCreated { socket_id } => json!({ + "socketId": socket_id, + }), + PythonVfsRpcResponsePayload::SocketSent { bytes_sent } => json!({ + "bytesSent": bytes_sent, + }), + PythonVfsRpcResponsePayload::SocketReceived { + data_base64, + closed, + timed_out, + } => json!({ + "dataBase64": data_base64, + "closed": closed, + "timedOut": timed_out, + }), + PythonVfsRpcResponsePayload::UdpReceived { + data_base64, + host, + port, + timed_out, + } => json!({ + "dataBase64": data_base64, + "host": host, + "port": port, + "timedOut": timed_out, + }), }; self.inner @@ -1195,6 +1260,8 @@ fn parse_python_bridge_sync_rpc_request( body_base64: wire.body_base64, hostname: wire.hostname, family: wire.family, + port: wire.port, + socket_id: wire.socket_id, command: wire.command, args: wire.args, cwd: wire.cwd, diff --git a/crates/sidecar/src/execution.rs b/crates/sidecar/src/execution.rs index 86cebb72..1a7590e0 100644 --- a/crates/sidecar/src/execution.rs +++ b/crates/sidecar/src/execution.rs @@ -17,10 +17,9 @@ use crate::protocol::{ OwnershipScope, ProcessExitedEvent, ProcessKilledResponse, ProcessOutputEvent, ProcessSnapshotEntry, ProcessSnapshotResponse, ProcessSnapshotStatus, ProcessStartedResponse, PtyResizedResponse, RequestFrame, ResizePtyRequest, ResponseFrame, ResponsePayload, - SidecarRequestPayload, SignalDispositionAction, - SignalHandlerRegistration, SignalStateResponse, SocketStateEntry, StdinClosedResponse, - StdinWrittenResponse, StreamChannel, VmFetchRequest, VmFetchResponse, WasmPermissionTier, - WriteStdinRequest, ZombieTimerCountResponse, + SidecarRequestPayload, SignalDispositionAction, SignalHandlerRegistration, SignalStateResponse, + SocketStateEntry, StdinClosedResponse, StdinWrittenResponse, StreamChannel, VmFetchRequest, + VmFetchResponse, WasmPermissionTier, WriteStdinRequest, ZombieTimerCountResponse, }; use crate::service::{ audit_fields, dirname, emit_security_audit_event, emit_structured_event, javascript_error, @@ -40,12 +39,13 @@ use crate::state::{ JavascriptTlsBridgeOptions, JavascriptTlsClientHello, JavascriptTlsDataValue, JavascriptTlsMaterial, JavascriptUdpFamily, JavascriptUdpSocketEvent, JavascriptUnixListenerEvent, NetworkResourceCounts, PendingTcpSocket, PendingUnixSocket, - ProcNetEntry, ProcessEventEnvelope, ResolvedChildProcessExecution, ResolvedTcpConnectAddr, - SharedBridge, SharedSidecarRequestClient, SidecarKernel, SocketQueryKind, ToolExecution, - VmDnsConfig, VmListenPolicy, VmState, DEFAULT_JAVASCRIPT_NET_BACKLOG, EXECUTION_DRIVER_NAME, - EXECUTION_SANDBOX_ROOT_ENV, JAVASCRIPT_COMMAND, LOOPBACK_EXEMPT_PORTS_ENV, - MAPPED_HOST_FD_START, PYTHON_COMMAND, TOOL_DRIVER_NAME, - VM_LISTEN_ALLOW_PRIVILEGED_METADATA_KEY, WASM_COMMAND, WASM_STDIO_SYNC_RPC_ENV, + ProcNetEntry, ProcessEventEnvelope, PythonHostSocket, ResolvedChildProcessExecution, + ResolvedTcpConnectAddr, SharedBridge, SharedSidecarRequestClient, SidecarKernel, + SocketQueryKind, ToolExecution, VmDnsConfig, VmListenPolicy, VmState, + DEFAULT_JAVASCRIPT_NET_BACKLOG, EXECUTION_DRIVER_NAME, EXECUTION_SANDBOX_ROOT_ENV, + JAVASCRIPT_COMMAND, LOOPBACK_EXEMPT_PORTS_ENV, MAPPED_HOST_FD_START, PYTHON_COMMAND, + TOOL_DRIVER_NAME, VM_LISTEN_ALLOW_PRIVILEGED_METADATA_KEY, WASM_COMMAND, + WASM_STDIO_SYNC_RPC_ENV, }; use crate::tools::{ format_tool_failure_output, is_tool_command, normalized_tool_command_name, @@ -368,6 +368,8 @@ impl ActiveProcess { next_unix_socket_id: 0, udp_sockets: BTreeMap::new(), next_udp_socket_id: 0, + python_sockets: BTreeMap::new(), + next_python_socket_id: 0, cipher_sessions: BTreeMap::new(), next_cipher_session_id: 0, diffie_hellman_sessions: BTreeMap::new(), @@ -4774,6 +4776,15 @@ where PythonVfsRpcMethod::SubprocessRun => { self.handle_python_subprocess_rpc_request(vm_id, process_id, request) } + PythonVfsRpcMethod::SocketConnect + | PythonVfsRpcMethod::SocketSend + | PythonVfsRpcMethod::SocketRecv + | PythonVfsRpcMethod::SocketClose + | PythonVfsRpcMethod::UdpCreate + | PythonVfsRpcMethod::UdpSendto + | PythonVfsRpcMethod::UdpRecvfrom => { + self.handle_python_socket_rpc_request(vm_id, process_id, request) + } } } @@ -5037,6 +5048,233 @@ where self.respond_python_rpc(vm_id, process_id, request.id, response) } + fn handle_python_socket_rpc_request( + &mut self, + vm_id: &str, + process_id: &str, + request: PythonVfsRpcRequest, + ) -> Result<(), SidecarError> { + if self.vms.get(vm_id).is_none() { + return Ok(()); + } + let response = self.python_socket_op(vm_id, process_id, &request); + self.respond_python_rpc(vm_id, process_id, request.id, response) + } + + fn python_socket_op( + &mut self, + vm_id: &str, + process_id: &str, + request: &PythonVfsRpcRequest, + ) -> Result { + match request.method { + PythonVfsRpcMethod::SocketConnect => { + let host = python_socket_host(request)?; + let port = python_socket_port(request)?; + self.bridge.require_network_access( + vm_id, + NetworkOperation::Http, + format_tcp_resource(&host, port), + )?; + let pinned = self.python_socket_pinned_addrs(vm_id, &host)?; + let stream = python_connect_tcp(&pinned, port)?; + stream + .set_read_timeout(Some(PYTHON_SOCKET_READ_POLL)) + .map_err(python_socket_io_error)?; + let socket_id = + self.store_python_socket(vm_id, process_id, PythonHostSocket::Tcp(stream))?; + Ok(PythonVfsRpcResponsePayload::SocketCreated { socket_id }) + } + PythonVfsRpcMethod::SocketSend => { + let data = python_socket_payload(request)?; + let socket = self.python_socket_mut(vm_id, process_id, request)?; + let PythonHostSocket::Tcp(stream) = socket else { + return Err(python_socket_kind_error("send", "TCP")); + }; + stream.write_all(&data).map_err(python_socket_io_error)?; + Ok(PythonVfsRpcResponsePayload::SocketSent { + bytes_sent: data.len(), + }) + } + PythonVfsRpcMethod::SocketRecv => { + let max = python_socket_recv_len(request); + let socket = self.python_socket_mut(vm_id, process_id, request)?; + let PythonHostSocket::Tcp(stream) = socket else { + return Err(python_socket_kind_error("recv", "TCP")); + }; + let mut buf = vec![0u8; max]; + match stream.read(&mut buf) { + Ok(0) => Ok(PythonVfsRpcResponsePayload::SocketReceived { + data_base64: String::new(), + closed: true, + timed_out: false, + }), + Ok(n) => Ok(PythonVfsRpcResponsePayload::SocketReceived { + data_base64: base64::engine::general_purpose::STANDARD.encode(&buf[..n]), + closed: false, + timed_out: false, + }), + Err(error) if python_socket_would_block(&error) => { + Ok(PythonVfsRpcResponsePayload::SocketReceived { + data_base64: String::new(), + closed: false, + timed_out: true, + }) + } + Err(error) => Err(python_socket_io_error(error)), + } + } + PythonVfsRpcMethod::SocketClose => { + self.remove_python_socket(vm_id, process_id, request); + Ok(PythonVfsRpcResponsePayload::Empty) + } + PythonVfsRpcMethod::UdpCreate => { + let socket = UdpSocket::bind("0.0.0.0:0").map_err(python_socket_io_error)?; + socket + .set_read_timeout(Some(PYTHON_SOCKET_READ_POLL)) + .map_err(python_socket_io_error)?; + let socket_id = + self.store_python_socket(vm_id, process_id, PythonHostSocket::Udp(socket))?; + Ok(PythonVfsRpcResponsePayload::SocketCreated { socket_id }) + } + PythonVfsRpcMethod::UdpSendto => { + let host = python_socket_host(request)?; + let port = python_socket_port(request)?; + let data = python_socket_payload(request)?; + self.bridge.require_network_access( + vm_id, + NetworkOperation::Http, + format_tcp_resource(&host, port), + )?; + let pinned = self.python_socket_pinned_addrs(vm_id, &host)?; + let target = pinned + .first() + .map(|ip| SocketAddr::new(*ip, port)) + .ok_or_else(|| { + SidecarError::Execution(format!("EAI_NONAME: cannot resolve {host}")) + })?; + let socket = self.python_socket_mut(vm_id, process_id, request)?; + let PythonHostSocket::Udp(udp) = socket else { + return Err(python_socket_kind_error("sendto", "UDP")); + }; + let sent = udp.send_to(&data, target).map_err(python_socket_io_error)?; + Ok(PythonVfsRpcResponsePayload::SocketSent { bytes_sent: sent }) + } + PythonVfsRpcMethod::UdpRecvfrom => { + let max = python_socket_recv_len(request); + let socket = self.python_socket_mut(vm_id, process_id, request)?; + let PythonHostSocket::Udp(udp) = socket else { + return Err(python_socket_kind_error("recvfrom", "UDP")); + }; + let mut buf = vec![0u8; max]; + match udp.recv_from(&mut buf) { + Ok((n, addr)) => Ok(PythonVfsRpcResponsePayload::UdpReceived { + data_base64: base64::engine::general_purpose::STANDARD.encode(&buf[..n]), + host: addr.ip().to_string(), + port: addr.port(), + timed_out: false, + }), + Err(error) if python_socket_would_block(&error) => { + Ok(PythonVfsRpcResponsePayload::UdpReceived { + data_base64: String::new(), + host: String::new(), + port: 0, + timed_out: true, + }) + } + Err(error) => Err(python_socket_io_error(error)), + } + } + _ => Err(SidecarError::InvalidState(String::from( + "non-socket python RPC reached the socket dispatcher unexpectedly", + ))), + } + } + + /// Resolve `host` to the egress-guard-approved IP set (literal IPs are + /// validated directly), mirroring the HTTP bridge's DNS pinning. + fn python_socket_pinned_addrs( + &self, + vm_id: &str, + host: &str, + ) -> Result, SidecarError> { + let Some(vm) = self.vms.get(vm_id) else { + return Err(SidecarError::InvalidState(String::from( + "python socket op for unknown vm", + ))); + }; + if let Ok(literal_ip) = host.parse::() { + filter_dns_safe_ip_addrs(vec![literal_ip], host) + } else { + filter_dns_safe_ip_addrs( + resolve_dns_ip_addrs( + &self.bridge, + &vm.kernel, + vm_id, + &vm.dns, + host, + DnsLookupPolicy::SkipPermissions, + )?, + host, + ) + } + } + + fn store_python_socket( + &mut self, + vm_id: &str, + process_id: &str, + socket: PythonHostSocket, + ) -> Result { + let process = self + .vms + .get_mut(vm_id) + .and_then(|vm| vm.active_processes.get_mut(process_id)) + .ok_or_else(|| { + SidecarError::InvalidState(String::from("python socket op for reaped vm/process")) + })?; + let socket_id = process.next_python_socket_id; + process.next_python_socket_id = process.next_python_socket_id.wrapping_add(1); + process.python_sockets.insert(socket_id, socket); + Ok(socket_id) + } + + fn python_socket_mut( + &mut self, + vm_id: &str, + process_id: &str, + request: &PythonVfsRpcRequest, + ) -> Result<&mut PythonHostSocket, SidecarError> { + let socket_id = request.socket_id.ok_or_else(|| { + SidecarError::InvalidState(String::from("python socket op requires socketId")) + })?; + self.vms + .get_mut(vm_id) + .and_then(|vm| vm.active_processes.get_mut(process_id)) + .and_then(|process| process.python_sockets.get_mut(&socket_id)) + .ok_or_else(|| { + SidecarError::Execution(format!("EBADF: unknown python socket {socket_id}")) + }) + } + + fn remove_python_socket( + &mut self, + vm_id: &str, + process_id: &str, + request: &PythonVfsRpcRequest, + ) { + let Some(socket_id) = request.socket_id else { + return; + }; + if let Some(process) = self + .vms + .get_mut(vm_id) + .and_then(|vm| vm.active_processes.get_mut(process_id)) + { + process.python_sockets.remove(&socket_id); + } + } + fn respond_python_rpc( &mut self, vm_id: &str, @@ -12007,6 +12245,79 @@ pub(crate) fn format_tcp_resource(host: &str, port: u16) -> String { format!("tcp://{host}:{port}") } +// --- Guest Python socket bridge helpers ------------------------------------ + +/// Host-socket read timeout for one `recv`/`recvfrom` RPC. Kept short so the +/// synchronous RPC returns promptly (data, or a `timed_out` flag the Python +/// shim re-polls on) and never stalls the shared sidecar event loop. +const PYTHON_SOCKET_READ_POLL: Duration = Duration::from_millis(100); +const PYTHON_SOCKET_CONNECT_TIMEOUT: Duration = Duration::from_secs(30); +const PYTHON_SOCKET_DEFAULT_RECV: usize = 65536; +const PYTHON_SOCKET_MAX_RECV: usize = 4 * 1024 * 1024; + +fn python_socket_host(request: &PythonVfsRpcRequest) -> Result { + request + .hostname + .clone() + .ok_or_else(|| SidecarError::InvalidState(String::from("python socket op requires a host"))) +} + +fn python_socket_port(request: &PythonVfsRpcRequest) -> Result { + request + .port + .ok_or_else(|| SidecarError::InvalidState(String::from("python socket op requires a port"))) +} + +fn python_socket_payload(request: &PythonVfsRpcRequest) -> Result, SidecarError> { + let Some(body) = request.body_base64.as_deref() else { + return Ok(Vec::new()); + }; + base64::engine::general_purpose::STANDARD + .decode(body) + .map_err(|error| { + SidecarError::InvalidState(format!("invalid base64 python socket payload: {error}")) + }) +} + +fn python_socket_recv_len(request: &PythonVfsRpcRequest) -> usize { + request + .max_buffer + .unwrap_or(PYTHON_SOCKET_DEFAULT_RECV) + .clamp(1, PYTHON_SOCKET_MAX_RECV) +} + +fn python_connect_tcp(addrs: &[IpAddr], port: u16) -> Result { + let mut last_error: Option = None; + for ip in addrs { + let addr = SocketAddr::new(*ip, port); + match TcpStream::connect_timeout(&addr, PYTHON_SOCKET_CONNECT_TIMEOUT) { + Ok(stream) => return Ok(stream), + Err(error) => last_error = Some(error.to_string()), + } + } + Err(SidecarError::Execution(format!( + "ECONNREFUSED: {}", + last_error.unwrap_or_else(|| String::from("no resolved addresses")) + ))) +} + +fn python_socket_would_block(error: &std::io::Error) -> bool { + matches!( + error.kind(), + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut + ) +} + +fn python_socket_io_error(error: std::io::Error) -> SidecarError { + SidecarError::Execution(format!("EIO: python socket: {error}")) +} + +fn python_socket_kind_error(op: &str, expected: &str) -> SidecarError { + SidecarError::Execution(format!( + "EOPNOTSUPP: python socket {op} requires a {expected} socket" + )) +} + fn is_loopback_ip(ip: IpAddr) -> bool { match ip { IpAddr::V4(ip) => ip.is_loopback(), @@ -16810,7 +17121,7 @@ fn install_kernel_stdin_pipe(kernel: &mut SidecarKernel, pid: u32) -> Result) -> Option<(u16, u16)> { let cols = env diff --git a/crates/sidecar/src/filesystem.rs b/crates/sidecar/src/filesystem.rs index 3fe0361b..f9da7602 100644 --- a/crates/sidecar/src/filesystem.rs +++ b/crates/sidecar/src/filesystem.rs @@ -820,11 +820,16 @@ where } PythonVfsRpcMethod::HttpRequest | PythonVfsRpcMethod::DnsLookup - | PythonVfsRpcMethod::SubprocessRun => { - Err(SidecarError::InvalidState(String::from( - "python non-filesystem RPC reached filesystem dispatcher unexpectedly", - ))) - } + | PythonVfsRpcMethod::SubprocessRun + | PythonVfsRpcMethod::SocketConnect + | PythonVfsRpcMethod::SocketSend + | PythonVfsRpcMethod::SocketRecv + | PythonVfsRpcMethod::SocketClose + | PythonVfsRpcMethod::UdpCreate + | PythonVfsRpcMethod::UdpSendto + | PythonVfsRpcMethod::UdpRecvfrom => Err(SidecarError::InvalidState(String::from( + "python non-filesystem RPC reached filesystem dispatcher unexpectedly", + ))), } } Err(error) => Err(error), diff --git a/crates/sidecar/src/state.rs b/crates/sidecar/src/state.rs index c916cd55..02622b7a 100644 --- a/crates/sidecar/src/state.rs +++ b/crates/sidecar/src/state.rs @@ -474,6 +474,11 @@ pub(crate) struct ActiveProcess { pub(crate) next_unix_socket_id: usize, pub(crate) udp_sockets: BTreeMap, pub(crate) next_udp_socket_id: usize, + /// Synchronous host sockets opened by the guest Python `socket` bridge, + /// keyed by the handle returned to the runner. Distinct from the JS + /// runtime's event-driven `tcp_sockets`/`udp_sockets` above. + pub(crate) python_sockets: BTreeMap, + pub(crate) next_python_socket_id: u64, pub(crate) cipher_sessions: BTreeMap, pub(crate) next_cipher_session_id: u64, pub(crate) diffie_hellman_sessions: BTreeMap, @@ -953,6 +958,15 @@ pub(crate) enum JavascriptUdpSocketEvent { }, } +/// A blocking host socket backing one guest Python socket. Reads use a short +/// timeout (set on the socket) so a `recv`/`recvfrom` RPC never stalls the +/// shared sidecar event loop; the Python shim re-polls to emulate blocking. +#[derive(Debug)] +pub(crate) enum PythonHostSocket { + Tcp(TcpStream), + Udp(UdpSocket), +} + #[derive(Debug)] pub(crate) struct ActiveUdpSocket { pub(crate) family: JavascriptUdpFamily, diff --git a/crates/sidecar/tests/python.rs b/crates/sidecar/tests/python.rs index ab05430d..4c06c47c 100644 --- a/crates/sidecar/tests/python.rs +++ b/crates/sidecar/tests/python.rs @@ -14,7 +14,7 @@ use serde_json::{json, Value}; use std::collections::HashMap; use std::fs; use std::io::{Read, Write}; -use std::net::TcpListener; +use std::net::{TcpListener, UdpSocket}; use std::os::unix::fs::symlink; use std::path::{Component, Path, PathBuf}; use std::sync::{ @@ -199,6 +199,67 @@ fn static_file_server_rejects_traversal_paths() { assert_eq!(static_file_path(root, "/packages/../../secret.txt"), None); } +fn spawn_tcp_echo_server() -> (u16, thread::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind tcp echo listener"); + listener + .set_nonblocking(true) + .expect("set nonblocking echo listener"); + let port = listener.local_addr().expect("echo listener address").port(); + let handle = thread::spawn(move || { + let deadline = Instant::now() + Duration::from_secs(60); + while Instant::now() < deadline { + match listener.accept() { + Ok((mut stream, _)) => { + stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); + let mut buf = [0_u8; 4096]; + loop { + match stream.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + if stream.write_all(&buf[..n]).is_err() { + break; + } + } + Err(_) => break, + } + } + return; + } + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(25)); + } + Err(_) => break, + } + } + }); + (port, handle) +} + +fn spawn_udp_echo_server() -> (u16, thread::JoinHandle<()>) { + let socket = UdpSocket::bind("127.0.0.1:0").expect("bind udp echo socket"); + socket + .set_read_timeout(Some(Duration::from_secs(1))) + .expect("set udp echo read timeout"); + let port = socket.local_addr().expect("udp echo address").port(); + let handle = thread::spawn(move || { + let deadline = Instant::now() + Duration::from_secs(60); + let mut buf = [0_u8; 4096]; + while Instant::now() < deadline { + match socket.recv_from(&mut buf) { + Ok((n, addr)) => { + let _ = socket.send_to(&buf[..n], addr); + return; + } + Err(error) + if error.kind() == std::io::ErrorKind::WouldBlock + || error.kind() == std::io::ErrorKind::TimedOut => {} + Err(_) => break, + } + } + }); + (port, handle) +} + fn spawn_static_file_server(root: PathBuf) -> (u16, thread::JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").expect("bind static file listener"); listener @@ -1338,6 +1399,94 @@ print(json.dumps(results)) ); } +#[test] +fn python_runtime_supports_raw_tcp_and_udp_sockets() { + assert_node_available(); + + let (tcp_port, tcp_server) = spawn_tcp_echo_server(); + let (udp_port, udp_server) = spawn_udp_echo_server(); + let mut sidecar = new_sidecar("python-sockets"); + let cwd = temp_dir("python-sockets-cwd"); + let connection_id = authenticate_wire(&mut sidecar, "conn-python"); + let session_id = open_session_wire(&mut sidecar, 2, &connection_id); + let vm_id = create_vm_with_metadata_and_permissions( + &mut sidecar, + 3, + &connection_id, + &session_id, + GuestRuntimeKind::Python, + &cwd, + HashMap::from([( + String::from("env.AGENTOS_LOOPBACK_EXEMPT_PORTS"), + serde_json::to_string(&vec![tcp_port.to_string(), udp_port.to_string()]) + .expect("serialize exempt ports"), + )]), + wire_permissions_allow_all(), + ); + + execute_inline_python( + &mut sidecar, + 4, + &connection_id, + &session_id, + &vm_id, + "proc-python-sockets", + &format!( + r#" +import json +import socket + +result = {{}} + +# Raw outbound TCP against a host echo server. +tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +tcp.settimeout(10) +tcp.connect(("127.0.0.1", {tcp_port})) +tcp.sendall(b"hello sockets") +received = b"" +while len(received) < len(b"hello sockets"): + chunk = tcp.recv(64) + if not chunk: + break + received += chunk +tcp.close() +result["tcp"] = received.decode() + +# Raw UDP against a host echo server. +udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +udp.settimeout(10) +udp.sendto(b"ping udp", ("127.0.0.1", {udp_port})) +data, _addr = udp.recvfrom(64) +udp.close() +result["udp"] = data.decode() + +print(json.dumps(result)) +"#, + ), + ); + + let (stdout, stderr, exit_code) = collect_process_output_with_timeout( + &mut sidecar, + &connection_id, + &session_id, + &vm_id, + "proc-python-sockets", + Duration::from_secs(60), + ); + + let _ = tcp_server.join(); + let _ = udp_server.join(); + assert_eq!(exit_code, 0, "stdout: {stdout}\nstderr: {stderr}"); + let json_line = stdout + .lines() + .rev() + .find(|line| !line.trim().is_empty()) + .expect("python sockets stdout line"); + let parsed: Value = serde_json::from_str(json_line).expect("parse sockets JSON"); + assert_eq!(parsed["tcp"], "hello sockets"); + assert_eq!(parsed["udp"], "ping udp"); +} + fn workspace_files_are_shared_between_javascript_and_python_runtimes() { assert_node_available(); diff --git a/crates/sidecar/tests/service.rs b/crates/sidecar/tests/service.rs index ca640684..6daa847e 100644 --- a/crates/sidecar/tests/service.rs +++ b/crates/sidecar/tests/service.rs @@ -5595,6 +5595,8 @@ ykAheWCsAteSEWVc0w==\n\ body_base64: None, hostname: None, family: None, + port: None, + socket_id: None, command: None, args: Vec::new(), cwd: None, @@ -5632,6 +5634,8 @@ ykAheWCsAteSEWVc0w==\n\ body_base64: None, hostname: None, family: None, + port: None, + socket_id: None, command: None, args: Vec::new(), cwd: None, @@ -10124,6 +10128,8 @@ export async function loadPyodide() { body_base64: None, hostname: None, family: None, + port: None, + socket_id: None, command: None, args: Vec::new(), cwd: None, @@ -10151,6 +10157,8 @@ export async function loadPyodide() { body_base64: None, hostname: None, family: None, + port: None, + socket_id: None, command: None, args: Vec::new(), cwd: None, From 226624af20f16963a6a143307b364eb33798c28a Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sat, 27 Jun 2026 17:48:25 -0700 Subject: [PATCH 2/2] feat(python): complete the Pyodide FS hooks + full review hardening MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FS hooks: symlink, readlink, lstat, and setattr->host (chmod/chown/utimes) — every Emscripten node_op/stream_op. lstat lets pre-existing host symlinks be detected as link nodes; setattr is host-first; chmod/utimes mirror to the host shadow only when the entry already exists there (never creating empty stubs); chown applies uid/gid independently (keeps the unchanged side). Socket hardening: per-process registry bounded by the VM max_sockets limit, read+write timeouts so recv/send can't wedge the shared loop (recv uses a short host poll + a capped guest-side backoff), connect/sendto routed through the loopback-connect gate (blocks DNS-rebind to sidecar-local ports), and host errors mapped to the right OSError/errno. Tests: python_runtime_supports_raw_tcp_and_udp_sockets, python_runtime_supports_symlink_readlink_and_metadata (now also covers a host-preexisting symlink); full python_suite green. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../assets/runners/python-runner.mjs | 133 +++++++++- crates/execution/src/python.rs | 41 +++ crates/sidecar/src/execution.rs | 54 +++- crates/sidecar/src/filesystem.rs | 80 ++++++ crates/sidecar/tests/python.rs | 239 +++++++++++++++++- crates/sidecar/tests/service.rs | 24 ++ 6 files changed, 547 insertions(+), 24 deletions(-) diff --git a/crates/execution/assets/runners/python-runner.mjs b/crates/execution/assets/runners/python-runner.mjs index 4ec50bb0..cbc835b8 100644 --- a/crates/execution/assets/runners/python-runner.mjs +++ b/crates/execution/assets/runners/python-runner.mjs @@ -571,6 +571,10 @@ function createPythonBridgeRpcBridge() { async fsStat(path) { return this.fsStatSync(path); }, + fsLstatSync(path) { + const result = requestSync('fsLstat', { path }); + return result.stat ?? null; + }, fsReaddirSync(path) { const result = requestSync('fsReaddir', { path }); return result.entries ?? []; @@ -596,6 +600,16 @@ function createPythonBridgeRpcBridge() { fsRenameSync(path, destination) { requestSync('fsRename', { path, destination }); }, + fsSymlinkSync(target, path) { + requestSync('fsSymlink', { target, path }); + }, + fsReadlinkSync(path) { + const result = requestSync('fsReadlink', { path }); + return result.target ?? ''; + }, + fsSetattrSync(path, attr) { + requestSync('fsSetattr', { path, ...attr }); + }, httpRequestSync(url, method = 'GET', headersJson = '{}', bodyBase64 = null) { let headers; try { @@ -779,6 +793,10 @@ function createPythonFdRpcBridge() { async fsStat(path) { return this.fsStatSync(path); }, + fsLstatSync(path) { + const result = requestSync('fsLstat', { path }); + return result.stat ?? null; + }, fsReaddirSync(path) { const result = requestSync('fsReaddir', { path }); return result.entries ?? []; @@ -804,6 +822,16 @@ function createPythonFdRpcBridge() { fsRenameSync(path, destination) { requestSync('fsRename', { path, destination }); }, + fsSymlinkSync(target, path) { + requestSync('fsSymlink', { target, path }); + }, + fsReadlinkSync(path) { + const result = requestSync('fsReadlink', { path }); + return result.target ?? ''; + }, + fsSetattrSync(path, attr) { + requestSync('fsSetattr', { path, ...attr }); + }, httpRequestSync(url, method = 'GET', headersJson = '{}', bodyBase64 = null) { let headers; try { @@ -1122,9 +1150,27 @@ _agentos_socket.gethostbyname = _agentos_gethostbyname # stalls the sidecar; the loop below re-polls to emulate blocking semantics. import base64 as _agentos_base64 import time as _agentos_time +import errno as _agentos_errno _agentos_original_socket_class = _agentos_socket.socket +def _agentos_socket_oserror(exc): + # Host errors arrive as "E: message"; recover the errno so Python + # code can catch ConnectionRefusedError/TimeoutError/etc. (OSError picks the + # right subclass from the errno). + message = str(getattr(exc, "message", None) or exc) + head = message.split(":", 1)[0].strip() + code = getattr(_agentos_errno, head, 0) if head[:1] == "E" and head.isupper() else 0 + return OSError(code or 0, message) + +def _agentos_socket_rpc(call): + try: + return _agentos_json.loads(call()) + except OSError: + raise + except Exception as exc: + raise _agentos_socket_oserror(exc) from None + class _SecureExecSocket: def __init__(self, family=None, type=None, proto=0, fileno=None): self.family = family if family is not None else _agentos_socket.AF_INET @@ -1135,12 +1181,12 @@ class _SecureExecSocket: self._closed = False self._is_udp = self.type == _agentos_socket.SOCK_DGRAM if self._is_udp: - resp = _agentos_json.loads(_agentos_rpc.udpCreateSync()) + resp = _agentos_socket_rpc(lambda: _agentos_rpc.udpCreateSync()) self._id = int(resp["socketId"]) def connect(self, address): host, port = address[0], address[1] - resp = _agentos_json.loads(_agentos_rpc.socketConnectSync(str(host), int(port))) + resp = _agentos_socket_rpc(lambda: _agentos_rpc.socketConnectSync(str(host), int(port))) self._id = int(resp["socketId"]) def connect_ex(self, address): @@ -1158,7 +1204,7 @@ class _SecureExecSocket: def send(self, data, flags=0): sid = self._ensure_id() b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii") - resp = _agentos_json.loads(_agentos_rpc.socketSendSync(sid, b64)) + resp = _agentos_socket_rpc(lambda: _agentos_rpc.socketSendSync(sid, b64)) return int(resp.get("bytesSent", len(data))) def sendall(self, data, flags=0): @@ -1172,8 +1218,9 @@ class _SecureExecSocket: deadline = None if self._timeout is not None and self._timeout > 0: deadline = _agentos_time.monotonic() + self._timeout + backoff = 0.0 while True: - resp = _agentos_json.loads(recv_fn(int(bufsize))) + resp = _agentos_socket_rpc(lambda: recv_fn(int(bufsize))) if resp.get("closed"): return b"", resp data = resp.get("dataBase64") or "" @@ -1184,6 +1231,11 @@ class _SecureExecSocket: raise BlockingIOError(11, "Resource temporarily unavailable") if deadline is not None and _agentos_time.monotonic() >= deadline: raise _agentos_socket.timeout("timed out") + # Guest-side capped backoff so a blocking recv on a silent socket + # doesn't hammer the host loop with back-to-back polls. + if backoff: + _agentos_time.sleep(backoff) + backoff = min(backoff * 2 if backoff else 0.005, 0.05) continue return b"", resp @@ -1196,11 +1248,11 @@ class _SecureExecSocket: address = args[-1] host, port = address[0], address[1] if self._id is None: - resp = _agentos_json.loads(_agentos_rpc.udpCreateSync()) + resp = _agentos_socket_rpc(lambda: _agentos_rpc.udpCreateSync()) self._id = int(resp["socketId"]) b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii") - resp = _agentos_json.loads( - _agentos_rpc.udpSendtoSync(self._id, str(host), int(port), b64) + resp = _agentos_socket_rpc( + lambda: _agentos_rpc.udpSendtoSync(self._id, str(host), int(port), b64) ) return int(resp.get("bytesSent", len(data))) @@ -1641,6 +1693,7 @@ function installPythonWorkspaceFs(pyodide, bridge) { const memfsDirStreamOps = MEMFS.ops_table.dir.stream; const memfsFileNodeOps = MEMFS.ops_table.file.node; const memfsFileStreamOps = MEMFS.ops_table.file.stream; + const memfsLinkNodeOps = MEMFS.ops_table.link.node; const workspaceDirStreamOps = memfsDirStreamOps; function joinGuestPath(parentPath, name) { @@ -1705,6 +1758,8 @@ function installPythonWorkspaceFs(pyodide, bridge) { if (FS.isDir(mode)) { node.node_ops = workspaceDirNodeOps; node.stream_ops = workspaceDirStreamOps; + } else if (FS.isLink(mode)) { + node.node_ops = workspaceLinkNodeOps; } else if (FS.isFile(mode)) { node.node_ops = workspaceFileNodeOps; node.stream_ops = workspaceFileStreamOps; @@ -1732,7 +1787,8 @@ function installPythonWorkspaceFs(pyodide, bridge) { for (const name of entries) { const childPath = joinGuestPath(guestPath, name); - const stat = withFsErrors(() => bridge.fsStatSync(childPath)); + // lstat (don't follow) so a host symlink is created as a link node. + const stat = withFsErrors(() => bridge.fsLstatSync(childPath)); const existing = node.contents[name]; if (existing) { @@ -1806,6 +1862,50 @@ function installPythonWorkspaceFs(pyodide, bridge) { }; } + function toEpochMs(value) { + if (value == null) return null; + if (typeof value === 'number') return value; + if (typeof value.getTime === 'function') return value.getTime(); + return null; + } + + // Propagate chmod/chown/utimes from an Emscripten `setattr` into the host VFS. + // (size/truncate is handled via the dirty-write path, not here.) + function propagateSetattrToHost(node, attr) { + if (!attr) return; + const payload = {}; + if (attr.mode != null) payload.mode = attr.mode & 0o7777; + // `os.chown(p, uid, -1)` keeps a side; never forward a negative sentinel + // (it would saturate to 0 = root on the host). + if (attr.uid != null && attr.uid >= 0) payload.uid = attr.uid; + if (attr.gid != null && attr.gid >= 0) payload.gid = attr.gid; + const atimeMs = toEpochMs(attr.atime ?? attr.timestamp); + const mtimeMs = toEpochMs(attr.mtime ?? attr.timestamp); + if (atimeMs != null && mtimeMs != null) { + payload.atimeMs = Math.trunc(atimeMs); + payload.mtimeMs = Math.trunc(mtimeMs); + } + if (Object.keys(payload).length === 0) return; + withFsErrors(() => bridge.fsSetattrSync(nodeGuestPath(node), payload)); + } + + const workspaceLinkNodeOps = { + // A symlink node reports itself (lstat semantics), not its target — so use + // the in-memory link mode rather than a host stat (which follows the link). + getattr(node) { + return makeStat(node, null); + }, + setattr(node, attr) { + // Host first: if the host op fails (e.g. read-only root) it throws before + // we mutate the in-isolate node, so the two views stay consistent. + propagateSetattrToHost(node, attr); + memfsLinkNodeOps.setattr(node, attr); + }, + readlink(node) { + return withFsErrors(() => bridge.fsReadlinkSync(nodeGuestPath(node))); + }, + }; + const workspaceFileNodeOps = { getattr(node) { const stat = node.agentOSDirty @@ -1817,6 +1917,9 @@ function installPythonWorkspaceFs(pyodide, bridge) { return makeStat(node, stat); }, setattr(node, attr) { + // Host first (see link setattr) so a failed host op leaves the in-isolate + // node untouched. + propagateSetattrToHost(node, attr); memfsFileNodeOps.setattr(node, attr); if (attr?.size != null) { node.agentOSDirty = true; @@ -1865,6 +1968,8 @@ function installPythonWorkspaceFs(pyodide, bridge) { return makeStat(node, stat); }, setattr(node, attr) { + // Host first (see link setattr). + propagateSetattrToHost(node, attr); memfsDirNodeOps.setattr(node, attr); }, lookup(parent, name) { @@ -1877,7 +1982,8 @@ function installPythonWorkspaceFs(pyodide, bridge) { } const guestPath = joinGuestPath(nodeGuestPath(parent), name); - const stat = withFsErrors(() => bridge.fsStatSync(guestPath)); + // lstat (don't follow) so a directly-looked-up host symlink is a link node. + const stat = withFsErrors(() => bridge.fsLstatSync(guestPath)); const child = createWorkspaceNode(parent, name, stat.mode, 0, guestPath); updateNodeFromRemoteStat(child, stat); return child; @@ -1925,8 +2031,13 @@ function installPythonWorkspaceFs(pyodide, bridge) { syncDirectory(node); return memfsDirNodeOps.readdir(node); }, - symlink() { - throw new FS.ErrnoError(ERRNO_CODES.ENOSYS); + symlink(parent, newName, oldPath) { + const guestPath = joinGuestPath(nodeGuestPath(parent), newName); + withFsErrors(() => bridge.fsSymlinkSync(oldPath, guestPath)); + const node = createWorkspaceNode(parent, newName, 0o120777, 0, guestPath); + node.link = oldPath; + node.usedBytes = oldPath.length; + return node; }, }; diff --git a/crates/execution/src/python.rs b/crates/execution/src/python.rs index e3f76a4b..e77979cf 100644 --- a/crates/execution/src/python.rs +++ b/crates/execution/src/python.rs @@ -48,11 +48,15 @@ pub enum PythonVfsRpcMethod { Read, Write, Stat, + Lstat, ReadDir, Mkdir, Unlink, Rmdir, Rename, + Symlink, + ReadLink, + Setattr, HttpRequest, DnsLookup, SubprocessRun, @@ -71,11 +75,15 @@ impl PythonVfsRpcMethod { "fsRead" => Some(Self::Read), "fsWrite" => Some(Self::Write), "fsStat" => Some(Self::Stat), + "fsLstat" => Some(Self::Lstat), "fsReaddir" => Some(Self::ReadDir), "fsMkdir" => Some(Self::Mkdir), "fsUnlink" => Some(Self::Unlink), "fsRmdir" => Some(Self::Rmdir), "fsRename" => Some(Self::Rename), + "fsSymlink" => Some(Self::Symlink), + "fsReadlink" => Some(Self::ReadLink), + "fsSetattr" => Some(Self::Setattr), "httpRequest" => Some(Self::HttpRequest), "dnsLookup" => Some(Self::DnsLookup), "subprocessRun" => Some(Self::SubprocessRun), @@ -98,6 +106,14 @@ pub struct PythonVfsRpcRequest { pub path: String, /// Second path for `Rename` (the destination); `None` for other methods. pub destination: Option, + /// Symlink target (the path the link points at), for `Symlink`. + pub target: Option, + /// `Setattr` metadata fields (each applied only when present). + pub mode: Option, + pub uid: Option, + pub gid: Option, + pub atime_ms: Option, + pub mtime_ms: Option, pub content_base64: Option, pub recursive: bool, pub url: Option, @@ -171,6 +187,9 @@ pub enum PythonVfsRpcResponsePayload { port: u16, timed_out: bool, }, + SymlinkTarget { + target: String, + }, } #[derive(Debug, Deserialize)] @@ -182,6 +201,19 @@ struct PythonVfsBridgeRequestWire { #[serde(default)] destination: Option, #[serde(default)] + target: Option, + // JS numbers cross the bridge as f64; accept that and narrow below. + #[serde(default)] + mode: Option, + #[serde(default)] + uid: Option, + #[serde(default)] + gid: Option, + #[serde(default, rename = "atimeMs")] + atime_ms: Option, + #[serde(default, rename = "mtimeMs")] + mtime_ms: Option, + #[serde(default)] content_base64: Option, #[serde(default)] recursive: bool, @@ -542,6 +574,9 @@ impl PythonExecution { "port": port, "timedOut": timed_out, }), + PythonVfsRpcResponsePayload::SymlinkTarget { target } => json!({ + "target": target, + }), }; self.inner @@ -1252,6 +1287,12 @@ fn parse_python_bridge_sync_rpc_request( method, path: wire.path, destination: wire.destination, + target: wire.target, + mode: wire.mode.map(|value| value as u32), + uid: wire.uid.map(|value| value as u32), + gid: wire.gid.map(|value| value as u32), + atime_ms: wire.atime_ms.map(|value| value as u64), + mtime_ms: wire.mtime_ms.map(|value| value as u64), content_base64: wire.content_base64, recursive: wire.recursive, url: wire.url, diff --git a/crates/sidecar/src/execution.rs b/crates/sidecar/src/execution.rs index 1a7590e0..a659b633 100644 --- a/crates/sidecar/src/execution.rs +++ b/crates/sidecar/src/execution.rs @@ -482,7 +482,8 @@ impl ActiveProcess { + self.tcp_sockets.len() + self.unix_listeners.len() + self.unix_sockets.len() - + self.udp_sockets.len(), + + self.udp_sockets.len() + + self.python_sockets.len(), connections: self.tcp_sockets.len() + self.unix_sockets.len(), }; if let Ok(http2) = self.http2.shared.lock() { @@ -518,7 +519,8 @@ impl ActiveProcess { .udp_sockets .values() .filter(|socket| socket.kernel_socket_id.is_none()) - .count(), + .count() + + self.python_sockets.len(), connections: self .tcp_sockets .values() @@ -4760,11 +4762,15 @@ where PythonVfsRpcMethod::Read | PythonVfsRpcMethod::Write | PythonVfsRpcMethod::Stat + | PythonVfsRpcMethod::Lstat | PythonVfsRpcMethod::ReadDir | PythonVfsRpcMethod::Mkdir | PythonVfsRpcMethod::Unlink | PythonVfsRpcMethod::Rmdir - | PythonVfsRpcMethod::Rename => { + | PythonVfsRpcMethod::Rename + | PythonVfsRpcMethod::Symlink + | PythonVfsRpcMethod::ReadLink + | PythonVfsRpcMethod::Setattr => { filesystem_handle_python_vfs_rpc_request(self, vm_id, process_id, request) } PythonVfsRpcMethod::HttpRequest => { @@ -5071,16 +5077,22 @@ where PythonVfsRpcMethod::SocketConnect => { let host = python_socket_host(request)?; let port = python_socket_port(request)?; + self.check_python_socket_limit(vm_id)?; self.bridge.require_network_access( vm_id, NetworkOperation::Http, format_tcp_resource(&host, port), )?; - let pinned = self.python_socket_pinned_addrs(vm_id, &host)?; + let pinned = self.python_socket_pinned_addrs(vm_id, &host, port)?; let stream = python_connect_tcp(&pinned, port)?; stream .set_read_timeout(Some(PYTHON_SOCKET_READ_POLL)) .map_err(python_socket_io_error)?; + // Bound writes too: without a write timeout `write_all` on a + // stalled peer would block the shared event loop indefinitely. + stream + .set_write_timeout(Some(PYTHON_SOCKET_WRITE_TIMEOUT)) + .map_err(python_socket_io_error)?; let socket_id = self.store_python_socket(vm_id, process_id, PythonHostSocket::Tcp(stream))?; Ok(PythonVfsRpcResponsePayload::SocketCreated { socket_id }) @@ -5129,6 +5141,7 @@ where Ok(PythonVfsRpcResponsePayload::Empty) } PythonVfsRpcMethod::UdpCreate => { + self.check_python_socket_limit(vm_id)?; let socket = UdpSocket::bind("0.0.0.0:0").map_err(python_socket_io_error)?; socket .set_read_timeout(Some(PYTHON_SOCKET_READ_POLL)) @@ -5146,7 +5159,7 @@ where NetworkOperation::Http, format_tcp_resource(&host, port), )?; - let pinned = self.python_socket_pinned_addrs(vm_id, &host)?; + let pinned = self.python_socket_pinned_addrs(vm_id, &host, port)?; let target = pinned .first() .map(|ip| SocketAddr::new(*ip, port)) @@ -5191,22 +5204,26 @@ where } } - /// Resolve `host` to the egress-guard-approved IP set (literal IPs are - /// validated directly), mirroring the HTTP bridge's DNS pinning. + /// Resolve `host` to the egress-guard-approved IP set, then apply the same + /// loopback-connect gate the JS raw-TCP path uses (`filter_tcp_connect_ip_addrs`) + /// so a rebinding DNS server can't map an allowlisted hostname onto a + /// sidecar-local loopback port the VM policy didn't open. fn python_socket_pinned_addrs( &self, vm_id: &str, host: &str, + port: u16, ) -> Result, SidecarError> { let Some(vm) = self.vms.get(vm_id) else { return Err(SidecarError::InvalidState(String::from( "python socket op for unknown vm", ))); }; + let context = build_javascript_socket_path_context(vm)?; if let Ok(literal_ip) = host.parse::() { - filter_dns_safe_ip_addrs(vec![literal_ip], host) + filter_tcp_connect_ip_addrs(vec![literal_ip], host, port, &context) } else { - filter_dns_safe_ip_addrs( + filter_tcp_connect_ip_addrs( resolve_dns_ip_addrs( &self.bridge, &vm.kernel, @@ -5216,10 +5233,24 @@ where DnsLookupPolicy::SkipPermissions, )?, host, + port, + &context, ) } } + /// Enforce the VM's `max_sockets` resource limit before opening another + /// host socket for the guest (the registry is otherwise unbounded — a + /// hostile guest could exhaust the sidecar's fds/memory). + fn check_python_socket_limit(&self, vm_id: &str) -> Result<(), SidecarError> { + let Some(vm) = self.vms.get(vm_id) else { + return Ok(()); + }; + let limit = vm.kernel.resource_limits().max_sockets; + let current = vm_network_resource_counts(vm).sockets; + check_network_resource_limit(limit, current, 1, "socket") + } + fn store_python_socket( &mut self, vm_id: &str, @@ -12250,8 +12281,11 @@ pub(crate) fn format_tcp_resource(host: &str, port: u16) -> String { /// Host-socket read timeout for one `recv`/`recvfrom` RPC. Kept short so the /// synchronous RPC returns promptly (data, or a `timed_out` flag the Python /// shim re-polls on) and never stalls the shared sidecar event loop. -const PYTHON_SOCKET_READ_POLL: Duration = Duration::from_millis(100); +// Short so a recv/recvfrom RPC holds the shared event loop only briefly; the +// guest shim adds a capped backoff between polls to bound the poll rate. +const PYTHON_SOCKET_READ_POLL: Duration = Duration::from_millis(25); const PYTHON_SOCKET_CONNECT_TIMEOUT: Duration = Duration::from_secs(30); +const PYTHON_SOCKET_WRITE_TIMEOUT: Duration = Duration::from_secs(30); const PYTHON_SOCKET_DEFAULT_RECV: usize = 65536; const PYTHON_SOCKET_MAX_RECV: usize = 4 * 1024 * 1024; diff --git a/crates/sidecar/src/filesystem.rs b/crates/sidecar/src/filesystem.rs index f9da7602..36a1a12a 100644 --- a/crates/sidecar/src/filesystem.rs +++ b/crates/sidecar/src/filesystem.rs @@ -776,6 +776,20 @@ where }, }) .map_err(kernel_error), + // Like Stat but does NOT follow symlinks, so the runner can + // represent a host-preexisting symlink as a link node. + PythonVfsRpcMethod::Lstat => vm + .kernel + .lstat(&path) + .map(|stat| PythonVfsRpcResponsePayload::Stat { + stat: PythonVfsRpcStat { + mode: stat.mode, + size: stat.size, + is_directory: stat.is_directory, + is_symbolic_link: stat.is_symbolic_link, + }, + }) + .map_err(kernel_error), PythonVfsRpcMethod::ReadDir => vm .kernel .read_dir(&path) @@ -818,6 +832,72 @@ where Err(error) => Err(error), } } + // Kernel-direct (no shadow mirror): guest Python writes/creates + // land only in the kernel VFS, so mirroring create/modify ops into + // the host-side shadow would leave empty stubs that a later + // shadow->kernel sync resurrects over real content. (Delete/rename + // still mirror — to *remove* stale wire-written shadow entries.) + PythonVfsRpcMethod::Symlink => { + let target = request.target.clone().ok_or_else(|| { + SidecarError::InvalidState(format!( + "python VFS fsSymlink for {} requires a target", + path + )) + })?; + vm.kernel + .symlink(&target, &path) + .map(|()| PythonVfsRpcResponsePayload::Empty) + .map_err(kernel_error) + } + PythonVfsRpcMethod::ReadLink => vm + .kernel + .read_link(&path) + .map(|target| PythonVfsRpcResponsePayload::SymlinkTarget { target }) + .map_err(kernel_error), + // `setattr` carries any of mode/uid/gid/atime+mtime; apply each + // present field to the host VFS. + PythonVfsRpcMethod::Setattr => { + (|| -> Result { + // Mirror metadata into the host shadow only when the entry + // already exists there (a host-mounted / wire-written file), + // so the next shadow->kernel reconcile keeps the guest's + // change. Never *create* a shadow stub for a kernel-only + // guest file (that resurrected empty content). + let mirror = shadow_host_path_for_guest(&vm.cwd, &path).exists(); + if let Some(mode) = request.mode { + vm.kernel.chmod(&path, mode).map_err(kernel_error)?; + if mirror { + mirror_guest_chmod_to_shadow(vm, &path, mode)?; + } + } + // uid/gid apply independently (`os.chown(p, uid, -1)` keeps + // the other side); fill the missing side from the current + // owner rather than dropping the whole chown. + if request.uid.is_some() || request.gid.is_some() { + let current = vm.kernel.stat(&path).map_err(kernel_error)?; + let uid = request.uid.unwrap_or(current.uid); + let gid = request.gid.unwrap_or(current.gid); + vm.kernel.chown(&path, uid, gid).map_err(kernel_error)?; + } + if let (Some(atime_ms), Some(mtime_ms)) = + (request.atime_ms, request.mtime_ms) + { + vm.kernel + .utimes(&path, atime_ms, mtime_ms) + .map_err(kernel_error)?; + if mirror { + mirror_guest_utimes_to_shadow( + vm, + &path, + VirtualUtimeSpec::Set(VirtualTimeSpec::from_millis(atime_ms)), + VirtualUtimeSpec::Set(VirtualTimeSpec::from_millis(mtime_ms)), + true, + )?; + } + } + Ok(PythonVfsRpcResponsePayload::Empty) + })() + } PythonVfsRpcMethod::HttpRequest | PythonVfsRpcMethod::DnsLookup | PythonVfsRpcMethod::SubprocessRun diff --git a/crates/sidecar/tests/python.rs b/crates/sidecar/tests/python.rs index 4c06c47c..8be71990 100644 --- a/crates/sidecar/tests/python.rs +++ b/crates/sidecar/tests/python.rs @@ -206,7 +206,7 @@ fn spawn_tcp_echo_server() -> (u16, thread::JoinHandle<()>) { .expect("set nonblocking echo listener"); let port = listener.local_addr().expect("echo listener address").port(); let handle = thread::spawn(move || { - let deadline = Instant::now() + Duration::from_secs(60); + let deadline = Instant::now() + Duration::from_secs(180); while Instant::now() < deadline { match listener.accept() { Ok((mut stream, _)) => { @@ -242,7 +242,7 @@ fn spawn_udp_echo_server() -> (u16, thread::JoinHandle<()>) { .expect("set udp echo read timeout"); let port = socket.local_addr().expect("udp echo address").port(); let handle = thread::spawn(move || { - let deadline = Instant::now() + Duration::from_secs(60); + let deadline = Instant::now() + Duration::from_secs(180); let mut buf = [0_u8; 4096]; while Instant::now() < deadline { match socket.recv_from(&mut buf) { @@ -686,6 +686,113 @@ fn guest_exists( response.exists.expect("guest filesystem exists flag") } +fn guest_readlink( + sidecar: &mut secure_exec_sidecar::NativeSidecar, + request_id: RequestId, + connection_id: &str, + session_id: &str, + vm_id: &str, + path: &str, +) -> String { + let response = guest_filesystem_call( + sidecar, + request_id, + connection_id, + session_id, + vm_id, + GuestFilesystemCallRequest { + operation: GuestFilesystemOperation::ReadLink, + path: path.to_owned(), + destination_path: None, + target: None, + content: None, + encoding: None, + recursive: false, + mode: None, + uid: None, + gid: None, + atime_ms: None, + mtime_ms: None, + len: None, + offset: None, + }, + ); + + assert_eq!(response.operation, GuestFilesystemOperation::ReadLink); + response.target.expect("guest filesystem readlink target") +} + +fn guest_symlink( + sidecar: &mut secure_exec_sidecar::NativeSidecar, + request_id: RequestId, + connection_id: &str, + session_id: &str, + vm_id: &str, + link_path: &str, + target: &str, +) { + let response = guest_filesystem_call( + sidecar, + request_id, + connection_id, + session_id, + vm_id, + GuestFilesystemCallRequest { + operation: GuestFilesystemOperation::Symlink, + path: link_path.to_owned(), + destination_path: None, + target: Some(target.to_owned()), + content: None, + encoding: None, + recursive: false, + mode: None, + uid: None, + gid: None, + atime_ms: None, + mtime_ms: None, + len: None, + offset: None, + }, + ); + + assert_eq!(response.operation, GuestFilesystemOperation::Symlink); +} + +fn guest_stat_mode( + sidecar: &mut secure_exec_sidecar::NativeSidecar, + request_id: RequestId, + connection_id: &str, + session_id: &str, + vm_id: &str, + path: &str, +) -> u32 { + let response = guest_filesystem_call( + sidecar, + request_id, + connection_id, + session_id, + vm_id, + GuestFilesystemCallRequest { + operation: GuestFilesystemOperation::Stat, + path: path.to_owned(), + destination_path: None, + target: None, + content: None, + encoding: None, + recursive: false, + mode: None, + uid: None, + gid: None, + atime_ms: None, + mtime_ms: None, + len: None, + offset: None, + }, + ); + + response.stat.expect("guest filesystem stat").mode +} + fn write_process_stdin( sidecar: &mut secure_exec_sidecar::NativeSidecar, request_id: RequestId, @@ -1471,7 +1578,7 @@ print(json.dumps(result)) &session_id, &vm_id, "proc-python-sockets", - Duration::from_secs(60), + Duration::from_secs(120), ); let _ = tcp_server.join(); @@ -1487,6 +1594,132 @@ print(json.dumps(result)) assert_eq!(parsed["udp"], "ping udp"); } +#[test] +fn python_runtime_supports_symlink_readlink_and_metadata() { + assert_node_available(); + + let mut sidecar = new_sidecar("python-fs-hooks"); + let cwd = temp_dir("python-fs-hooks-cwd"); + let connection_id = authenticate_wire(&mut sidecar, "conn-python"); + let session_id = open_session_wire(&mut sidecar, 2, &connection_id); + let (vm_id, _) = create_vm_wire( + &mut sidecar, + 3, + &connection_id, + &session_id, + GuestRuntimeKind::Python, + &cwd, + ); + + bootstrap_root_filesystem( + &mut sidecar, + 4, + &connection_id, + &session_id, + &vm_id, + vec![root_dir("/workspace")], + ); + + // A symlink that already exists on the host (created via the wire, not by + // Python) — exercises lstat-based detection of pre-existing links. + guest_symlink( + &mut sidecar, + 5, + &connection_id, + &session_id, + &vm_id, + "/workspace/hostlink.txt", + "file.txt", + ); + + execute_inline_python( + &mut sidecar, + 6, + &connection_id, + &session_id, + &vm_id, + "proc-python-fs-hooks", + r#" +import json +import os + +result = {} + +with open("/workspace/file.txt", "w", encoding="utf-8") as handle: + handle.write("data") + +# symlink + readlink (created by Python) +os.symlink("file.txt", "/workspace/link.txt") +result["readlink"] = os.readlink("/workspace/link.txt") +result["islink"] = os.path.islink("/workspace/link.txt") + +# a symlink that pre-existed on the host is detected as a link +result["host_islink"] = os.path.islink("/workspace/hostlink.txt") +result["host_readlink"] = os.readlink("/workspace/hostlink.txt") + +# chmod (setattr -> host) +os.chmod("/workspace/file.txt", 0o640) +result["mode"] = os.stat("/workspace/file.txt").st_mode & 0o777 + +# utimes (setattr -> host) — just exercise the hook +os.utime("/workspace/file.txt", (1700000000, 1710000000)) + +print(json.dumps(result)) +"#, + ); + + let (stdout, stderr, exit_code) = collect_process_output( + &mut sidecar, + &connection_id, + &session_id, + &vm_id, + "proc-python-fs-hooks", + ); + + assert_eq!(exit_code, 0, "stdout: {stdout}\nstderr: {stderr}"); + assert!(stderr.is_empty(), "unexpected stderr: {stderr}"); + + let parsed: Value = serde_json::from_str(stdout.trim()).expect("parse fs-hooks JSON"); + assert_eq!( + parsed["readlink"], "file.txt", + "os.readlink should return the target" + ); + assert_eq!(parsed["islink"], true, "os.path.islink should be true"); + assert_eq!( + parsed["host_islink"], true, + "a host-preexisting symlink should be detected as a link" + ); + assert_eq!(parsed["host_readlink"], "file.txt"); + assert_eq!(parsed["mode"], 0o640, "os.chmod should be reflected"); + + // Cross-check the host kernel VFS. + let host_target = guest_readlink( + &mut sidecar, + 7, + &connection_id, + &session_id, + &vm_id, + "/workspace/link.txt", + ); + assert_eq!( + host_target, "file.txt", + "host VFS should resolve the symlink" + ); + let host_mode = guest_stat_mode( + &mut sidecar, + 8, + &connection_id, + &session_id, + &vm_id, + "/workspace/file.txt", + ); + assert_eq!( + host_mode & 0o777, + 0o640, + "host VFS should reflect the chmod" + ); +} + fn workspace_files_are_shared_between_javascript_and_python_runtimes() { assert_node_available(); diff --git a/crates/sidecar/tests/service.rs b/crates/sidecar/tests/service.rs index 6daa847e..db657f81 100644 --- a/crates/sidecar/tests/service.rs +++ b/crates/sidecar/tests/service.rs @@ -5587,6 +5587,12 @@ ykAheWCsAteSEWVc0w==\n\ method: PythonVfsRpcMethod::Mkdir, path: String::from("/tmp/stale-python-rpc"), destination: None, + target: None, + mode: None, + uid: None, + gid: None, + atime_ms: None, + mtime_ms: None, content_base64: None, recursive: false, url: None, @@ -5626,6 +5632,12 @@ ykAheWCsAteSEWVc0w==\n\ method: PythonVfsRpcMethod::Mkdir, path: String::from("/tmp/stale-python-rpc"), destination: None, + target: None, + mode: None, + uid: None, + gid: None, + atime_ms: None, + mtime_ms: None, content_base64: None, recursive: false, url: None, @@ -10120,6 +10132,12 @@ export async function loadPyodide() { method: PythonVfsRpcMethod::Mkdir, path: String::from("/workspace"), destination: None, + target: None, + mode: None, + uid: None, + gid: None, + atime_ms: None, + mtime_ms: None, content_base64: None, recursive: false, url: None, @@ -10149,6 +10167,12 @@ export async function loadPyodide() { method: PythonVfsRpcMethod::Write, path: String::from("/workspace/note.txt"), destination: None, + target: None, + mode: None, + uid: None, + gid: None, + atime_ms: None, + mtime_ms: None, content_base64: Some(String::from("aGVsbG8gZnJvbSBzaWRlY2FyIHJwYw==")), recursive: false, url: None,