diff --git a/src/praisonai/praisonai/sandbox/sandlock.py b/src/praisonai/praisonai/sandbox/sandlock.py index 03359e371..9c633b1c9 100644 --- a/src/praisonai/praisonai/sandbox/sandlock.py +++ b/src/praisonai/praisonai/sandbox/sandlock.py @@ -74,12 +74,32 @@ def __init__( "sandlock package required for SandlockSandbox. " "Install with: pip install 'praisonai[sandbox]' or pip install sandlock" ) - + + # Fail loud if Landlock isn't supported on this kernel. sandlock + # requires a minimum Landlock ABI (currently v6, which shipped in + # Linux 6.12); query it rather than hard-coding so we track the SDK's + # own requirement. + try: + abi = self._sandlock.landlock_abi_version() + min_abi = self._sandlock.min_landlock_abi() + except Exception as e: + raise RuntimeError( + f"failed to query Landlock ABI version: {e}" + ) from e + if abi < min_abi: + raise RuntimeError( + "SandlockSandbox requires Landlock ABI >= " + f"{min_abi} (Linux kernel >= 6.12 with " + "CONFIG_SECURITY_LANDLOCK=y). This kernel reports Landlock " + f"ABI version {abi}. Use SubprocessSandbox explicitly if " + "weaker isolation is acceptable." + ) + @property def is_available(self) -> bool: """Whether sandlock backend is available.""" try: - return self._sandlock.landlock_abi_version() >= 1 + return self._sandlock.landlock_abi_version() >= self._sandlock.min_landlock_abi() except (AttributeError, ImportError): return False @@ -108,24 +128,30 @@ async def stop(self) -> None: self._is_running = False logger.info("Sandlock sandbox stopped") - def _create_policy( + def _build_sandbox_kwargs( self, limits: ResourceLimits, working_dir: Optional[str] = None, - ) -> Any: - """Create sandlock policy from resource limits. - + extra_readable: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """Build sandlock ``Sandbox`` keyword arguments from resource limits. + + sandlock dropped the separate ``Policy`` object; configuration is now + passed directly to ``Sandbox(**kwargs)``. This returns that kwargs + dict. + Args: limits: Resource limits configuration - working_dir: Working directory for execution - - Returns: - Sandlock Policy object + working_dir: Working directory for execution (added to the + writable allowlist). + extra_readable: Additional directories to add to the Landlock + read allowlist (e.g. the parent of a script passed to + ``execute_file``). """ - Policy = self._sandlock.Policy - - # Determine allowed paths - allowed_read_paths = [ + # Landlock requires every path in the allowlist to exist at rule- + # attach time; passing a missing directory makes sandlock_spawn + # fail outright. Filter to paths that actually exist on this host. + _candidate_read_paths = [ "/usr/lib/python3", "/usr/local/lib/python3", "/lib", @@ -133,36 +159,72 @@ def _create_policy( "/bin", "/usr/bin", ] - + allowed_read_paths = [p for p in _candidate_read_paths if os.path.isdir(p)] + if extra_readable: + # extra_readable may name individual files (e.g. the single script + # passed to execute_file), so accept any path that exists — not + # just directories. Landlock grants read on a named file without + # exposing its siblings. + allowed_read_paths.extend( + p for p in extra_readable if os.path.exists(p) + ) + allowed_write_paths = [] if working_dir: allowed_write_paths.append(working_dir) if self._temp_dir: allowed_write_paths.append(self._temp_dir) - + # Add any configured allowed paths from security policy if hasattr(self.config, 'security_policy') and self.config.security_policy: allowed_write_paths.extend(self.config.security_policy.allowed_paths) - # Create policy with kernel-level restrictions - policy = Policy( + # Network policy. + # + # sandlock collapsed network config into a single ``net_allow`` + # endpoint allowlist: + # [] -> deny all outbound (the default) + # ["*:*", ...] -> allow rules; "*:*" = any TCP host:port + # + # Protocol gating falls out of rule presence: with no UDP rule, UDP + # sockets are denied at the seccomp layer. To make an enabled network + # actually usable we open all outbound TCP plus UDP DNS (port 53) so + # the sandboxed process can resolve hostnames. To block the network + # we leave the allowlist empty (deny all). + if limits.network_enabled: + net_allow = ["*:*", "udp://*:53"] + else: + net_allow = [] + + return { # Filesystem restrictions (Landlock) - fs_readable=allowed_read_paths, - fs_writable=allowed_write_paths, - - # Network restrictions - net_allow_hosts=[] if not limits.network_enabled else None, - + "fs_readable": allowed_read_paths, + "fs_writable": allowed_write_paths, + # Resource limits - max_memory=f"{limits.memory_mb}M", - max_processes=limits.max_processes, - max_open_files=limits.max_open_files, - - # Note: CPU throttle percentage, not time limit - # Execution timeout is handled via Sandbox.run(timeout=...) - ) - - return policy + "max_memory": f"{limits.memory_mb}M", + "max_processes": limits.max_processes, + "max_open_files": limits.max_open_files, + # max_cpu is a throttle percentage of one core, not a time budget. + # Execution timeout is handled via Sandbox.run(timeout=...). + "max_cpu": limits.cpu_percent, + + # Network (deny-all by default) + "net_allow": net_allow, + } + + @staticmethod + def _decode(buf: Any) -> str: + """Decode a sandlock Result stdout/stderr buffer to str. + + sandlock returns ``bytes`` from ``Sandbox.run()``; PraisonAI's + ``SandboxResult`` uses ``str`` throughout. Invalid UTF-8 is + replaced rather than raised so downstream consumers never see + binary artefacts. + """ + if isinstance(buf, bytes): + return buf.decode("utf-8", errors="replace") + return buf or "" def _safe_sandbox_path(self, path: str) -> Optional[str]: """Resolve a caller-supplied path to an absolute path inside _temp_dir. @@ -190,99 +252,80 @@ async def _run_sandlocked( limits: ResourceLimits, env: Optional[Dict[str, str]], working_dir: Optional[str], + extra_readable: Optional[List[str]] = None, ) -> SandboxResult: """Execute *cmd* inside a sandlock Sandbox and return a SandboxResult. - Centralises all sandlock Sandbox/Policy construction and error handling - so that ``execute`` and ``run_command`` share a single code path. + Centralises all sandlock Sandbox construction and error handling so + that ``execute`` and ``run_command`` share a single code path. - Note: ``env`` is accepted for API compatibility but sandlock's - ``Sandbox.run()`` does not support per-run environment injection; - callers requiring custom env vars should set them in the policy or - prior to sandbox creation. ``working_dir`` is applied via the policy - (added to the writable-path allow-list). + ``env`` variables are injected via the ``Sandbox`` ``env`` field + (set/overridden in the child). ``working_dir`` is applied to the + sandbox config (added to the writable-path allow-list). """ - policy = self._create_policy(limits, working_dir) + sandbox_kwargs = self._build_sandbox_kwargs( + limits, working_dir, extra_readable + ) + if env: + sandbox_kwargs["env"] = dict(env) started_at = time.time() - try: - sandbox = self._sandlock.Sandbox(policy) - - result = await asyncio.get_running_loop().run_in_executor( - None, - lambda: sandbox.run(cmd, timeout=limits.timeout_seconds) - ) - - completed_at = time.time() - duration = completed_at - started_at - - # Check result.success and exit_code to determine actual status - if not result.success: - # Check if this was a timeout based on duration - if duration >= limits.timeout_seconds: - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.TIMEOUT, - error=f"Execution timed out after {limits.timeout_seconds}s", - exit_code=result.exit_code, - stdout=result.stdout, - stderr=result.stderr, - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - ) - else: - # Non-zero exit or other failure - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.FAILED, - error=f"Execution failed with exit code {result.exit_code}: {result.stderr}", - exit_code=result.exit_code, - stdout=result.stdout, - stderr=result.stderr, - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - ) - - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.COMPLETED, - exit_code=result.exit_code, - stdout=result.stdout, - stderr=result.stderr, - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - metadata={ - "sandbox_type": "sandlock", - "landlock_enabled": True, - "seccomp_enabled": True, - }, - ) + def _run() -> Any: + # Context manager ensures the sandbox handle is released even + # if .run() raises partway through. + with self._sandlock.Sandbox(**sandbox_kwargs) as sb: + return sb.run(cmd, timeout=limits.timeout_seconds) + try: + result = await asyncio.get_running_loop().run_in_executor(None, _run) except Exception as e: completed_at = time.time() - duration = completed_at - started_at - if duration >= limits.timeout_seconds: - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.TIMEOUT, - error=f"Execution timed out after {limits.timeout_seconds}s", - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - ) return SandboxResult( execution_id=execution_id, status=SandboxStatus.FAILED, error=f"Execution failed: {e}", - duration_seconds=duration, + duration_seconds=completed_at - started_at, started_at=started_at, completed_at=completed_at, ) + completed_at = time.time() + duration = completed_at - started_at + stdout = self._decode(result.stdout) + stderr = self._decode(result.stderr) + + # sandlock uses exit_code == -1 as the ExitStatus::Timeout sentinel + # (see sandlock's python/src/sandlock/_sdk.py). This is a + # structural signal — Sandbox.run() doesn't populate result.error + # for timeouts, so string-matching on it is unreliable. + if result.success: + status = SandboxStatus.COMPLETED + error = None + elif result.exit_code == -1: + status = SandboxStatus.TIMEOUT + error = f"Execution timed out after {limits.timeout_seconds}s" + else: + status = SandboxStatus.FAILED + error = f"Execution failed with exit code {result.exit_code}: {stderr}" + + return SandboxResult( + execution_id=execution_id, + status=status, + exit_code=result.exit_code, + stdout=stdout, + stderr=stderr, + duration_seconds=duration, + started_at=started_at, + completed_at=completed_at, + error=error, + metadata={ + "sandbox_type": "sandlock", + "landlock_enabled": True, + "seccomp_enabled": True, + }, + ) + async def execute( self, code: str, @@ -294,14 +337,7 @@ async def execute( """Execute code in the sandlock-isolated sandbox.""" if not self._is_running: await self.start() - - if not self.is_available: - # Fallback to subprocess sandbox if sandlock not available - logger.warning("Sandlock not available, falling back to subprocess") - from .subprocess import SubprocessSandbox - fallback = SubprocessSandbox(self.config) - return await fallback.execute(code, language, limits, env, working_dir) - + limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) @@ -325,22 +361,39 @@ async def execute_file( limits: Optional[ResourceLimits] = None, env: Optional[Dict[str, str]] = None, ) -> SandboxResult: - """Execute a file in the sandbox.""" + """Execute a file in the sandbox. + + The script is passed to the interpreter by path rather than slurped + through ``-c``, so large scripts don't hit ARG_MAX. Only the script + file itself — not its parent directory — is added to the Landlock + read allowlist, so sibling files on the host are never exposed. + """ + if not self._is_running: + await self.start() + if not os.path.exists(file_path): return SandboxResult( status=SandboxStatus.FAILED, error=f"File not found: {file_path}", ) - - with open(file_path, "r") as f: - code = f.read() - - # Determine language from file extension - language = "python" - if file_path.endswith(".sh") or file_path.endswith(".bash"): - language = "bash" - - return await self.execute(code, language=language, limits=limits, env=env) + + limits = limits or self.config.resource_limits + execution_id = str(uuid.uuid4()) + + abs_path = os.path.realpath(file_path) + interp = "bash" if file_path.endswith((".sh", ".bash")) else "python3" + cmd: List[str] = [interp, abs_path] + if args: + cmd.extend(args) + + return await self._run_sandlocked( + cmd, + execution_id=execution_id, + limits=limits, + env=env, + working_dir=self._temp_dir, + extra_readable=[abs_path], + ) async def run_command( self, @@ -352,13 +405,7 @@ async def run_command( """Run a shell command in the sandbox.""" if not self._is_running: await self.start() - - if not self.is_available: - logger.warning("Sandlock not available, falling back to subprocess") - from .subprocess import SubprocessSandbox - fallback = SubprocessSandbox(self.config) - return await fallback.run_command(command, limits, env, working_dir) - + limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) @@ -462,4 +509,4 @@ async def cleanup(self) -> None: async def reset(self) -> None: """Reset sandbox to initial state.""" await self.stop() - await self.start() \ No newline at end of file + await self.start() diff --git a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py index 09e9a7725..0e7035b81 100644 --- a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py +++ b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py @@ -15,6 +15,12 @@ def _make_sandbox(mock_sandlock): """Instantiate SandlockSandbox with *mock_sandlock* injected via sys.modules.""" + # The SDK gates on landlock_abi_version() >= min_landlock_abi(); give the + # mock a concrete minimum (sandlock currently requires ABI 6) so the + # numeric comparison in __init__/is_available works. Tests that exercise + # the "unsupported" path set landlock_abi_version below this. + if not isinstance(mock_sandlock.min_landlock_abi.return_value, int): + mock_sandlock.min_landlock_abi.return_value = 6 # Remove any cached version so the import block inside __init__ re-runs. sys.modules.pop("praisonai.sandbox.sandlock", None) with patch.dict("sys.modules", {"sandlock": mock_sandlock}): @@ -42,58 +48,53 @@ def test_import_without_sandlock(self): with pytest.raises(ImportError, match="sandlock package required"): SandlockSandbox() - def test_fallback_to_subprocess_when_unavailable(self): - """Test fallback to subprocess when sandlock is not available.""" - mock_sandlock = Mock() - mock_sandlock.landlock_abi_version.return_value = 0 # < 1, so unavailable - - sandbox = _make_sandbox(mock_sandlock) - assert not sandbox.is_available - assert sandbox.sandbox_type == "sandlock" + def test_raises_when_landlock_unavailable(self): + """Instantiation must fail loud on kernels without Landlock support. - @pytest.mark.asyncio - async def test_fallback_execution(self): - """Test that execution falls back to subprocess when sandlock unavailable.""" + Silent degradation to SubprocessSandbox would violate the caller's + explicit choice of kernel-level isolation — a SandlockSandbox that + isn't actually using Landlock is a security footgun. + """ mock_sandlock = Mock() - mock_sandlock.landlock_abi_version.return_value = 0 # < 1, so unavailable - - sandbox = _make_sandbox(mock_sandlock) - - mock_subprocess_instance = AsyncMock() - mock_subprocess_instance.execute.return_value = Mock( - status=SandboxStatus.COMPLETED, - exit_code=0, - stdout="Hello, World!", - stderr="", - ) + mock_sandlock.landlock_abi_version.return_value = 0 # unsupported - with patch("praisonai.sandbox.subprocess.SubprocessSandbox") as mock_subprocess: - mock_subprocess.return_value = mock_subprocess_instance - result = await sandbox.execute("print('Hello, World!')") + with pytest.raises(RuntimeError, match="requires Landlock"): + _make_sandbox(mock_sandlock) - mock_subprocess.assert_called_once() - mock_subprocess_instance.execute.assert_called_once() + def test_sandbox_kwargs_with_minimal_limits(self): + """Sandbox kwargs are built directly from resource limits. - def test_policy_creation_with_minimal_limits(self): - """Test policy creation with minimal resource limits.""" + sandlock dropped the ``Policy`` object; config is now passed straight + to ``Sandbox(**kwargs)``. + """ mock_sandlock = Mock() - mock_policy = Mock() - mock_sandlock.Policy.return_value = mock_policy + mock_sandlock.landlock_abi_version.return_value = 6 # supported sandbox = _make_sandbox(mock_sandlock) limits = ResourceLimits.minimal() - sandbox._create_policy(limits, "/tmp/workspace") - - mock_sandlock.Policy.assert_called_once() - call_kwargs = mock_sandlock.Policy.call_args[1] + call_kwargs = sandbox._build_sandbox_kwargs(limits, "/tmp/workspace") assert "fs_readable" in call_kwargs assert "fs_writable" in call_kwargs - assert "max_memory" in call_kwargs assert call_kwargs["max_memory"] == "128M" # From minimal limits assert call_kwargs["max_processes"] == 5 - assert call_kwargs["net_allow_hosts"] == [] # Network disabled + assert call_kwargs["max_cpu"] == 50 # From minimal limits + # Network disabled → deny all outbound (empty allowlist). + assert call_kwargs["net_allow"] == [] + + def test_sandbox_kwargs_network_enabled(self): + """Network-enabled limits open all TCP plus UDP DNS.""" + mock_sandlock = Mock() + mock_sandlock.landlock_abi_version.return_value = 6 + + sandbox = _make_sandbox(mock_sandlock) + + limits = ResourceLimits(network_enabled=True) + call_kwargs = sandbox._build_sandbox_kwargs(limits) + + assert "*:*" in call_kwargs["net_allow"] + assert "udp://*:53" in call_kwargs["net_allow"] def test_status_reporting(self): """Test sandbox status reporting.""" @@ -116,11 +117,12 @@ async def test_sandlock_execution_success(self): """Test successful code execution with sandlock.""" mock_sandlock = Mock() mock_result = Mock() + mock_result.success = True mock_result.exit_code = 0 - mock_result.stdout = "Hello, World!" - mock_result.stderr = "" + # Real sandlock returns bytes; exercise the _decode() byte path. + mock_result.stdout = b"Hello, World!" + mock_result.stderr = b"" - mock_sandlock.Policy.return_value = Mock() mock_sandlock.Sandbox.return_value = Mock(run=Mock(return_value=mock_result)) mock_sandlock.landlock_abi_version.return_value = 6 # >= 6, so available @@ -140,54 +142,53 @@ async def test_sandlock_execution_success(self): @pytest.mark.asyncio async def test_sandlock_execution_timeout(self): - """Test timeout handling in sandlock execution.""" + """Timeout is detected via exit_code == -1 (ExitStatus::Timeout).""" mock_sandlock = Mock() - mock_sandlock.Policy.return_value = Mock() mock_sandlock.Sandbox.return_value = Mock() - mock_sandlock.landlock_abi_version.return_value = 6 # >= 6, so available + mock_sandlock.landlock_abi_version.return_value = 6 sandbox = _make_sandbox(mock_sandlock) - # Create a mock Result object indicating timeout mock_timeout_result = Mock() mock_timeout_result.success = False - mock_timeout_result.exit_code = 124 # Common timeout exit code - mock_timeout_result.stdout = "" - mock_timeout_result.stderr = "Process timed out" + # sandlock's timeout sentinel — Sandbox.run() does not populate + # result.error on timeout, so we rely on the exit_code instead. + mock_timeout_result.exit_code = -1 + mock_timeout_result.stdout = b"" + mock_timeout_result.stderr = b"" + mock_timeout_result.error = None - # Simulate timeout by returning failed Result after enough time - with patch("asyncio.get_running_loop") as mock_loop, \ - patch("time.time", side_effect=[0, 11, 11]): # Started at 0, ended at 11s (> 10s timeout) + with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock( return_value=mock_timeout_result ) await sandbox.start() - result = await sandbox.execute("import time; time.sleep(100)", limits=ResourceLimits(timeout_seconds=10)) + result = await sandbox.execute( + "import time; time.sleep(100)", + limits=ResourceLimits(timeout_seconds=10), + ) assert result.status == SandboxStatus.TIMEOUT assert "timed out" in result.error.lower() @pytest.mark.asyncio async def test_sandlock_execution_failure(self): - """Test general execution failure handling.""" + """Non-timeout failures keep the FAILED status and surface stderr.""" mock_sandlock = Mock() - mock_sandlock.Policy.return_value = Mock() mock_sandlock.Sandbox.return_value = Mock() - mock_sandlock.landlock_abi_version.return_value = 6 # >= 6, so available + mock_sandlock.landlock_abi_version.return_value = 6 sandbox = _make_sandbox(mock_sandlock) - # Create a mock Result object indicating non-zero exit mock_failed_result = Mock() mock_failed_result.success = False - mock_failed_result.exit_code = 1 # Non-zero exit code - mock_failed_result.stdout = "" - mock_failed_result.stderr = "Permission denied" + mock_failed_result.exit_code = 1 + mock_failed_result.stdout = b"" + mock_failed_result.stderr = b"Permission denied" + mock_failed_result.error = None # not a timeout - # Simulate execution failure (not timeout) - with patch("asyncio.get_running_loop") as mock_loop, \ - patch("time.time", side_effect=[0, 2, 2]): # Started at 0, ended at 2s (< 10s timeout) + with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock( return_value=mock_failed_result )