From f403c53fdfeb8731053da4c9cbfb292d7042f879 Mon Sep 17 00:00:00 2001 From: ethanbailie Date: Thu, 12 Mar 2026 20:20:15 -0400 Subject: [PATCH] add gpu runtime policy and executor coverage --- tako_vm/execution/worker.py | 86 +++++++++++++++++++++++++++++-- tests/test_runtime.py | 100 ++++++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+), 5 deletions(-) diff --git a/tako_vm/execution/worker.py b/tako_vm/execution/worker.py index 937e664..a0e3b34 100644 --- a/tako_vm/execution/worker.py +++ b/tako_vm/execution/worker.py @@ -209,16 +209,18 @@ def __init__( self.registry = registry or JobTypeRegistry() self.config = config or get_config() - # Check runtime availability - self._runtime = self._resolve_runtime() + # Resolve and validate baseline non-GPU runtime availability at startup + self._runtime = self._resolve_runtime(use_gpu=False, workload="job") - def _resolve_runtime(self) -> str: + def _resolve_runtime(self, use_gpu: bool = False, workload: str = "job") -> str: """ Resolve the container runtime to use. In strict mode (default), gVisor must be available or we fail. In permissive mode, we fall back to runc with a warning. + GPU execution/session workloads always require runc. + Returns: The runtime to use ('runsc' or 'runc') @@ -228,6 +230,20 @@ def _resolve_runtime(self) -> str: requested_runtime = self.config.container_runtime security_mode = self.config.security_mode + if use_gpu: + if security_mode == "strict": + raise RuntimeUnavailableError( + "GPU workloads require gVisor to be disabled, but security_mode='strict' " + "requires gVisor. Use security_mode='permissive' for GPU sessions/jobs." + ) + + if requested_runtime == "runsc": + logger.info( + "GPU enabled for %s workload; forcing runtime to runc (gVisor disabled)", + workload, + ) + return "runc" + # If runc is explicitly requested, allow it (user knows what they're doing) if requested_runtime == "runc": if security_mode == "strict": @@ -262,6 +278,45 @@ def _resolve_runtime(self) -> str: logger.warning("=" * 60) return "runc" + def resolve_runtime_for_job_type(self, job_type: JobType, workload: str = "job") -> str: + """Resolve runtime for a specific job type, accounting for GPU requirements.""" + return self._resolve_runtime(use_gpu=job_type.gpu_enabled, workload=workload) + + def build_gpu_flags(self, job_type: JobType) -> List[str]: + """Build Docker CLI GPU flags for a job type.""" + if not job_type.gpu_enabled: + return [] + + vendor = (job_type.gpu_vendor or "").lower() + if vendor == "nvidia": + if job_type.gpu_device_ids: + return [f"--gpus=device={','.join(job_type.gpu_device_ids)}"] + if job_type.gpu_count is not None: + return [f"--gpus={job_type.gpu_count}"] + return ["--gpus=all"] + + if vendor == "amd": + return ["--device=/dev/kfd", "--device=/dev/dri"] + + raise RuntimeUnavailableError(f"Unsupported GPU vendor: {job_type.gpu_vendor}") + + def build_gpu_env_vars(self, job_type: JobType) -> Dict[str, str]: + """Build environment variables used for GPU device selection.""" + if not job_type.gpu_enabled or not job_type.gpu_device_ids: + return {} + + device_list = ",".join(job_type.gpu_device_ids) + vendor = (job_type.gpu_vendor or "").lower() + + if vendor == "nvidia": + return {"CUDA_VISIBLE_DEVICES": device_list} + if vendor == "amd": + return { + "ROCR_VISIBLE_DEVICES": device_list, + "HIP_VISIBLE_DEVICES": device_list, + } + return {} + def _get_job_type(self, job_type_name: Optional[str]) -> JobType: """ Get job type configuration. @@ -819,6 +874,20 @@ def _run_container( "For true network isolation, use pre-built images via 'tako-vm build'." ) + try: + runtime = self.resolve_runtime_for_job_type(job_type=job_type, workload="job") + gpu_flags = self.build_gpu_flags(job_type) + gpu_env_vars = self.build_gpu_env_vars(job_type) + except RuntimeUnavailableError as e: + safe_error = sanitize_error(str(e)) + return { + "success": False, + "error": safe_error, + "stdout": "", + "stderr": safe_error, + "exit_code": -1, + } + # Generate container name for tracking (allows cleanup on timeout) container_name = generate_container_name("tako", job_id) @@ -849,8 +918,11 @@ def _run_container( # Only specify runtime explicitly for gVisor (runsc) # runc is the default Docker runtime, so we don't need to specify it explicitly # (and some Docker configurations may not accept --runtime=runc) - if self._runtime == "runsc": - cmd.append(f"--runtime={self._runtime}") + if runtime == "runsc": + cmd.append("--runtime=runsc") + + # GPU access flags (if enabled by job type) + cmd.extend(gpu_flags) # Mount uv cache volume for faster repeated installs if has_runtime_deps: @@ -910,6 +982,10 @@ def _run_container( continue cmd.append(f"--env={key}={value}") + # Add GPU selection env vars (safe, generated by server) + for key, value in gpu_env_vars.items(): + cmd.append(f"--env={key}={value}") + # Pass requirements for runtime installation via uv if all_requirements: # Check requirements limit to prevent env var overflow diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 4987fa8..13cf814 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -14,6 +14,7 @@ RuntimeUnavailableError, reset_gvisor_check, ) +from tako_vm.job_types import JobType @pytest.fixture(autouse=True) @@ -234,3 +235,102 @@ def test_is_native_linux_on_windows(self, monkeypatch): from tako_vm.execution.worker import is_native_linux assert is_native_linux() is False + + +class TestGpuRuntimePolicy: + """Tests for GPU-specific runtime resolution and Docker flags.""" + + @pytest.fixture(autouse=True) + def mock_gvisor_available(self, monkeypatch): + """Mock gVisor as available for baseline executor initialization.""" + monkeypatch.setattr(worker_module, "_gvisor_available", True) + + def test_gpu_workload_rejected_in_strict_mode(self): + """GPU workloads are blocked when strict mode requires gVisor.""" + executor = CodeExecutor( + config=TakoVMConfig(container_runtime="runsc", security_mode="strict") + ) + gpu_job = JobType(name="gpu-job", gpu_enabled=True, gpu_vendor="nvidia") + + with pytest.raises(RuntimeUnavailableError) as exc_info: + executor.resolve_runtime_for_job_type(gpu_job) + + assert "GPU workloads require gVisor to be disabled" in str(exc_info.value) + + def test_gpu_workload_forces_runc_in_permissive_mode(self): + """GPU workloads always use runc in permissive mode.""" + executor = CodeExecutor( + config=TakoVMConfig(container_runtime="runsc", security_mode="permissive") + ) + gpu_job = JobType(name="gpu-job", gpu_enabled=True, gpu_vendor="nvidia") + + assert executor.resolve_runtime_for_job_type(gpu_job) == "runc" + + def test_build_gpu_flags_for_nvidia_variants(self): + """NVIDIA flag generation supports all/count/device selection.""" + executor = CodeExecutor( + config=TakoVMConfig(container_runtime="runsc", security_mode="permissive") + ) + + assert executor.build_gpu_flags( + JobType(name="all", gpu_enabled=True, gpu_vendor="nvidia") + ) == ["--gpus=all"] + assert executor.build_gpu_flags( + JobType(name="count", gpu_enabled=True, gpu_vendor="nvidia", gpu_count=2) + ) == ["--gpus=2"] + assert executor.build_gpu_flags( + JobType( + name="devices", + gpu_enabled=True, + gpu_vendor="nvidia", + gpu_device_ids=["GPU-1", "GPU-2"], + ) + ) == ["--gpus=device=GPU-1,GPU-2"] + + def test_build_gpu_flags_for_amd(self): + """AMD jobs mount required device nodes.""" + executor = CodeExecutor( + config=TakoVMConfig(container_runtime="runsc", security_mode="permissive") + ) + flags = executor.build_gpu_flags(JobType(name="amd", gpu_enabled=True, gpu_vendor="amd")) + assert flags == ["--device=/dev/kfd", "--device=/dev/dri"] + + def test_build_gpu_env_vars_for_device_selection(self): + """Device selection env vars are vendor-specific.""" + executor = CodeExecutor( + config=TakoVMConfig(container_runtime="runsc", security_mode="permissive") + ) + + nvidia_env = executor.build_gpu_env_vars( + JobType( + name="nvidia", + gpu_enabled=True, + gpu_vendor="nvidia", + gpu_device_ids=["0", "2"], + ) + ) + assert nvidia_env == {"CUDA_VISIBLE_DEVICES": "0,2"} + + amd_env = executor.build_gpu_env_vars( + JobType( + name="amd", + gpu_enabled=True, + gpu_vendor="amd", + gpu_device_ids=["card0"], + ) + ) + assert amd_env == { + "ROCR_VISIBLE_DEVICES": "card0", + "HIP_VISIBLE_DEVICES": "card0", + } + + def test_build_gpu_flags_rejects_unknown_vendor(self): + """Unsupported GPU vendor raises a runtime error.""" + executor = CodeExecutor( + config=TakoVMConfig(container_runtime="runsc", security_mode="permissive") + ) + + with pytest.raises(RuntimeUnavailableError) as exc_info: + executor.build_gpu_flags(JobType(name="bad", gpu_enabled=True, gpu_vendor="intel")) + + assert "Unsupported GPU vendor" in str(exc_info.value)