Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 291 additions & 4 deletions crates/execution/assets/runners/python-runner.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,10 @@ function createPythonBridgeRpcBridge() {
async fsStat(path) {
return this.fsStatSync(path);
},
fsLstatSync(path) {
const result = requestSync('fsLstat', { path });
return result.stat ?? null;
},
fsReaddirSync(path) {
const result = requestSync('fsReaddir', { path });
return result.entries ?? [];
Expand All @@ -596,6 +600,16 @@ function createPythonBridgeRpcBridge() {
fsRenameSync(path, destination) {
requestSync('fsRename', { path, destination });
},
fsSymlinkSync(target, path) {
requestSync('fsSymlink', { target, path });
},
fsReadlinkSync(path) {
const result = requestSync('fsReadlink', { path });
return result.target ?? '';
},
fsSetattrSync(path, attr) {
requestSync('fsSetattr', { path, ...attr });
},
httpRequestSync(url, method = 'GET', headersJson = '{}', bodyBase64 = null) {
let headers;
try {
Expand Down Expand Up @@ -638,6 +652,29 @@ function createPythonBridgeRpcBridge() {
maxBuffer,
}));
},
socketConnectSync(host, port) {
return JSON.stringify(requestSync('socketConnect', { hostname: host, port }));
},
socketSendSync(socketId, dataBase64) {
return JSON.stringify(requestSync('socketSend', { socketId, bodyBase64: dataBase64 }));
},
socketRecvSync(socketId, maxBuffer) {
return JSON.stringify(requestSync('socketRecv', { socketId, maxBuffer }));
},
socketCloseSync(socketId) {
return JSON.stringify(requestSync('socketClose', { socketId }));
},
udpCreateSync() {
return JSON.stringify(requestSync('udpCreate', {}));
},
udpSendtoSync(socketId, host, port, dataBase64) {
return JSON.stringify(
requestSync('udpSendto', { socketId, hostname: host, port, bodyBase64: dataBase64 }),
);
},
udpRecvfromSync(socketId, maxBuffer) {
return JSON.stringify(requestSync('udpRecvfrom', { socketId, maxBuffer }));
},
dispose() {},
};
}
Expand Down Expand Up @@ -756,6 +793,10 @@ function createPythonFdRpcBridge() {
async fsStat(path) {
return this.fsStatSync(path);
},
fsLstatSync(path) {
const result = requestSync('fsLstat', { path });
return result.stat ?? null;
},
fsReaddirSync(path) {
const result = requestSync('fsReaddir', { path });
return result.entries ?? [];
Expand All @@ -781,6 +822,16 @@ function createPythonFdRpcBridge() {
fsRenameSync(path, destination) {
requestSync('fsRename', { path, destination });
},
fsSymlinkSync(target, path) {
requestSync('fsSymlink', { target, path });
},
fsReadlinkSync(path) {
const result = requestSync('fsReadlink', { path });
return result.target ?? '';
},
fsSetattrSync(path, attr) {
requestSync('fsSetattr', { path, ...attr });
},
httpRequestSync(url, method = 'GET', headersJson = '{}', bodyBase64 = null) {
let headers;
try {
Expand Down Expand Up @@ -1094,6 +1145,183 @@ def _agentos_gethostbyname(host):
_agentos_socket.getaddrinfo = _agentos_getaddrinfo
_agentos_socket.gethostbyname = _agentos_gethostbyname

# Raw socket bridge: back socket.socket() with the host (outbound TCP + UDP).
# Reads poll (the host uses a short read timeout) so the synchronous RPC never
# stalls the sidecar; the loop below re-polls to emulate blocking semantics.
import base64 as _agentos_base64
import time as _agentos_time
import errno as _agentos_errno

_agentos_original_socket_class = _agentos_socket.socket

def _agentos_socket_oserror(exc):
# Host errors arrive as "E<NAME>: message"; recover the errno so Python
# code can catch ConnectionRefusedError/TimeoutError/etc. (OSError picks the
# right subclass from the errno).
message = str(getattr(exc, "message", None) or exc)
head = message.split(":", 1)[0].strip()
code = getattr(_agentos_errno, head, 0) if head[:1] == "E" and head.isupper() else 0
return OSError(code or 0, message)

def _agentos_socket_rpc(call):
try:
return _agentos_json.loads(call())
except OSError:
raise
except Exception as exc:
raise _agentos_socket_oserror(exc) from None

class _SecureExecSocket:
def __init__(self, family=None, type=None, proto=0, fileno=None):
self.family = family if family is not None else _agentos_socket.AF_INET
self.type = type if type is not None else _agentos_socket.SOCK_STREAM
self.proto = proto
self._timeout = None # None blocks; 0 is non-blocking; >0 is a deadline
self._id = None
self._closed = False
self._is_udp = self.type == _agentos_socket.SOCK_DGRAM
if self._is_udp:
resp = _agentos_socket_rpc(lambda: _agentos_rpc.udpCreateSync())
self._id = int(resp["socketId"])

def connect(self, address):
host, port = address[0], address[1]
resp = _agentos_socket_rpc(lambda: _agentos_rpc.socketConnectSync(str(host), int(port)))
self._id = int(resp["socketId"])

def connect_ex(self, address):
try:
self.connect(address)
return 0
except OSError as exc:
return exc.errno or 1

def _ensure_id(self):
if self._id is None:
raise OSError(9, "Bad file descriptor")
return self._id

def send(self, data, flags=0):
sid = self._ensure_id()
b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii")
resp = _agentos_socket_rpc(lambda: _agentos_rpc.socketSendSync(sid, b64))
return int(resp.get("bytesSent", len(data)))

def sendall(self, data, flags=0):
payload = bytes(data)
total = 0
while total < len(payload):
total += self.send(payload[total:], flags)
return None

def _poll(self, bufsize, recv_fn):
deadline = None
if self._timeout is not None and self._timeout > 0:
deadline = _agentos_time.monotonic() + self._timeout
backoff = 0.0
while True:
resp = _agentos_socket_rpc(lambda: recv_fn(int(bufsize)))
if resp.get("closed"):
return b"", resp
data = resp.get("dataBase64") or ""
if data:
return _agentos_base64.b64decode(data), resp
if resp.get("timedOut"):
if self._timeout == 0:
raise BlockingIOError(11, "Resource temporarily unavailable")
if deadline is not None and _agentos_time.monotonic() >= deadline:
raise _agentos_socket.timeout("timed out")
# Guest-side capped backoff so a blocking recv on a silent socket
# doesn't hammer the host loop with back-to-back polls.
if backoff:
_agentos_time.sleep(backoff)
backoff = min(backoff * 2 if backoff else 0.005, 0.05)
continue
return b"", resp

def recv(self, bufsize, flags=0):
sid = self._ensure_id()
data, _ = self._poll(bufsize, lambda n: _agentos_rpc.socketRecvSync(sid, n))
return data

def sendto(self, data, *args):
address = args[-1]
host, port = address[0], address[1]
if self._id is None:
resp = _agentos_socket_rpc(lambda: _agentos_rpc.udpCreateSync())
self._id = int(resp["socketId"])
b64 = _agentos_base64.b64encode(bytes(data)).decode("ascii")
resp = _agentos_socket_rpc(
lambda: _agentos_rpc.udpSendtoSync(self._id, str(host), int(port), b64)
)
return int(resp.get("bytesSent", len(data)))

def recvfrom(self, bufsize, flags=0):
sid = self._ensure_id()
data, resp = self._poll(bufsize, lambda n: _agentos_rpc.udpRecvfromSync(sid, n))
addr = (resp.get("host", ""), int(resp.get("port", 0))) if resp else ("", 0)
return data, addr

def settimeout(self, value):
self._timeout = value

def gettimeout(self):
return self._timeout

def setblocking(self, flag):
self._timeout = None if flag else 0.0

def setsockopt(self, *args, **kwargs):
return None

def getsockopt(self, *args, **kwargs):
return 0

def fileno(self):
return self._id if self._id is not None else -1

def getpeername(self):
return ("", 0)

def getsockname(self):
return ("0.0.0.0", 0)

def close(self):
if self._closed:
return
self._closed = True
if self._id is not None:
try:
_agentos_rpc.socketCloseSync(self._id)
except Exception:
pass
self._id = None

def __enter__(self):
return self

def __exit__(self, *exc):
self.close()

def __del__(self):
try:
self.close()
except Exception:
pass

def _agentos_socket_factory(family=-1, type=-1, proto=0, fileno=None):
fam = family if family != -1 else _agentos_socket.AF_INET
typ = type if type != -1 else _agentos_socket.SOCK_STREAM
if (
fileno is None
and fam in (_agentos_socket.AF_INET, _agentos_socket.AF_INET6)
and typ in (_agentos_socket.SOCK_STREAM, _agentos_socket.SOCK_DGRAM)
):
return _SecureExecSocket(fam, typ, proto)
return _agentos_original_socket_class(family, type, proto, fileno)

_agentos_socket.socket = _agentos_socket_factory

class _SecureExecRequestsResponse:
def __init__(self, payload):
self.status_code = int(payload.get("status", 0))
Expand Down Expand Up @@ -1465,6 +1693,7 @@ function installPythonWorkspaceFs(pyodide, bridge) {
const memfsDirStreamOps = MEMFS.ops_table.dir.stream;
const memfsFileNodeOps = MEMFS.ops_table.file.node;
const memfsFileStreamOps = MEMFS.ops_table.file.stream;
const memfsLinkNodeOps = MEMFS.ops_table.link.node;
const workspaceDirStreamOps = memfsDirStreamOps;

function joinGuestPath(parentPath, name) {
Expand Down Expand Up @@ -1529,6 +1758,8 @@ function installPythonWorkspaceFs(pyodide, bridge) {
if (FS.isDir(mode)) {
node.node_ops = workspaceDirNodeOps;
node.stream_ops = workspaceDirStreamOps;
} else if (FS.isLink(mode)) {
node.node_ops = workspaceLinkNodeOps;
} else if (FS.isFile(mode)) {
node.node_ops = workspaceFileNodeOps;
node.stream_ops = workspaceFileStreamOps;
Expand Down Expand Up @@ -1556,7 +1787,8 @@ function installPythonWorkspaceFs(pyodide, bridge) {

for (const name of entries) {
const childPath = joinGuestPath(guestPath, name);
const stat = withFsErrors(() => bridge.fsStatSync(childPath));
// lstat (don't follow) so a host symlink is created as a link node.
const stat = withFsErrors(() => bridge.fsLstatSync(childPath));
const existing = node.contents[name];

if (existing) {
Expand Down Expand Up @@ -1630,6 +1862,50 @@ function installPythonWorkspaceFs(pyodide, bridge) {
};
}

function toEpochMs(value) {
if (value == null) return null;
if (typeof value === 'number') return value;
if (typeof value.getTime === 'function') return value.getTime();
return null;
}

// Propagate chmod/chown/utimes from an Emscripten `setattr` into the host VFS.
// (size/truncate is handled via the dirty-write path, not here.)
function propagateSetattrToHost(node, attr) {
if (!attr) return;
const payload = {};
if (attr.mode != null) payload.mode = attr.mode & 0o7777;
// `os.chown(p, uid, -1)` keeps a side; never forward a negative sentinel
// (it would saturate to 0 = root on the host).
if (attr.uid != null && attr.uid >= 0) payload.uid = attr.uid;
if (attr.gid != null && attr.gid >= 0) payload.gid = attr.gid;
const atimeMs = toEpochMs(attr.atime ?? attr.timestamp);
const mtimeMs = toEpochMs(attr.mtime ?? attr.timestamp);
if (atimeMs != null && mtimeMs != null) {
payload.atimeMs = Math.trunc(atimeMs);
payload.mtimeMs = Math.trunc(mtimeMs);
}
if (Object.keys(payload).length === 0) return;
withFsErrors(() => bridge.fsSetattrSync(nodeGuestPath(node), payload));
}

const workspaceLinkNodeOps = {
// A symlink node reports itself (lstat semantics), not its target — so use
// the in-memory link mode rather than a host stat (which follows the link).
getattr(node) {
return makeStat(node, null);
},
setattr(node, attr) {
// Host first: if the host op fails (e.g. read-only root) it throws before
// we mutate the in-isolate node, so the two views stay consistent.
propagateSetattrToHost(node, attr);
memfsLinkNodeOps.setattr(node, attr);
},
readlink(node) {
return withFsErrors(() => bridge.fsReadlinkSync(nodeGuestPath(node)));
},
};

const workspaceFileNodeOps = {
getattr(node) {
const stat = node.agentOSDirty
Expand All @@ -1641,6 +1917,9 @@ function installPythonWorkspaceFs(pyodide, bridge) {
return makeStat(node, stat);
},
setattr(node, attr) {
// Host first (see link setattr) so a failed host op leaves the in-isolate
// node untouched.
propagateSetattrToHost(node, attr);
memfsFileNodeOps.setattr(node, attr);
if (attr?.size != null) {
node.agentOSDirty = true;
Expand Down Expand Up @@ -1689,6 +1968,8 @@ function installPythonWorkspaceFs(pyodide, bridge) {
return makeStat(node, stat);
},
setattr(node, attr) {
// Host first (see link setattr).
propagateSetattrToHost(node, attr);
memfsDirNodeOps.setattr(node, attr);
},
lookup(parent, name) {
Expand All @@ -1701,7 +1982,8 @@ function installPythonWorkspaceFs(pyodide, bridge) {
}

const guestPath = joinGuestPath(nodeGuestPath(parent), name);
const stat = withFsErrors(() => bridge.fsStatSync(guestPath));
// lstat (don't follow) so a directly-looked-up host symlink is a link node.
const stat = withFsErrors(() => bridge.fsLstatSync(guestPath));
const child = createWorkspaceNode(parent, name, stat.mode, 0, guestPath);
updateNodeFromRemoteStat(child, stat);
return child;
Expand Down Expand Up @@ -1749,8 +2031,13 @@ function installPythonWorkspaceFs(pyodide, bridge) {
syncDirectory(node);
return memfsDirNodeOps.readdir(node);
},
symlink() {
throw new FS.ErrnoError(ERRNO_CODES.ENOSYS);
symlink(parent, newName, oldPath) {
const guestPath = joinGuestPath(nodeGuestPath(parent), newName);
withFsErrors(() => bridge.fsSymlinkSync(oldPath, guestPath));
const node = createWorkspaceNode(parent, newName, 0o120777, 0, guestPath);
node.link = oldPath;
node.usedBytes = oldPath.length;
return node;
},
};

Expand Down
Loading
Loading