diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml index 7d2059e..ca043bc 100644 --- a/.github/workflows/docker-build.yml +++ b/.github/workflows/docker-build.yml @@ -124,103 +124,6 @@ jobs: if-no-files-found: error retention-days: 1 - build-operator: - runs-on: ${{ matrix.runner }} - needs: [get-version] - permissions: - contents: read - packages: write - strategy: - fail-fast: false - matrix: - include: - - platform: linux/amd64 - runner: ubuntu-latest - - platform: linux/arm64 - runner: ubuntu-24.04-arm - - steps: - - name: Set repository and image name to lowercase - run: | - echo "FULL_IMAGE_NAME=ghcr.io/${GITHUB_REPOSITORY,,}-operator" >> $GITHUB_ENV - - - name: Prepare - run: | - platform=${{ matrix.platform }} - echo "PLATFORM_PAIR=${platform//\//-}" >> $GITHUB_ENV - - - name: Checkout repository - uses: actions/checkout@v5 - - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to the Container registry - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for Docker images - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - type=ref,event=tag - type=sha,prefix=git- - type=semver,pattern={{version}} - type=semver,pattern={{major}}.{{minor}} - type=raw,value=${{ needs.get-version.outputs.version }} - flavor: | - latest=${{ github.ref == 'refs/heads/main' }} - - - name: Extract metadata for Docker cache - id: cache-meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - ${{ github.ref_type == 'tag' && 'type=raw,value=main' || '' }} - flavor: | - prefix=cache-operator-${{ matrix.platform }}- - latest=false - - - name: Build Docker image - uses: docker/build-push-action@v5 - id: build - with: - context: ./operator - file: ./operator/Dockerfile - push: true - platforms: ${{ matrix.platform }} - labels: ${{ steps.meta.outputs.labels }} - outputs: type=image,name=${{ env.FULL_IMAGE_NAME }},push-by-digest=true,name-canonical=true,push=true - cache-from: type=registry,ref=${{ steps.cache-meta.outputs.tags }} - cache-to: type=registry,ref=${{ steps.cache-meta.outputs.tags }},mode=max - build-args: | - BUILD_HASH=${{ github.sha }} - - - name: Export digest - run: | - mkdir -p /tmp/digests - digest="${{ steps.build.outputs.digest }}" - touch "/tmp/digests/${digest#sha256:}" - - - name: Upload digest - uses: actions/upload-artifact@v4 - with: - name: digests-operator-${{ env.PLATFORM_PAIR }} - path: /tmp/digests/* - if-no-files-found: error - retention-days: 1 - merge-orchestrator-images: runs-on: ubuntu-latest needs: [get-version, build-orchestrator] @@ -274,63 +177,10 @@ jobs: run: | docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ needs.get-version.outputs.version }} - merge-operator-images: - runs-on: ubuntu-latest - needs: [get-version, build-operator] - permissions: - contents: read - packages: write - steps: - - name: Set repository and image name to lowercase - run: | - echo "FULL_IMAGE_NAME=ghcr.io/${GITHUB_REPOSITORY,,}-operator" >> $GITHUB_ENV - - - name: Download digests - uses: actions/download-artifact@v5 - with: - pattern: digests-operator-* - path: /tmp/digests - merge-multiple: true - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - - name: Log in to the Container registry - uses: docker/login-action@v3 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Extract metadata for Docker images - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ env.FULL_IMAGE_NAME }} - tags: | - type=ref,event=branch - type=ref,event=tag - type=sha,prefix=git- - type=semver,pattern={{version}} - type=semver,pattern={{major}}.{{minor}} - type=raw,value=${{ needs.get-version.outputs.version }} - flavor: | - latest=${{ github.ref == 'refs/heads/main' }} - - - name: Create manifest list and push - working-directory: /tmp/digests - run: | - docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \ - $(printf '${{ env.FULL_IMAGE_NAME }}@sha256:%s ' *) - - - name: Inspect image - run: | - docker buildx imagetools inspect ${{ env.FULL_IMAGE_NAME }}:${{ needs.get-version.outputs.version }} - create-release: runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' - needs: [get-version, merge-orchestrator-images, merge-operator-images] + needs: [get-version, merge-orchestrator-images] permissions: contents: write steps: diff --git a/README.md b/README.md index dafe8ff..f9fe942 100644 --- a/README.md +++ b/README.md @@ -30,17 +30,51 @@ docker run -p 3000:3000 \ **Prerequisites:** Docker running on the host. -### Kubernetes Operator (recommended for clusters) - -For Kubernetes deployments, the operator manages `Terminal` custom resources automatically — handling pod creation, storage, and cleanup through CRDs. +### Kubernetes with Agent Sandbox (recommended for clusters) + +For Kubernetes deployments, Terminals builds on the upstream +[Agent Sandbox](https://github.com/kubernetes-sigs/agent-sandbox) project (SIG Apps). +Each user+policy maps to a single `Sandbox` custom resource; the agent-sandbox +controller reconciles it into a Pod, a headless Service (a stable `serviceFQDN`), +and a PersistentVolume when a workspace is requested. Idle terminals are +**suspended** (`operatingMode: Suspended`, scale-to-zero with storage and identity +preserved) and resumed on the next request. + +Workspace persistence: the per-user `PersistentVolumeClaim` is created from the +Sandbox's `volumeClaimTemplates` and owned by the Sandbox. Suspending keeps the +Sandbox object, so `/workspace` data survives idle and resume. **Tearing a terminal +down deletes the Sandbox and its PVC** (workspace data is removed) — idle reaping uses +suspend, not teardown, so normal inactivity never destroys data. ```bash -# Install the CRD and operator -kubectl apply -f manifests/terminal-crd.yaml -kubectl apply -f manifests/operator-deployment.yaml +# 1. Install the agent-sandbox controller + extensions (pin a release version) +export VERSION="v0.5.0rc1" # see https://github.com/kubernetes-sigs/agent-sandbox/releases +kubectl apply -f https://github.com/kubernetes-sigs/agent-sandbox/releases/download/${VERSION}/manifest.yaml +kubectl apply -f https://github.com/kubernetes-sigs/agent-sandbox/releases/download/${VERSION}/extensions.yaml + +# 2. Grant the Terminals service access to the Sandbox CRDs +kubectl apply -f manifests/sandbox-rbac.yaml ``` -Set `TERMINALS_BACKEND=kubernetes-operator` when deploying the Terminals service. +Set `TERMINALS_BACKEND=kubernetes-sandbox` when deploying the Terminals service +(with `serviceAccountName: terminals`). For stronger isolation of user code, set +`TERMINALS_SANDBOX_RUNTIME_CLASS=gvisor` (or `kata-qemu`) once the runtime is +installed on your nodes. + +> [!NOTE] +> Agent Sandbox is a young upstream project — this backend targets `v1beta1` (v0.5.x); +> pin a release version and track changes. Two trade-offs to be aware of: the per-user +> API key lives as a plaintext env value in the `Sandbox` pod template (rely on RBAC + +> etcd encryption-at-rest), and **idle tracking is in-memory** in the Terminals process +> (run a single Terminals replica, or expect idle to be tracked per-replica). Warm pools +> are intentionally not used — they are mutually exclusive with per-user API keys (an +> env-injecting claim bypasses the pool), so first connection pays pod start-up (the +> image is cached on the node after the first pull). +> +> These last two are things the backend self-manages **because the controller does not +> yet**. Both are on the [upstream roadmap](https://github.com/kubernetes-sigs/agent-sandbox/blob/main/roadmap.md) +> (*Auto Suspend/Resume*, *Scale to Zero*, *Sandbox/Pod Identity Association*); the +> backend is pinned to `v1beta1` specifically so we can drop these shims as they land. ### From source (development) @@ -54,7 +88,7 @@ terminals serve | Backend | Best for | How it works | |---------|----------|-------------| | `docker` | Single-node, local dev | One container per user via Docker socket | -| `kubernetes-operator` | Production K8s clusters | Operator watches `Terminal` CRDs for automated lifecycle | +| `kubernetes-sandbox` | Production K8s clusters | One [Agent Sandbox](https://github.com/kubernetes-sigs/agent-sandbox) `Sandbox` per user; suspend/resume on idle | | `kubernetes` | K8s without CRDs | Direct Pod + PVC + Service per user (you manage resources) | Set the backend with `TERMINALS_BACKEND` (defaults to `docker`). @@ -109,7 +143,7 @@ All settings are configured through environment variables prefixed with `TERMINA | Variable | Default | Description | |----------|---------|-------------| -| `TERMINALS_BACKEND` | `docker` | `docker`, `kubernetes`, or `kubernetes-operator` | +| `TERMINALS_BACKEND` | `docker` | `docker`, `kubernetes`, or `kubernetes-sandbox` | | `TERMINALS_API_KEY` | *(auto-generated)* | Bearer token for API auth | | `TERMINALS_IMAGE` | `ghcr.io/open-webui/open-terminal:latest` | Default container image | | `TERMINALS_MAX_CPU` | | Hard cap on CPU per container | @@ -117,6 +151,7 @@ All settings are configured through environment variables prefixed with `TERMINA | `TERMINALS_MAX_STORAGE` | | Hard cap on storage per container | | `TERMINALS_ALLOWED_IMAGES` | | Comma-separated list of allowed image patterns | | `TERMINALS_KUBERNETES_STORAGE_MODE` | `per-user` | `per-user`, `shared`, or `shared-rwo` | +| `TERMINALS_SANDBOX_RUNTIME_CLASS` | | RuntimeClass for sandbox isolation, e.g. `gvisor` or `kata-qemu` | See [`config.py`](terminals/config.py) for the full list. diff --git a/manifests/operator-deployment.yaml b/manifests/operator-deployment.yaml deleted file mode 100644 index 12fd585..0000000 --- a/manifests/operator-deployment.yaml +++ /dev/null @@ -1,79 +0,0 @@ ---- -apiVersion: v1 -kind: Namespace -metadata: - name: terminals ---- -apiVersion: v1 -kind: ServiceAccount -metadata: - name: open-terminal-operator - namespace: terminals ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: open-terminal-operator -rules: - # Terminal CRDs - - apiGroups: ["openwebui.com"] - resources: ["terminals", "terminals/status"] - verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] - # Pods, Services, PVCs managed by the operator - - apiGroups: [""] - resources: ["pods", "services", "persistentvolumeclaims"] - verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] - # Events (Kopf posts events for status) - - apiGroups: [""] - resources: ["events"] - verbs: ["create"] - # Kopf needs these for leader election and state tracking - - apiGroups: [""] - resources: ["configmaps"] - verbs: ["get", "list", "watch", "create", "update", "patch"] - - apiGroups: ["coordination.k8s.io"] - resources: ["leases"] - verbs: ["get", "list", "watch", "create", "update", "patch"] ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: open-terminal-operator -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: open-terminal-operator -subjects: - - kind: ServiceAccount - name: open-terminal-operator - namespace: terminals ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: open-terminal-operator - namespace: terminals - labels: - app.kubernetes.io/name: open-terminal-operator - app.kubernetes.io/part-of: open-terminal -spec: - replicas: 1 - selector: - matchLabels: - app.kubernetes.io/name: open-terminal-operator - template: - metadata: - labels: - app.kubernetes.io/name: open-terminal-operator - spec: - serviceAccountName: open-terminal-operator - containers: - - name: operator - image: ghcr.io/open-webui/open-terminal-operator:latest - resources: - requests: - memory: "64Mi" - cpu: "50m" - limits: - memory: "256Mi" - cpu: "200m" diff --git a/manifests/sandbox-rbac.yaml b/manifests/sandbox-rbac.yaml new file mode 100644 index 0000000..9a54c98 --- /dev/null +++ b/manifests/sandbox-rbac.yaml @@ -0,0 +1,50 @@ +# RBAC for the Terminals service to drive Agent Sandbox CRDs. +# +# Apply AFTER installing the agent-sandbox controller + extensions: +# kubectl apply -f https://github.com/kubernetes-sigs/agent-sandbox/releases/download//manifest.yaml +# kubectl apply -f https://github.com/kubernetes-sigs/agent-sandbox/releases/download//extensions.yaml +# kubectl apply -f manifests/sandbox-rbac.yaml +# +# The Terminals Deployment must set `serviceAccountName: terminals` and +# `TERMINALS_BACKEND=kubernetes-sandbox`. +--- +apiVersion: v1 +kind: Namespace +metadata: + name: terminals +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: terminals + namespace: terminals +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: terminals-sandbox + namespace: terminals +rules: + # Core Sandbox resource: create/delete per user, patch replicas for + # suspend/resume, and read status (serviceFQDN, Ready condition). + - apiGroups: ["agents.x-k8s.io"] + resources: ["sandboxes", "sandboxes/status"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] + # Read-only access to the Services/Pods the controller creates. + - apiGroups: [""] + resources: ["services", "pods"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: terminals-sandbox + namespace: terminals +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: terminals-sandbox +subjects: + - kind: ServiceAccount + name: terminals + namespace: terminals diff --git a/manifests/terminal-crd.yaml b/manifests/terminal-crd.yaml deleted file mode 100644 index c9f7003..0000000 --- a/manifests/terminal-crd.yaml +++ /dev/null @@ -1,153 +0,0 @@ -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition -metadata: - name: terminals.openwebui.com - labels: - app.kubernetes.io/part-of: open-terminal -spec: - group: openwebui.com - scope: Namespaced - names: - plural: terminals - singular: terminal - kind: Terminal - listKind: TerminalList - shortNames: - - term - versions: - - name: v1alpha1 - served: true - storage: true - schema: - openAPIV3Schema: - type: object - required: - - spec - properties: - spec: - type: object - required: - - userId - properties: - userId: - type: string - description: Open WebUI user ID that owns this terminal. - image: - type: string - default: "ghcr.io/open-webui/open-terminal:latest" - description: Container image for the Open Terminal instance. - resources: - type: object - properties: - requests: - type: object - properties: - cpu: - type: string - default: "100m" - memory: - type: string - default: "256Mi" - limits: - type: object - properties: - cpu: - type: string - default: "1" - memory: - type: string - default: "1Gi" - idleTimeoutMinutes: - type: integer - default: 30 - minimum: 1 - description: >- - Minutes of inactivity before the terminal pod is stopped. - packages: - type: array - items: - type: string - default: [] - description: Apt packages to pre-install in the terminal. - pipPackages: - type: array - items: - type: string - default: [] - description: Pip packages to pre-install in the terminal. - persistence: - type: object - properties: - enabled: - type: boolean - default: true - size: - type: string - default: "1Gi" - storageClass: - type: string - default: "" - status: - type: object - properties: - phase: - type: string - enum: - - Pending - - Provisioning - - Running - - Idle - - Error - podName: - type: string - serviceName: - type: string - serviceUrl: - type: string - description: Full URL (http://svc:port) reachable within the cluster. - apiKeySecret: - type: string - description: Name of the Secret holding the terminal API key. - lastActivityAt: - type: string - format: date-time - description: Timestamp of the last proxied request. - message: - type: string - description: Human-readable status message. - conditions: - type: array - items: - type: object - properties: - type: - type: string - status: - type: string - enum: ["True", "False", "Unknown"] - lastTransitionTime: - type: string - format: date-time - reason: - type: string - message: - type: string - subresources: - status: {} - additionalPrinterColumns: - - name: User - type: string - jsonPath: .spec.userId - - name: Phase - type: string - jsonPath: .status.phase - - name: Service URL - type: string - jsonPath: .status.serviceUrl - priority: 1 - - name: Last Activity - type: date - jsonPath: .status.lastActivityAt - - name: Age - type: date - jsonPath: .metadata.creationTimestamp diff --git a/operator/Dockerfile b/operator/Dockerfile deleted file mode 100644 index cb8e74a..0000000 --- a/operator/Dockerfile +++ /dev/null @@ -1,13 +0,0 @@ -FROM python:3.12-slim - -WORKDIR /app - -RUN pip install --no-cache-dir \ - kopf>=1.37.0 \ - kubernetes>=31.0.0 - -COPY handler.py /app/handler.py - -USER 65534:65534 - -ENTRYPOINT ["kopf", "run", "/app/handler.py", "--verbose"] diff --git a/operator/handler.py b/operator/handler.py deleted file mode 100644 index a0e9801..0000000 --- a/operator/handler.py +++ /dev/null @@ -1,547 +0,0 @@ -"""Open Terminal Operator — Kopf handlers for the Terminal CRD. - -Watches ``Terminal`` custom resources (``terminals.openwebui.com/v1alpha1``) -and reconciles the underlying Kubernetes resources: - -- **Secret** holding a generated API key -- **Pod** running the open-terminal container -- **Service** (ClusterIP) exposing port 8000 -- **PVC** (optional) for persistent ``/workspace`` storage - -The orchestrator creates/deletes Terminal CRs; this operator does the rest. - -Ported from the ``kubernetes-controller`` branch with the ABC-compatible -``openwebui.com`` API group retained for extensibility. -""" - -import base64 -import logging -import secrets -import string -from datetime import datetime, timezone - -import kopf -import kubernetes -from kubernetes import client as k8s - -log = logging.getLogger(__name__) - -GROUP = "openwebui.com" -VERSION = "v1alpha1" -PLURAL = "terminals" - - -# --------------------------------------------------------------------------- -# Startup -# --------------------------------------------------------------------------- - - -@kopf.on.startup() -def configure(settings: kopf.OperatorSettings, **_): - """Load K8s config and configure kopf settings.""" - try: - kubernetes.config.load_incluster_config() - except kubernetes.config.ConfigException: - kubernetes.config.load_kube_config() - settings.posting.level = logging.WARNING - settings.persistence.finalizer = "terminals.openwebui.com/finalizer" - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def _generate_api_key(length: int = 48) -> str: - alphabet = string.ascii_letters + string.digits - return "sk-" + "".join(secrets.choice(alphabet) for _ in range(length)) - - -def _resource_name(name: str, suffix: str) -> str: - """Derive child resource names from the Terminal CR name.""" - return f"{name}-{suffix}" - - -def _now_iso() -> str: - return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - - -def _owner_ref(body: dict) -> dict: - """Build a single ownerReference dict for garbage collection.""" - return { - "apiVersion": f"{GROUP}/{VERSION}", - "kind": "Terminal", - "name": body["metadata"]["name"], - "uid": body["metadata"]["uid"], - "controller": True, - "blockOwnerDeletion": True, - } - - -def _labels(name: str, user_id: str = "") -> dict[str, str]: - labels = { - "app.kubernetes.io/name": "open-terminal", - "app.kubernetes.io/instance": name, - "app.kubernetes.io/managed-by": "terminals", - "app.kubernetes.io/part-of": "open-terminal", - "openwebui.com/terminal": name, - } - if user_id: - labels["openwebui.com/user-id"] = user_id - return labels - - -def _set_condition( - status: dict, - cond_type: str, - cond_status: str, - reason: str = "", - message: str = "", -) -> list: - """Create or update a condition in the conditions list.""" - conditions = list(status.get("conditions") or []) - for c in conditions: - if c["type"] == cond_type: - c["status"] = cond_status - c["lastTransitionTime"] = _now_iso() - c["reason"] = reason - c["message"] = message - return conditions - conditions.append( - { - "type": cond_type, - "status": cond_status, - "lastTransitionTime": _now_iso(), - "reason": reason, - "message": message, - } - ) - return conditions - - -# --------------------------------------------------------------------------- -# Manifest builders -# --------------------------------------------------------------------------- - - -def _build_pod_manifest( - name: str, - namespace: str, - spec: dict, - api_key: str, - owner_ref: dict, - pvc_name: str | None, - user_id: str = "", -) -> dict: - """Build the Pod manifest for an Open Terminal instance.""" - image = spec.get("image", "ghcr.io/open-webui/open-terminal:latest") - resources_spec = spec.get("resources", {}) - packages = spec.get("packages", []) - pip_packages = spec.get("pipPackages", []) - - env = [ - {"name": "OPEN_TERMINAL_API_KEY", "value": api_key}, - {"name": "OPEN_TERMINAL_HOST", "value": "0.0.0.0"}, - {"name": "OPEN_TERMINAL_PORT", "value": "8000"}, - ] - if packages: - env.append({"name": "OPEN_TERMINAL_PACKAGES", "value": " ".join(packages)}) - if pip_packages: - env.append({"name": "OPEN_TERMINAL_PIP_PACKAGES", "value": " ".join(pip_packages)}) - - volume_mounts = [] - volumes = [] - if pvc_name: - volume_mounts.append({"name": "workspace", "mountPath": "/workspace"}) - volumes.append( - {"name": "workspace", "persistentVolumeClaim": {"claimName": pvc_name}} - ) - - container = { - "name": "open-terminal", - "image": image, - "ports": [{"containerPort": 8000, "name": "http", "protocol": "TCP"}], - "env": env, - "volumeMounts": volume_mounts, - "readinessProbe": { - "httpGet": {"path": "/health", "port": 8000}, - "initialDelaySeconds": 3, - "periodSeconds": 5, - }, - "livenessProbe": { - "httpGet": {"path": "/health", "port": 8000}, - "initialDelaySeconds": 10, - "periodSeconds": 15, - }, - } - - requests = resources_spec.get("requests", {}) - limits = resources_spec.get("limits", {}) - if requests or limits: - container["resources"] = {} - if requests: - container["resources"]["requests"] = requests - if limits: - container["resources"]["limits"] = limits - - pod_labels = _labels(name, user_id) - - return { - "apiVersion": "v1", - "kind": "Pod", - "metadata": { - "name": _resource_name(name, "pod"), - "namespace": namespace, - "labels": pod_labels, - "ownerReferences": [owner_ref], - }, - "spec": { - "containers": [container], - "volumes": volumes, - "restartPolicy": "Always", - "enableServiceLinks": False, - "automountServiceAccountToken": False, - }, - } - - -def _build_service_manifest( - name: str, namespace: str, owner_ref: dict, user_id: str = "" -) -> dict: - return { - "apiVersion": "v1", - "kind": "Service", - "metadata": { - "name": _resource_name(name, "svc"), - "namespace": namespace, - "labels": _labels(name, user_id), - "ownerReferences": [owner_ref], - }, - "spec": { - "type": "ClusterIP", - "selector": {"openwebui.com/terminal": name}, - "ports": [ - {"name": "http", "port": 8000, "targetPort": 8000, "protocol": "TCP"} - ], - }, - } - - -def _build_secret_manifest( - name: str, namespace: str, api_key: str, owner_ref: dict, - user_id: str = "", -) -> dict: - return { - "apiVersion": "v1", - "kind": "Secret", - "metadata": { - "name": _resource_name(name, "apikey"), - "namespace": namespace, - "labels": _labels(name, user_id), - "ownerReferences": [owner_ref], - }, - "type": "Opaque", - "data": { - "api-key": base64.b64encode(api_key.encode()).decode(), - }, - } - - -def _build_pvc_manifest( - name: str, namespace: str, spec: dict, owner_ref: dict, - user_id: str = "", -) -> dict: - persistence = spec.get("persistence", {}) - size = persistence.get("size", "1Gi") - storage_class = persistence.get("storageClass", "") - - # NOTE: PVCs intentionally have NO ownerReference so they survive - # Terminal CR deletion. User workspace data must persist across - # idle-reap / re-provision cycles. - pvc = { - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": { - "name": _resource_name(name, "pvc"), - "namespace": namespace, - "labels": _labels(name, user_id), - }, - "spec": { - "accessModes": ["ReadWriteOnce"], - "resources": {"requests": {"storage": size}}, - }, - } - if storage_class: - pvc["spec"]["storageClassName"] = storage_class - return pvc - - -# --------------------------------------------------------------------------- -# Create handler -# --------------------------------------------------------------------------- - - -@kopf.on.create(GROUP, VERSION, PLURAL) -async def on_create(body, spec, name, namespace, patch, **_): - """Create all child resources for a new Terminal CR.""" - log.info("Creating terminal %s/%s for user %s", namespace, name, spec.get("userId")) - - user_id = spec.get("userId", "") - owner_ref = _owner_ref(body) - api_key = _generate_api_key() - core_v1 = k8s.CoreV1Api() - - # -- Status: Provisioning - patch.status["phase"] = "Provisioning" - patch.status["lastActivityAt"] = _now_iso() - patch.status["conditions"] = _set_condition( - {}, "Ready", "False", "Provisioning", "Creating child resources" - ) - - # -- PVC (if persistence enabled) - persistence = spec.get("persistence", {}) - pvc_name = None - if persistence.get("enabled", True): - pvc_name = _resource_name(name, "pvc") - pvc_manifest = _build_pvc_manifest(name, namespace, spec, owner_ref, user_id=user_id) - try: - core_v1.create_namespaced_persistent_volume_claim( - namespace=namespace, body=pvc_manifest - ) - log.info("Created PVC %s/%s", namespace, pvc_name) - except k8s.exceptions.ApiException as e: - if e.status == 409: - log.info("PVC %s/%s already exists", namespace, pvc_name) - else: - raise - - # -- Secret (API key) - secret_name = _resource_name(name, "apikey") - secret_manifest = _build_secret_manifest(name, namespace, api_key, owner_ref, user_id=user_id) - try: - core_v1.create_namespaced_secret(namespace=namespace, body=secret_manifest) - log.info("Created Secret %s/%s", namespace, secret_name) - except k8s.exceptions.ApiException as e: - if e.status == 409: - log.info("Secret %s/%s already exists, reading existing key", namespace, secret_name) - existing = core_v1.read_namespaced_secret(secret_name, namespace) - api_key = base64.b64decode(existing.data["api-key"]).decode() - else: - raise - - # -- Service - svc_name = _resource_name(name, "svc") - svc_manifest = _build_service_manifest(name, namespace, owner_ref, user_id=user_id) - try: - core_v1.create_namespaced_service(namespace=namespace, body=svc_manifest) - log.info("Created Service %s/%s", namespace, svc_name) - except k8s.exceptions.ApiException as e: - if e.status == 409: - log.info("Service %s/%s already exists", namespace, svc_name) - else: - raise - - # -- Pod - pod_name = _resource_name(name, "pod") - pod_manifest = _build_pod_manifest( - name, namespace, spec, api_key, owner_ref, pvc_name, user_id=user_id - ) - try: - core_v1.create_namespaced_pod(namespace=namespace, body=pod_manifest) - log.info("Created Pod %s/%s", namespace, pod_name) - except k8s.exceptions.ApiException as e: - if e.status == 409: - log.info("Pod %s/%s already exists", namespace, pod_name) - else: - raise - - # -- Update status - service_url = f"http://{svc_name}.{namespace}.svc:8000" - patch.status["podName"] = pod_name - patch.status["serviceName"] = svc_name - patch.status["serviceUrl"] = service_url - patch.status["apiKeySecret"] = secret_name - patch.status["phase"] = "Pending" - patch.status["conditions"] = _set_condition( - {"conditions": patch.status.get("conditions", [])}, - "Ready", - "False", - "PodNotReady", - "Waiting for pod to become ready", - ) - - -# --------------------------------------------------------------------------- -# Delete handler (cleanup is automatic via ownerReferences, but log it) -# --------------------------------------------------------------------------- - - -@kopf.on.delete(GROUP, VERSION, PLURAL) -async def on_delete(name, namespace, **_): - """Log deletion — child resources are cleaned up via ownerReferences.""" - log.info( - "Terminal %s/%s deleted. Child resources will be garbage-collected.", - namespace, - name, - ) - - -# --------------------------------------------------------------------------- -# Pod watcher — update Terminal status when pod phase changes -# --------------------------------------------------------------------------- - - -@kopf.on.event("v1", "pods", labels={"app.kubernetes.io/managed-by": "terminals"}) -async def on_pod_event(event, body, **_): - """Watch terminal pods and reflect readiness back into the Terminal CR status.""" - pod = body - labels = pod.get("metadata", {}).get("labels", {}) - terminal_name = labels.get("openwebui.com/terminal") - if not terminal_name: - return - - namespace = pod["metadata"]["namespace"] - pod_phase = (pod.get("status") or {}).get("phase", "Unknown") - - # Check container readiness - container_statuses = (pod.get("status") or {}).get("containerStatuses", []) - is_ready = any(cs.get("ready", False) for cs in container_statuses) - - custom_api = k8s.CustomObjectsApi() - try: - terminal = custom_api.get_namespaced_custom_object( - group=GROUP, - version=VERSION, - namespace=namespace, - plural=PLURAL, - name=terminal_name, - ) - except k8s.exceptions.ApiException as e: - if e.status == 404: - return - raise - - current_status = terminal.get("status", {}) - current_phase = current_status.get("phase") - - # Don't update if terminal is being torn down - if current_phase in ("Idle",): - return - - new_phase = current_phase - if is_ready and pod_phase == "Running": - new_phase = "Running" - elif pod_phase in ("Pending",): - new_phase = "Pending" - elif pod_phase in ("Failed", "Unknown"): - new_phase = "Error" - - if new_phase == current_phase and current_phase == "Running" and is_ready: - return # No change needed - - conditions = _set_condition( - current_status, - "Ready", - "True" if is_ready else "False", - "PodReady" if is_ready else "PodNotReady", - f"Pod phase: {pod_phase}", - ) - - status_patch = { - "status": { - "phase": new_phase, - "conditions": conditions, - } - } - - if is_ready and new_phase == "Running": - status_patch["status"]["lastActivityAt"] = _now_iso() - - try: - custom_api.patch_namespaced_custom_object_status( - group=GROUP, - version=VERSION, - namespace=namespace, - plural=PLURAL, - name=terminal_name, - body=status_patch, - ) - except k8s.exceptions.ApiException as e: - if e.status == 404: - return - log.warning("Failed to patch Terminal %s/%s status: %s", namespace, terminal_name, e) - - -# --------------------------------------------------------------------------- -# Idle timeout timer -# --------------------------------------------------------------------------- - - -@kopf.timer(GROUP, VERSION, PLURAL, interval=60, idle=60) -async def idle_check(spec, status, name, namespace, **_): - """Periodically check if a terminal has exceeded its idle timeout.""" - phase = (status or {}).get("phase") - if phase not in ("Running", "Idle"): - return - - last_activity = (status or {}).get("lastActivityAt") - if not last_activity: - return - - timeout_minutes = spec.get("idleTimeoutMinutes", 30) - try: - last_dt = datetime.fromisoformat(last_activity.replace("Z", "+00:00")) - except (ValueError, TypeError): - return - - elapsed = (datetime.now(timezone.utc) - last_dt).total_seconds() / 60 - - if elapsed < timeout_minutes: - return - - log.info( - "Terminal %s/%s idle for %.1f min (timeout=%d). Deleting pod.", - namespace, - name, - elapsed, - timeout_minutes, - ) - - pod_name = (status or {}).get("podName") - if not pod_name: - return - - # Delete the pod to free resources; the PVC, Secret, and CRD remain - core_v1 = k8s.CoreV1Api() - try: - core_v1.delete_namespaced_pod(name=pod_name, namespace=namespace) - except k8s.exceptions.ApiException as e: - if e.status == 404: - log.info("Pod %s/%s already gone", namespace, pod_name) - else: - raise - - # Update status to Idle - custom_api = k8s.CustomObjectsApi() - try: - custom_api.patch_namespaced_custom_object_status( - group=GROUP, - version=VERSION, - namespace=namespace, - plural=PLURAL, - name=name, - body={ - "status": { - "phase": "Idle", - "conditions": _set_condition( - status, - "Ready", - "False", - "IdleTimeout", - f"Pod deleted after {elapsed:.0f} min of inactivity", - ), - } - }, - ) - except k8s.exceptions.ApiException: - pass diff --git a/terminals/backends/__init__.py b/terminals/backends/__init__.py index 21c5742..30a469f 100644 --- a/terminals/backends/__init__.py +++ b/terminals/backends/__init__.py @@ -14,8 +14,8 @@ def create_backend() -> Backend: from terminals.backends.kubernetes import KubernetesBackend return KubernetesBackend() - elif settings.backend == "kubernetes-operator": - from terminals.backends.kubernetes_operator import KubernetesOperatorBackend + elif settings.backend == "kubernetes-sandbox": + from terminals.backends.kubernetes_sandbox import KubernetesSandboxBackend - return KubernetesOperatorBackend() + return KubernetesSandboxBackend() raise ValueError(f"Unknown backend: {settings.backend!r}") diff --git a/terminals/backends/kubernetes_operator.py b/terminals/backends/kubernetes_operator.py deleted file mode 100644 index 99d033c..0000000 --- a/terminals/backends/kubernetes_operator.py +++ /dev/null @@ -1,618 +0,0 @@ -"""Kubernetes Operator backend — manages Terminals via CRDs. - -Instead of creating Pods/Services directly, this backend creates and manages -``Terminal`` custom resources. A separate Kopf-based operator watches these -CRs and reconciles the underlying Pods, Services, Secrets, and PVCs. - -The operator generates API keys and stores them in Kubernetes Secrets. -This backend reads the key from the Secret referenced in ``status.apiKeySecret``. -""" - -import asyncio -import base64 -import hashlib -import logging -import re -from typing import Optional - -from kubernetes_asyncio import client, config -from kubernetes_asyncio.client import ApiClient - -from terminals.backends.base import Backend -from terminals.config import settings - -log = logging.getLogger(__name__) - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -_DNS_SAFE = re.compile(r"[^a-z0-9-]") - - -def _sanitize_name(user_id: str, policy_id: str = "default") -> str: - """Deterministic, DNS-safe Terminal CR name from a user ID + policy.""" - short = hashlib.sha256(user_id.encode()).hexdigest()[:12] - if policy_id == "default": - return f"terminal-{short}" - policy_slug = _DNS_SAFE.sub("-", policy_id.lower()).strip("-")[:20] - return f"terminal-{short}-{policy_slug}" - - -class KubernetesOperatorBackend(Backend): - """Manage terminal instances via Terminal CRDs. - - The backend creates/deletes ``Terminal`` custom resources in the - configured namespace. A Kopf operator running in the cluster watches - these resources and manages the actual Pods, Services, Secrets, and PVCs. - """ - - def __init__(self) -> None: - super().__init__() - self._api_client: Optional[ApiClient] = None - - async def _ensure_client(self) -> ApiClient: - if self._api_client is None: - if settings.kubernetes_kubeconfig: - await config.load_kube_config( - config_file=settings.kubernetes_kubeconfig - ) - else: - config.load_incluster_config() - self._api_client = ApiClient() - return self._api_client - - @property - def _group(self) -> str: - return settings.kubernetes_crd_group - - @property - def _version(self) -> str: - return settings.kubernetes_crd_version - - @property - def _plural(self) -> str: - return "terminals" - - # ------------------------------------------------------------------ - # Internal helpers - # ------------------------------------------------------------------ - - async def _read_api_key_from_secret(self, secret_name: str) -> Optional[str]: - """Read the API key from a Kubernetes Secret.""" - api_client = await self._ensure_client() - core = client.CoreV1Api(api_client) - ns = settings.kubernetes_namespace - try: - secret = await core.read_namespaced_secret(secret_name, ns) - raw = secret.data.get("api-key", "") - return base64.b64decode(raw).decode() if raw else None - except client.exceptions.ApiException as e: - if e.status == 404: - return None - raise - - async def _get_terminal_cr( - self, user_id: str, policy_id: str = "default" - ) -> Optional[dict]: - """Get the Terminal CR for a user+policy, or None if it doesn't exist.""" - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - name = _sanitize_name(user_id, policy_id) - ns = settings.kubernetes_namespace - try: - return await custom.get_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - name=name, - ) - except client.exceptions.ApiException as e: - if e.status == 404: - return None - raise - - async def _create_terminal_cr( - self, - user_id: str, - policy_id: str = "default", - spec: dict | None = None, - ) -> dict: - """Create a Terminal CR for a user+policy and return it.""" - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - name = _sanitize_name(user_id, policy_id) - ns = settings.kubernetes_namespace - s = spec or {} - - image = s.get("image", settings.kubernetes_image) - storage_size = s.get("storage") # absent = ephemeral - - # Build flat CRD spec aligned with policy schema - cr_spec: dict = { - "userId": user_id, - "image": image, - } - - # CPU / memory limits (flat — no nesting) - if s.get("cpu_limit"): - cr_spec["cpuLimit"] = s["cpu_limit"] - if s.get("memory_limit"): - cr_spec["memoryLimit"] = s["memory_limit"] - - # Storage: present = persistent, absent = ephemeral - if storage_size: - cr_spec["storage"] = storage_size - if settings.kubernetes_storage_class: - cr_spec["storageClass"] = settings.kubernetes_storage_class - # Storage mode (per-user, shared, shared-rwo) - storage_mode = s.get("storage_mode", settings.kubernetes_storage_mode) - cr_spec["storageMode"] = storage_mode - - # Env vars - env = s.get("env", {}) - if env: - cr_spec["env"] = env - - # Idle timeout - idle_timeout = s.get("idle_timeout_minutes", settings.idle_timeout_minutes) - if idle_timeout and idle_timeout > 0: - cr_spec["idleTimeoutMinutes"] = idle_timeout - - policy_slug = _DNS_SAFE.sub("-", policy_id.lower()).strip("-")[:20] - - cr = { - "apiVersion": f"{self._group}/{self._version}", - "kind": "Terminal", - "metadata": { - "name": name, - "namespace": ns, - "labels": { - "app.kubernetes.io/managed-by": "terminals", - "app.kubernetes.io/part-of": "open-terminal", - "openwebui.com/user-id": user_id, - "openwebui.com/policy": policy_slug, - }, - }, - "spec": cr_spec, - } - - try: - return await custom.create_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - body=cr, - ) - except client.exceptions.ApiException as exc: - if exc.status == 409: - # Already exists — but may be mid-deletion (finalizer pending). - existing = await self._get_terminal_cr(user_id, policy_id) - if existing and existing.get("metadata", {}).get("deletionTimestamp"): - # CR is being deleted; wait for it to vanish, then retry create - await self._wait_for_deletion(user_id, policy_id, timeout=60) - return await custom.create_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - body=cr, - ) - if existing: - return existing - # Gone between the 409 and our GET — safe to retry - return await custom.create_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - body=cr, - ) - raise - - async def _delete_terminal_cr( - self, - user_id: str, - policy_id: str = "default", - wait: bool = True, - timeout: int = 60, - ) -> bool: - """Delete the Terminal CR for a user+policy and optionally wait for it to be gone. - - When *wait* is True (default), polls until the CR returns 404 so that - a subsequent create won't collide with the kopf finalizer. - """ - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - name = _sanitize_name(user_id, policy_id) - ns = settings.kubernetes_namespace - try: - await custom.delete_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - name=name, - ) - except client.exceptions.ApiException as e: - if e.status == 404: - return False - raise - - if not wait: - return True - - # Poll until the CR is fully removed (finalizer may delay deletion) - deadline = asyncio.get_event_loop().time() + timeout - while asyncio.get_event_loop().time() < deadline: - try: - await custom.get_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - name=name, - ) - except client.exceptions.ApiException as e: - if e.status == 404: - return True - raise - await asyncio.sleep(1) - - log.warning("Terminal CR %s not fully deleted after %ds", name, timeout) - return True - - async def _wait_for_deletion( - self, user_id: str, policy_id: str = "default", timeout: int = 60 - ) -> None: - """Poll until a Terminal CR no longer exists (404).""" - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - name = _sanitize_name(user_id, policy_id) - ns = settings.kubernetes_namespace - - deadline = asyncio.get_event_loop().time() + timeout - while asyncio.get_event_loop().time() < deadline: - try: - await custom.get_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - name=name, - ) - except client.exceptions.ApiException as e: - if e.status == 404: - return - raise - await asyncio.sleep(1) - - log.warning("Terminal CR %s still exists after %ds wait", name, timeout) - - async def _wait_for_ready( - self, name: str, namespace: str, timeout: int = 120 - ) -> Optional[dict]: - """Poll the CR status until Running with serviceUrl and apiKeySecret. - - Returns a dict with ``service_url`` and ``api_key``, or None on timeout. - """ - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - - deadline = asyncio.get_event_loop().time() + timeout - while asyncio.get_event_loop().time() < deadline: - try: - cr = await custom.get_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=namespace, - plural=self._plural, - name=name, - ) - status = cr.get("status", {}) - if ( - status.get("phase") == "Running" - and status.get("serviceUrl") - and status.get("apiKeySecret") - ): - api_key = await self._read_api_key_from_secret(status["apiKeySecret"]) - if api_key: - return { - "service_url": status["serviceUrl"], - "api_key": api_key, - } - except client.exceptions.ApiException: - pass - await asyncio.sleep(2) - - log.warning( - "Terminal CR %s did not reach Running in %ds", - name, - timeout, - ) - return None - - async def _name_from_uid(self, uid: str) -> Optional[str]: - """Look up a Terminal CR name by its UID.""" - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - ns = settings.kubernetes_namespace - - try: - result = await custom.list_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - label_selector="app.kubernetes.io/managed-by=terminals", - ) - for item in result.get("items", []): - if item["metadata"]["uid"] == uid: - return item["metadata"]["name"] - except client.exceptions.ApiException: - pass - return None - - def _parse_service_url(self, service_url: str) -> tuple[str, int]: - """Extract host and port from a service URL like http://svc:8000.""" - url = service_url.rstrip("/") - if "://" in url: - url = url.split("://", 1)[1] - if ":" in url: - host, port_str = url.rsplit(":", 1) - return host, int(port_str) - return url, 8000 - - # ------------------------------------------------------------------ - # Backend interface - # ------------------------------------------------------------------ - - async def provision( - self, - user_id: str, - policy_id: str = "default", - spec: dict | None = None, - ) -> Optional[dict]: - """Create a Terminal CR and wait for it to become ready. - - Returns connection info dict or ``None`` on timeout. - """ - cr = await self._create_terminal_cr(user_id, policy_id=policy_id, spec=spec) - name = cr["metadata"]["name"] - ns = settings.kubernetes_namespace - - ready = await self._wait_for_ready(name, ns, timeout=120) - if ready: - host, port = self._parse_service_url(ready["service_url"]) - return { - "instance_id": cr["metadata"]["uid"], - "instance_name": name, - "api_key": ready["api_key"], - "host": host, - "port": port, - } - - return None - - async def start(self, instance_id: str) -> bool: - """For idle terminals, delete and re-create the CR.""" - name = await self._name_from_uid(instance_id) - if name is None: - return False - - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - ns = settings.kubernetes_namespace - - try: - cr = await custom.get_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - name=name, - ) - except client.exceptions.ApiException: - return False - - phase = cr.get("status", {}).get("phase") - if phase == "Running": - return True - if phase in ("Pending", "Provisioning"): - return True # still coming up - - # Idle or Error — delete and let the caller re-provision - return False - - async def teardown(self, instance_id: str) -> None: - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - ns = settings.kubernetes_namespace - - name = await self._name_from_uid(instance_id) - if name is None: - log.warning("No Terminal CR found for UID %s", instance_id) - return - - try: - await custom.delete_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - name=name, - ) - log.info("Deleted Terminal CR %s", name) - except client.exceptions.ApiException: - log.warning( - "Could not delete Terminal CR %s (may already be gone)", name - ) - - async def status(self, instance_id: str) -> str: - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - ns = settings.kubernetes_namespace - - name = await self._name_from_uid(instance_id) - if name is None: - return "missing" - - try: - cr = await custom.get_namespaced_custom_object( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - name=name, - ) - phase = cr.get("status", {}).get("phase", "Unknown") - if phase == "Running": - return "running" - if phase in ("Provisioning", "Pending"): - return "running" # still coming up - if phase == "Idle": - return "stopped" - return "stopped" - except client.exceptions.ApiException: - return "missing" - - async def close(self) -> None: - if self._api_client is not None: - await self._api_client.close() - self._api_client = None - - # ------------------------------------------------------------------ - # Operator-aware ensure_terminal - # ------------------------------------------------------------------ - - async def ensure_terminal( - self, - user_id: str, - policy_id: str = "default", - spec: Optional[dict] = None, - ) -> Optional[dict]: - """Get or create a terminal, resolving from K8s CRDs. - - Uses a per-key lock so concurrent requests for the same user+policy - don't race to create the same CR. - - Returns a dict with ``api_key``, ``host``, ``port`` or ``None``. - """ - key = self._key(user_id, policy_id) - - # Fast path — check if CR is already Running (no lock needed). - cr = await self._get_terminal_cr(user_id, policy_id) - if cr: - status = cr.get("status") or {} - phase = status.get("phase") - if phase == "Running" and status.get("serviceUrl") and status.get("apiKeySecret"): - api_key = await self._read_api_key_from_secret(status["apiKeySecret"]) - if api_key: - host, port = self._parse_service_url(status["serviceUrl"]) - return { - "instance_id": cr["metadata"]["uid"], - "instance_name": cr["metadata"]["name"], - "api_key": api_key, - "host": host, - "port": port, - } - - # Serialise provisioning per key. - if key not in self._locks: - self._locks[key] = asyncio.Lock() - - async with self._locks[key]: - # Re-check after acquiring lock. - cr = await self._get_terminal_cr(user_id, policy_id) - - if cr is None: - return await self.provision(user_id, policy_id=policy_id, spec=spec) - - status = cr.get("status") or {} - phase = status.get("phase") - - if phase in ("Idle", "Error"): - log.info( - "Terminal CR %s in phase %s — deleting and re-provisioning", - cr["metadata"]["name"], - phase, - ) - await self._delete_terminal_cr(user_id, policy_id) - return await self.provision(user_id, policy_id=policy_id, spec=spec) - - if phase == "Running" and status.get("serviceUrl") and status.get("apiKeySecret"): - api_key = await self._read_api_key_from_secret(status["apiKeySecret"]) - if api_key: - host, port = self._parse_service_url(status["serviceUrl"]) - return { - "instance_id": cr["metadata"]["uid"], - "instance_name": cr["metadata"]["name"], - "api_key": api_key, - "host": host, - "port": port, - } - - # Still provisioning — wait for the operator to bring it up - name = cr["metadata"]["name"] - ns = settings.kubernetes_namespace - ready = await self._wait_for_ready(name, ns, timeout=120) - if ready: - host, port = self._parse_service_url(ready["service_url"]) - return { - "instance_id": cr["metadata"]["uid"], - "instance_name": cr["metadata"]["name"], - "api_key": ready["api_key"], - "host": host, - "port": port, - } - - return None - - async def get_terminal_info(self, user_id: str) -> Optional[dict]: - """Look up an existing terminal from the K8s CRD without creating one.""" - cr = await self._get_terminal_cr(user_id) - if cr is None: - return None - - status = cr.get("status") or {} - phase = status.get("phase") - - if phase == "Running" and status.get("serviceUrl") and status.get("apiKeySecret"): - api_key = await self._read_api_key_from_secret(status["apiKeySecret"]) - if api_key: - host, port = self._parse_service_url(status["serviceUrl"]) - return { - "instance_id": cr["metadata"]["uid"], - "instance_name": cr["metadata"]["name"], - "api_key": api_key, - "host": host, - "port": port, - } - - return None - - async def touch_activity( - self, user_id: str, policy_id: str = "default" - ) -> None: - """Update lastActivityAt on the Terminal CR to prevent idle culling.""" - api_client = await self._ensure_client() - custom = client.CustomObjectsApi(api_client) - name = _sanitize_name(user_id, policy_id) - ns = settings.kubernetes_namespace - from datetime import datetime, timezone - now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - try: - await custom.patch_namespaced_custom_object_status( - group=self._group, - version=self._version, - namespace=ns, - plural=self._plural, - name=name, - body={"status": {"lastActivityAt": now}}, - _content_type="application/merge-patch+json", - ) - except client.exceptions.ApiException as e: - if e.status != 404: - log.warning("Failed to touch activity for %s: %s", name, e) diff --git a/terminals/backends/kubernetes_sandbox.py b/terminals/backends/kubernetes_sandbox.py new file mode 100644 index 0000000..11bff43 --- /dev/null +++ b/terminals/backends/kubernetes_sandbox.py @@ -0,0 +1,498 @@ +"""Kubernetes Agent Sandbox backend — manages terminals via the upstream +`agent-sandbox `_ ``Sandbox`` CRD. + +Each user+policy maps to a single ``Sandbox`` (``agents.x-k8s.io/v1beta1``): the +agent-sandbox controller reconciles it into a Pod, a headless Service (giving a +stable ``serviceFQDN``), and — when a workspace is requested — a PersistentVolume. +This backend only creates/patches/deletes ``Sandbox`` objects; the controller +(installed separately) does the rest. + +Lifecycle uses the Sandbox ``operatingMode`` field for idle handling: on idle we +patch the Sandbox to ``operatingMode: Suspended`` (scale-to-zero, identity + +workspace preserved); on the next request we patch it back to ``operatingMode: +Running``. Teardown deletes the Sandbox. + +The per-user Open Terminal API key is generated here and baked into the Sandbox's +pod template env (``OPEN_TERMINAL_API_KEY``); it is read back from the Sandbox object +when resolving connection info, so the backend stays stateless across restarts. + +Targets agent-sandbox ``v1beta1`` (v0.5.x). The capabilities this backend still +self-manages — deciding *when* to suspend based on request activity, and generating +the per-user key — are on the upstream roadmap (``Auto Suspend/Resume``, ``Scale to +Zero``, ``Sandbox/Pod Identity Association``); aligning to v1beta1 positions us to +drop those shims as the controller gains them. The v0.4.x API used a ``replicas`` +field instead of ``operatingMode``; pin ``TERMINALS_SANDBOX_VERSION=v1alpha1`` and +adjust :meth:`_set_operating_mode` if you must target the older controller. +""" + +import asyncio +import hashlib +import logging +import re +import secrets +import string +import time +from typing import Optional + +from kubernetes_asyncio import client, config +from kubernetes_asyncio.client import ApiClient + +from terminals.backends.base import Backend +from terminals.config import settings + +log = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_DNS_SAFE = re.compile(r"[^a-z0-9-]") + +_PLURAL_SANDBOX = "sandboxes" +_API_KEY_ENV = "OPEN_TERMINAL_API_KEY" +_CONTAINER_NAME = "open-terminal" + + +def _policy_slug(policy_id: str) -> str: + """DNS-safe slug for a policy id.""" + return _DNS_SAFE.sub("-", policy_id.lower()).strip("-")[:20] or "default" + + +def _sandbox_name(user_id: str, policy_id: str = "default") -> str: + """Deterministic, DNS-safe Sandbox name from a user id + policy. + + Mirrors the previous operator backend's scheme so names stay stable and + within the 63-character DNS label limit. + """ + short = hashlib.sha256(user_id.encode()).hexdigest()[:12] + if policy_id == "default": + return f"term-{short}" + return f"term-{short}-{_policy_slug(policy_id)}" + + +def _generate_api_key(length: int = 48) -> str: + alphabet = string.ascii_letters + string.digits + return "sk-" + "".join(secrets.choice(alphabet) for _ in range(length)) + + +def _labels(user_id: str = "", policy_id: str = "default") -> dict[str, str]: + labels = { + "app.kubernetes.io/name": "open-terminal", + "app.kubernetes.io/managed-by": "terminals", + "app.kubernetes.io/part-of": "open-terminal", + "openwebui.com/policy": _policy_slug(policy_id), + } + if user_id: + labels["openwebui.com/user-id"] = user_id + return labels + + +class KubernetesSandboxBackend(Backend): + """Manage terminal instances via Agent Sandbox ``Sandbox`` objects.""" + + def __init__(self) -> None: + super().__init__() + self._api_client: Optional[ApiClient] = None + + # ------------------------------------------------------------------ + # Client / config plumbing + # ------------------------------------------------------------------ + + async def _ensure_client(self) -> ApiClient: + if self._api_client is None: + if settings.kubernetes_kubeconfig: + await config.load_kube_config(config_file=settings.kubernetes_kubeconfig) + else: + config.load_incluster_config() + self._api_client = ApiClient() + return self._api_client + + async def _custom(self) -> client.CustomObjectsApi: + return client.CustomObjectsApi(await self._ensure_client()) + + @property + def _ns(self) -> str: + return settings.kubernetes_namespace + + @property + def _group(self) -> str: + return settings.sandbox_core_group + + @property + def _version(self) -> str: + return settings.sandbox_version + + # ------------------------------------------------------------------ + # Manifest builders + # ------------------------------------------------------------------ + + def _build_pod_template(self, spec: dict, api_key: str) -> dict: + """Build the Sandbox ``podTemplate`` from a policy spec + per-user key.""" + image = spec.get("image", settings.kubernetes_image) + port = settings.sandbox_port + + env = [ + {"name": _API_KEY_ENV, "value": api_key}, + {"name": "OPEN_TERMINAL_HOST", "value": "0.0.0.0"}, + {"name": "OPEN_TERMINAL_PORT", "value": str(port)}, + ] + for k, v in (spec.get("env") or {}).items(): + env.append({"name": k, "value": str(v)}) + + container: dict = { + "name": _CONTAINER_NAME, + "image": image, + "ports": [{"containerPort": port, "name": "http", "protocol": "TCP"}], + "env": env, + "readinessProbe": { + "httpGet": {"path": "/health", "port": port}, + "initialDelaySeconds": 3, + "periodSeconds": 5, + }, + "livenessProbe": { + "httpGet": {"path": "/health", "port": port}, + "initialDelaySeconds": 10, + "periodSeconds": 15, + }, + } + + requests = {"cpu": "100m", "memory": "256Mi"} + limits = {} + if spec.get("cpu_limit"): + limits["cpu"] = spec["cpu_limit"] + if spec.get("memory_limit"): + limits["memory"] = spec["memory_limit"] + container["resources"] = {"requests": requests} + if limits: + container["resources"]["limits"] = limits + + if spec.get("storage"): + container["volumeMounts"] = [ + {"name": "workspace", "mountPath": "/workspace"} + ] + + pod_spec: dict = { + "containers": [container], + "restartPolicy": "Always", + "enableServiceLinks": False, + "automountServiceAccountToken": False, + } + if settings.sandbox_runtime_class: + pod_spec["runtimeClassName"] = settings.sandbox_runtime_class + + return {"spec": pod_spec} + + def _build_volume_claim_templates(self, spec: dict) -> list[dict]: + """Build ``volumeClaimTemplates`` (empty when storage is ephemeral).""" + size = spec.get("storage") + if not size: + return [] + claim_spec: dict = { + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": size}}, + } + storage_class = spec.get("storage_class") or settings.kubernetes_storage_class + if storage_class: + claim_spec["storageClassName"] = storage_class + return [{"metadata": {"name": "workspace"}, "spec": claim_spec}] + + def _build_sandbox( + self, user_id: str, policy_id: str, spec: dict, api_key: str + ) -> dict: + body: dict = { + "apiVersion": f"{self._group}/{self._version}", + "kind": "Sandbox", + "metadata": { + "name": _sandbox_name(user_id, policy_id), + "namespace": self._ns, + "labels": _labels(user_id, policy_id), + }, + "spec": { + "service": True, # headless Service → stable serviceFQDN + "podTemplate": self._build_pod_template(spec, api_key), + }, + } + vct = self._build_volume_claim_templates(spec) + if vct: + body["spec"]["volumeClaimTemplates"] = vct + return body + + # ------------------------------------------------------------------ + # Custom-object CRUD + # ------------------------------------------------------------------ + + async def _get_sandbox(self, name: str) -> Optional[dict]: + custom = await self._custom() + try: + return await custom.get_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._ns, + plural=_PLURAL_SANDBOX, + name=name, + ) + except client.exceptions.ApiException as e: + if e.status == 404: + return None + raise + + async def _create_sandbox(self, body: dict) -> Optional[dict]: + custom = await self._custom() + try: + return await custom.create_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._ns, + plural=_PLURAL_SANDBOX, + body=body, + ) + except client.exceptions.ApiException as e: + if e.status == 409: + return await self._get_sandbox(body["metadata"]["name"]) + raise + + async def _set_operating_mode(self, name: str, mode: str) -> None: + """Patch ``spec.operatingMode`` ("Running" or "Suspended").""" + custom = await self._custom() + try: + await custom.patch_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._ns, + plural=_PLURAL_SANDBOX, + name=name, + body={"spec": {"operatingMode": mode}}, + _content_type="application/merge-patch+json", + ) + log.info("Set Sandbox %s operatingMode=%s", name, mode) + except client.exceptions.ApiException as e: + if e.status != 404: + raise + + # ------------------------------------------------------------------ + # Status / connection helpers + # ------------------------------------------------------------------ + + @staticmethod + def _is_suspended(sandbox: dict) -> bool: + """True when the Sandbox's desired state is Suspended.""" + return (sandbox.get("spec") or {}).get("operatingMode", "Running") == "Suspended" + + @staticmethod + def _sandbox_running(sandbox: dict) -> bool: + """True when the Sandbox desires Running and is *freshly* Ready. + + The controller's ``Ready`` condition is stale immediately after a spec + change (e.g. resuming from a suspend): it still reads ``True`` from before + the pod was torn down. We guard against that by requiring the condition's + ``observedGeneration`` to have caught up to ``metadata.generation`` — i.e. + the controller has reconciled the *current* spec — so we never report a + not-yet-recreated pod as ready. + """ + if KubernetesSandboxBackend._is_suspended(sandbox): + return False + generation = (sandbox.get("metadata") or {}).get("generation") + status = sandbox.get("status") or {} + for c in status.get("conditions", []): + if c.get("type") == "Ready": + if c.get("status") != "True": + return False + obs = c.get("observedGeneration") + if generation is not None and obs is not None and obs < generation: + return False # stale — controller hasn't reconciled new spec + return True + return False + + @staticmethod + def _api_key_from_sandbox(sandbox: dict) -> Optional[str]: + containers = ( + ((sandbox.get("spec") or {}).get("podTemplate") or {}).get("spec") or {} + ).get("containers") or [] + for c in containers: + for ev in c.get("env") or []: + if ev.get("name") == _API_KEY_ENV: + return ev.get("value") + return None + + def _connection_info(self, sandbox: dict) -> Optional[dict]: + """Build the connection dict from a (ready) Sandbox, or None.""" + status = sandbox.get("status") or {} + host = status.get("serviceFQDN") + if not host: + pod_ips = status.get("podIPs") or [] + host = pod_ips[0] if pod_ips else None + if not host: + return None + api_key = self._api_key_from_sandbox(sandbox) + if not api_key: + return None + name = sandbox["metadata"]["name"] + return { + "instance_id": name, + "instance_name": name, + "api_key": api_key, + "host": host, + "port": settings.sandbox_port, + } + + async def _wait_until_ready( + self, name: str, timeout: int = 120 + ) -> Optional[dict]: + """Poll the Sandbox until it is Running with a connectable endpoint.""" + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + sandbox = await self._get_sandbox(name) + if sandbox is None: + return None + if self._sandbox_running(sandbox): + info = self._connection_info(sandbox) + if info: + return info + await asyncio.sleep(2) + log.warning("Sandbox %s not ready within %ds", name, timeout) + return None + + # ------------------------------------------------------------------ + # Backend interface + # ------------------------------------------------------------------ + + async def provision( + self, + user_id: str, + policy_id: str = "default", + spec: Optional[dict] = None, + ) -> Optional[dict]: + """Create a Sandbox for the user+policy and wait until it is ready.""" + spec = spec or {} + api_key = _generate_api_key() + body = self._build_sandbox(user_id, policy_id, spec, api_key) + await self._create_sandbox(body) + return await self._wait_until_ready(_sandbox_name(user_id, policy_id)) + + async def start(self, instance_id: str) -> bool: + """Resume a suspended sandbox; idempotent if already running.""" + sandbox = await self._get_sandbox(instance_id) + if sandbox is None: + return False + if self._is_suspended(sandbox): + await self._set_operating_mode(instance_id, "Running") + return True + + async def teardown(self, instance_id: str) -> None: + """Delete the Sandbox (and its controller-managed Pod/Service/PVC).""" + custom = await self._custom() + try: + await custom.delete_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=self._ns, + plural=_PLURAL_SANDBOX, + name=instance_id, + ) + log.info("Deleted Sandbox %s", instance_id) + except client.exceptions.ApiException as e: + if e.status != 404: + log.warning("Could not delete Sandbox %s: %s", instance_id, e) + + async def _suspend(self, instance_id: str) -> None: + """Suspend the Sandbox (scale-to-zero, identity + workspace preserved).""" + await self._set_operating_mode(instance_id, "Suspended") + + async def status(self, instance_id: str) -> str: + sandbox = await self._get_sandbox(instance_id) + if sandbox is None: + return "missing" + return "running" if self._sandbox_running(sandbox) else "stopped" + + async def close(self) -> None: + if self._api_client is not None: + await self._api_client.close() + self._api_client = None + + # ------------------------------------------------------------------ + # Sandbox-aware ensure_terminal (get-or-create, resume if suspended) + # ------------------------------------------------------------------ + + async def ensure_terminal( + self, + user_id: str, + policy_id: str = "default", + spec: Optional[dict] = None, + ) -> Optional[dict]: + key = self._key(user_id, policy_id) + name = _sandbox_name(user_id, policy_id) + + # Fast path — sandbox exists and is Running. + sandbox = await self._get_sandbox(name) + if sandbox is not None and self._sandbox_running(sandbox): + info = self._connection_info(sandbox) + if info: + self._track(key, info, spec) + return info + + if key not in self._locks: + self._locks[key] = asyncio.Lock() + + async with self._locks[key]: + sandbox = await self._get_sandbox(name) + if sandbox is None: + info = await self.provision(user_id, policy_id=policy_id, spec=spec) + if info: + self._track(key, info, spec) + return info + + # Exists — resume if suspended, then wait until ready. + if self._is_suspended(sandbox): + await self._set_operating_mode(name, "Running") + info = await self._wait_until_ready(name) + if info: + self._track(key, info, spec) + return info + + def _track(self, key: str, info: dict, spec: Optional[dict]) -> None: + self._instances[key] = info + self._specs[key] = spec or {} + self._activity[key] = time.monotonic() + + async def get_terminal_info(self, user_id: str) -> Optional[dict]: + sandbox = await self._get_sandbox(_sandbox_name(user_id)) + if sandbox is None or not self._sandbox_running(sandbox): + return None + return self._connection_info(sandbox) + + async def touch_activity( + self, user_id: str, policy_id: str = "default" + ) -> None: + # Activity is tracked in-memory; the idle reaper suspends idle sandboxes. + self._activity[self._key(user_id, policy_id)] = time.monotonic() + + # ------------------------------------------------------------------ + # Idle reaper — suspend (operatingMode Suspended) instead of tear down + # ------------------------------------------------------------------ + + async def _reap_idle(self) -> None: + """Suspend (not delete) sandboxes idle past their timeout.""" + now = time.monotonic() + for key in list(self._instances): + info = self._instances.get(key) + if info is None: + continue + spec = self._specs.get(key, {}) + timeout_min = spec.get( + "idle_timeout_minutes", settings.idle_timeout_minutes + ) + if not timeout_min or timeout_min <= 0: + continue + if now - self._activity.get(key, now) < timeout_min * 60: + continue + + log.info("Suspending idle sandbox %s (key=%s)", info.get("instance_name"), key) + try: + await self._suspend(info["instance_id"]) + except Exception: + log.exception("Failed to suspend %s", key) + # Drop from active tracking; a returning request will resume it. + self._instances.pop(key, None) + self._specs.pop(key, None) + self._activity.pop(key, None) + self._locks.pop(key, None) diff --git a/terminals/config.py b/terminals/config.py index 4d99e3f..d6c47f0 100644 --- a/terminals/config.py +++ b/terminals/config.py @@ -19,7 +19,7 @@ class Settings(BaseSettings): # Database database_url: str = f"sqlite+aiosqlite:///{_DEFAULT_DATA_DIR}/terminals.db" - # Backend selection: "docker", "kubernetes", or "kubernetes-operator" + # Backend selection: "docker", "kubernetes", or "kubernetes-sandbox" backend: str = "docker" # Docker settings @@ -41,9 +41,13 @@ class Settings(BaseSettings): kubernetes_kubeconfig: str = "" # empty = in-cluster config kubernetes_labels: str = "" # extra labels as "k=v,k2=v2" - # Operator-specific settings - kubernetes_crd_group: str = "openwebui.com" - kubernetes_crd_version: str = "v1alpha1" + # Agent Sandbox settings (TERMINALS_BACKEND=kubernetes-sandbox) + # Upstream project: https://github.com/kubernetes-sigs/agent-sandbox + sandbox_core_group: str = "agents.x-k8s.io" # Sandbox CRD group + sandbox_ext_group: str = "extensions.agents.x-k8s.io" # Template/WarmPool/Claim group + sandbox_version: str = "v1beta1" + sandbox_port: int = 8000 # open-terminal container port + sandbox_runtime_class: str = "" # e.g. "gvisor" or "kata-qemu" for isolation # Idle reaper — tear down terminals after N minutes of inactivity (0 = disabled) idle_timeout_minutes: int = 0