Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 81 additions & 5 deletions tako_vm/execution/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,18 @@ def __init__(
self.registry = registry or JobTypeRegistry()
self.config = config or get_config()

# Check runtime availability
self._runtime = self._resolve_runtime()
# Resolve and validate baseline non-GPU runtime availability at startup
self._runtime = self._resolve_runtime(use_gpu=False, workload="job")

def _resolve_runtime(self) -> str:
def _resolve_runtime(self, use_gpu: bool = False, workload: str = "job") -> str:
"""
Resolve the container runtime to use.

In strict mode (default), gVisor must be available or we fail.
In permissive mode, we fall back to runc with a warning.

GPU execution/session workloads always require runc.

Returns:
The runtime to use ('runsc' or 'runc')

Expand All @@ -228,6 +230,20 @@ def _resolve_runtime(self) -> str:
requested_runtime = self.config.container_runtime
security_mode = self.config.security_mode

if use_gpu:
if security_mode == "strict":
raise RuntimeUnavailableError(
"GPU workloads require gVisor to be disabled, but security_mode='strict' "
"requires gVisor. Use security_mode='permissive' for GPU sessions/jobs."
)

if requested_runtime == "runsc":
logger.info(
"GPU enabled for %s workload; forcing runtime to runc (gVisor disabled)",
workload,
)
return "runc"

# If runc is explicitly requested, allow it (user knows what they're doing)
if requested_runtime == "runc":
if security_mode == "strict":
Expand Down Expand Up @@ -262,6 +278,45 @@ def _resolve_runtime(self) -> str:
logger.warning("=" * 60)
return "runc"

def resolve_runtime_for_job_type(self, job_type: JobType, workload: str = "job") -> str:
"""Resolve runtime for a specific job type, accounting for GPU requirements."""
return self._resolve_runtime(use_gpu=job_type.gpu_enabled, workload=workload)

def build_gpu_flags(self, job_type: JobType) -> List[str]:
"""Build Docker CLI GPU flags for a job type."""
if not job_type.gpu_enabled:
return []

vendor = (job_type.gpu_vendor or "").lower()
if vendor == "nvidia":
if job_type.gpu_device_ids:
return [f"--gpus=device={','.join(job_type.gpu_device_ids)}"]
if job_type.gpu_count is not None:
return [f"--gpus={job_type.gpu_count}"]
return ["--gpus=all"]

if vendor == "amd":
return ["--device=/dev/kfd", "--device=/dev/dri"]

raise RuntimeUnavailableError(f"Unsupported GPU vendor: {job_type.gpu_vendor}")

def build_gpu_env_vars(self, job_type: JobType) -> Dict[str, str]:
"""Build environment variables used for GPU device selection."""
if not job_type.gpu_enabled or not job_type.gpu_device_ids:
return {}

device_list = ",".join(job_type.gpu_device_ids)
vendor = (job_type.gpu_vendor or "").lower()

if vendor == "nvidia":
return {"CUDA_VISIBLE_DEVICES": device_list}
if vendor == "amd":
return {
"ROCR_VISIBLE_DEVICES": device_list,
"HIP_VISIBLE_DEVICES": device_list,
}
return {}

def _get_job_type(self, job_type_name: Optional[str]) -> JobType:
"""
Get job type configuration.
Expand Down Expand Up @@ -819,6 +874,20 @@ def _run_container(
"For true network isolation, use pre-built images via 'tako-vm build'."
)

try:
runtime = self.resolve_runtime_for_job_type(job_type=job_type, workload="job")
gpu_flags = self.build_gpu_flags(job_type)
gpu_env_vars = self.build_gpu_env_vars(job_type)
except RuntimeUnavailableError as e:
safe_error = sanitize_error(str(e))
return {
"success": False,
"error": safe_error,
"stdout": "",
"stderr": safe_error,
"exit_code": -1,
}

# Generate container name for tracking (allows cleanup on timeout)
container_name = generate_container_name("tako", job_id)

Expand Down Expand Up @@ -849,8 +918,11 @@ def _run_container(
# Only specify runtime explicitly for gVisor (runsc)
# runc is the default Docker runtime, so we don't need to specify it explicitly
# (and some Docker configurations may not accept --runtime=runc)
if self._runtime == "runsc":
cmd.append(f"--runtime={self._runtime}")
if runtime == "runsc":
cmd.append("--runtime=runsc")

# GPU access flags (if enabled by job type)
cmd.extend(gpu_flags)

# Mount uv cache volume for faster repeated installs
if has_runtime_deps:
Expand Down Expand Up @@ -910,6 +982,10 @@ def _run_container(
continue
cmd.append(f"--env={key}={value}")

# Add GPU selection env vars (safe, generated by server)
for key, value in gpu_env_vars.items():
cmd.append(f"--env={key}={value}")

# Pass requirements for runtime installation via uv
if all_requirements:
# Check requirements limit to prevent env var overflow
Expand Down
100 changes: 100 additions & 0 deletions tests/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
RuntimeUnavailableError,
reset_gvisor_check,
)
from tako_vm.job_types import JobType


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -234,3 +235,102 @@ def test_is_native_linux_on_windows(self, monkeypatch):
from tako_vm.execution.worker import is_native_linux

assert is_native_linux() is False


class TestGpuRuntimePolicy:
"""Tests for GPU-specific runtime resolution and Docker flags."""

@pytest.fixture(autouse=True)
def mock_gvisor_available(self, monkeypatch):
"""Mock gVisor as available for baseline executor initialization."""
monkeypatch.setattr(worker_module, "_gvisor_available", True)

def test_gpu_workload_rejected_in_strict_mode(self):
"""GPU workloads are blocked when strict mode requires gVisor."""
executor = CodeExecutor(
config=TakoVMConfig(container_runtime="runsc", security_mode="strict")
)
gpu_job = JobType(name="gpu-job", gpu_enabled=True, gpu_vendor="nvidia")

with pytest.raises(RuntimeUnavailableError) as exc_info:
executor.resolve_runtime_for_job_type(gpu_job)

assert "GPU workloads require gVisor to be disabled" in str(exc_info.value)

def test_gpu_workload_forces_runc_in_permissive_mode(self):
"""GPU workloads always use runc in permissive mode."""
executor = CodeExecutor(
config=TakoVMConfig(container_runtime="runsc", security_mode="permissive")
)
gpu_job = JobType(name="gpu-job", gpu_enabled=True, gpu_vendor="nvidia")

assert executor.resolve_runtime_for_job_type(gpu_job) == "runc"

def test_build_gpu_flags_for_nvidia_variants(self):
"""NVIDIA flag generation supports all/count/device selection."""
executor = CodeExecutor(
config=TakoVMConfig(container_runtime="runsc", security_mode="permissive")
)

assert executor.build_gpu_flags(
JobType(name="all", gpu_enabled=True, gpu_vendor="nvidia")
) == ["--gpus=all"]
assert executor.build_gpu_flags(
JobType(name="count", gpu_enabled=True, gpu_vendor="nvidia", gpu_count=2)
) == ["--gpus=2"]
assert executor.build_gpu_flags(
JobType(
name="devices",
gpu_enabled=True,
gpu_vendor="nvidia",
gpu_device_ids=["GPU-1", "GPU-2"],
)
) == ["--gpus=device=GPU-1,GPU-2"]

def test_build_gpu_flags_for_amd(self):
"""AMD jobs mount required device nodes."""
executor = CodeExecutor(
config=TakoVMConfig(container_runtime="runsc", security_mode="permissive")
)
flags = executor.build_gpu_flags(JobType(name="amd", gpu_enabled=True, gpu_vendor="amd"))
assert flags == ["--device=/dev/kfd", "--device=/dev/dri"]

def test_build_gpu_env_vars_for_device_selection(self):
"""Device selection env vars are vendor-specific."""
executor = CodeExecutor(
config=TakoVMConfig(container_runtime="runsc", security_mode="permissive")
)

nvidia_env = executor.build_gpu_env_vars(
JobType(
name="nvidia",
gpu_enabled=True,
gpu_vendor="nvidia",
gpu_device_ids=["0", "2"],
)
)
assert nvidia_env == {"CUDA_VISIBLE_DEVICES": "0,2"}

amd_env = executor.build_gpu_env_vars(
JobType(
name="amd",
gpu_enabled=True,
gpu_vendor="amd",
gpu_device_ids=["card0"],
)
)
assert amd_env == {
"ROCR_VISIBLE_DEVICES": "card0",
"HIP_VISIBLE_DEVICES": "card0",
}

def test_build_gpu_flags_rejects_unknown_vendor(self):
"""Unsupported GPU vendor raises a runtime error."""
executor = CodeExecutor(
config=TakoVMConfig(container_runtime="runsc", security_mode="permissive")
)

with pytest.raises(RuntimeUnavailableError) as exc_info:
executor.build_gpu_flags(JobType(name="bad", gpu_enabled=True, gpu_vendor="intel"))

assert "Unsupported GPU vendor" in str(exc_info.value)
Loading