-
Notifications
You must be signed in to change notification settings - Fork 3
Add Kubernetes native launcher #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
c29a8fe
8a552d4
46bc860
b48dafb
2b8a6ae
5568711
0130881
8e9d86c
9e172e3
98c1f7d
f3aa634
385c2ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,3 +15,7 @@ volumes/ | |
| *.key | ||
| *.pem | ||
| .jwks | ||
|
|
||
| # Python | ||
| __pycache__/ | ||
| *.pyc | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In all the functions with parameters: add the types to the contruct function and remove from the comments |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -106,6 +106,7 @@ public function __construct($tool, $execution = "", $project = "", $descrip = "" | |
| switch ($this->launcher) { | ||
| case "SGE": | ||
| case "docker_SGE": | ||
| case "kubernetes_native": | ||
| $this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id']; | ||
| $this->root_dir_mug = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual']; | ||
| $this->pub_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['pubDir_virtual']; | ||
|
|
@@ -978,6 +979,7 @@ public function prepareExecution($tool, $metadata, $dataLocations = [], $metadat | |
|
|
||
| switch ($this->launcher) { | ||
| case "SGE": | ||
| case "kubernetes_native": | ||
| $cmd = $this->setBashCmd_SGE($tool); | ||
| $this->createSubmitFile_SGE($cmd); | ||
|
|
||
|
|
@@ -1208,6 +1210,10 @@ protected function setBashCommandDockerSge($tool) | |
| " --out_metadata " . $this->stageout_file_virtual . | ||
| " --log_file " . $this->log_file_virtual; | ||
|
|
||
| if (isset($this->launcher) && $this->launcher === "kubernetes_native") { | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The launcher |
||
| return $cmd_vre; | ||
| } | ||
|
|
||
|
|
||
| $cmd = "docker run --privileged -v /var/run/docker.sock:/var/run/docker.sock -d" . | ||
| " " . $cmd_envs . | ||
|
|
@@ -1438,7 +1444,7 @@ public function submit($tool) | |
| case "SGE": | ||
| case "ega_demo": | ||
| case "docker_SGE": | ||
| return $this->enqueue($tool); | ||
| case "kubernetes_native": | ||
| case "Slurm_Singularity": | ||
| return $this->enqueue($tool); | ||
| default: | ||
|
|
@@ -1461,8 +1467,22 @@ protected function enqueue($tool) | |
| $cpus = $launcherInfo['cpus'] ?? $tool['infrastructure']['cpus']; | ||
| $queue = $launcherInfo['queue'] ?? $tool['infrastructure']['clouds'][$this->cloudName]['queue']; | ||
| $this->logger->info("Resolved Parameters: Queue=$queue, CPUs=$cpus, Memory=$memory"); | ||
| $jobOptions = array(); | ||
| if ($jobManager === "kubernetes_native") { | ||
| $jobOptions["image"] = $tool['infrastructure']['container_image'] ?? ""; | ||
| if ($jobOptions["image"] === "") { | ||
| $_SESSION['errorData']['Error'][] = "Missing infrastructure.container_image for kubernetes_native launcher."; | ||
| return 0; | ||
| } | ||
| $jobOptions["env"] = array(); | ||
| if (isset($tool['infrastructure']['container_env']) && is_array($tool['infrastructure']['container_env'])) { | ||
| foreach ($tool['infrastructure']['container_env'] as $env_key => $env_value) { | ||
| $jobOptions["env"][$env_key] = (string)$env_value; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| $pid = execJob($this->working_dir, $this->submission_file, $queue, $cpus, $memory, $this->stdout_file, $this->stderr_file, $jobManager); | ||
| $pid = execJob($this->working_dir, $this->submission_file, $queue, $cpus, $memory, $this->stdout_file, $this->stderr_file, $jobManager, $this->toolId, $jobOptions); | ||
| $this->logger->info("Tool job submitted to SGE queue '$queue' (PID=$pid)"); | ||
|
|
||
| $this->pid = $pid; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| .git | ||
| .gitignore | ||
| __pycache__/ | ||
| *.pyc | ||
| *.pyo | ||
| *.pyd | ||
| .pytest_cache/ | ||
| .mypy_cache/ | ||
| .ruff_cache/ | ||
| dist/ | ||
| build/ | ||
| *.egg-info/ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| FROM python:3.11-alpine | ||
|
|
||
| WORKDIR /app | ||
|
|
||
| COPY app.py /app/app.py | ||
|
|
||
| EXPOSE 8080 | ||
|
|
||
| USER 65534:65534 | ||
|
|
||
| CMD ["python", "/app/app.py"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| import json | ||
| import os | ||
| import ssl | ||
| import urllib.error | ||
| import urllib.parse | ||
| import urllib.request | ||
| from http.server import BaseHTTPRequestHandler, HTTPServer | ||
|
|
||
|
|
||
| TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" | ||
| CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" | ||
| API_HOST = os.environ.get("KUBERNETES_SERVICE_HOST", "kubernetes.default.svc") | ||
| API_PORT = os.environ.get("KUBERNETES_SERVICE_PORT", "443") | ||
| DEFAULT_NAMESPACE = os.environ.get("DEFAULT_NAMESPACE", "default") | ||
| SCHEDULER_AUTH_TOKEN = os.environ.get("SCHEDULER_AUTH_TOKEN", "") | ||
|
|
||
| with open(TOKEN_PATH, "r", encoding="utf-8") as f: | ||
| SA_TOKEN = f.read().strip() | ||
|
|
||
| SSL_CTX = ssl.create_default_context(cafile=CA_PATH) | ||
|
|
||
|
|
||
| def k8s_request(method, path, body=None, content_type="application/json"): | ||
| url = f"https://{API_HOST}:{API_PORT}{path}" | ||
| data = body.encode("utf-8") if body is not None else None | ||
| req = urllib.request.Request(url, method=method, data=data) | ||
| req.add_header("Authorization", f"Bearer {SA_TOKEN}") | ||
| if body is not None: | ||
| req.add_header("Content-Type", content_type) | ||
| try: | ||
| with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as resp: | ||
| raw = resp.read().decode("utf-8") | ||
| return resp.getcode(), raw | ||
| except urllib.error.HTTPError as e: | ||
| raw = e.read().decode("utf-8") if e.fp else str(e) | ||
| return e.code, raw | ||
|
|
||
|
|
||
| class Handler(BaseHTTPRequestHandler): | ||
| def _authorized(self): | ||
| if SCHEDULER_AUTH_TOKEN == "": | ||
| return False | ||
| auth = self.headers.get("Authorization", "") | ||
| return auth == f"Bearer {SCHEDULER_AUTH_TOKEN}" | ||
|
|
||
| def _json(self, code, payload): | ||
| out = json.dumps(payload).encode("utf-8") | ||
| self.send_response(code) | ||
| self.send_header("Content-Type", "application/json") | ||
| self.send_header("Content-Length", str(len(out))) | ||
| self.end_headers() | ||
| self.wfile.write(out) | ||
|
|
||
| def _read_json(self): | ||
| length = int(self.headers.get("Content-Length", "0")) | ||
| if length == 0: | ||
| return {} | ||
| raw = self.rfile.read(length).decode("utf-8") | ||
| return json.loads(raw) | ||
|
|
||
| def do_GET(self): | ||
| if self.path == "/healthz": | ||
| return self._json(200, {"ok": True}) | ||
| if not self._authorized(): | ||
| return self._json(401, {"ok": False, "error": "unauthorized"}) | ||
| if self.path.startswith("/jobs/"): | ||
| name_q = self.path[len("/jobs/"):] | ||
| name, _, query = name_q.partition("?") | ||
| params = urllib.parse.parse_qs(query) | ||
| ns = params.get("namespace", [DEFAULT_NAMESPACE])[0] | ||
| code, raw = k8s_request("GET", f"/apis/batch/v1/namespaces/{ns}/jobs/{name}") | ||
| if code == 404: | ||
| return self._json(200, {"ok": True, "exists": False, "job": ""}) | ||
| if code >= 300: | ||
| return self._json(500, {"ok": False, "error": raw}) | ||
| return self._json(200, {"ok": True, "exists": True, "job": raw}) | ||
| return self._json(404, {"ok": False, "error": "not found"}) | ||
|
|
||
| def do_POST(self): | ||
| if not self._authorized(): | ||
| return self._json(401, {"ok": False, "error": "unauthorized"}) | ||
| if self.path != "/jobs": | ||
| return self._json(404, {"ok": False, "error": "not found"}) | ||
| try: | ||
| body = self._read_json() | ||
| ns = body.get("namespace") or DEFAULT_NAMESPACE | ||
| manifest = body.get("manifest") | ||
| if not manifest: | ||
| return self._json(400, {"ok": False, "error": "manifest is required"}) | ||
| code, raw = k8s_request( | ||
| "POST", | ||
| f"/apis/batch/v1/namespaces/{ns}/jobs", | ||
| body=manifest, | ||
| content_type="application/yaml", | ||
| ) | ||
| if code >= 300: | ||
| return self._json(500, {"ok": False, "error": raw}) | ||
| return self._json(200, {"ok": True, "stdout": raw, "stderr": ""}) | ||
| except Exception as e: | ||
| return self._json(500, {"ok": False, "error": str(e)}) | ||
|
|
||
| def do_DELETE(self): | ||
| if not self._authorized(): | ||
| return self._json(401, {"ok": False, "error": "unauthorized"}) | ||
| if not self.path.startswith("/jobs/"): | ||
| return self._json(404, {"ok": False, "error": "not found"}) | ||
| name_q = self.path[len("/jobs/"):] | ||
| name, _, query = name_q.partition("?") | ||
| params = urllib.parse.parse_qs(query) | ||
| ns = params.get("namespace", [DEFAULT_NAMESPACE])[0] | ||
| delete_opts = '{"apiVersion":"batch/v1","kind":"DeleteOptions","propagationPolicy":"Background"}' | ||
| code, raw = k8s_request( | ||
| "DELETE", | ||
| f"/apis/batch/v1/namespaces/{ns}/jobs/{name}", | ||
| body=delete_opts, | ||
| content_type="application/json", | ||
| ) | ||
| if code == 404: | ||
| return self._json( | ||
| 200, | ||
| {"ok": True, "stdout": f"job.batch/{name} already deleted", "stderr": ""}, | ||
| ) | ||
| if code >= 300: | ||
| return self._json(500, {"ok": False, "error": raw}) | ||
| return self._json(200, {"ok": True, "stdout": raw, "stderr": ""}) | ||
|
|
||
| def log_message(self, fmt, *args): | ||
| return | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| server = HTTPServer(("0.0.0.0", 8080), Handler) | ||
| server.serve_forever() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,9 +54,19 @@ RUN useradd -m application \ | |
| && echo "application:application" | chpasswd | ||
|
|
||
| # Make sure host and container share the same GID for group 'docker', bc it has reading permissions to the socket file | ||
| ARG DOCKER_GROUP | ||
| RUN groupmod -g $DOCKER_GROUP docker | ||
| RUN usermod -aG docker application | ||
|
|
||
| #ARG DOCKER_GROUP | ||
| #RUN groupmod -g $DOCKER_GROUP docker | ||
| #RUN usermod -aG docker application | ||
|
|
||
| ARG DOCKER_GROUP=1002 | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is already in the .env file |
||
| RUN set -eux; \ | ||
| if getent group docker >/dev/null 2>&1; then \ | ||
| groupmod -g "${DOCKER_GROUP}" docker; \ | ||
| else \ | ||
| groupadd -g "${DOCKER_GROUP}" docker; \ | ||
| fi; \ | ||
| usermod -aG docker application | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The docker group already exists at this point so I would keep the old code |
||
|
|
||
| # Add setup script and set permissions | ||
| ADD setup_gridengine.sh /usr/local/bin/setup_gridengine.sh | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removing this? The dependencies are defined in composer.json and are versioned, so the builds should be reproducible