diff --git a/crates/execution/assets/runners/python-runner.mjs b/crates/execution/assets/runners/python-runner.mjs index f28f821d..4ec50bb0 100644 --- a/crates/execution/assets/runners/python-runner.mjs +++ b/crates/execution/assets/runners/python-runner.mjs @@ -638,6 +638,29 @@ function createPythonBridgeRpcBridge() { maxBuffer, })); }, + socketConnectSync(host, port) { + return JSON.stringify(requestSync('socketConnect', { hostname: host, port })); + }, + socketSendSync(socketId, dataBase64) { + return JSON.stringify(requestSync('socketSend', { socketId, bodyBase64: dataBase64 })); + }, + socketRecvSync(socketId, maxBuffer) { + return JSON.stringify(requestSync('socketRecv', { socketId, maxBuffer })); + }, + socketCloseSync(socketId) { + return JSON.stringify(requestSync('socketClose', { socketId })); + }, + udpCreateSync() { + return JSON.stringify(requestSync('udpCreate', {})); + }, + udpSendtoSync(socketId, host, port, dataBase64) { + return JSON.stringify( + requestSync('udpSendto', { socketId, hostname: host, port, bodyBase64: dataBase64 }), + ); + }, + udpRecvfromSync(socketId, maxBuffer) { + return JSON.stringify(requestSync('udpRecvfrom', { socketId, maxBuffer })); + }, dispose() {}, }; } @@ -1094,6 +1117,159 @@ def _agentos_gethostbyname(host): _agentos_socket.getaddrinfo = _agentos_getaddrinfo _agentos_socket.gethostbyname = _agentos_gethostbyname +# Raw socket bridge: back socket.socket() with the host (outbound TCP + UDP). +# Reads poll (the host uses a short read timeout) so the synchronous RPC never +# stalls the sidecar; the loop below re-polls to emulate blocking semantics. +import base64 as _agentos_base64 +import time as _agentos_time + +_agentos_original_socket_class = _agentos_socket.socket + +class _SecureExecSocket: + def __init__(self, family=None, type=None, proto=0, fileno=None): + self.family = family if family is not None else _agentos_socket.AF_INET + self.type = type if type is not None else _agentos_socket.SOCK_STREAM + self.proto = proto + self._timeout = None # None blocks; 0 is non-blocking; >0 is a deadline + self._id = None + self._closed = False + self._is_udp = self.type == _agentos_socket.SOCK_DGRAM + if self._is_udp: + resp = _agentos_json.loads(_agentos_rpc.udpCreateSync()) + self._id = int(resp["socketId"]) + + def connect(self, address): + host, port = address[0], address[1] + resp = _agentos_json.loads(_agentos_rpc.socketConnectSync(str(host), int(port))) + self._id = int(resp["socketId"]) + + def connect_ex(self, address): + try: + self.connect(address) + return 0 + except OSError as exc: + return exc.errno or 1 + + def _ensure_id(self): + if self._id is None: + raise OSError(9, "Bad file descriptor") + return self._id + + def send(self, data, flags=0): + sid = self._ensure_id() + b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii") + resp = _agentos_json.loads(_agentos_rpc.socketSendSync(sid, b64)) + return int(resp.get("bytesSent", len(data))) + + def sendall(self, data, flags=0): + payload = bytes(data) + total = 0 + while total < len(payload): + total += self.send(payload[total:], flags) + return None + + def _poll(self, bufsize, recv_fn): + deadline = None + if self._timeout is not None and self._timeout > 0: + deadline = _agentos_time.monotonic() + self._timeout + while True: + resp = _agentos_json.loads(recv_fn(int(bufsize))) + if resp.get("closed"): + return b"", resp + data = resp.get("dataBase64") or "" + if data: + return _agentos_base64.b64decode(data), resp + if resp.get("timedOut"): + if self._timeout == 0: + raise BlockingIOError(11, "Resource temporarily unavailable") + if deadline is not None and _agentos_time.monotonic() >= deadline: + raise _agentos_socket.timeout("timed out") + continue + return b"", resp + + def recv(self, bufsize, flags=0): + sid = self._ensure_id() + data, _ = self._poll(bufsize, lambda n: _agentos_rpc.socketRecvSync(sid, n)) + return data + + def sendto(self, data, *args): + address = args[-1] + host, port = address[0], address[1] + if self._id is None: + resp = _agentos_json.loads(_agentos_rpc.udpCreateSync()) + self._id = int(resp["socketId"]) + b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii") + resp = _agentos_json.loads( + _agentos_rpc.udpSendtoSync(self._id, str(host), int(port), b64) + ) + return int(resp.get("bytesSent", len(data))) + + def recvfrom(self, bufsize, flags=0): + sid = self._ensure_id() + data, resp = self._poll(bufsize, lambda n: _agentos_rpc.udpRecvfromSync(sid, n)) + addr = (resp.get("host", ""), int(resp.get("port", 0))) if resp else ("", 0) + return data, addr + + def settimeout(self, value): + self._timeout = value + + def gettimeout(self): + return self._timeout + + def setblocking(self, flag): + self._timeout = None if flag else 0.0 + + def setsockopt(self, *args, **kwargs): + return None + + def getsockopt(self, *args, **kwargs): + return 0 + + def fileno(self): + return self._id if self._id is not None else -1 + + def getpeername(self): + return ("", 0) + + def getsockname(self): + return ("0.0.0.0", 0) + + def close(self): + if self._closed: + return + self._closed = True + if self._id is not None: + try: + _agentos_rpc.socketCloseSync(self._id) + except Exception: + pass + self._id = None + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + + def __del__(self): + try: + self.close() + except Exception: + pass + +def _agentos_socket_factory(family=-1, type=-1, proto=0, fileno=None): + fam = family if family != -1 else _agentos_socket.AF_INET + typ = type if type != -1 else _agentos_socket.SOCK_STREAM + if ( + fileno is None + and fam in (_agentos_socket.AF_INET, _agentos_socket.AF_INET6) + and typ in (_agentos_socket.SOCK_STREAM, _agentos_socket.SOCK_DGRAM) + ): + return _SecureExecSocket(fam, typ, proto) + return _agentos_original_socket_class(family, type, proto, fileno) + +_agentos_socket.socket = _agentos_socket_factory + class _SecureExecRequestsResponse: def __init__(self, payload): self.status_code = int(payload.get("status", 0)) diff --git a/crates/execution/src/python.rs b/crates/execution/src/python.rs index 45fb6413..e3f76a4b 100644 --- a/crates/execution/src/python.rs +++ b/crates/execution/src/python.rs @@ -56,6 +56,13 @@ pub enum PythonVfsRpcMethod { HttpRequest, DnsLookup, SubprocessRun, + SocketConnect, + SocketSend, + SocketRecv, + SocketClose, + UdpCreate, + UdpSendto, + UdpRecvfrom, } impl PythonVfsRpcMethod { @@ -72,6 +79,13 @@ impl PythonVfsRpcMethod { "httpRequest" => Some(Self::HttpRequest), "dnsLookup" => Some(Self::DnsLookup), "subprocessRun" => Some(Self::SubprocessRun), + "socketConnect" => Some(Self::SocketConnect), + "socketSend" => Some(Self::SocketSend), + "socketRecv" => Some(Self::SocketRecv), + "socketClose" => Some(Self::SocketClose), + "udpCreate" => Some(Self::UdpCreate), + "udpSendto" => Some(Self::UdpSendto), + "udpRecvfrom" => Some(Self::UdpRecvfrom), _ => None, } } @@ -92,6 +106,10 @@ pub struct PythonVfsRpcRequest { pub body_base64: Option, pub hostname: Option, pub family: Option, + /// Port for socket connect/sendto. + pub port: Option, + /// Socket handle for send/recv/close/sendto/recvfrom. + pub socket_id: Option, pub command: Option, pub args: Vec, pub cwd: Option, @@ -136,6 +154,23 @@ pub enum PythonVfsRpcResponsePayload { stderr: String, max_buffer_exceeded: bool, }, + SocketCreated { + socket_id: u64, + }, + SocketSent { + bytes_sent: usize, + }, + SocketReceived { + data_base64: String, + closed: bool, + timed_out: bool, + }, + UdpReceived { + data_base64: String, + host: String, + port: u16, + timed_out: bool, + }, } #[derive(Debug, Deserialize)] @@ -163,6 +198,10 @@ struct PythonVfsBridgeRequestWire { #[serde(default)] family: Option, #[serde(default)] + port: Option, + #[serde(default, rename = "socketId")] + socket_id: Option, + #[serde(default)] command: Option, #[serde(default)] args: Vec, @@ -477,6 +516,32 @@ impl PythonExecution { "stderr": stderr, "maxBufferExceeded": max_buffer_exceeded, }), + PythonVfsRpcResponsePayload::SocketCreated { socket_id } => json!({ + "socketId": socket_id, + }), + PythonVfsRpcResponsePayload::SocketSent { bytes_sent } => json!({ + "bytesSent": bytes_sent, + }), + PythonVfsRpcResponsePayload::SocketReceived { + data_base64, + closed, + timed_out, + } => json!({ + "dataBase64": data_base64, + "closed": closed, + "timedOut": timed_out, + }), + PythonVfsRpcResponsePayload::UdpReceived { + data_base64, + host, + port, + timed_out, + } => json!({ + "dataBase64": data_base64, + "host": host, + "port": port, + "timedOut": timed_out, + }), }; self.inner @@ -1195,6 +1260,8 @@ fn parse_python_bridge_sync_rpc_request( body_base64: wire.body_base64, hostname: wire.hostname, family: wire.family, + port: wire.port, + socket_id: wire.socket_id, command: wire.command, args: wire.args, cwd: wire.cwd, diff --git a/crates/sidecar/src/execution.rs b/crates/sidecar/src/execution.rs index 86cebb72..1a7590e0 100644 --- a/crates/sidecar/src/execution.rs +++ b/crates/sidecar/src/execution.rs @@ -17,10 +17,9 @@ use crate::protocol::{ OwnershipScope, ProcessExitedEvent, ProcessKilledResponse, ProcessOutputEvent, ProcessSnapshotEntry, ProcessSnapshotResponse, ProcessSnapshotStatus, ProcessStartedResponse, PtyResizedResponse, RequestFrame, ResizePtyRequest, ResponseFrame, ResponsePayload, - SidecarRequestPayload, SignalDispositionAction, - SignalHandlerRegistration, SignalStateResponse, SocketStateEntry, StdinClosedResponse, - StdinWrittenResponse, StreamChannel, VmFetchRequest, VmFetchResponse, WasmPermissionTier, - WriteStdinRequest, ZombieTimerCountResponse, + SidecarRequestPayload, SignalDispositionAction, SignalHandlerRegistration, SignalStateResponse, + SocketStateEntry, StdinClosedResponse, StdinWrittenResponse, StreamChannel, VmFetchRequest, + VmFetchResponse, WasmPermissionTier, WriteStdinRequest, ZombieTimerCountResponse, }; use crate::service::{ audit_fields, dirname, emit_security_audit_event, emit_structured_event, javascript_error, @@ -40,12 +39,13 @@ use crate::state::{ JavascriptTlsBridgeOptions, JavascriptTlsClientHello, JavascriptTlsDataValue, JavascriptTlsMaterial, JavascriptUdpFamily, JavascriptUdpSocketEvent, JavascriptUnixListenerEvent, NetworkResourceCounts, PendingTcpSocket, PendingUnixSocket, - ProcNetEntry, ProcessEventEnvelope, ResolvedChildProcessExecution, ResolvedTcpConnectAddr, - SharedBridge, SharedSidecarRequestClient, SidecarKernel, SocketQueryKind, ToolExecution, - VmDnsConfig, VmListenPolicy, VmState, DEFAULT_JAVASCRIPT_NET_BACKLOG, EXECUTION_DRIVER_NAME, - EXECUTION_SANDBOX_ROOT_ENV, JAVASCRIPT_COMMAND, LOOPBACK_EXEMPT_PORTS_ENV, - MAPPED_HOST_FD_START, PYTHON_COMMAND, TOOL_DRIVER_NAME, - VM_LISTEN_ALLOW_PRIVILEGED_METADATA_KEY, WASM_COMMAND, WASM_STDIO_SYNC_RPC_ENV, + ProcNetEntry, ProcessEventEnvelope, PythonHostSocket, ResolvedChildProcessExecution, + ResolvedTcpConnectAddr, SharedBridge, SharedSidecarRequestClient, SidecarKernel, + SocketQueryKind, ToolExecution, VmDnsConfig, VmListenPolicy, VmState, + DEFAULT_JAVASCRIPT_NET_BACKLOG, EXECUTION_DRIVER_NAME, EXECUTION_SANDBOX_ROOT_ENV, + JAVASCRIPT_COMMAND, LOOPBACK_EXEMPT_PORTS_ENV, MAPPED_HOST_FD_START, PYTHON_COMMAND, + TOOL_DRIVER_NAME, VM_LISTEN_ALLOW_PRIVILEGED_METADATA_KEY, WASM_COMMAND, + WASM_STDIO_SYNC_RPC_ENV, }; use crate::tools::{ format_tool_failure_output, is_tool_command, normalized_tool_command_name, @@ -368,6 +368,8 @@ impl ActiveProcess { next_unix_socket_id: 0, udp_sockets: BTreeMap::new(), next_udp_socket_id: 0, + python_sockets: BTreeMap::new(), + next_python_socket_id: 0, cipher_sessions: BTreeMap::new(), next_cipher_session_id: 0, diffie_hellman_sessions: BTreeMap::new(), @@ -4774,6 +4776,15 @@ where PythonVfsRpcMethod::SubprocessRun => { self.handle_python_subprocess_rpc_request(vm_id, process_id, request) } + PythonVfsRpcMethod::SocketConnect + | PythonVfsRpcMethod::SocketSend + | PythonVfsRpcMethod::SocketRecv + | PythonVfsRpcMethod::SocketClose + | PythonVfsRpcMethod::UdpCreate + | PythonVfsRpcMethod::UdpSendto + | PythonVfsRpcMethod::UdpRecvfrom => { + self.handle_python_socket_rpc_request(vm_id, process_id, request) + } } } @@ -5037,6 +5048,233 @@ where self.respond_python_rpc(vm_id, process_id, request.id, response) } + fn handle_python_socket_rpc_request( + &mut self, + vm_id: &str, + process_id: &str, + request: PythonVfsRpcRequest, + ) -> Result<(), SidecarError> { + if self.vms.get(vm_id).is_none() { + return Ok(()); + } + let response = self.python_socket_op(vm_id, process_id, &request); + self.respond_python_rpc(vm_id, process_id, request.id, response) + } + + fn python_socket_op( + &mut self, + vm_id: &str, + process_id: &str, + request: &PythonVfsRpcRequest, + ) -> Result { + match request.method { + PythonVfsRpcMethod::SocketConnect => { + let host = python_socket_host(request)?; + let port = python_socket_port(request)?; + self.bridge.require_network_access( + vm_id, + NetworkOperation::Http, + format_tcp_resource(&host, port), + )?; + let pinned = self.python_socket_pinned_addrs(vm_id, &host)?; + let stream = python_connect_tcp(&pinned, port)?; + stream + .set_read_timeout(Some(PYTHON_SOCKET_READ_POLL)) + .map_err(python_socket_io_error)?; + let socket_id = + self.store_python_socket(vm_id, process_id, PythonHostSocket::Tcp(stream))?; + Ok(PythonVfsRpcResponsePayload::SocketCreated { socket_id }) + } + PythonVfsRpcMethod::SocketSend => { + let data = python_socket_payload(request)?; + let socket = self.python_socket_mut(vm_id, process_id, request)?; + let PythonHostSocket::Tcp(stream) = socket else { + return Err(python_socket_kind_error("send", "TCP")); + }; + stream.write_all(&data).map_err(python_socket_io_error)?; + Ok(PythonVfsRpcResponsePayload::SocketSent { + bytes_sent: data.len(), + }) + } + PythonVfsRpcMethod::SocketRecv => { + let max = python_socket_recv_len(request); + let socket = self.python_socket_mut(vm_id, process_id, request)?; + let PythonHostSocket::Tcp(stream) = socket else { + return Err(python_socket_kind_error("recv", "TCP")); + }; + let mut buf = vec![0u8; max]; + match stream.read(&mut buf) { + Ok(0) => Ok(PythonVfsRpcResponsePayload::SocketReceived { + data_base64: String::new(), + closed: true, + timed_out: false, + }), + Ok(n) => Ok(PythonVfsRpcResponsePayload::SocketReceived { + data_base64: base64::engine::general_purpose::STANDARD.encode(&buf[..n]), + closed: false, + timed_out: false, + }), + Err(error) if python_socket_would_block(&error) => { + Ok(PythonVfsRpcResponsePayload::SocketReceived { + data_base64: String::new(), + closed: false, + timed_out: true, + }) + } + Err(error) => Err(python_socket_io_error(error)), + } + } + PythonVfsRpcMethod::SocketClose => { + self.remove_python_socket(vm_id, process_id, request); + Ok(PythonVfsRpcResponsePayload::Empty) + } + PythonVfsRpcMethod::UdpCreate => { + let socket = UdpSocket::bind("0.0.0.0:0").map_err(python_socket_io_error)?; + socket + .set_read_timeout(Some(PYTHON_SOCKET_READ_POLL)) + .map_err(python_socket_io_error)?; + let socket_id = + self.store_python_socket(vm_id, process_id, PythonHostSocket::Udp(socket))?; + Ok(PythonVfsRpcResponsePayload::SocketCreated { socket_id }) + } + PythonVfsRpcMethod::UdpSendto => { + let host = python_socket_host(request)?; + let port = python_socket_port(request)?; + let data = python_socket_payload(request)?; + self.bridge.require_network_access( + vm_id, + NetworkOperation::Http, + format_tcp_resource(&host, port), + )?; + let pinned = self.python_socket_pinned_addrs(vm_id, &host)?; + let target = pinned + .first() + .map(|ip| SocketAddr::new(*ip, port)) + .ok_or_else(|| { + SidecarError::Execution(format!("EAI_NONAME: cannot resolve {host}")) + })?; + let socket = self.python_socket_mut(vm_id, process_id, request)?; + let PythonHostSocket::Udp(udp) = socket else { + return Err(python_socket_kind_error("sendto", "UDP")); + }; + let sent = udp.send_to(&data, target).map_err(python_socket_io_error)?; + Ok(PythonVfsRpcResponsePayload::SocketSent { bytes_sent: sent }) + } + PythonVfsRpcMethod::UdpRecvfrom => { + let max = python_socket_recv_len(request); + let socket = self.python_socket_mut(vm_id, process_id, request)?; + let PythonHostSocket::Udp(udp) = socket else { + return Err(python_socket_kind_error("recvfrom", "UDP")); + }; + let mut buf = vec![0u8; max]; + match udp.recv_from(&mut buf) { + Ok((n, addr)) => Ok(PythonVfsRpcResponsePayload::UdpReceived { + data_base64: base64::engine::general_purpose::STANDARD.encode(&buf[..n]), + host: addr.ip().to_string(), + port: addr.port(), + timed_out: false, + }), + Err(error) if python_socket_would_block(&error) => { + Ok(PythonVfsRpcResponsePayload::UdpReceived { + data_base64: String::new(), + host: String::new(), + port: 0, + timed_out: true, + }) + } + Err(error) => Err(python_socket_io_error(error)), + } + } + _ => Err(SidecarError::InvalidState(String::from( + "non-socket python RPC reached the socket dispatcher unexpectedly", + ))), + } + } + + /// Resolve `host` to the egress-guard-approved IP set (literal IPs are + /// validated directly), mirroring the HTTP bridge's DNS pinning. + fn python_socket_pinned_addrs( + &self, + vm_id: &str, + host: &str, + ) -> Result, SidecarError> { + let Some(vm) = self.vms.get(vm_id) else { + return Err(SidecarError::InvalidState(String::from( + "python socket op for unknown vm", + ))); + }; + if let Ok(literal_ip) = host.parse::() { + filter_dns_safe_ip_addrs(vec![literal_ip], host) + } else { + filter_dns_safe_ip_addrs( + resolve_dns_ip_addrs( + &self.bridge, + &vm.kernel, + vm_id, + &vm.dns, + host, + DnsLookupPolicy::SkipPermissions, + )?, + host, + ) + } + } + + fn store_python_socket( + &mut self, + vm_id: &str, + process_id: &str, + socket: PythonHostSocket, + ) -> Result { + let process = self + .vms + .get_mut(vm_id) + .and_then(|vm| vm.active_processes.get_mut(process_id)) + .ok_or_else(|| { + SidecarError::InvalidState(String::from("python socket op for reaped vm/process")) + })?; + let socket_id = process.next_python_socket_id; + process.next_python_socket_id = process.next_python_socket_id.wrapping_add(1); + process.python_sockets.insert(socket_id, socket); + Ok(socket_id) + } + + fn python_socket_mut( + &mut self, + vm_id: &str, + process_id: &str, + request: &PythonVfsRpcRequest, + ) -> Result<&mut PythonHostSocket, SidecarError> { + let socket_id = request.socket_id.ok_or_else(|| { + SidecarError::InvalidState(String::from("python socket op requires socketId")) + })?; + self.vms + .get_mut(vm_id) + .and_then(|vm| vm.active_processes.get_mut(process_id)) + .and_then(|process| process.python_sockets.get_mut(&socket_id)) + .ok_or_else(|| { + SidecarError::Execution(format!("EBADF: unknown python socket {socket_id}")) + }) + } + + fn remove_python_socket( + &mut self, + vm_id: &str, + process_id: &str, + request: &PythonVfsRpcRequest, + ) { + let Some(socket_id) = request.socket_id else { + return; + }; + if let Some(process) = self + .vms + .get_mut(vm_id) + .and_then(|vm| vm.active_processes.get_mut(process_id)) + { + process.python_sockets.remove(&socket_id); + } + } + fn respond_python_rpc( &mut self, vm_id: &str, @@ -12007,6 +12245,79 @@ pub(crate) fn format_tcp_resource(host: &str, port: u16) -> String { format!("tcp://{host}:{port}") } +// --- Guest Python socket bridge helpers ------------------------------------ + +/// Host-socket read timeout for one `recv`/`recvfrom` RPC. Kept short so the +/// synchronous RPC returns promptly (data, or a `timed_out` flag the Python +/// shim re-polls on) and never stalls the shared sidecar event loop. +const PYTHON_SOCKET_READ_POLL: Duration = Duration::from_millis(100); +const PYTHON_SOCKET_CONNECT_TIMEOUT: Duration = Duration::from_secs(30); +const PYTHON_SOCKET_DEFAULT_RECV: usize = 65536; +const PYTHON_SOCKET_MAX_RECV: usize = 4 * 1024 * 1024; + +fn python_socket_host(request: &PythonVfsRpcRequest) -> Result { + request + .hostname + .clone() + .ok_or_else(|| SidecarError::InvalidState(String::from("python socket op requires a host"))) +} + +fn python_socket_port(request: &PythonVfsRpcRequest) -> Result { + request + .port + .ok_or_else(|| SidecarError::InvalidState(String::from("python socket op requires a port"))) +} + +fn python_socket_payload(request: &PythonVfsRpcRequest) -> Result, SidecarError> { + let Some(body) = request.body_base64.as_deref() else { + return Ok(Vec::new()); + }; + base64::engine::general_purpose::STANDARD + .decode(body) + .map_err(|error| { + SidecarError::InvalidState(format!("invalid base64 python socket payload: {error}")) + }) +} + +fn python_socket_recv_len(request: &PythonVfsRpcRequest) -> usize { + request + .max_buffer + .unwrap_or(PYTHON_SOCKET_DEFAULT_RECV) + .clamp(1, PYTHON_SOCKET_MAX_RECV) +} + +fn python_connect_tcp(addrs: &[IpAddr], port: u16) -> Result { + let mut last_error: Option = None; + for ip in addrs { + let addr = SocketAddr::new(*ip, port); + match TcpStream::connect_timeout(&addr, PYTHON_SOCKET_CONNECT_TIMEOUT) { + Ok(stream) => return Ok(stream), + Err(error) => last_error = Some(error.to_string()), + } + } + Err(SidecarError::Execution(format!( + "ECONNREFUSED: {}", + last_error.unwrap_or_else(|| String::from("no resolved addresses")) + ))) +} + +fn python_socket_would_block(error: &std::io::Error) -> bool { + matches!( + error.kind(), + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut + ) +} + +fn python_socket_io_error(error: std::io::Error) -> SidecarError { + SidecarError::Execution(format!("EIO: python socket: {error}")) +} + +fn python_socket_kind_error(op: &str, expected: &str) -> SidecarError { + SidecarError::Execution(format!( + "EOPNOTSUPP: python socket {op} requires a {expected} socket" + )) +} + fn is_loopback_ip(ip: IpAddr) -> bool { match ip { IpAddr::V4(ip) => ip.is_loopback(), @@ -16810,7 +17121,7 @@ fn install_kernel_stdin_pipe(kernel: &mut SidecarKernel, pid: u32) -> Result) -> Option<(u16, u16)> { let cols = env diff --git a/crates/sidecar/src/filesystem.rs b/crates/sidecar/src/filesystem.rs index 3fe0361b..f9da7602 100644 --- a/crates/sidecar/src/filesystem.rs +++ b/crates/sidecar/src/filesystem.rs @@ -820,11 +820,16 @@ where } PythonVfsRpcMethod::HttpRequest | PythonVfsRpcMethod::DnsLookup - | PythonVfsRpcMethod::SubprocessRun => { - Err(SidecarError::InvalidState(String::from( - "python non-filesystem RPC reached filesystem dispatcher unexpectedly", - ))) - } + | PythonVfsRpcMethod::SubprocessRun + | PythonVfsRpcMethod::SocketConnect + | PythonVfsRpcMethod::SocketSend + | PythonVfsRpcMethod::SocketRecv + | PythonVfsRpcMethod::SocketClose + | PythonVfsRpcMethod::UdpCreate + | PythonVfsRpcMethod::UdpSendto + | PythonVfsRpcMethod::UdpRecvfrom => Err(SidecarError::InvalidState(String::from( + "python non-filesystem RPC reached filesystem dispatcher unexpectedly", + ))), } } Err(error) => Err(error), diff --git a/crates/sidecar/src/state.rs b/crates/sidecar/src/state.rs index c916cd55..02622b7a 100644 --- a/crates/sidecar/src/state.rs +++ b/crates/sidecar/src/state.rs @@ -474,6 +474,11 @@ pub(crate) struct ActiveProcess { pub(crate) next_unix_socket_id: usize, pub(crate) udp_sockets: BTreeMap, pub(crate) next_udp_socket_id: usize, + /// Synchronous host sockets opened by the guest Python `socket` bridge, + /// keyed by the handle returned to the runner. Distinct from the JS + /// runtime's event-driven `tcp_sockets`/`udp_sockets` above. + pub(crate) python_sockets: BTreeMap, + pub(crate) next_python_socket_id: u64, pub(crate) cipher_sessions: BTreeMap, pub(crate) next_cipher_session_id: u64, pub(crate) diffie_hellman_sessions: BTreeMap, @@ -953,6 +958,15 @@ pub(crate) enum JavascriptUdpSocketEvent { }, } +/// A blocking host socket backing one guest Python socket. Reads use a short +/// timeout (set on the socket) so a `recv`/`recvfrom` RPC never stalls the +/// shared sidecar event loop; the Python shim re-polls to emulate blocking. +#[derive(Debug)] +pub(crate) enum PythonHostSocket { + Tcp(TcpStream), + Udp(UdpSocket), +} + #[derive(Debug)] pub(crate) struct ActiveUdpSocket { pub(crate) family: JavascriptUdpFamily, diff --git a/crates/sidecar/tests/python.rs b/crates/sidecar/tests/python.rs index ab05430d..4c06c47c 100644 --- a/crates/sidecar/tests/python.rs +++ b/crates/sidecar/tests/python.rs @@ -14,7 +14,7 @@ use serde_json::{json, Value}; use std::collections::HashMap; use std::fs; use std::io::{Read, Write}; -use std::net::TcpListener; +use std::net::{TcpListener, UdpSocket}; use std::os::unix::fs::symlink; use std::path::{Component, Path, PathBuf}; use std::sync::{ @@ -199,6 +199,67 @@ fn static_file_server_rejects_traversal_paths() { assert_eq!(static_file_path(root, "/packages/../../secret.txt"), None); } +fn spawn_tcp_echo_server() -> (u16, thread::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind tcp echo listener"); + listener + .set_nonblocking(true) + .expect("set nonblocking echo listener"); + let port = listener.local_addr().expect("echo listener address").port(); + let handle = thread::spawn(move || { + let deadline = Instant::now() + Duration::from_secs(60); + while Instant::now() < deadline { + match listener.accept() { + Ok((mut stream, _)) => { + stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); + let mut buf = [0_u8; 4096]; + loop { + match stream.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + if stream.write_all(&buf[..n]).is_err() { + break; + } + } + Err(_) => break, + } + } + return; + } + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(25)); + } + Err(_) => break, + } + } + }); + (port, handle) +} + +fn spawn_udp_echo_server() -> (u16, thread::JoinHandle<()>) { + let socket = UdpSocket::bind("127.0.0.1:0").expect("bind udp echo socket"); + socket + .set_read_timeout(Some(Duration::from_secs(1))) + .expect("set udp echo read timeout"); + let port = socket.local_addr().expect("udp echo address").port(); + let handle = thread::spawn(move || { + let deadline = Instant::now() + Duration::from_secs(60); + let mut buf = [0_u8; 4096]; + while Instant::now() < deadline { + match socket.recv_from(&mut buf) { + Ok((n, addr)) => { + let _ = socket.send_to(&buf[..n], addr); + return; + } + Err(error) + if error.kind() == std::io::ErrorKind::WouldBlock + || error.kind() == std::io::ErrorKind::TimedOut => {} + Err(_) => break, + } + } + }); + (port, handle) +} + fn spawn_static_file_server(root: PathBuf) -> (u16, thread::JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").expect("bind static file listener"); listener @@ -1338,6 +1399,94 @@ print(json.dumps(results)) ); } +#[test] +fn python_runtime_supports_raw_tcp_and_udp_sockets() { + assert_node_available(); + + let (tcp_port, tcp_server) = spawn_tcp_echo_server(); + let (udp_port, udp_server) = spawn_udp_echo_server(); + let mut sidecar = new_sidecar("python-sockets"); + let cwd = temp_dir("python-sockets-cwd"); + let connection_id = authenticate_wire(&mut sidecar, "conn-python"); + let session_id = open_session_wire(&mut sidecar, 2, &connection_id); + let vm_id = create_vm_with_metadata_and_permissions( + &mut sidecar, + 3, + &connection_id, + &session_id, + GuestRuntimeKind::Python, + &cwd, + HashMap::from([( + String::from("env.AGENTOS_LOOPBACK_EXEMPT_PORTS"), + serde_json::to_string(&vec![tcp_port.to_string(), udp_port.to_string()]) + .expect("serialize exempt ports"), + )]), + wire_permissions_allow_all(), + ); + + execute_inline_python( + &mut sidecar, + 4, + &connection_id, + &session_id, + &vm_id, + "proc-python-sockets", + &format!( + r#" +import json +import socket + +result = {{}} + +# Raw outbound TCP against a host echo server. +tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +tcp.settimeout(10) +tcp.connect(("127.0.0.1", {tcp_port})) +tcp.sendall(b"hello sockets") +received = b"" +while len(received) < len(b"hello sockets"): + chunk = tcp.recv(64) + if not chunk: + break + received += chunk +tcp.close() +result["tcp"] = received.decode() + +# Raw UDP against a host echo server. +udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +udp.settimeout(10) +udp.sendto(b"ping udp", ("127.0.0.1", {udp_port})) +data, _addr = udp.recvfrom(64) +udp.close() +result["udp"] = data.decode() + +print(json.dumps(result)) +"#, + ), + ); + + let (stdout, stderr, exit_code) = collect_process_output_with_timeout( + &mut sidecar, + &connection_id, + &session_id, + &vm_id, + "proc-python-sockets", + Duration::from_secs(60), + ); + + let _ = tcp_server.join(); + let _ = udp_server.join(); + assert_eq!(exit_code, 0, "stdout: {stdout}\nstderr: {stderr}"); + let json_line = stdout + .lines() + .rev() + .find(|line| !line.trim().is_empty()) + .expect("python sockets stdout line"); + let parsed: Value = serde_json::from_str(json_line).expect("parse sockets JSON"); + assert_eq!(parsed["tcp"], "hello sockets"); + assert_eq!(parsed["udp"], "ping udp"); +} + fn workspace_files_are_shared_between_javascript_and_python_runtimes() { assert_node_available(); diff --git a/crates/sidecar/tests/service.rs b/crates/sidecar/tests/service.rs index ca640684..6daa847e 100644 --- a/crates/sidecar/tests/service.rs +++ b/crates/sidecar/tests/service.rs @@ -5595,6 +5595,8 @@ ykAheWCsAteSEWVc0w==\n\ body_base64: None, hostname: None, family: None, + port: None, + socket_id: None, command: None, args: Vec::new(), cwd: None, @@ -5632,6 +5634,8 @@ ykAheWCsAteSEWVc0w==\n\ body_base64: None, hostname: None, family: None, + port: None, + socket_id: None, command: None, args: Vec::new(), cwd: None, @@ -10124,6 +10128,8 @@ export async function loadPyodide() { body_base64: None, hostname: None, family: None, + port: None, + socket_id: None, command: None, args: Vec::new(), cwd: None, @@ -10151,6 +10157,8 @@ export async function loadPyodide() { body_base64: None, hostname: None, family: None, + port: None, + socket_id: None, command: None, args: Vec::new(), cwd: None,