From 96b15db3d68f9a40314ae28b8f0877bd3b3b99d2 Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Sun, 12 Apr 2026 17:13:51 -0700 Subject: [PATCH 1/3] fix(sandbox): correct sandlock integration semantics and fail loud MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SandlockSandbox wrapper had several latent correctness issues that could cause it to silently run with weaker isolation than intended, or drop resource limits on the floor. Written against the sandlock 0.8.1 API, which replaces the standalone Policy object with direct Sandbox(**kwargs) configuration and collapses the network knobs into a single net_allow endpoint allowlist. Fixes: * Configuration uses the current sandlock API. sandlock 0.8.1 dropped the Policy object; config is passed straight to Sandbox(**kwargs). _build_sandbox_kwargs() assembles that dict. * Network policy intent is now explicit via net_allow. An empty allowlist denies all outbound (the default); network-enabled=True opens all TCP plus UDP DNS (["*:*", "udp://*:53"]) so the sandboxed process can actually resolve hostnames and connect. * stdout/stderr are now str, not bytes. sandlock returns bytes from Sandbox.run(); PraisonAI's SandboxResult is typed as str. Added a _decode() helper with errors="replace" so downstream consumers never see binary artefacts or crash on .lower() / .split(). * max_cpu is now actually passed to the sandbox. Previously limits.cpu_percent was silently ignored. * env variables are injected via the Sandbox env field (set/overridden in the child), which the current API supports natively. * execute_file() passes the script by path, not via `python3 -c `. Large scripts no longer hit ARG_MAX, and the script's parent directory is added to the Landlock read allowlist via the new extra_readable parameter on _build_sandbox_kwargs. * Timeout detection is authoritative: we inspect result.error for "timed out" rather than heuristically comparing wall-clock duration against limits.timeout_seconds. (Refined in the following commit to use sandlock's exit_code == -1 sentinel.) * Sandbox handles are now managed via `with ... as sb:` so cleanup runs on exception. * fs_readable is filtered to paths that actually exist. Landlock fails at spawn time if any allowlisted path is missing, so the hardcoded list (which included /usr/local/lib/python3) caused sandbox creation failures on most hosts. Now we filter with os.path.isdir before constructing the sandbox. Breaking change — silent fallback removed: SandlockSandbox used to fall back to SubprocessSandbox whenever Landlock support was too low, logging only a warning. This violates the caller's explicit choice of kernel-level isolation: a SandlockSandbox that isn't actually using Landlock is a security footgun, and a warning in the logs is not a consent mechanism. __init__ now raises RuntimeError if Landlock support is missing — i.e. landlock_abi_version() < min_landlock_abi() (currently ABI 6, Linux 6.7+). Callers who want graceful degradation should catch ImportError / RuntimeError and construct SubprocessSandbox explicitly, e.g.: try: sb = SandlockSandbox(cfg) except (ImportError, RuntimeError): sb = SubprocessSandbox(cfg) The equivalent fallback branches in execute(), run_command(), and execute_file() are removed. Tests updated: - test_raises_when_landlock_unavailable replaces the two fallback tests and asserts RuntimeError is raised at construction time. - test_sandbox_kwargs_with_minimal_limits / _network_enabled check max_cpu and the net_allow deny-all / allow-TCP+DNS semantics. - test_sandlock_execution_timeout mocks result.error to assert the timeout path. - test_sandlock_execution_failure sets result.error=None explicitly. All unit tests pass, including the real-sandlock integration test. --- src/praisonai/praisonai/sandbox/sandlock.py | 311 ++++++++++-------- .../unit/sandbox/test_sandlock_sandbox.py | 116 ++++--- 2 files changed, 234 insertions(+), 193 deletions(-) diff --git a/src/praisonai/praisonai/sandbox/sandlock.py b/src/praisonai/praisonai/sandbox/sandlock.py index 03359e371..4c4819d6e 100644 --- a/src/praisonai/praisonai/sandbox/sandlock.py +++ b/src/praisonai/praisonai/sandbox/sandlock.py @@ -74,12 +74,30 @@ def __init__( "sandlock package required for SandlockSandbox. " "Install with: pip install 'praisonai[sandbox]' or pip install sandlock" ) - + + # Fail loud if Landlock isn't supported on this kernel. sandlock + # requires a minimum Landlock ABI (currently v6, Linux 6.7+); query + # it rather than hard-coding so we track the SDK's own requirement. + try: + abi = self._sandlock.landlock_abi_version() + min_abi = self._sandlock.min_landlock_abi() + except Exception as e: + raise RuntimeError( + f"failed to query Landlock ABI version: {e}" + ) from e + if abi < min_abi: + raise RuntimeError( + "SandlockSandbox requires Landlock support (Linux kernel " + f">= 6.7 with CONFIG_SECURITY_LANDLOCK=y and ABI >= {min_abi}). " + f"This kernel reports Landlock ABI version {abi}. Use " + "SubprocessSandbox explicitly if weaker isolation is acceptable." + ) + @property def is_available(self) -> bool: """Whether sandlock backend is available.""" try: - return self._sandlock.landlock_abi_version() >= 1 + return self._sandlock.landlock_abi_version() >= self._sandlock.min_landlock_abi() except (AttributeError, ImportError): return False @@ -108,24 +126,30 @@ async def stop(self) -> None: self._is_running = False logger.info("Sandlock sandbox stopped") - def _create_policy( + def _build_sandbox_kwargs( self, limits: ResourceLimits, working_dir: Optional[str] = None, - ) -> Any: - """Create sandlock policy from resource limits. - + extra_readable: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """Build sandlock ``Sandbox`` keyword arguments from resource limits. + + sandlock dropped the separate ``Policy`` object; configuration is now + passed directly to ``Sandbox(**kwargs)``. This returns that kwargs + dict. + Args: limits: Resource limits configuration - working_dir: Working directory for execution - - Returns: - Sandlock Policy object + working_dir: Working directory for execution (added to the + writable allowlist). + extra_readable: Additional directories to add to the Landlock + read allowlist (e.g. the parent of a script passed to + ``execute_file``). """ - Policy = self._sandlock.Policy - - # Determine allowed paths - allowed_read_paths = [ + # Landlock requires every path in the allowlist to exist at rule- + # attach time; passing a missing directory makes sandlock_spawn + # fail outright. Filter to paths that actually exist on this host. + _candidate_read_paths = [ "/usr/lib/python3", "/usr/local/lib/python3", "/lib", @@ -133,36 +157,68 @@ def _create_policy( "/bin", "/usr/bin", ] - + allowed_read_paths = [p for p in _candidate_read_paths if os.path.isdir(p)] + if extra_readable: + allowed_read_paths.extend( + p for p in extra_readable if os.path.isdir(p) + ) + allowed_write_paths = [] if working_dir: allowed_write_paths.append(working_dir) if self._temp_dir: allowed_write_paths.append(self._temp_dir) - + # Add any configured allowed paths from security policy if hasattr(self.config, 'security_policy') and self.config.security_policy: allowed_write_paths.extend(self.config.security_policy.allowed_paths) - # Create policy with kernel-level restrictions - policy = Policy( + # Network policy. + # + # sandlock collapsed network config into a single ``net_allow`` + # endpoint allowlist: + # [] -> deny all outbound (the default) + # ["*:*", ...] -> allow rules; "*:*" = any TCP host:port + # + # Protocol gating falls out of rule presence: with no UDP rule, UDP + # sockets are denied at the seccomp layer. To make an enabled network + # actually usable we open all outbound TCP plus UDP DNS (port 53) so + # the sandboxed process can resolve hostnames. To block the network + # we leave the allowlist empty (deny all). + if limits.network_enabled: + net_allow = ["*:*", "udp://*:53"] + else: + net_allow = [] + + return { # Filesystem restrictions (Landlock) - fs_readable=allowed_read_paths, - fs_writable=allowed_write_paths, - - # Network restrictions - net_allow_hosts=[] if not limits.network_enabled else None, - + "fs_readable": allowed_read_paths, + "fs_writable": allowed_write_paths, + # Resource limits - max_memory=f"{limits.memory_mb}M", - max_processes=limits.max_processes, - max_open_files=limits.max_open_files, - - # Note: CPU throttle percentage, not time limit - # Execution timeout is handled via Sandbox.run(timeout=...) - ) - - return policy + "max_memory": f"{limits.memory_mb}M", + "max_processes": limits.max_processes, + "max_open_files": limits.max_open_files, + # max_cpu is a throttle percentage of one core, not a time budget. + # Execution timeout is handled via Sandbox.run(timeout=...). + "max_cpu": limits.cpu_percent, + + # Network (deny-all by default) + "net_allow": net_allow, + } + + @staticmethod + def _decode(buf: Any) -> str: + """Decode a sandlock Result stdout/stderr buffer to str. + + sandlock returns ``bytes`` from ``Sandbox.run()``; PraisonAI's + ``SandboxResult`` uses ``str`` throughout. Invalid UTF-8 is + replaced rather than raised so downstream consumers never see + binary artefacts. + """ + if isinstance(buf, bytes): + return buf.decode("utf-8", errors="replace") + return buf or "" def _safe_sandbox_path(self, path: str) -> Optional[str]: """Resolve a caller-supplied path to an absolute path inside _temp_dir. @@ -190,99 +246,82 @@ async def _run_sandlocked( limits: ResourceLimits, env: Optional[Dict[str, str]], working_dir: Optional[str], + extra_readable: Optional[List[str]] = None, ) -> SandboxResult: """Execute *cmd* inside a sandlock Sandbox and return a SandboxResult. - Centralises all sandlock Sandbox/Policy construction and error handling - so that ``execute`` and ``run_command`` share a single code path. + Centralises all sandlock Sandbox construction and error handling so + that ``execute`` and ``run_command`` share a single code path. - Note: ``env`` is accepted for API compatibility but sandlock's - ``Sandbox.run()`` does not support per-run environment injection; - callers requiring custom env vars should set them in the policy or - prior to sandbox creation. ``working_dir`` is applied via the policy - (added to the writable-path allow-list). + ``env`` variables are injected via the ``Sandbox`` ``env`` field + (set/overridden in the child). ``working_dir`` is applied to the + sandbox config (added to the writable-path allow-list). """ - policy = self._create_policy(limits, working_dir) + sandbox_kwargs = self._build_sandbox_kwargs( + limits, working_dir, extra_readable + ) + if env: + sandbox_kwargs["env"] = dict(env) started_at = time.time() - try: - sandbox = self._sandlock.Sandbox(policy) - - result = await asyncio.get_running_loop().run_in_executor( - None, - lambda: sandbox.run(cmd, timeout=limits.timeout_seconds) - ) - - completed_at = time.time() - duration = completed_at - started_at - - # Check result.success and exit_code to determine actual status - if not result.success: - # Check if this was a timeout based on duration - if duration >= limits.timeout_seconds: - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.TIMEOUT, - error=f"Execution timed out after {limits.timeout_seconds}s", - exit_code=result.exit_code, - stdout=result.stdout, - stderr=result.stderr, - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - ) - else: - # Non-zero exit or other failure - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.FAILED, - error=f"Execution failed with exit code {result.exit_code}: {result.stderr}", - exit_code=result.exit_code, - stdout=result.stdout, - stderr=result.stderr, - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - ) - - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.COMPLETED, - exit_code=result.exit_code, - stdout=result.stdout, - stderr=result.stderr, - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - metadata={ - "sandbox_type": "sandlock", - "landlock_enabled": True, - "seccomp_enabled": True, - }, - ) + def _run() -> Any: + # Context manager ensures the sandbox handle is released even + # if .run() raises partway through. + with self._sandlock.Sandbox(**sandbox_kwargs) as sb: + return sb.run(cmd, timeout=limits.timeout_seconds) + try: + result = await asyncio.get_running_loop().run_in_executor(None, _run) except Exception as e: completed_at = time.time() - duration = completed_at - started_at - if duration >= limits.timeout_seconds: - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.TIMEOUT, - error=f"Execution timed out after {limits.timeout_seconds}s", - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - ) return SandboxResult( execution_id=execution_id, status=SandboxStatus.FAILED, error=f"Execution failed: {e}", - duration_seconds=duration, + duration_seconds=completed_at - started_at, started_at=started_at, completed_at=completed_at, ) + completed_at = time.time() + duration = completed_at - started_at + stdout = self._decode(result.stdout) + stderr = self._decode(result.stderr) + + # sandlock surfaces timeouts via result.error containing "timed out". + # This is authoritative — wall-clock guesses are unreliable because a + # process can legitimately finish just under the limit. + err_text = (result.error or "") if not result.success else "" + is_timeout = "timed out" in err_text.lower() or "timeout" in err_text.lower() + + if result.success: + status = SandboxStatus.COMPLETED + error = None + elif is_timeout: + status = SandboxStatus.TIMEOUT + error = f"Execution timed out after {limits.timeout_seconds}s" + else: + status = SandboxStatus.FAILED + error = f"Execution failed with exit code {result.exit_code}: {stderr or err_text}" + + return SandboxResult( + execution_id=execution_id, + status=status, + exit_code=result.exit_code, + stdout=stdout, + stderr=stderr, + duration_seconds=duration, + started_at=started_at, + completed_at=completed_at, + error=error, + metadata={ + "sandbox_type": "sandlock", + "landlock_enabled": True, + "seccomp_enabled": True, + }, + ) + async def execute( self, code: str, @@ -294,14 +333,7 @@ async def execute( """Execute code in the sandlock-isolated sandbox.""" if not self._is_running: await self.start() - - if not self.is_available: - # Fallback to subprocess sandbox if sandlock not available - logger.warning("Sandlock not available, falling back to subprocess") - from .subprocess import SubprocessSandbox - fallback = SubprocessSandbox(self.config) - return await fallback.execute(code, language, limits, env, working_dir) - + limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) @@ -325,22 +357,39 @@ async def execute_file( limits: Optional[ResourceLimits] = None, env: Optional[Dict[str, str]] = None, ) -> SandboxResult: - """Execute a file in the sandbox.""" + """Execute a file in the sandbox. + + The script is passed to the interpreter by path rather than slurped + through ``-c``, so large scripts don't hit ARG_MAX. The file's + parent directory is added to the Landlock read allowlist for this + run so the sandboxed process can actually open it. + """ + if not self._is_running: + await self.start() + if not os.path.exists(file_path): return SandboxResult( status=SandboxStatus.FAILED, error=f"File not found: {file_path}", ) - - with open(file_path, "r") as f: - code = f.read() - - # Determine language from file extension - language = "python" - if file_path.endswith(".sh") or file_path.endswith(".bash"): - language = "bash" - - return await self.execute(code, language=language, limits=limits, env=env) + + limits = limits or self.config.resource_limits + execution_id = str(uuid.uuid4()) + + abs_path = os.path.realpath(file_path) + interp = "bash" if file_path.endswith((".sh", ".bash")) else "python3" + cmd: List[str] = [interp, abs_path] + if args: + cmd.extend(args) + + return await self._run_sandlocked( + cmd, + execution_id=execution_id, + limits=limits, + env=env, + working_dir=self._temp_dir, + extra_readable=[os.path.dirname(abs_path)], + ) async def run_command( self, @@ -352,13 +401,7 @@ async def run_command( """Run a shell command in the sandbox.""" if not self._is_running: await self.start() - - if not self.is_available: - logger.warning("Sandlock not available, falling back to subprocess") - from .subprocess import SubprocessSandbox - fallback = SubprocessSandbox(self.config) - return await fallback.run_command(command, limits, env, working_dir) - + limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) @@ -462,4 +505,4 @@ async def cleanup(self) -> None: async def reset(self) -> None: """Reset sandbox to initial state.""" await self.stop() - await self.start() \ No newline at end of file + await self.start() diff --git a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py index 09e9a7725..f76054669 100644 --- a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py +++ b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py @@ -15,6 +15,12 @@ def _make_sandbox(mock_sandlock): """Instantiate SandlockSandbox with *mock_sandlock* injected via sys.modules.""" + # The SDK gates on landlock_abi_version() >= min_landlock_abi(); give the + # mock a concrete minimum (sandlock currently requires ABI 6) so the + # numeric comparison in __init__/is_available works. Tests that exercise + # the "unsupported" path set landlock_abi_version below this. + if not isinstance(mock_sandlock.min_landlock_abi.return_value, int): + mock_sandlock.min_landlock_abi.return_value = 6 # Remove any cached version so the import block inside __init__ re-runs. sys.modules.pop("praisonai.sandbox.sandlock", None) with patch.dict("sys.modules", {"sandlock": mock_sandlock}): @@ -42,58 +48,53 @@ def test_import_without_sandlock(self): with pytest.raises(ImportError, match="sandlock package required"): SandlockSandbox() - def test_fallback_to_subprocess_when_unavailable(self): - """Test fallback to subprocess when sandlock is not available.""" - mock_sandlock = Mock() - mock_sandlock.landlock_abi_version.return_value = 0 # < 1, so unavailable - - sandbox = _make_sandbox(mock_sandlock) - assert not sandbox.is_available - assert sandbox.sandbox_type == "sandlock" + def test_raises_when_landlock_unavailable(self): + """Instantiation must fail loud on kernels without Landlock support. - @pytest.mark.asyncio - async def test_fallback_execution(self): - """Test that execution falls back to subprocess when sandlock unavailable.""" + Silent degradation to SubprocessSandbox would violate the caller's + explicit choice of kernel-level isolation — a SandlockSandbox that + isn't actually using Landlock is a security footgun. + """ mock_sandlock = Mock() - mock_sandlock.landlock_abi_version.return_value = 0 # < 1, so unavailable - - sandbox = _make_sandbox(mock_sandlock) - - mock_subprocess_instance = AsyncMock() - mock_subprocess_instance.execute.return_value = Mock( - status=SandboxStatus.COMPLETED, - exit_code=0, - stdout="Hello, World!", - stderr="", - ) + mock_sandlock.landlock_abi_version.return_value = 0 # unsupported - with patch("praisonai.sandbox.subprocess.SubprocessSandbox") as mock_subprocess: - mock_subprocess.return_value = mock_subprocess_instance - result = await sandbox.execute("print('Hello, World!')") + with pytest.raises(RuntimeError, match="requires Landlock"): + _make_sandbox(mock_sandlock) - mock_subprocess.assert_called_once() - mock_subprocess_instance.execute.assert_called_once() + def test_sandbox_kwargs_with_minimal_limits(self): + """Sandbox kwargs are built directly from resource limits. - def test_policy_creation_with_minimal_limits(self): - """Test policy creation with minimal resource limits.""" + sandlock dropped the ``Policy`` object; config is now passed straight + to ``Sandbox(**kwargs)``. + """ mock_sandlock = Mock() - mock_policy = Mock() - mock_sandlock.Policy.return_value = mock_policy + mock_sandlock.landlock_abi_version.return_value = 6 # supported sandbox = _make_sandbox(mock_sandlock) limits = ResourceLimits.minimal() - sandbox._create_policy(limits, "/tmp/workspace") - - mock_sandlock.Policy.assert_called_once() - call_kwargs = mock_sandlock.Policy.call_args[1] + call_kwargs = sandbox._build_sandbox_kwargs(limits, "/tmp/workspace") assert "fs_readable" in call_kwargs assert "fs_writable" in call_kwargs - assert "max_memory" in call_kwargs assert call_kwargs["max_memory"] == "128M" # From minimal limits assert call_kwargs["max_processes"] == 5 - assert call_kwargs["net_allow_hosts"] == [] # Network disabled + assert call_kwargs["max_cpu"] == 50 # From minimal limits + # Network disabled → deny all outbound (empty allowlist). + assert call_kwargs["net_allow"] == [] + + def test_sandbox_kwargs_network_enabled(self): + """Network-enabled limits open all TCP plus UDP DNS.""" + mock_sandlock = Mock() + mock_sandlock.landlock_abi_version.return_value = 6 + + sandbox = _make_sandbox(mock_sandlock) + + limits = ResourceLimits(network_enabled=True) + call_kwargs = sandbox._build_sandbox_kwargs(limits) + + assert "*:*" in call_kwargs["net_allow"] + assert "udp://*:53" in call_kwargs["net_allow"] def test_status_reporting(self): """Test sandbox status reporting.""" @@ -120,7 +121,6 @@ async def test_sandlock_execution_success(self): mock_result.stdout = "Hello, World!" mock_result.stderr = "" - mock_sandlock.Policy.return_value = Mock() mock_sandlock.Sandbox.return_value = Mock(run=Mock(return_value=mock_result)) mock_sandlock.landlock_abi_version.return_value = 6 # >= 6, so available @@ -140,54 +140,52 @@ async def test_sandlock_execution_success(self): @pytest.mark.asyncio async def test_sandlock_execution_timeout(self): - """Test timeout handling in sandlock execution.""" + """Timeout is detected via result.error, not wall-clock heuristics.""" mock_sandlock = Mock() - mock_sandlock.Policy.return_value = Mock() mock_sandlock.Sandbox.return_value = Mock() - mock_sandlock.landlock_abi_version.return_value = 6 # >= 6, so available + mock_sandlock.landlock_abi_version.return_value = 6 sandbox = _make_sandbox(mock_sandlock) - # Create a mock Result object indicating timeout mock_timeout_result = Mock() mock_timeout_result.success = False - mock_timeout_result.exit_code = 124 # Common timeout exit code - mock_timeout_result.stdout = "" - mock_timeout_result.stderr = "Process timed out" + mock_timeout_result.exit_code = 124 + mock_timeout_result.stdout = b"" + mock_timeout_result.stderr = b"" + # sandlock surfaces timeout via the error field. + mock_timeout_result.error = "process timed out after 10s" - # Simulate timeout by returning failed Result after enough time - with patch("asyncio.get_running_loop") as mock_loop, \ - patch("time.time", side_effect=[0, 11, 11]): # Started at 0, ended at 11s (> 10s timeout) + with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock( return_value=mock_timeout_result ) await sandbox.start() - result = await sandbox.execute("import time; time.sleep(100)", limits=ResourceLimits(timeout_seconds=10)) + result = await sandbox.execute( + "import time; time.sleep(100)", + limits=ResourceLimits(timeout_seconds=10), + ) assert result.status == SandboxStatus.TIMEOUT assert "timed out" in result.error.lower() @pytest.mark.asyncio async def test_sandlock_execution_failure(self): - """Test general execution failure handling.""" + """Non-timeout failures keep the FAILED status and surface stderr.""" mock_sandlock = Mock() - mock_sandlock.Policy.return_value = Mock() mock_sandlock.Sandbox.return_value = Mock() - mock_sandlock.landlock_abi_version.return_value = 6 # >= 6, so available + mock_sandlock.landlock_abi_version.return_value = 6 sandbox = _make_sandbox(mock_sandlock) - # Create a mock Result object indicating non-zero exit mock_failed_result = Mock() mock_failed_result.success = False - mock_failed_result.exit_code = 1 # Non-zero exit code - mock_failed_result.stdout = "" - mock_failed_result.stderr = "Permission denied" + mock_failed_result.exit_code = 1 + mock_failed_result.stdout = b"" + mock_failed_result.stderr = b"Permission denied" + mock_failed_result.error = None # not a timeout - # Simulate execution failure (not timeout) - with patch("asyncio.get_running_loop") as mock_loop, \ - patch("time.time", side_effect=[0, 2, 2]): # Started at 0, ended at 2s (< 10s timeout) + with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock( return_value=mock_failed_result ) From c0c4eee90b74d4ec50be064243bea87398ede2c3 Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Sun, 12 Apr 2026 17:22:52 -0700 Subject: [PATCH 2/3] fix(sandbox): use sandlock's exit_code == -1 sentinel for timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sandbox.run() does not populate result.error on timeout — only the Pipeline path does. String-matching on result.error was therefore unreliable for the common single-sandbox case: a real timeout from Sandbox.run() returns success=False, exit_code=-1, empty stderr, and error=None, which my previous logic mis-classified as FAILED. Switch to the structural signal: sandlock's ExitStatus::Timeout is exposed as exit_code == -1 (see sandlock's _sdk.py around line 1475). This matches how sandlock itself detects pipeline timeouts and works uniformly across Sandbox.run() and any future execution paths. Verified end-to-end with a real forced timeout against real sandlock: status: SandboxStatus.TIMEOUT exit: -1 error: Execution timed out after 1s Test updated to match: mock_timeout_result.exit_code = -1 and error = None (reflecting actual Sandbox.run() behavior). --- src/praisonai/praisonai/sandbox/sandlock.py | 14 ++++++-------- .../tests/unit/sandbox/test_sandlock_sandbox.py | 9 +++++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/praisonai/praisonai/sandbox/sandlock.py b/src/praisonai/praisonai/sandbox/sandlock.py index 4c4819d6e..9faa940d9 100644 --- a/src/praisonai/praisonai/sandbox/sandlock.py +++ b/src/praisonai/praisonai/sandbox/sandlock.py @@ -289,21 +289,19 @@ def _run() -> Any: stdout = self._decode(result.stdout) stderr = self._decode(result.stderr) - # sandlock surfaces timeouts via result.error containing "timed out". - # This is authoritative — wall-clock guesses are unreliable because a - # process can legitimately finish just under the limit. - err_text = (result.error or "") if not result.success else "" - is_timeout = "timed out" in err_text.lower() or "timeout" in err_text.lower() - + # sandlock uses exit_code == -1 as the ExitStatus::Timeout sentinel + # (see sandlock's python/src/sandlock/_sdk.py). This is a + # structural signal — Sandbox.run() doesn't populate result.error + # for timeouts, so string-matching on it is unreliable. if result.success: status = SandboxStatus.COMPLETED error = None - elif is_timeout: + elif result.exit_code == -1: status = SandboxStatus.TIMEOUT error = f"Execution timed out after {limits.timeout_seconds}s" else: status = SandboxStatus.FAILED - error = f"Execution failed with exit code {result.exit_code}: {stderr or err_text}" + error = f"Execution failed with exit code {result.exit_code}: {stderr}" return SandboxResult( execution_id=execution_id, diff --git a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py index f76054669..7ab96a313 100644 --- a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py +++ b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py @@ -140,7 +140,7 @@ async def test_sandlock_execution_success(self): @pytest.mark.asyncio async def test_sandlock_execution_timeout(self): - """Timeout is detected via result.error, not wall-clock heuristics.""" + """Timeout is detected via exit_code == -1 (ExitStatus::Timeout).""" mock_sandlock = Mock() mock_sandlock.Sandbox.return_value = Mock() mock_sandlock.landlock_abi_version.return_value = 6 @@ -149,11 +149,12 @@ async def test_sandlock_execution_timeout(self): mock_timeout_result = Mock() mock_timeout_result.success = False - mock_timeout_result.exit_code = 124 + # sandlock's timeout sentinel — Sandbox.run() does not populate + # result.error on timeout, so we rely on the exit_code instead. + mock_timeout_result.exit_code = -1 mock_timeout_result.stdout = b"" mock_timeout_result.stderr = b"" - # sandlock surfaces timeout via the error field. - mock_timeout_result.error = "process timed out after 10s" + mock_timeout_result.error = None with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock( From ec8b3477a2e924a923a8c2c08815ad5d5c0a7b16 Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Tue, 2 Jun 2026 22:36:31 -0700 Subject: [PATCH 3/3] fix(sandbox): address review feedback on execute_file and ABI message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code-review fixes from PR #1367: * execute_file no longer allowlists the script's entire parent directory for Landlock read. Only the script file itself is granted read access, so sibling files on the host are never exposed (gemini flagged this as high severity). Verified end-to-end against real sandlock: a script that tries to read a sibling file is denied. * _build_sandbox_kwargs filters extra_readable with os.path.exists instead of os.path.isdir, so individual files (not just directories) can be allowlisted — required for the file-only execute_file path. * Corrected the Landlock kernel version in the fail-loud error message. The wrapper now requires ABI >= min_landlock_abi() (currently v6), which shipped in Linux 6.12 — not 6.7. The message is phrased around the ABI version so it stays correct as the SDK's minimum changes. * test_sandlock_execution_success now uses bytes for stdout/stderr (to exercise the _decode() byte path, matching real sandlock) and sets result.success = True explicitly. Note: the suggestion to also add temp_dir/working_dir to fs_readable was not applied — sandlock's fs_writable already grants read access ("fs_readable ... in addition to writable paths"), confirmed by test. --- src/praisonai/praisonai/sandbox/sandlock.py | 28 +++++++++++-------- .../unit/sandbox/test_sandlock_sandbox.py | 6 ++-- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/praisonai/praisonai/sandbox/sandlock.py b/src/praisonai/praisonai/sandbox/sandlock.py index 9faa940d9..9c633b1c9 100644 --- a/src/praisonai/praisonai/sandbox/sandlock.py +++ b/src/praisonai/praisonai/sandbox/sandlock.py @@ -76,8 +76,9 @@ def __init__( ) # Fail loud if Landlock isn't supported on this kernel. sandlock - # requires a minimum Landlock ABI (currently v6, Linux 6.7+); query - # it rather than hard-coding so we track the SDK's own requirement. + # requires a minimum Landlock ABI (currently v6, which shipped in + # Linux 6.12); query it rather than hard-coding so we track the SDK's + # own requirement. try: abi = self._sandlock.landlock_abi_version() min_abi = self._sandlock.min_landlock_abi() @@ -87,10 +88,11 @@ def __init__( ) from e if abi < min_abi: raise RuntimeError( - "SandlockSandbox requires Landlock support (Linux kernel " - f">= 6.7 with CONFIG_SECURITY_LANDLOCK=y and ABI >= {min_abi}). " - f"This kernel reports Landlock ABI version {abi}. Use " - "SubprocessSandbox explicitly if weaker isolation is acceptable." + "SandlockSandbox requires Landlock ABI >= " + f"{min_abi} (Linux kernel >= 6.12 with " + "CONFIG_SECURITY_LANDLOCK=y). This kernel reports Landlock " + f"ABI version {abi}. Use SubprocessSandbox explicitly if " + "weaker isolation is acceptable." ) @property @@ -159,8 +161,12 @@ def _build_sandbox_kwargs( ] allowed_read_paths = [p for p in _candidate_read_paths if os.path.isdir(p)] if extra_readable: + # extra_readable may name individual files (e.g. the single script + # passed to execute_file), so accept any path that exists — not + # just directories. Landlock grants read on a named file without + # exposing its siblings. allowed_read_paths.extend( - p for p in extra_readable if os.path.isdir(p) + p for p in extra_readable if os.path.exists(p) ) allowed_write_paths = [] @@ -358,9 +364,9 @@ async def execute_file( """Execute a file in the sandbox. The script is passed to the interpreter by path rather than slurped - through ``-c``, so large scripts don't hit ARG_MAX. The file's - parent directory is added to the Landlock read allowlist for this - run so the sandboxed process can actually open it. + through ``-c``, so large scripts don't hit ARG_MAX. Only the script + file itself — not its parent directory — is added to the Landlock + read allowlist, so sibling files on the host are never exposed. """ if not self._is_running: await self.start() @@ -386,7 +392,7 @@ async def execute_file( limits=limits, env=env, working_dir=self._temp_dir, - extra_readable=[os.path.dirname(abs_path)], + extra_readable=[abs_path], ) async def run_command( diff --git a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py index 7ab96a313..0e7035b81 100644 --- a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py +++ b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py @@ -117,9 +117,11 @@ async def test_sandlock_execution_success(self): """Test successful code execution with sandlock.""" mock_sandlock = Mock() mock_result = Mock() + mock_result.success = True mock_result.exit_code = 0 - mock_result.stdout = "Hello, World!" - mock_result.stderr = "" + # Real sandlock returns bytes; exercise the _decode() byte path. + mock_result.stdout = b"Hello, World!" + mock_result.stderr = b"" mock_sandlock.Sandbox.return_value = Mock(run=Mock(return_value=mock_result)) mock_sandlock.landlock_abi_version.return_value = 6 # >= 6, so available