From c29a8fe54a0b3d013ec90c5e312c8785228c4726 Mon Sep 17 00:00:00 2001 From: Yasir Date: Wed, 6 May 2026 10:12:05 +0000 Subject: [PATCH 01/11] changes for kubernetes --- front_end/Dockerfile | 6 +- .../public/phplib/classes/ProcessK8s.php | 490 ++++++++++++++++++ .../openVRE/public/phplib/classes/Tooljob.php | 45 +- .../openVRE/public/phplib/processJob.inc.php | 73 ++- 4 files changed, 562 insertions(+), 52 deletions(-) create mode 100755 front_end/openVRE/public/phplib/classes/ProcessK8s.php diff --git a/front_end/Dockerfile b/front_end/Dockerfile index ba1cdfb4..fb6af438 100644 --- a/front_end/Dockerfile +++ b/front_end/Dockerfile @@ -4,7 +4,7 @@ FROM webdevops/php-apache:8.3 ARG DEBIAN_FRONTEND=noninteractive RUN apt-get update -RUN a2enmod proxy proxy_http +RUN a2enmod proxy proxy_http ssl RUN apt install -y nodejs RUN apt-get install -y autoconf pkg-config libssl-dev RUN docker-php-ext-install bcmath @@ -25,7 +25,7 @@ RUN apt-get update && apt-get install -y libmcrypt-dev \ && docker-php-ext-install gd RUN apt-get update && apt-get install gnupg curl -RUN curl -fsSL https://www.mongodb.org/static/pgp/server-7.0.asc |gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg --dearmor +RUN curl -fsSL https://pgp.mongodb.com/server-7.0.asc |gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg --dearmor RUN echo "deb [ signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] http://repo.mongodb.org/apt/debian bookworm/mongodb-org/7.0 main" | tee /etc/apt/sources.list.d/mongodb-org-7.0.list RUN apt-get update RUN apt-get install -y mongodb-org @@ -51,6 +51,6 @@ USER application WORKDIR /var/www/html/openVRE/public RUN pwd -EXPOSE 88 443 +EXPOSE 443 CMD setup.sh && /opt/docker/bin/entrypoint.sh supervisord diff --git a/front_end/openVRE/public/phplib/classes/ProcessK8s.php b/front_end/openVRE/public/phplib/classes/ProcessK8s.php new file mode 100755 index 00000000..4e4341db --- /dev/null +++ b/front_end/openVRE/public/phplib/classes/ProcessK8s.php @@ -0,0 +1,490 @@ + "RUNNING", + "Pending" => "PENDING", + "Succeeded" => "FINISHING", + "Failed" => "ERROR", + "NotFound" => "FINISHING", + ); + + public function __construct($cl = false, $workDir = "", $queue = "", $jobname = "", $cpu = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobOptions = array()) + { + $this->namespace = getenv("OPENVRE_K8S_NAMESPACE") ?: "fedcomp"; + $this->jobImage = getenv("OPENVRE_K8S_JOB_IMAGE") ?: ""; + $this->sharedPvc = getenv("OPENVRE_K8S_SHARED_PVC") ?: "dashboard-frontend-sgecore-shareddata"; + $this->toolsPvc = getenv("OPENVRE_K8S_TOOLS_PVC") ?: "dashboard-frontend-tools"; + $this->launcherUrl = rtrim(getenv("OPENVRE_K8S_LAUNCHER_URL") ?: "", "/"); + $this->launcherToken = getenv("OPENVRE_K8S_LAUNCHER_TOKEN") ?: ""; + $this->runAsUid = (int)(getenv("OPENVRE_K8S_RUN_AS_UID") ?: 1000); + $this->runAsGid = (int)(getenv("OPENVRE_K8S_RUN_AS_GID") ?: 1000); + $k8sHost = getenv("KUBERNETES_SERVICE_HOST") ?: ""; + $k8sPort = getenv("KUBERNETES_SERVICE_PORT_HTTPS") ?: (getenv("KUBERNETES_SERVICE_PORT") ?: "443"); + if ($k8sHost !== "") { + $this->k8sApiServer = "https://" . $k8sHost . ":" . $k8sPort; + } + $tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"; + if (is_readable($tokenPath)) { + $this->k8sApiToken = trim((string)file_get_contents($tokenPath)); + } + + if (is_array($jobOptions)) { + if (!empty($jobOptions["image"])) { + $this->jobImage = (string)$jobOptions["image"]; + } + if (!empty($jobOptions["env"]) && is_array($jobOptions["env"])) { + $this->jobEnv = $jobOptions["env"]; + } + } + + if ($cl !== false) { + $this->workDir = $workDir; + $this->command = $cl; + $this->jobname = $jobname ? $jobname : basename($cl); + $this->runCom($cpu, $mem); + } + } + + private function sanitizeName($name) + { + $name = strtolower($name); + $name = preg_replace('/[^a-z0-9-]+/', '-', $name); + $name = trim($name, '-'); + if ($name === "") { + $name = "openvre-job"; + } + if (strlen($name) > 40) { + $name = substr($name, 0, 40); + $name = rtrim($name, '-'); + } + return $name . "-" . substr(md5(uniqid("", true)), 0, 8); + } + + private function runCom($cpu, $mem) + { + if ($this->jobImage === "") { + $this->stderr = "OPENVRE_K8S_JOB_IMAGE is not set"; + $_SESSION['errorData']['Error'][] = $this->stderr; + return; + } + + $jobName = $this->sanitizeName($this->jobname); + $this->pid = $jobName; + + $scriptPath = $this->command; + $workDir = $this->workDir; + $cpuRequest = max(1, (int)$cpu); + $cpuLimit = max(1, (int)$cpu); + $memLimit = ((int)$mem > 0 ? ((int)$mem . "Gi") : "4Gi"); + + $manifest = array( + "apiVersion" => "batch/v1", + "kind" => "Job", + "metadata" => array( + "name" => $jobName, + "namespace" => $this->namespace, + "labels" => array( + "app.kubernetes.io/managed-by" => "openvre", + "openvre-job-id" => $jobName + ) + ), + "spec" => array( + "ttlSecondsAfterFinished" => (int)(getenv("OPENVRE_K8S_JOB_TTL") ?: 120), + "backoffLimit" => 0, + "template" => array( + "spec" => array( + "restartPolicy" => "Never", + "containers" => array( + array( + "name" => "tool-runner", + "image" => $this->jobImage, + "command" => array("bash", "-lc", "cd \"\$OPENVRE_WORKDIR\" && bash \"\$OPENVRE_SUBMIT_SCRIPT\""), + "env" => array( + array("name" => "OPENVRE_WORKDIR", "value" => $workDir), + array("name" => "OPENVRE_SUBMIT_SCRIPT", "value" => $scriptPath), + ), + "resources" => array( + "requests" => array("cpu" => (string)$cpuRequest), + "limits" => array("cpu" => (string)$cpuLimit, "memory" => $memLimit) + ), + "securityContext" => array( + "allowPrivilegeEscalation" => false, + ), + "volumeMounts" => array( + array("name" => "shared-data", "mountPath" => "/shared_data"), + array("name" => "tools", "mountPath" => "/var/www/html/openVRE/public/tools") + ) + ) + ), + "securityContext" => array( + "runAsUser" => max(1, $this->runAsUid), + "runAsGroup" => max(1, $this->runAsGid), + "fsGroup" => max(1, $this->runAsGid) + ), + "volumes" => array( + array("name" => "shared-data", "persistentVolumeClaim" => array("claimName" => $this->sharedPvc)), + array("name" => "tools", "persistentVolumeClaim" => array("claimName" => $this->toolsPvc)) + ) + ) + ) + ) + ); + + // Inject tool runtime environment variables (e.g. FEM_ACCESS_TOKEN) into the pod. + // This is critical for kubernetes_native where we execute directly from the tool image. + if (!empty($this->jobEnv)) { + $envList =& $manifest["spec"]["template"]["spec"]["containers"][0]["env"]; + if (is_array($envList)) { + foreach ($this->jobEnv as $k => $v) { + if (!is_string($k) || $k === "") { + continue; + } + if ($v === null || $v === "") { + continue; + } + $envList[] = array( + "name" => (string)$k, + "value" => (string)$v + ); + } + } + } + + $yaml = $this->arrayToYaml($manifest); + if ($this->launcherUrl !== "") { + $this->fullcommand = "POST " . $this->launcherUrl . "/jobs"; + logger("K8s job submission via launcher endpoint '" . $this->fullcommand . "'"); + $response = $this->launcherRequest("POST", "/jobs", array( + "namespace" => $this->namespace, + "manifest" => $yaml + )); + } else { + $this->fullcommand = "POST " . $this->k8sApiServer . "/apis/batch/v1/namespaces/" . $this->namespace . "/jobs"; + logger("K8s job submission via in-cluster API '" . $this->fullcommand . "'"); + $response = $this->k8sRequest("POST", "/apis/batch/v1/namespaces/" . rawurlencode($this->namespace) . "/jobs", $yaml, "application/yaml"); + } + if ($response["ok"] !== true) { + $this->stderr = $response["error"]; + $this->stdout = ""; + $msg = "K8s job submission failed: " . trim($this->stderr); + logger($msg); + $_SESSION['errorData']['Error'][] = $msg; + $this->pid = ""; + } else { + $this->stdout = isset($response["data"]["stdout"]) ? (string)$response["data"]["stdout"] : ""; + $this->stderr = isset($response["data"]["stderr"]) ? (string)$response["data"]["stderr"] : ""; + logger("K8s job submitted: " . $this->pid . ". Output: " . trim($this->stdout)); + } + } + + private function launcherRequest($method, $path, $payload = null) + { + if ($this->launcherUrl === "") { + return array("ok" => false, "error" => "OPENVRE_K8S_LAUNCHER_URL is not set"); + } + if (!function_exists("curl_init")) { + return array("ok" => false, "error" => "PHP curl extension is required"); + } + + $url = $this->launcherUrl . $path; + $ch = curl_init($url); + $headers = array("Content-Type: application/json"); + if ($this->launcherToken !== "") { + $headers[] = "Authorization: Bearer " . $this->launcherToken; + } + + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); + curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); + curl_setopt($ch, CURLOPT_TIMEOUT, 30); + if ($payload !== null) { + curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload)); + } + + $raw = curl_exec($ch); + if ($raw === false) { + $err = curl_error($ch); + curl_close($ch); + return array("ok" => false, "error" => "Launcher request failed: " . $err); + } + $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); + curl_close($ch); + + $json = json_decode($raw, true); + if ($code < 200 || $code >= 300) { + $msg = is_array($json) && isset($json["error"]) ? $json["error"] : ("HTTP " . $code . " from launcher"); + return array("ok" => false, "error" => $msg); + } + if (!is_array($json)) { + return array("ok" => false, "error" => "Invalid JSON response from launcher"); + } + if (isset($json["ok"]) && !$json["ok"]) { + return array("ok" => false, "error" => isset($json["error"]) ? $json["error"] : "Launcher error"); + } + return array("ok" => true, "data" => $json); + } + + private function k8sRequest($method, $path, $payload = null, $contentType = "application/json") + { + // Fallback refresh: in some PHP-FPM request contexts the constructor may + // run before projected service-account files/env are ready. Re-read just + // before Kubernetes API calls. + if ($this->k8sApiServer === "") { + $k8sHost = getenv("KUBERNETES_SERVICE_HOST") ?: ""; + $k8sPort = getenv("KUBERNETES_SERVICE_PORT_HTTPS") ?: (getenv("KUBERNETES_SERVICE_PORT") ?: "443"); + if ($k8sHost !== "") { + $this->k8sApiServer = "https://" . $k8sHost . ":" . $k8sPort; + } + } + if ($this->k8sApiToken === "") { + $tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"; + if (is_readable($tokenPath)) { + $this->k8sApiToken = trim((string)file_get_contents($tokenPath)); + } + } + if ($this->k8sApiServer === "" || $this->k8sApiToken === "") { + return array("ok" => false, "error" => "Kubernetes in-cluster API/token not available"); + } + if (!function_exists("curl_init")) { + return array("ok" => false, "error" => "PHP curl extension is required"); + } + + $url = $this->k8sApiServer . $path; + $ch = curl_init($url); + $headers = array( + "Authorization: Bearer " . $this->k8sApiToken + ); + if ($payload !== null) { + $headers[] = "Content-Type: " . $contentType; + } + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); + curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); + curl_setopt($ch, CURLOPT_TIMEOUT, 30); + if (is_file($this->k8sApiCaFile)) { + curl_setopt($ch, CURLOPT_CAINFO, $this->k8sApiCaFile); + } + if ($payload !== null) { + curl_setopt($ch, CURLOPT_POSTFIELDS, $payload); + } + + $raw = curl_exec($ch); + if ($raw === false) { + $err = curl_error($ch); + curl_close($ch); + return array("ok" => false, "error" => "Kubernetes API request failed: " . $err); + } + $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); + curl_close($ch); + + $json = json_decode($raw, true); + if ($code < 200 || $code >= 300) { + $msg = ""; + if (is_array($json) && isset($json["message"])) { + $msg = (string)$json["message"]; + } + if ($msg === "") { + $msg = "HTTP " . $code . " from Kubernetes API"; + } + return array("ok" => false, "error" => $msg); + } + + return array("ok" => true, "data" => array( + "stdout" => $raw, + "stderr" => "", + "job" => $raw, + "exists" => true + )); + } + + private function arrayToYaml($data, $indent = 0) + { + $yaml = ""; + $spaces = str_repeat(" ", $indent); + $isAssoc = function ($arr) { + if (!is_array($arr)) return false; + return array_keys($arr) !== range(0, count($arr) - 1); + }; + + foreach ($data as $key => $value) { + if (is_array($value)) { + if ($isAssoc($value)) { + $yaml .= $spaces . $key . ":\n" . $this->arrayToYaml($value, $indent + 1); + } else { + $yaml .= $spaces . $key . ":\n"; + foreach ($value as $item) { + if (is_array($item)) { + $yaml .= $spaces . " -\n" . $this->arrayToYaml($item, $indent + 2); + } else { + $yaml .= $spaces . " - " . $this->yamlScalar($item) . "\n"; + } + } + } + } else { + $yaml .= $spaces . $key . ": " . $this->yamlScalar($value) . "\n"; + } + } + return $yaml; + } + + private function yamlScalar($value) + { + if (is_bool($value)) return $value ? "true" : "false"; + if (is_numeric($value)) return (string)$value; + $escaped = str_replace('"', '\"', (string)$value); + return '"' . $escaped . '"'; + } + + public function getRunningJobInfo($pid) + { + $job = array(); + if (!$pid) return $job; + if ($this->launcherUrl !== "") { + $response = $this->launcherRequest("GET", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); + } else { + $response = $this->k8sRequest("GET", "/apis/batch/v1/namespaces/" . rawurlencode($this->namespace) . "/jobs/" . rawurlencode($pid), null); + if ($response["ok"] !== true && strpos((string)$response["error"], "not found") !== false) { + return array(); + } + if ($response["ok"] === true) { + $response["data"]["exists"] = true; + $response["data"]["job"] = isset($response["data"]["stdout"]) ? $response["data"]["stdout"] : ""; + } + } + if ($response["ok"] !== true) { + // Treat lookup errors as "not running" to avoid infinite FINISHING state. + return array(); + } + $exists = isset($response["data"]["exists"]) ? (bool)$response["data"]["exists"] : false; + if (!$exists) { + // Job already removed (e.g. TTL cleanup) => finalize in OpenVRE. + return array(); + } + $jsonRaw = isset($response["data"]["job"]) ? $response["data"]["job"] : ""; + $json = json_decode($jsonRaw, true); + if (!is_array($json)) { + return array(); + } + $status = $json['status'] ?? array(); + $state = "Pending"; + if (!empty($status['active'])) { + $state = "Running"; + } elseif (!empty($status['succeeded'])) { + $state = "Succeeded"; + } elseif (!empty($status['failed'])) { + $state = "Failed"; + } + + // Important: OpenVRE's workspace polling treats "empty job info" as + // "job not running anymore" (to finalize outputs). For Kubernetes, + // return an empty array once the Job is finished. + if ($state === "Succeeded" || $state === "Failed") { + return array(); + } + + $job['pid'] = $pid; + $job['state'] = $this->jobState[$state]; + $job['job_name'] = $pid; + return $job; + } + + public function getFullCommand() + { + return $this->fullcommand; + } + + public function getPid() + { + return $this->pid; + } + + public function getErr() + { + if ($this->stderr) { + return trim($this->stdout . " " . $this->stderr); + } + return null; + } + + public function status() + { + if (!$this->pid) return false; + if ($this->launcherUrl !== "") { + $response = $this->launcherRequest("GET", "/jobs/" . rawurlencode($this->pid) . "?namespace=" . rawurlencode($this->namespace)); + } else { + $response = $this->k8sRequest("GET", "/apis/batch/v1/namespaces/" . rawurlencode($this->namespace) . "/jobs/" . rawurlencode($this->pid), null); + if ($response["ok"] !== true && strpos((string)$response["error"], "not found") !== false) { + return false; + } + if ($response["ok"] === true) { + $response["data"]["exists"] = true; + $response["data"]["job"] = isset($response["data"]["stdout"]) ? $response["data"]["stdout"] : ""; + } + } + if ($response["ok"] !== true) { + return false; + } + $exists = isset($response["data"]["exists"]) ? (bool)$response["data"]["exists"] : false; + if (!$exists) { + return false; + } + + $jsonRaw = isset($response["data"]["job"]) ? $response["data"]["job"] : ""; + $json = json_decode($jsonRaw, true); + if (!is_array($json)) { + return true; + } + + $status = isset($json["status"]) && is_array($json["status"]) ? $json["status"] : array(); + if (!empty($status["succeeded"]) || !empty($status["failed"])) { + return false; + } + + return true; + } + + public function stop($pid = null) + { + if (!$pid) { + return array(false, "No job id '$pid' given"); + } + if ($this->launcherUrl !== "") { + $response = $this->launcherRequest("DELETE", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); + } else { + $deletePayload = json_encode(array( + "apiVersion" => "batch/v1", + "kind" => "DeleteOptions", + "propagationPolicy" => "Background" + )); + $response = $this->k8sRequest("DELETE", "/apis/batch/v1/namespaces/" . rawurlencode($this->namespace) . "/jobs/" . rawurlencode($pid), $deletePayload, "application/json"); + } + if ($response["ok"] === true) { + $res = isset($response["data"]["stdout"]) ? trim((string)$response["data"]["stdout"]) : ""; + return array(true, $res); + } + return array(false, $response["error"] ?: "Failed to delete kubernetes job"); + } +} + diff --git a/front_end/openVRE/public/phplib/classes/Tooljob.php b/front_end/openVRE/public/phplib/classes/Tooljob.php index 283d378f..99a09fa6 100644 --- a/front_end/openVRE/public/phplib/classes/Tooljob.php +++ b/front_end/openVRE/public/phplib/classes/Tooljob.php @@ -98,6 +98,7 @@ public function __construct($tool, $execution = "", $project = "", $descrip = "" switch ($this->launcher) { case "SGE": case "docker_SGE": + case "kubernetes_native": $this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id']; $this->root_dir_mug = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual']; $this->pub_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['pubDir_virtual']; @@ -1116,6 +1117,19 @@ public function prepareExecution($tool, $metadata, $metadata_pub = []) break; + case "kubernetes_native": + $cmd = $this->setBashCmd_SGE($tool); + if (!$cmd) { + return 0; + } + + $submissionFilename = $this->createSubmitFile_SGE($cmd); + if (!is_file($submissionFilename)) { + return 0; + } + + break; + case "PMES": $json_data = $this->setPMESrequest($tool); if (!$json_data) { @@ -1395,6 +1409,10 @@ protected function setBashCommandDockerSge($tool) " --out_metadata " . $this->stageout_file_virtual . " --log_file " . $this->log_file_virtual; + if (isset($this->launcher) && $this->launcher === "kubernetes_native") { + return $cmd_vre; + } + $cmd = "docker run --privileged -v /var/run/docker.sock:/var/run/docker.sock -d" . " " . $cmd_envs . @@ -1788,6 +1806,7 @@ public function submit($tool) case "SGE": case "ega_demo": case "docker_SGE": + case "kubernetes_native": return $this->enqueue($tool); case "Slurm_Singularity": return $this->enqueue($tool); @@ -1808,21 +1827,33 @@ protected function enqueue($tool) return 0; } $jobManager = $launcherInfo['launcher']['job_manager'] - ?? $tool['infrastructure']['clouds'][$this->cloudName]['launcher']; + ?? $tool['infrastructure']['clouds'][$this->cloudName]['launcher']; $memory = $launcherInfo['memory'] ?? $tool['infrastructure']['memory']; $cpus = $launcherInfo['cpus'] ?? $tool['infrastructure']['cpus']; $queue = $launcherInfo['queue'] ?? $tool['infrastructure']['clouds'][$this->cloudName]['queue']; - // error_log("Resolved Parameters: Queue=$queue, CPUs=$cpus, Memory=$memory, jobManager=$jobManager"); + $jobOptions = array(); + if ($jobManager === "kubernetes_native") { + $jobOptions["mode"] = "native"; + $jobOptions["image"] = $tool['infrastructure']['container_image'] ?? ""; + if ($jobOptions["image"] === "") { + $_SESSION['errorData']['Error'][] = "Missing infrastructure.container_image for kubernetes_native launcher."; + return 0; + } + $jobOptions["env"] = array(); + if (isset($tool['infrastructure']['container_env']) && is_array($tool['infrastructure']['container_env'])) { + foreach ($tool['infrastructure']['container_env'] as $env_key => $env_value) { + $jobOptions["env"][$env_key] = (string)$env_value; + } + } + } - list($pid, $errMesg) = execJob($this->working_dir, $this->submission_file, $queue, $cpus, $memory, $this->stdout_file, $this->stderr_file, $jobManager); + list($pid, $errMesg) = execJob($this->working_dir, $this->submission_file, $queue, $cpus, $memory, $this->stdout_file, $this->stderr_file, $jobManager, $this->toolId, $jobOptions); if (!$pid) { - //error_log($pid, $errMesg, NULL, $this->toolId, $this->cloudName, "SGE", $cpus, $memory); - error_log("Error: $errMesg"); + log_addError($pid, $errMesg, NULL, $this->toolId, $this->cloudName, $jobManager, $cpus, $memory); $_SESSION['errorData']['Error'][] = "Internal error. Cannot enqueue job."; return 0; } - #logger("USER:" . $_SESSION['User']['_id'] . ", ID:" . $_SESSION['User']['id'] . ", LAUNCHER:SGE, TOOL:" . $this->toolId . ", PID:$pid"); - //error_log("USER:" . $_SESSION['User']['_id'] . ", ID:" . $_SESSION['User']['id'] . ", LAUNCHER:SGE, TOOL:" . $this->toolId . ", PID:$pid"); + logger("USER:" . $_SESSION['User']['_id'] . ", ID:" . $_SESSION['User']['id'] . ", LAUNCHER:" . $jobManager . ", TOOL:" . $this->toolId . ", PID:$pid"); log_addSubmission($pid, $this->toolId, $this->cloudName, $jobManager, $cpus, $memory, $this->working_dir); $this->pid = $pid; diff --git a/front_end/openVRE/public/phplib/processJob.inc.php b/front_end/openVRE/public/phplib/processJob.inc.php index 13893615..229abdd7 100644 --- a/front_end/openVRE/public/phplib/processJob.inc.php +++ b/front_end/openVRE/public/phplib/processJob.inc.php @@ -5,10 +5,9 @@ # -function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobManager = "docker_SGE") +function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $launcherType = "SGE", $toolId = "", $jobOptions = array()) { - logger("Start job submission via $jobManager"); - error_log("DEBUG- execJob: Start job submission via $jobManager"); + logger("Start job submission via " . $launcherType); if (!isset($_SESSION['User']['id'])) { $_SESSION['errorData']['Error'][] = "User ID not found in session."; @@ -29,42 +28,31 @@ function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job // Validate queue $queue = $queue ?: ($GLOBALS['queueTask'] ?? null); - if (!$queue && strtoupper($jobManager) === "SGE") { + if (!$queue) { $_SESSION['errorData']['Error'][] = "Queue not provided."; return [0, "Queue not provided."]; } $queue = (isset($queue) ? $queue : $GLOBALS['queueTask']); - $jobname = $_SESSION['User']['id'] . "#" . basename($shFile); - - // - // Start SGE process - //$process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile); - - switch ($jobManager) { - case "docker_SGE": - error_log("DEBUG: Submitting job via docker_SGE. Parameters: shFile=$shFile, workDir=$workDir, queue=$queue, jobname=$jobname, cpus=$cpus, mem=$mem, logFile=$logFile, errFile=$errFile"); - $process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile); - break; - case "Slurm_Singularity": - $remote_system = $_REQUEST['sites']['site_list'][0]; - error_log("DEBUG: Submitting job via Slurm to $remote_system. Parameters: shFile=$shFile, workDir=$workDir, logFile=$logFile, errFile=$errFile"); - $process = new ProcessSlurm($shFile, $workDir, $logFile, $errFile, $remote_system); - break; - default: - $process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile); - break; + $jobname = $_SESSION['User']['id'] . "#" . ($toolId ?: basename($shFile)); + + if ($launcherType === "kubernetes_native") { + require_once __DIR__ . "/classes/ProcessK8s.php"; + $process = new ProcessK8s($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile, $jobOptions); + } else { + $process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile); } - + + $pid = $process->getPid(); + if (!$process->status()) { - $_SESSION['errorData']['Error'][] = "Job submission failed.
" . $process->getFullCommand() . "
" . $process->getErr(); - $errMesg = "ERROR: Job submission failed. FullCommand: '" . $process->getFullCommand() . "'. ErrorSGE: '" . $process->getErr() . "'"; + $_SESSION['errorData']['Error'][] = "Job submission failed.
" . $process->getFullCommand . "
" . $process->getErr(); + $errMesg = "ERROR: Job submission failed. FullCommand: '" . $process->getFullCommand . "'. ErrorSGE: '" . $process->getErr() . "'"; logger($errMesg); return array(0, $errMesg); } - $pid = $process->getPid(); error_log("Process started successfully: PID = $pid"); logger("The process is currently running PID = $pid"); return array($pid, ""); @@ -136,37 +124,32 @@ function getRunningJobInfo($pid, $launcherType = NULL, $cloudName = "local") if (! $pid) return $job; - logger("getRunningJobInfo: start processing $pid"); - // guess launcher if (!$launcherType) { - if (is_numeric($pid)) + if (is_numeric($pid)) { $launcherType = "SGE"; - else + } elseif (strpos((string)$pid, "-") !== false) { + $launcherType = "kubernetes_native"; + } else { $launcherType = "PMES"; + } } - logger("getRunningJobInfo: launcherType = $launcherType"); - // create new jobProcess if ($launcherType == "SGE" || $launcherType == "docker_SGE") { $process = new ProcessSGE(); $job = $process->getRunningJobInfo($pid); + } elseif ($launcherType == "kubernetes_native") { + require_once __DIR__ . "/classes/ProcessK8s.php"; + $process = new ProcessK8s(); + $job = $process->getRunningJobInfo($pid); } elseif ($launcherType == "PMES") { $process = new ProcessPMES($cloudName); $job = $process->getRunningJobInfo($pid); - } elseif ($launcherType == "Slurm_Singularity") { - $process = new ProcessSlurm(); - $job = $process->getRunningJobInfo($pid); - logger("getRunningJobInfo: $job"); } else { - logger("getRunningJobInfo: error due to unknown launcher type '$launcherType'"); - $_SESSION['errorData']['Error'][] = "Cannot monitor job '$pid' of type '$launcherType'. Launcher not implemented."; + $_SESSION['errorData']['Error'][] = "Cannot monitor job '$pid' of type '$launcher'. Launcher not implemented."; return $job; } - - logger("getRunningJobInfo: end processing $pid"); - // return job info return $job; } @@ -304,6 +287,8 @@ function delJob($pid, $launcherType = NULL, $cloudName = "local", $login = NULL) if (!$launcherType) { if (is_numeric($pid)) { $launcherType = "docker_SGE"; + } elseif (strpos((string)$pid, "-") !== false) { + $launcherType = "kubernetes_native"; } else { $launcherType = "PMES"; } @@ -321,6 +306,10 @@ function delJob($pid, $launcherType = NULL, $cloudName = "local", $login = NULL) updateLogFromJobInfo($jobInfo['log'], $pid, $launcherType, $cloudName); // Add any other file redirection logic here } + } elseif ($launcherType == "kubernetes_native") { + require_once __DIR__ . "/classes/ProcessK8s.php"; + $processK8s = new ProcessK8s(); + list($r_sge, $msg_sge) = $processK8s->stop($pid); } elseif ($launcherType == "PMES") { $process = new ProcessPMES(); $r = $process->stop($pid); From 8a552d49da73b884ee806bf376c60e178254a05c Mon Sep 17 00:00:00 2001 From: Yasir Date: Tue, 12 May 2026 12:46:52 +0000 Subject: [PATCH 02/11] Removed extra code from Tooljob and ProcessK8s --- .../public/phplib/classes/ProcessK8s.php | 400 +++++++++++------- .../openVRE/public/phplib/classes/Tooljob.php | 1 - sge/Dockerfile | 16 +- 3 files changed, 265 insertions(+), 152 deletions(-) diff --git a/front_end/openVRE/public/phplib/classes/ProcessK8s.php b/front_end/openVRE/public/phplib/classes/ProcessK8s.php index 4e4341db..021f1433 100755 --- a/front_end/openVRE/public/phplib/classes/ProcessK8s.php +++ b/front_end/openVRE/public/phplib/classes/ProcessK8s.php @@ -1,38 +1,115 @@ "RUNNING", - "Pending" => "PENDING", + "Running" => "RUNNING", + "Pending" => "PENDING", "Succeeded" => "FINISHING", - "Failed" => "ERROR", - "NotFound" => "FINISHING", + "Failed" => "ERROR", + "NotFound" => "FINISHING", ); + // --------------------------------------------------------------- + // Constructor + // --------------------------------------------------------------- + + /** + * @param string|false $cl Path to submission script (false = status-only instance) + * @param string $workDir Working directory path inside the pod + * @param string $queue Queue name (unused for K8s, kept for interface compatibility with ProcessSGE) + * @param string $jobname Human-readable job name + * @param int $cpu CPU cores requested/limited + * @param int $mem Memory in GB (0 = default 4Gi) + * @param string $logFile Stdout log filename (unused for K8s, kept for interface compatibility) + * @param string $errFile Stderr log filename (unused for K8s, kept for interface compatibility) + * @param array $jobOptions Options from Tooljob: "image" overrides container image, + * "env" provides extra environment variables for the pod + */ public function __construct($cl = false, $workDir = "", $queue = "", $jobname = "", $cpu = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobOptions = array()) { - $this->namespace = getenv("OPENVRE_K8S_NAMESPACE") ?: "fedcomp"; + // Read all configuration from environment variables (set via ConfigMap/Secret). + $this->namespace = getenv("OPENVRE_K8S_NAMESPACE") ?: "bsctre-v2"; $this->jobImage = getenv("OPENVRE_K8S_JOB_IMAGE") ?: ""; $this->sharedPvc = getenv("OPENVRE_K8S_SHARED_PVC") ?: "dashboard-frontend-sgecore-shareddata"; $this->toolsPvc = getenv("OPENVRE_K8S_TOOLS_PVC") ?: "dashboard-frontend-tools"; @@ -40,16 +117,9 @@ public function __construct($cl = false, $workDir = "", $queue = "", $jobname = $this->launcherToken = getenv("OPENVRE_K8S_LAUNCHER_TOKEN") ?: ""; $this->runAsUid = (int)(getenv("OPENVRE_K8S_RUN_AS_UID") ?: 1000); $this->runAsGid = (int)(getenv("OPENVRE_K8S_RUN_AS_GID") ?: 1000); - $k8sHost = getenv("KUBERNETES_SERVICE_HOST") ?: ""; - $k8sPort = getenv("KUBERNETES_SERVICE_PORT_HTTPS") ?: (getenv("KUBERNETES_SERVICE_PORT") ?: "443"); - if ($k8sHost !== "") { - $this->k8sApiServer = "https://" . $k8sHost . ":" . $k8sPort; - } - $tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"; - if (is_readable($tokenPath)) { - $this->k8sApiToken = trim((string)file_get_contents($tokenPath)); - } + // Allow per-tool overrides: the tool's Mongo document can specify a custom + // container image and extra environment variables via $jobOptions. if (is_array($jobOptions)) { if (!empty($jobOptions["image"])) { $this->jobImage = (string)$jobOptions["image"]; @@ -59,6 +129,9 @@ public function __construct($cl = false, $workDir = "", $queue = "", $jobname = } } + // If a submission script path was provided, immediately create and submit + // the Kubernetes Job. When $cl is false, this instance is used only for + // status queries (getRunningJobInfo, status, stop). if ($cl !== false) { $this->workDir = $workDir; $this->command = $cl; @@ -67,6 +140,15 @@ public function __construct($cl = false, $workDir = "", $queue = "", $jobname = } } + // --------------------------------------------------------------- + // Job name generation + // --------------------------------------------------------------- + + /** + * Converts a human-readable job name into a Kubernetes-safe DNS name. + * Kubernetes Job names must be lowercase, alphanumeric + hyphens, max 63 chars. + * Appends a random 8-char suffix to guarantee uniqueness across runs. + */ private function sanitizeName($name) { $name = strtolower($name); @@ -82,14 +164,33 @@ private function sanitizeName($name) return $name . "-" . substr(md5(uniqid("", true)), 0, 8); } + // --------------------------------------------------------------- + // Job creation and submission + // --------------------------------------------------------------- + + /** + * Builds a Kubernetes Job manifest and submits it to kubectl-runner. + * + * @param int $cpu Number of CPU cores (request and limit) + * @param int $mem Memory in GB (0 = default 4Gi) + */ private function runCom($cpu, $mem) { + // Abort if no container image is configured — there's nothing to run. if ($this->jobImage === "") { $this->stderr = "OPENVRE_K8S_JOB_IMAGE is not set"; $_SESSION['errorData']['Error'][] = $this->stderr; return; } + // Abort if kubectl-runner URL is not configured — we cannot submit jobs. + if ($this->launcherUrl === "") { + $this->stderr = "OPENVRE_K8S_LAUNCHER_URL is not set"; + $_SESSION['errorData']['Error'][] = $this->stderr; + return; + } + + // Generate a unique, K8s-safe Job name and use it as the OpenVRE "pid". $jobName = $this->sanitizeName($this->jobname); $this->pid = $jobName; @@ -99,6 +200,8 @@ private function runCom($cpu, $mem) $cpuLimit = max(1, (int)$cpu); $memLimit = ((int)$mem > 0 ? ((int)$mem . "Gi") : "4Gi"); + // Build the full Kubernetes Job manifest as a PHP associative array. + // This will be serialized to YAML before submission to kubectl-runner. $manifest = array( "apiVersion" => "batch/v1", "kind" => "Job", @@ -111,38 +214,61 @@ private function runCom($cpu, $mem) ) ), "spec" => array( + // Auto-delete the Job object N seconds after it finishes (success or failure). "ttlSecondsAfterFinished" => (int)(getenv("OPENVRE_K8S_JOB_TTL") ?: 120), + + // Kill the Job if it runs longer than this (prevents stuck/hung jobs). + "activeDeadlineSeconds" => (int)(getenv("OPENVRE_K8S_JOB_DEADLINE") ?: 86400), + + // Do not retry on failure — mark as Failed immediately. "backoffLimit" => 0, + "template" => array( "spec" => array( + // Pod should not restart — one attempt only. "restartPolicy" => "Never", + "containers" => array( array( "name" => "tool-runner", "image" => $this->jobImage, + + // The pod runs the submission script (generated by Tooljob) + // from the working directory. Both are passed as env vars. "command" => array("bash", "-lc", "cd \"\$OPENVRE_WORKDIR\" && bash \"\$OPENVRE_SUBMIT_SCRIPT\""), + "env" => array( array("name" => "OPENVRE_WORKDIR", "value" => $workDir), array("name" => "OPENVRE_SUBMIT_SCRIPT", "value" => $scriptPath), ), + "resources" => array( "requests" => array("cpu" => (string)$cpuRequest), "limits" => array("cpu" => (string)$cpuLimit, "memory" => $memLimit) ), + "securityContext" => array( "allowPrivilegeEscalation" => false, ), + + // Mount shared storage so the Job can read inputs and write outputs + // to the same PVCs used by the frontend and SGE pods. "volumeMounts" => array( array("name" => "shared-data", "mountPath" => "/shared_data"), array("name" => "tools", "mountPath" => "/var/www/html/openVRE/public/tools") ) ) ), + + // Run the container as a non-root user with a fixed UID/GID + // to match file ownership on the shared PVCs. "securityContext" => array( "runAsUser" => max(1, $this->runAsUid), "runAsGroup" => max(1, $this->runAsGid), "fsGroup" => max(1, $this->runAsGid) ), + + // Attach the two shared PVCs as volumes. "volumes" => array( array("name" => "shared-data", "persistentVolumeClaim" => array("claimName" => $this->sharedPvc)), array("name" => "tools", "persistentVolumeClaim" => array("claimName" => $this->toolsPvc)) @@ -152,8 +278,8 @@ private function runCom($cpu, $mem) ) ); - // Inject tool runtime environment variables (e.g. FEM_ACCESS_TOKEN) into the pod. - // This is critical for kubernetes_native where we execute directly from the tool image. + // Inject tool-specific environment variables (e.g. FEM_ACCESS_TOKEN, FEM_API_PREFIX) + // from the tool's Mongo definition into the Job pod's container env list. if (!empty($this->jobEnv)) { $envList =& $manifest["spec"]["template"]["spec"]["containers"][0]["env"]; if (is_array($envList)) { @@ -172,19 +298,18 @@ private function runCom($cpu, $mem) } } + // Serialize the manifest array to YAML for submission to kubectl-runner. $yaml = $this->arrayToYaml($manifest); - if ($this->launcherUrl !== "") { - $this->fullcommand = "POST " . $this->launcherUrl . "/jobs"; - logger("K8s job submission via launcher endpoint '" . $this->fullcommand . "'"); - $response = $this->launcherRequest("POST", "/jobs", array( - "namespace" => $this->namespace, - "manifest" => $yaml - )); - } else { - $this->fullcommand = "POST " . $this->k8sApiServer . "/apis/batch/v1/namespaces/" . $this->namespace . "/jobs"; - logger("K8s job submission via in-cluster API '" . $this->fullcommand . "'"); - $response = $this->k8sRequest("POST", "/apis/batch/v1/namespaces/" . rawurlencode($this->namespace) . "/jobs", $yaml, "application/yaml"); - } + + // Submit Job via kubectl-runner proxy. + $this->fullcommand = "POST " . $this->launcherUrl . "/jobs"; + logger("K8s job submission via kubectl-runner '" . $this->fullcommand . "'"); + $response = $this->launcherRequest("POST", "/jobs", array( + "namespace" => $this->namespace, + "manifest" => $yaml + )); + + // Handle submission result. if ($response["ok"] !== true) { $this->stderr = $response["error"]; $this->stdout = ""; @@ -199,6 +324,19 @@ private function runCom($cpu, $mem) } } + // --------------------------------------------------------------- + // HTTP client: kubectl-runner proxy + // --------------------------------------------------------------- + + /** + * Sends an HTTP request to the kubectl-runner proxy service. + * This is the sole communication channel with Kubernetes. + * + * @param string $method HTTP method (GET, POST, DELETE) + * @param string $path API path (e.g. "/jobs", "/jobs/{name}") + * @param array|null $payload JSON-serializable body (for POST) + * @return array ["ok" => bool, "error" => string, "data" => array] + */ private function launcherRequest($method, $path, $payload = null) { if ($this->launcherUrl === "") { @@ -211,6 +349,8 @@ private function launcherRequest($method, $path, $payload = null) $url = $this->launcherUrl . $path; $ch = curl_init($url); $headers = array("Content-Type: application/json"); + + // Authenticate to kubectl-runner with a Bearer token. if ($this->launcherToken !== "") { $headers[] = "Authorization: Bearer " . $this->launcherToken; } @@ -227,98 +367,37 @@ private function launcherRequest($method, $path, $payload = null) if ($raw === false) { $err = curl_error($ch); curl_close($ch); - return array("ok" => false, "error" => "Launcher request failed: " . $err); + return array("ok" => false, "error" => "kubectl-runner request failed: " . $err); } $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch); $json = json_decode($raw, true); + + // Non-2xx status from kubectl-runner indicates an error. if ($code < 200 || $code >= 300) { - $msg = is_array($json) && isset($json["error"]) ? $json["error"] : ("HTTP " . $code . " from launcher"); + $msg = is_array($json) && isset($json["error"]) ? $json["error"] : ("HTTP " . $code . " from kubectl-runner"); return array("ok" => false, "error" => $msg); } if (!is_array($json)) { - return array("ok" => false, "error" => "Invalid JSON response from launcher"); + return array("ok" => false, "error" => "Invalid JSON response from kubectl-runner"); } + // kubectl-runner returns {"ok": false, "error": "..."} on application-level errors. if (isset($json["ok"]) && !$json["ok"]) { - return array("ok" => false, "error" => isset($json["error"]) ? $json["error"] : "Launcher error"); + return array("ok" => false, "error" => isset($json["error"]) ? $json["error"] : "kubectl-runner error"); } return array("ok" => true, "data" => $json); } - private function k8sRequest($method, $path, $payload = null, $contentType = "application/json") - { - // Fallback refresh: in some PHP-FPM request contexts the constructor may - // run before projected service-account files/env are ready. Re-read just - // before Kubernetes API calls. - if ($this->k8sApiServer === "") { - $k8sHost = getenv("KUBERNETES_SERVICE_HOST") ?: ""; - $k8sPort = getenv("KUBERNETES_SERVICE_PORT_HTTPS") ?: (getenv("KUBERNETES_SERVICE_PORT") ?: "443"); - if ($k8sHost !== "") { - $this->k8sApiServer = "https://" . $k8sHost . ":" . $k8sPort; - } - } - if ($this->k8sApiToken === "") { - $tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"; - if (is_readable($tokenPath)) { - $this->k8sApiToken = trim((string)file_get_contents($tokenPath)); - } - } - if ($this->k8sApiServer === "" || $this->k8sApiToken === "") { - return array("ok" => false, "error" => "Kubernetes in-cluster API/token not available"); - } - if (!function_exists("curl_init")) { - return array("ok" => false, "error" => "PHP curl extension is required"); - } - - $url = $this->k8sApiServer . $path; - $ch = curl_init($url); - $headers = array( - "Authorization: Bearer " . $this->k8sApiToken - ); - if ($payload !== null) { - $headers[] = "Content-Type: " . $contentType; - } - curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); - curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); - curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); - curl_setopt($ch, CURLOPT_TIMEOUT, 30); - if (is_file($this->k8sApiCaFile)) { - curl_setopt($ch, CURLOPT_CAINFO, $this->k8sApiCaFile); - } - if ($payload !== null) { - curl_setopt($ch, CURLOPT_POSTFIELDS, $payload); - } - - $raw = curl_exec($ch); - if ($raw === false) { - $err = curl_error($ch); - curl_close($ch); - return array("ok" => false, "error" => "Kubernetes API request failed: " . $err); - } - $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); - curl_close($ch); - - $json = json_decode($raw, true); - if ($code < 200 || $code >= 300) { - $msg = ""; - if (is_array($json) && isset($json["message"])) { - $msg = (string)$json["message"]; - } - if ($msg === "") { - $msg = "HTTP " . $code . " from Kubernetes API"; - } - return array("ok" => false, "error" => $msg); - } - - return array("ok" => true, "data" => array( - "stdout" => $raw, - "stderr" => "", - "job" => $raw, - "exists" => true - )); - } + // --------------------------------------------------------------- + // YAML serializer (PHP array -> YAML string) + // --------------------------------------------------------------- + /** + * Converts a nested PHP associative array into a YAML string. + * Used to serialize the Job manifest before sending it to kubectl-runner. + * This avoids requiring the Symfony YAML or ext-yaml extension. + */ private function arrayToYaml($data, $indent = 0) { $yaml = ""; @@ -331,8 +410,10 @@ private function arrayToYaml($data, $indent = 0) foreach ($data as $key => $value) { if (is_array($value)) { if ($isAssoc($value)) { + // Associative array → nested YAML object. $yaml .= $spaces . $key . ":\n" . $this->arrayToYaml($value, $indent + 1); } else { + // Sequential array → YAML list with "- " prefix. $yaml .= $spaces . $key . ":\n"; foreach ($value as $item) { if (is_array($item)) { @@ -343,12 +424,17 @@ private function arrayToYaml($data, $indent = 0) } } } else { + // Scalar value. $yaml .= $spaces . $key . ": " . $this->yamlScalar($value) . "\n"; } } return $yaml; } + /** + * Formats a single scalar value for YAML output. + * Booleans become true/false, numbers stay numeric, strings are double-quoted. + */ private function yamlScalar($value) { if (is_bool($value)) return $value ? "true" : "false"; @@ -357,36 +443,45 @@ private function yamlScalar($value) return '"' . $escaped . '"'; } + // --------------------------------------------------------------- + // Job status and lifecycle queries + // --------------------------------------------------------------- + + /** + * Retrieves the current state of a running Kubernetes Job by its name (pid). + * + * Called by OpenVRE's workspace polling loop to determine whether a tool + * execution is still in progress. Returns an array with "pid", "state", + * and "job_name" if the job is still active, or an empty array if the + * job is finished/deleted (which tells OpenVRE to finalize outputs). + * + * @param string $pid Kubernetes Job name + * @return array Job info array, or empty if job is done/not found + */ public function getRunningJobInfo($pid) { $job = array(); if (!$pid) return $job; - if ($this->launcherUrl !== "") { - $response = $this->launcherRequest("GET", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); - } else { - $response = $this->k8sRequest("GET", "/apis/batch/v1/namespaces/" . rawurlencode($this->namespace) . "/jobs/" . rawurlencode($pid), null); - if ($response["ok"] !== true && strpos((string)$response["error"], "not found") !== false) { - return array(); - } - if ($response["ok"] === true) { - $response["data"]["exists"] = true; - $response["data"]["job"] = isset($response["data"]["stdout"]) ? $response["data"]["stdout"] : ""; - } - } + + // Query job status via kubectl-runner. + $response = $this->launcherRequest("GET", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); + if ($response["ok"] !== true) { - // Treat lookup errors as "not running" to avoid infinite FINISHING state. return array(); } + $exists = isset($response["data"]["exists"]) ? (bool)$response["data"]["exists"] : false; if (!$exists) { - // Job already removed (e.g. TTL cleanup) => finalize in OpenVRE. return array(); } + + // Parse the Job JSON to determine its current condition. $jsonRaw = isset($response["data"]["job"]) ? $response["data"]["job"] : ""; $json = json_decode($jsonRaw, true); if (!is_array($json)) { return array(); } + $status = $json['status'] ?? array(); $state = "Pending"; if (!empty($status['active'])) { @@ -397,9 +492,8 @@ public function getRunningJobInfo($pid) $state = "Failed"; } - // Important: OpenVRE's workspace polling treats "empty job info" as - // "job not running anymore" (to finalize outputs). For Kubernetes, - // return an empty array once the Job is finished. + // Once the Job is finished (succeeded or failed), return empty so + // OpenVRE's workspace loop finalizes outputs and stops polling. if ($state === "Succeeded" || $state === "Failed") { return array(); } @@ -410,16 +504,23 @@ public function getRunningJobInfo($pid) return $job; } + // --------------------------------------------------------------- + // Simple accessors (ProcessSGE interface compatibility) + // --------------------------------------------------------------- + + /** Returns the full API URL used for the last submission (for debug logging). */ public function getFullCommand() { return $this->fullcommand; } + /** Returns the Kubernetes Job name (used as OpenVRE's "pid"). */ public function getPid() { return $this->pid; } + /** Returns a combined error string if submission failed, or null on success. */ public function getErr() { if ($this->stderr) { @@ -428,24 +529,22 @@ public function getErr() return null; } + /** + * Checks whether the Job associated with this instance is still active. + * Returns true if the Job exists and is running/pending, false otherwise. + * Used by OpenVRE to decide whether to keep polling. + */ public function status() { if (!$this->pid) return false; - if ($this->launcherUrl !== "") { - $response = $this->launcherRequest("GET", "/jobs/" . rawurlencode($this->pid) . "?namespace=" . rawurlencode($this->namespace)); - } else { - $response = $this->k8sRequest("GET", "/apis/batch/v1/namespaces/" . rawurlencode($this->namespace) . "/jobs/" . rawurlencode($this->pid), null); - if ($response["ok"] !== true && strpos((string)$response["error"], "not found") !== false) { - return false; - } - if ($response["ok"] === true) { - $response["data"]["exists"] = true; - $response["data"]["job"] = isset($response["data"]["stdout"]) ? $response["data"]["stdout"] : ""; - } - } + + // Query job status via kubectl-runner. + $response = $this->launcherRequest("GET", "/jobs/" . rawurlencode($this->pid) . "?namespace=" . rawurlencode($this->namespace)); + if ($response["ok"] !== true) { return false; } + $exists = isset($response["data"]["exists"]) ? (bool)$response["data"]["exists"] : false; if (!$exists) { return false; @@ -458,6 +557,7 @@ public function status() } $status = isset($json["status"]) && is_array($json["status"]) ? $json["status"] : array(); + // Job is no longer active if it has succeeded or failed. if (!empty($status["succeeded"]) || !empty($status["failed"])) { return false; } @@ -465,21 +565,26 @@ public function status() return true; } + // --------------------------------------------------------------- + // Job cancellation / deletion + // --------------------------------------------------------------- + + /** + * Deletes a Kubernetes Job (and its pod) by name via kubectl-runner. + * kubectl-runner uses "Background" propagation policy so the API call + * returns immediately and Kubernetes garbage-collects the pod asynchronously. + * + * @param string|null $pid Kubernetes Job name to delete + * @return array [bool success, string message] + */ public function stop($pid = null) { if (!$pid) { return array(false, "No job id '$pid' given"); } - if ($this->launcherUrl !== "") { - $response = $this->launcherRequest("DELETE", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); - } else { - $deletePayload = json_encode(array( - "apiVersion" => "batch/v1", - "kind" => "DeleteOptions", - "propagationPolicy" => "Background" - )); - $response = $this->k8sRequest("DELETE", "/apis/batch/v1/namespaces/" . rawurlencode($this->namespace) . "/jobs/" . rawurlencode($pid), $deletePayload, "application/json"); - } + + $response = $this->launcherRequest("DELETE", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); + if ($response["ok"] === true) { $res = isset($response["data"]["stdout"]) ? trim((string)$response["data"]["stdout"]) : ""; return array(true, $res); @@ -487,4 +592,3 @@ public function stop($pid = null) return array(false, $response["error"] ?: "Failed to delete kubernetes job"); } } - diff --git a/front_end/openVRE/public/phplib/classes/Tooljob.php b/front_end/openVRE/public/phplib/classes/Tooljob.php index 99a09fa6..4c1b6c8e 100644 --- a/front_end/openVRE/public/phplib/classes/Tooljob.php +++ b/front_end/openVRE/public/phplib/classes/Tooljob.php @@ -1833,7 +1833,6 @@ protected function enqueue($tool) $queue = $launcherInfo['queue'] ?? $tool['infrastructure']['clouds'][$this->cloudName]['queue']; $jobOptions = array(); if ($jobManager === "kubernetes_native") { - $jobOptions["mode"] = "native"; $jobOptions["image"] = $tool['infrastructure']['container_image'] ?? ""; if ($jobOptions["image"] === "") { $_SESSION['errorData']['Error'][] = "Missing infrastructure.container_image for kubernetes_native launcher."; diff --git a/sge/Dockerfile b/sge/Dockerfile index 73b7b688..7b2391b3 100644 --- a/sge/Dockerfile +++ b/sge/Dockerfile @@ -52,9 +52,19 @@ RUN useradd -m application \ && echo "application:application" | chpasswd # Make sure host and container share the same GID for group 'docker', bc it has reading permissions to the socket file -ARG DOCKER_GROUP -RUN groupmod -g $DOCKER_GROUP docker -RUN usermod -aG docker application + +#ARG DOCKER_GROUP +#RUN groupmod -g $DOCKER_GROUP docker +#RUN usermod -aG docker application + +ARG DOCKER_GROUP=1002 +RUN set -eux; \ + if getent group docker >/dev/null 2>&1; then \ + groupmod -g "${DOCKER_GROUP}" docker; \ + else \ + groupadd -g "${DOCKER_GROUP}" docker; \ + fi; \ + usermod -aG docker application # Add setup script and set permissions ADD setup_gridengine.sh /usr/local/bin/setup_gridengine.sh From 46bc86070e046d54ac27a8d32969a4ae3cd404cd Mon Sep 17 00:00:00 2001 From: Yasir Date: Tue, 12 May 2026 13:12:55 +0000 Subject: [PATCH 03/11] replaced kubectl-runner with scheduler - naming --- .../public/phplib/classes/ProcessK8s.php | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/front_end/openVRE/public/phplib/classes/ProcessK8s.php b/front_end/openVRE/public/phplib/classes/ProcessK8s.php index 021f1433..58b5b982 100755 --- a/front_end/openVRE/public/phplib/classes/ProcessK8s.php +++ b/front_end/openVRE/public/phplib/classes/ProcessK8s.php @@ -6,19 +6,19 @@ * This class is responsible for the full lifecycle of Kubernetes Jobs that run * OpenVRE tools: creation, status polling, and deletion/cancellation. * - * All communication with Kubernetes goes through the kubectl-runner proxy — a - * dedicated HTTP microservice deployed as a sidecar or separate pod that holds - * the Kubernetes service-account credentials. The frontend pod itself does NOT - * need a mounted service-account token or direct K8s API access, which improves - * security by limiting the blast radius of a frontend compromise. + * All communication with Kubernetes goes through the scheduler — a dedicated + * HTTP microservice deployed as a separate pod that holds the Kubernetes + * service-account credentials. The frontend pod itself does NOT need a mounted + * service-account token or direct K8s API access, which improves security by + * limiting the blast radius of a frontend compromise. * * Environment variables consumed (all optional, with defaults): * OPENVRE_K8S_NAMESPACE — target namespace for Jobs (default: "bsctre-v2") * OPENVRE_K8S_JOB_IMAGE — fallback container image for the Job pod * OPENVRE_K8S_SHARED_PVC — PVC name for /shared_data volume * OPENVRE_K8S_TOOLS_PVC — PVC name for /var/www/html/openVRE/public/tools volume - * OPENVRE_K8S_LAUNCHER_URL — base URL of kubectl-runner (REQUIRED) - * OPENVRE_K8S_LAUNCHER_TOKEN — Bearer token for kubectl-runner authentication + * OPENVRE_K8S_SCHEDULER_URL — base URL of the scheduler service (REQUIRED) + * OPENVRE_K8S_SCHEDULER_TOKEN — Bearer token for scheduler authentication * OPENVRE_K8S_RUN_AS_UID — UID the Job pod runs as (default: 1000) * OPENVRE_K8S_RUN_AS_GID — GID / fsGroup for the Job pod (default: 1000) * OPENVRE_K8S_JOB_TTL — seconds to keep a finished Job before auto-deletion (default: 120) @@ -63,11 +63,11 @@ class ProcessK8s /** @var string PVC name mounted at the tools directory inside the Job pod */ private $toolsPvc = "dashboard-frontend-tools"; - /** @var string Base URL of the kubectl-runner HTTP proxy (REQUIRED) */ - private $launcherUrl = ""; + /** @var string Base URL of the scheduler HTTP service (REQUIRED) */ + private $schedulerUrl = ""; - /** @var string Bearer token sent to kubectl-runner for authentication */ - private $launcherToken = ""; + /** @var string Bearer token sent to the scheduler for authentication */ + private $schedulerToken = ""; /** @var int UID the Job pod container runs as */ private $runAsUid = 1000; @@ -113,8 +113,8 @@ public function __construct($cl = false, $workDir = "", $queue = "", $jobname = $this->jobImage = getenv("OPENVRE_K8S_JOB_IMAGE") ?: ""; $this->sharedPvc = getenv("OPENVRE_K8S_SHARED_PVC") ?: "dashboard-frontend-sgecore-shareddata"; $this->toolsPvc = getenv("OPENVRE_K8S_TOOLS_PVC") ?: "dashboard-frontend-tools"; - $this->launcherUrl = rtrim(getenv("OPENVRE_K8S_LAUNCHER_URL") ?: "", "/"); - $this->launcherToken = getenv("OPENVRE_K8S_LAUNCHER_TOKEN") ?: ""; + $this->schedulerUrl = rtrim(getenv("OPENVRE_K8S_SCHEDULER_URL") ?: "", "/"); + $this->schedulerToken = getenv("OPENVRE_K8S_SCHEDULER_TOKEN") ?: ""; $this->runAsUid = (int)(getenv("OPENVRE_K8S_RUN_AS_UID") ?: 1000); $this->runAsGid = (int)(getenv("OPENVRE_K8S_RUN_AS_GID") ?: 1000); @@ -169,7 +169,7 @@ private function sanitizeName($name) // --------------------------------------------------------------- /** - * Builds a Kubernetes Job manifest and submits it to kubectl-runner. + * Builds a Kubernetes Job manifest and submits it to the scheduler. * * @param int $cpu Number of CPU cores (request and limit) * @param int $mem Memory in GB (0 = default 4Gi) @@ -183,9 +183,9 @@ private function runCom($cpu, $mem) return; } - // Abort if kubectl-runner URL is not configured — we cannot submit jobs. - if ($this->launcherUrl === "") { - $this->stderr = "OPENVRE_K8S_LAUNCHER_URL is not set"; + // Abort if scheduler URL is not configured — we cannot submit jobs. + if ($this->schedulerUrl === "") { + $this->stderr = "OPENVRE_K8S_SCHEDULER_URL is not set"; $_SESSION['errorData']['Error'][] = $this->stderr; return; } @@ -201,7 +201,7 @@ private function runCom($cpu, $mem) $memLimit = ((int)$mem > 0 ? ((int)$mem . "Gi") : "4Gi"); // Build the full Kubernetes Job manifest as a PHP associative array. - // This will be serialized to YAML before submission to kubectl-runner. + // This will be serialized to YAML before submission to the scheduler. $manifest = array( "apiVersion" => "batch/v1", "kind" => "Job", @@ -298,13 +298,13 @@ private function runCom($cpu, $mem) } } - // Serialize the manifest array to YAML for submission to kubectl-runner. + // Serialize the manifest array to YAML for submission to the scheduler. $yaml = $this->arrayToYaml($manifest); - // Submit Job via kubectl-runner proxy. - $this->fullcommand = "POST " . $this->launcherUrl . "/jobs"; - logger("K8s job submission via kubectl-runner '" . $this->fullcommand . "'"); - $response = $this->launcherRequest("POST", "/jobs", array( + // Submit Job via the scheduler service. + $this->fullcommand = "POST " . $this->schedulerUrl . "/jobs"; + logger("K8s job submission via scheduler '" . $this->fullcommand . "'"); + $response = $this->schedulerRequest("POST", "/jobs", array( "namespace" => $this->namespace, "manifest" => $yaml )); @@ -325,11 +325,11 @@ private function runCom($cpu, $mem) } // --------------------------------------------------------------- - // HTTP client: kubectl-runner proxy + // HTTP client: scheduler service // --------------------------------------------------------------- /** - * Sends an HTTP request to the kubectl-runner proxy service. + * Sends an HTTP request to the scheduler service. * This is the sole communication channel with Kubernetes. * * @param string $method HTTP method (GET, POST, DELETE) @@ -337,22 +337,22 @@ private function runCom($cpu, $mem) * @param array|null $payload JSON-serializable body (for POST) * @return array ["ok" => bool, "error" => string, "data" => array] */ - private function launcherRequest($method, $path, $payload = null) + private function schedulerRequest($method, $path, $payload = null) { - if ($this->launcherUrl === "") { - return array("ok" => false, "error" => "OPENVRE_K8S_LAUNCHER_URL is not set"); + if ($this->schedulerUrl === "") { + return array("ok" => false, "error" => "OPENVRE_K8S_SCHEDULER_URL is not set"); } if (!function_exists("curl_init")) { return array("ok" => false, "error" => "PHP curl extension is required"); } - $url = $this->launcherUrl . $path; + $url = $this->schedulerUrl . $path; $ch = curl_init($url); $headers = array("Content-Type: application/json"); - // Authenticate to kubectl-runner with a Bearer token. - if ($this->launcherToken !== "") { - $headers[] = "Authorization: Bearer " . $this->launcherToken; + // Authenticate to the scheduler with a Bearer token. + if ($this->schedulerToken !== "") { + $headers[] = "Authorization: Bearer " . $this->schedulerToken; } curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); @@ -367,24 +367,24 @@ private function launcherRequest($method, $path, $payload = null) if ($raw === false) { $err = curl_error($ch); curl_close($ch); - return array("ok" => false, "error" => "kubectl-runner request failed: " . $err); + return array("ok" => false, "error" => "Scheduler request failed: " . $err); } $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch); $json = json_decode($raw, true); - // Non-2xx status from kubectl-runner indicates an error. + // Non-2xx status from the scheduler indicates an error. if ($code < 200 || $code >= 300) { - $msg = is_array($json) && isset($json["error"]) ? $json["error"] : ("HTTP " . $code . " from kubectl-runner"); + $msg = is_array($json) && isset($json["error"]) ? $json["error"] : ("HTTP " . $code . " from scheduler"); return array("ok" => false, "error" => $msg); } if (!is_array($json)) { - return array("ok" => false, "error" => "Invalid JSON response from kubectl-runner"); + return array("ok" => false, "error" => "Invalid JSON response from scheduler"); } - // kubectl-runner returns {"ok": false, "error": "..."} on application-level errors. + // The scheduler returns {"ok": false, "error": "..."} on application-level errors. if (isset($json["ok"]) && !$json["ok"]) { - return array("ok" => false, "error" => isset($json["error"]) ? $json["error"] : "kubectl-runner error"); + return array("ok" => false, "error" => isset($json["error"]) ? $json["error"] : "Scheduler error"); } return array("ok" => true, "data" => $json); } @@ -395,7 +395,7 @@ private function launcherRequest($method, $path, $payload = null) /** * Converts a nested PHP associative array into a YAML string. - * Used to serialize the Job manifest before sending it to kubectl-runner. + * Used to serialize the Job manifest before sending it to the scheduler. * This avoids requiring the Symfony YAML or ext-yaml extension. */ private function arrayToYaml($data, $indent = 0) @@ -463,8 +463,8 @@ public function getRunningJobInfo($pid) $job = array(); if (!$pid) return $job; - // Query job status via kubectl-runner. - $response = $this->launcherRequest("GET", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); + // Query job status via the scheduler. + $response = $this->schedulerRequest("GET", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); if ($response["ok"] !== true) { return array(); @@ -538,8 +538,8 @@ public function status() { if (!$this->pid) return false; - // Query job status via kubectl-runner. - $response = $this->launcherRequest("GET", "/jobs/" . rawurlencode($this->pid) . "?namespace=" . rawurlencode($this->namespace)); + // Query job status via the scheduler. + $response = $this->schedulerRequest("GET", "/jobs/" . rawurlencode($this->pid) . "?namespace=" . rawurlencode($this->namespace)); if ($response["ok"] !== true) { return false; @@ -570,8 +570,8 @@ public function status() // --------------------------------------------------------------- /** - * Deletes a Kubernetes Job (and its pod) by name via kubectl-runner. - * kubectl-runner uses "Background" propagation policy so the API call + * Deletes a Kubernetes Job (and its pod) by name via the scheduler. + * The scheduler uses "Background" propagation policy so the API call * returns immediately and Kubernetes garbage-collects the pod asynchronously. * * @param string|null $pid Kubernetes Job name to delete @@ -583,7 +583,7 @@ public function stop($pid = null) return array(false, "No job id '$pid' given"); } - $response = $this->launcherRequest("DELETE", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); + $response = $this->schedulerRequest("DELETE", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); if ($response["ok"] === true) { $res = isset($response["data"]["stdout"]) ? trim((string)$response["data"]["stdout"]) : ""; From b48dafb450af15be6731b3e4aeaf67182de2cec2 Mon Sep 17 00:00:00 2001 From: Yasir Date: Wed, 13 May 2026 09:49:47 +0000 Subject: [PATCH 04/11] fixing projects.inc, processJob --- .../openVRE/public/phplib/classes/Tooljob.php | 6 ++ .../openVRE/public/phplib/processJob.inc.php | 78 +++++++++++++++---- .../openVRE/public/phplib/projects.inc.php | 11 ++- 3 files changed, 78 insertions(+), 17 deletions(-) diff --git a/front_end/openVRE/public/phplib/classes/Tooljob.php b/front_end/openVRE/public/phplib/classes/Tooljob.php index 4c1b6c8e..5067934d 100644 --- a/front_end/openVRE/public/phplib/classes/Tooljob.php +++ b/front_end/openVRE/public/phplib/classes/Tooljob.php @@ -98,6 +98,12 @@ public function __construct($tool, $execution = "", $project = "", $descrip = "" switch ($this->launcher) { case "SGE": case "docker_SGE": + $this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id']; + $this->root_dir_mug = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual']; + $this->pub_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['pubDir_virtual']; + $this->pub_dir_volumes = $GLOBALS['clouds'][$this->cloudName]['pubDir_host']; + $this->root_dir_volumes = $GLOBALS['clouds'][$this->cloudName]['dataDir_host'] . "/" . $_SESSION['User']['id']; + $this->pub_dir_intern = rtrim($this->pub_dir_virtual, "/") . "_tmp"; case "kubernetes_native": $this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id']; $this->root_dir_mug = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual']; diff --git a/front_end/openVRE/public/phplib/processJob.inc.php b/front_end/openVRE/public/phplib/processJob.inc.php index 229abdd7..7d7d06ec 100644 --- a/front_end/openVRE/public/phplib/processJob.inc.php +++ b/front_end/openVRE/public/phplib/processJob.inc.php @@ -5,9 +5,10 @@ # -function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $launcherType = "SGE", $toolId = "", $jobOptions = array()) +function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobManager = "docker_SGE", $toolId = "", $jobOptions = array()) { - logger("Start job submission via " . $launcherType); + logger("Start job submission via $jobManager"); + error_log("DEBUG- execJob: Start job submission via $jobManager"); if (!isset($_SESSION['User']['id'])) { $_SESSION['errorData']['Error'][] = "User ID not found in session."; @@ -28,31 +29,55 @@ function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job // Validate queue $queue = $queue ?: ($GLOBALS['queueTask'] ?? null); - if (!$queue) { + if (!$queue && strtoupper($jobManager) === "SGE") { $_SESSION['errorData']['Error'][] = "Queue not provided."; return [0, "Queue not provided."]; } - + $queue = (isset($queue) ? $queue : $GLOBALS['queueTask']); - $jobname = $_SESSION['User']['id'] . "#" . ($toolId ?: basename($shFile)); - - if ($launcherType === "kubernetes_native") { - require_once __DIR__ . "/classes/ProcessK8s.php"; - $process = new ProcessK8s($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile, $jobOptions); - } else { - $process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile); + $jobname = $_SESSION['User']['id'] . "#" . basename($shFile); + + switch ($jobManager) { + case "docker_SGE": + error_log("DEBUG: Submitting job via docker_SGE. Parameters: shFile=$shFile, workDir=$workDir, queue=$queue, jobname=$jobname, cpus=$cpus, mem=$mem, logFile=$logFile, errFile=$errFile"); + $process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile); + break; + case "Slurm_Singularity": + $remote_system = $_REQUEST['sites']['site_list'][0]; + error_log("DEBUG: Submitting job via Slurm to $remote_system. Parameters: shFile=$shFile, workDir=$workDir, logFile=$logFile, errFile=$errFile"); + $process = new ProcessSlurm($shFile, $workDir, $logFile, $errFile, $remote_system); + break; + case "kubernetes_native": + $schedUrl = getenv("OPENVRE_K8S_SCHEDULER_URL") ?: ""; + $schedHost = $schedUrl !== "" + ? (string)(parse_url($schedUrl, PHP_URL_HOST) ?: "(parse_failed)") + : "(not_set)"; + $k8sNs = getenv("OPENVRE_K8S_NAMESPACE") ?: "(env_unset)"; + $jobOptKeys = is_array($jobOptions) && count($jobOptions) + ? implode(",", array_keys($jobOptions)) + : "(none)"; + error_log( + "DEBUG: Submitting job via kubernetes_native. Parameters: shFile=$shFile, workDir=$workDir, queue=$queue, " + . "jobname=$jobname, cpus=$cpus, mem=$mem, logFile=$logFile, errFile=$errFile, " + . "namespace=$k8sNs, scheduler_host=$schedHost, jobOptions_keys=$jobOptKeys" + ); + require_once __DIR__ . "/classes/ProcessK8s.php"; + $process = new ProcessK8s($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile, $jobOptions); + break; + default: + $process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile); + break; } - $pid = $process->getPid(); - if (!$process->status()) { - $_SESSION['errorData']['Error'][] = "Job submission failed.
" . $process->getFullCommand . "
" . $process->getErr(); - $errMesg = "ERROR: Job submission failed. FullCommand: '" . $process->getFullCommand . "'. ErrorSGE: '" . $process->getErr() . "'"; + $_SESSION['errorData']['Error'][] = "Job submission failed.
" . $process->getFullCommand() . "
" . $process->getErr(); + $errMesg = "ERROR: Job submission failed. FullCommand: '" . $process->getFullCommand() . "'. ErrorDetail: '" . $process->getErr() . "'"; logger($errMesg); return array(0, $errMesg); } + $pid = $process->getPid(); error_log("Process started successfully: PID = $pid"); logger("The process is currently running PID = $pid"); return array($pid, ""); @@ -124,6 +149,8 @@ function getRunningJobInfo($pid, $launcherType = NULL, $cloudName = "local") if (! $pid) return $job; + logger("getRunningJobInfo: start processing $pid"); + // guess launcher if (!$launcherType) { if (is_numeric($pid)) { @@ -134,6 +161,7 @@ function getRunningJobInfo($pid, $launcherType = NULL, $cloudName = "local") $launcherType = "PMES"; } } + logger("getRunningJobInfo: launcherType = $launcherType"); // create new jobProcess if ($launcherType == "SGE" || $launcherType == "docker_SGE") { @@ -143,13 +171,25 @@ function getRunningJobInfo($pid, $launcherType = NULL, $cloudName = "local") require_once __DIR__ . "/classes/ProcessK8s.php"; $process = new ProcessK8s(); $job = $process->getRunningJobInfo($pid); + $k8sSummary = empty($job) + ? "empty (job finished or unknown to scheduler)" + : ("keys=" . implode(",", array_keys($job)) + . (isset($job['state']) ? " state=" . $job['state'] : "")); + error_log("DEBUG: getRunningJobInfo kubernetes_native pid=$pid $k8sSummary"); + logger("getRunningJobInfo (kubernetes_native): pid=$pid $k8sSummary"); } elseif ($launcherType == "PMES") { $process = new ProcessPMES($cloudName); $job = $process->getRunningJobInfo($pid); + } elseif ($launcherType == "Slurm_Singularity") { + $process = new ProcessSlurm(); + $job = $process->getRunningJobInfo($pid); + logger("getRunningJobInfo: $job"); } else { - $_SESSION['errorData']['Error'][] = "Cannot monitor job '$pid' of type '$launcher'. Launcher not implemented."; + logger("getRunningJobInfo: error due to unknown launcher type '$launcherType'"); + $_SESSION['errorData']['Error'][] = "Cannot monitor job '$pid' of type '$launcherType'. Launcher not implemented."; return $job; } + logger("getRunningJobInfo: end processing $pid"); // return job info return $job; } @@ -307,9 +347,15 @@ function delJob($pid, $launcherType = NULL, $cloudName = "local", $login = NULL) // Add any other file redirection logic here } } elseif ($launcherType == "kubernetes_native") { + error_log("DEBUG: delJob kubernetes_native pid=$pid calling ProcessK8s::stop"); require_once __DIR__ . "/classes/ProcessK8s.php"; $processK8s = new ProcessK8s(); list($r_sge, $msg_sge) = $processK8s->stop($pid); + error_log( + "DEBUG: delJob kubernetes_native pid=$pid stop_ok=" . ($r_sge ? "1" : "0") + . " msg=" . $msg_sge + ); + logger("delJob (kubernetes_native): pid=$pid stop_ok=" . ($r_sge ? "1" : "0") . " msg=" . $msg_sge); } elseif ($launcherType == "PMES") { $process = new ProcessPMES(); $r = $process->stop($pid); diff --git a/front_end/openVRE/public/phplib/projects.inc.php b/front_end/openVRE/public/phplib/projects.inc.php index ca951a32..7dc8834a 100644 --- a/front_end/openVRE/public/phplib/projects.inc.php +++ b/front_end/openVRE/public/phplib/projects.inc.php @@ -2347,7 +2347,12 @@ function resolvePath_toLocalAbsolutePath($path, $job) $rfn = str_replace($job['root_dir_virtual'], $GLOBALS['dataDir'] . $_SESSION['User']['id'], $path); //SGE finds mounted dataDir as root_dir_virtual - } elseif ($job['launcher'] == "SGE" || $job['launcher'] == "ega_demo" || $job['launcher'] == "docker_SGE") { + } elseif ( + $job['launcher'] == "SGE" || + $job['launcher'] == "ega_demo" || + $job['launcher'] == "docker_SGE" || + $job['launcher'] == "kubernetes_native" + ) { $rfn = str_replace($job['root_dir_mug'], $GLOBALS['dataDir'], $path); } // direct from file_path @@ -2380,6 +2385,10 @@ function resolvePath_toLocalAbsolutePath($path, $job) } } //clean slashes + if ($rfn === "") { + // Keep original absolute path instead of returning empty when launcher mapping is missing. + $rfn = $path; + } $rfn = preg_replace('#/+#', '/', $rfn); //return absolute path From 2b8a6aec0297d8f9161f7c5ecff58982c4695c2a Mon Sep 17 00:00:00 2001 From: Yasir Date: Wed, 13 May 2026 09:57:39 +0000 Subject: [PATCH 05/11] Tooljob fix only line --- front_end/openVRE/public/phplib/classes/Tooljob.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/front_end/openVRE/public/phplib/classes/Tooljob.php b/front_end/openVRE/public/phplib/classes/Tooljob.php index 5067934d..6d380e85 100644 --- a/front_end/openVRE/public/phplib/classes/Tooljob.php +++ b/front_end/openVRE/public/phplib/classes/Tooljob.php @@ -98,7 +98,7 @@ public function __construct($tool, $execution = "", $project = "", $descrip = "" switch ($this->launcher) { case "SGE": case "docker_SGE": - $this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id']; + $this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id']; $this->root_dir_mug = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual']; $this->pub_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['pubDir_virtual']; $this->pub_dir_volumes = $GLOBALS['clouds'][$this->cloudName]['pubDir_host']; From 556871143a8af7e50f00fc8f2a7a0510c6ea1e25 Mon Sep 17 00:00:00 2001 From: Yasir Date: Mon, 18 May 2026 09:42:56 +0000 Subject: [PATCH 06/11] reverting dockerfile to original --- front_end/Dockerfile | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/front_end/Dockerfile b/front_end/Dockerfile index fb6af438..ee90ec5c 100644 --- a/front_end/Dockerfile +++ b/front_end/Dockerfile @@ -4,7 +4,7 @@ FROM webdevops/php-apache:8.3 ARG DEBIAN_FRONTEND=noninteractive RUN apt-get update -RUN a2enmod proxy proxy_http ssl +RUN a2enmod proxy proxy_http RUN apt install -y nodejs RUN apt-get install -y autoconf pkg-config libssl-dev RUN docker-php-ext-install bcmath @@ -25,7 +25,7 @@ RUN apt-get update && apt-get install -y libmcrypt-dev \ && docker-php-ext-install gd RUN apt-get update && apt-get install gnupg curl -RUN curl -fsSL https://pgp.mongodb.com/server-7.0.asc |gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg --dearmor +RUN curl -fsSL https://pgp.mongodb.com/server-7.0.asc | gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg --dearmor RUN echo "deb [ signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] http://repo.mongodb.org/apt/debian bookworm/mongodb-org/7.0 main" | tee /etc/apt/sources.list.d/mongodb-org-7.0.list RUN apt-get update RUN apt-get install -y mongodb-org @@ -39,9 +39,7 @@ WORKDIR /var/www/html ADD openVRE /var/www/html/openVRE RUN cd openVRE WORKDIR /var/www/html/openVRE -RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer -RUN composer self-update -RUN composer update --ignore-platform-req=ext-mongodb --ignore-platform-req=ext-mongodb +# Dependencies are pre-seeded in ./openVRE/vendor for offline/reproducible builds. RUN mkdir logs RUN touch logs/application.log RUN chmod -R 777 logs/application.log @@ -51,6 +49,6 @@ USER application WORKDIR /var/www/html/openVRE/public RUN pwd -EXPOSE 443 +EXPOSE 88 443 CMD setup.sh && /opt/docker/bin/entrypoint.sh supervisord From 01308816394a6710ae601ffd5b0cd4624e88e59c Mon Sep 17 00:00:00 2001 From: Yasir Date: Mon, 18 May 2026 09:45:21 +0000 Subject: [PATCH 07/11] keeping same structure for elseif in projects.inc --- front_end/openVRE/public/phplib/projects.inc.php | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/front_end/openVRE/public/phplib/projects.inc.php b/front_end/openVRE/public/phplib/projects.inc.php index 7dc8834a..73741ab4 100644 --- a/front_end/openVRE/public/phplib/projects.inc.php +++ b/front_end/openVRE/public/phplib/projects.inc.php @@ -2347,12 +2347,7 @@ function resolvePath_toLocalAbsolutePath($path, $job) $rfn = str_replace($job['root_dir_virtual'], $GLOBALS['dataDir'] . $_SESSION['User']['id'], $path); //SGE finds mounted dataDir as root_dir_virtual - } elseif ( - $job['launcher'] == "SGE" || - $job['launcher'] == "ega_demo" || - $job['launcher'] == "docker_SGE" || - $job['launcher'] == "kubernetes_native" - ) { + } elseif ($job['launcher'] == "SGE" || $job['launcher'] == "ega_demo" || $job['launcher'] == "docker_SGE" || $job['launcher'] == "kubernetes_native") { $rfn = str_replace($job['root_dir_mug'], $GLOBALS['dataDir'], $path); } // direct from file_path From 8e9d86cd1fec19923ae1c6c8044289a397011268 Mon Sep 17 00:00:00 2001 From: Yasir Date: Mon, 18 May 2026 09:47:13 +0000 Subject: [PATCH 08/11] keeping switch aligned with original tooljob --- front_end/openVRE/public/phplib/classes/Tooljob.php | 6 ------ 1 file changed, 6 deletions(-) diff --git a/front_end/openVRE/public/phplib/classes/Tooljob.php b/front_end/openVRE/public/phplib/classes/Tooljob.php index 6d380e85..4c1b6c8e 100644 --- a/front_end/openVRE/public/phplib/classes/Tooljob.php +++ b/front_end/openVRE/public/phplib/classes/Tooljob.php @@ -98,12 +98,6 @@ public function __construct($tool, $execution = "", $project = "", $descrip = "" switch ($this->launcher) { case "SGE": case "docker_SGE": - $this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id']; - $this->root_dir_mug = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual']; - $this->pub_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['pubDir_virtual']; - $this->pub_dir_volumes = $GLOBALS['clouds'][$this->cloudName]['pubDir_host']; - $this->root_dir_volumes = $GLOBALS['clouds'][$this->cloudName]['dataDir_host'] . "/" . $_SESSION['User']['id']; - $this->pub_dir_intern = rtrim($this->pub_dir_virtual, "/") . "_tmp"; case "kubernetes_native": $this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id']; $this->root_dir_mug = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual']; From 9e172e3e8e2ae8b7361574e2ce29afe4b0162ae0 Mon Sep 17 00:00:00 2001 From: Yasir Date: Mon, 18 May 2026 19:45:35 +0000 Subject: [PATCH 09/11] scheduler for kubernetes job creation --- scheduler/.dockerignore | 12 ++++ scheduler/Dockerfile | 11 ++++ scheduler/app.py | 133 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 156 insertions(+) create mode 100644 scheduler/.dockerignore create mode 100644 scheduler/Dockerfile create mode 100644 scheduler/app.py diff --git a/scheduler/.dockerignore b/scheduler/.dockerignore new file mode 100644 index 00000000..b765ce2f --- /dev/null +++ b/scheduler/.dockerignore @@ -0,0 +1,12 @@ +.git +.gitignore +__pycache__/ +*.pyc +*.pyo +*.pyd +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +dist/ +build/ +*.egg-info/ diff --git a/scheduler/Dockerfile b/scheduler/Dockerfile new file mode 100644 index 00000000..0e7e69a9 --- /dev/null +++ b/scheduler/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.11-alpine + +WORKDIR /app + +COPY app.py /app/app.py + +EXPOSE 8080 + +USER 65534:65534 + +CMD ["python", "/app/app.py"] diff --git a/scheduler/app.py b/scheduler/app.py new file mode 100644 index 00000000..5288be69 --- /dev/null +++ b/scheduler/app.py @@ -0,0 +1,133 @@ +import json +import os +import ssl +import urllib.error +import urllib.parse +import urllib.request +from http.server import BaseHTTPRequestHandler, HTTPServer + + +TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" +CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" +API_HOST = os.environ.get("KUBERNETES_SERVICE_HOST", "kubernetes.default.svc") +API_PORT = os.environ.get("KUBERNETES_SERVICE_PORT", "443") +DEFAULT_NAMESPACE = os.environ.get("DEFAULT_NAMESPACE", "default") +SCHEDULER_AUTH_TOKEN = os.environ.get("SCHEDULER_AUTH_TOKEN", "") + +with open(TOKEN_PATH, "r", encoding="utf-8") as f: + SA_TOKEN = f.read().strip() + +SSL_CTX = ssl.create_default_context(cafile=CA_PATH) + + +def k8s_request(method, path, body=None, content_type="application/json"): + url = f"https://{API_HOST}:{API_PORT}{path}" + data = body.encode("utf-8") if body is not None else None + req = urllib.request.Request(url, method=method, data=data) + req.add_header("Authorization", f"Bearer {SA_TOKEN}") + if body is not None: + req.add_header("Content-Type", content_type) + try: + with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as resp: + raw = resp.read().decode("utf-8") + return resp.getcode(), raw + except urllib.error.HTTPError as e: + raw = e.read().decode("utf-8") if e.fp else str(e) + return e.code, raw + + +class Handler(BaseHTTPRequestHandler): + def _authorized(self): + if SCHEDULER_AUTH_TOKEN == "": + return False + auth = self.headers.get("Authorization", "") + return auth == f"Bearer {SCHEDULER_AUTH_TOKEN}" + + def _json(self, code, payload): + out = json.dumps(payload).encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(out))) + self.end_headers() + self.wfile.write(out) + + def _read_json(self): + length = int(self.headers.get("Content-Length", "0")) + if length == 0: + return {} + raw = self.rfile.read(length).decode("utf-8") + return json.loads(raw) + + def do_GET(self): + if self.path == "/healthz": + return self._json(200, {"ok": True}) + if not self._authorized(): + return self._json(401, {"ok": False, "error": "unauthorized"}) + if self.path.startswith("/jobs/"): + name_q = self.path[len("/jobs/"):] + name, _, query = name_q.partition("?") + params = urllib.parse.parse_qs(query) + ns = params.get("namespace", [DEFAULT_NAMESPACE])[0] + code, raw = k8s_request("GET", f"/apis/batch/v1/namespaces/{ns}/jobs/{name}") + if code == 404: + return self._json(200, {"ok": True, "exists": False, "job": ""}) + if code >= 300: + return self._json(500, {"ok": False, "error": raw}) + return self._json(200, {"ok": True, "exists": True, "job": raw}) + return self._json(404, {"ok": False, "error": "not found"}) + + def do_POST(self): + if not self._authorized(): + return self._json(401, {"ok": False, "error": "unauthorized"}) + if self.path != "/jobs": + return self._json(404, {"ok": False, "error": "not found"}) + try: + body = self._read_json() + ns = body.get("namespace") or DEFAULT_NAMESPACE + manifest = body.get("manifest") + if not manifest: + return self._json(400, {"ok": False, "error": "manifest is required"}) + code, raw = k8s_request( + "POST", + f"/apis/batch/v1/namespaces/{ns}/jobs", + body=manifest, + content_type="application/yaml", + ) + if code >= 300: + return self._json(500, {"ok": False, "error": raw}) + return self._json(200, {"ok": True, "stdout": raw, "stderr": ""}) + except Exception as e: + return self._json(500, {"ok": False, "error": str(e)}) + + def do_DELETE(self): + if not self._authorized(): + return self._json(401, {"ok": False, "error": "unauthorized"}) + if not self.path.startswith("/jobs/"): + return self._json(404, {"ok": False, "error": "not found"}) + name_q = self.path[len("/jobs/"):] + name, _, query = name_q.partition("?") + params = urllib.parse.parse_qs(query) + ns = params.get("namespace", [DEFAULT_NAMESPACE])[0] + delete_opts = '{"apiVersion":"batch/v1","kind":"DeleteOptions","propagationPolicy":"Background"}' + code, raw = k8s_request( + "DELETE", + f"/apis/batch/v1/namespaces/{ns}/jobs/{name}", + body=delete_opts, + content_type="application/json", + ) + if code == 404: + return self._json( + 200, + {"ok": True, "stdout": f"job.batch/{name} already deleted", "stderr": ""}, + ) + if code >= 300: + return self._json(500, {"ok": False, "error": raw}) + return self._json(200, {"ok": True, "stdout": raw, "stderr": ""}) + + def log_message(self, fmt, *args): + return + + +if __name__ == "__main__": + server = HTTPServer(("0.0.0.0", 8080), Handler) + server.serve_forever() From 98c1f7d2351db06e1c89f56dd10b78dcc699f594 Mon Sep 17 00:00:00 2001 From: Yasir Date: Mon, 18 May 2026 19:48:06 +0000 Subject: [PATCH 10/11] scheduler for kubernetes job creation --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 7653983d..e765ed48 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,7 @@ volumes/ *.key *.pem .jwks + +# Python +__pycache__/ +*.pyc From 385c2ce020af8136efc14e07878df9d4642c92d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20D=C3=ADaz=20Roussel?= Date: Fri, 29 May 2026 14:29:46 +0200 Subject: [PATCH 11/11] refactor: minor updates on logger and other details --- .../public/phplib/classes/ProcessK8s.php | 19 ++++++++++++++----- .../openVRE/public/phplib/classes/Tooljob.php | 1 - .../openVRE/public/phplib/processJob.inc.php | 1 + 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/front_end/openVRE/public/phplib/classes/ProcessK8s.php b/front_end/openVRE/public/phplib/classes/ProcessK8s.php index 58b5b982..df2ef212 100755 --- a/front_end/openVRE/public/phplib/classes/ProcessK8s.php +++ b/front_end/openVRE/public/phplib/classes/ProcessK8s.php @@ -1,5 +1,10 @@ "FINISHING", ); + private Logger $logger; + // --------------------------------------------------------------- // Constructor // --------------------------------------------------------------- @@ -108,6 +115,8 @@ class ProcessK8s */ public function __construct($cl = false, $workDir = "", $queue = "", $jobname = "", $cpu = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobOptions = array()) { + $this->logger = LoggerFactory::getLogger("Process K8s interface"); + // Read all configuration from environment variables (set via ConfigMap/Secret). $this->namespace = getenv("OPENVRE_K8S_NAMESPACE") ?: "bsctre-v2"; $this->jobImage = getenv("OPENVRE_K8S_JOB_IMAGE") ?: ""; @@ -303,7 +312,7 @@ private function runCom($cpu, $mem) // Submit Job via the scheduler service. $this->fullcommand = "POST " . $this->schedulerUrl . "/jobs"; - logger("K8s job submission via scheduler '" . $this->fullcommand . "'"); + $this->logger->debug("K8s job submission via scheduler '" . $this->fullcommand . "'"); $response = $this->schedulerRequest("POST", "/jobs", array( "namespace" => $this->namespace, "manifest" => $yaml @@ -313,14 +322,14 @@ private function runCom($cpu, $mem) if ($response["ok"] !== true) { $this->stderr = $response["error"]; $this->stdout = ""; - $msg = "K8s job submission failed: " . trim($this->stderr); - logger($msg); - $_SESSION['errorData']['Error'][] = $msg; $this->pid = ""; + $msg = "K8s job submission failed: " . trim($this->stderr); + $this->logger->error($msg); + throw new UnexpectedValueException($msg); } else { $this->stdout = isset($response["data"]["stdout"]) ? (string)$response["data"]["stdout"] : ""; $this->stderr = isset($response["data"]["stderr"]) ? (string)$response["data"]["stderr"] : ""; - logger("K8s job submitted: " . $this->pid . ". Output: " . trim($this->stdout)); + $this->logger->debug("K8s job submitted: " . $this->pid . ". Output: " . trim($this->stdout)); } } diff --git a/front_end/openVRE/public/phplib/classes/Tooljob.php b/front_end/openVRE/public/phplib/classes/Tooljob.php index abac1f5b..ef288d59 100644 --- a/front_end/openVRE/public/phplib/classes/Tooljob.php +++ b/front_end/openVRE/public/phplib/classes/Tooljob.php @@ -1445,7 +1445,6 @@ public function submit($tool) case "ega_demo": case "docker_SGE": case "kubernetes_native": - return $this->enqueue($tool); case "Slurm_Singularity": return $this->enqueue($tool); default: diff --git a/front_end/openVRE/public/phplib/processJob.inc.php b/front_end/openVRE/public/phplib/processJob.inc.php index 2cb926c6..0d9be4de 100644 --- a/front_end/openVRE/public/phplib/processJob.inc.php +++ b/front_end/openVRE/public/phplib/processJob.inc.php @@ -2,6 +2,7 @@ use OpenVRE\LoggerFactory; use OpenVRE\NotFoundException; +use OpenVRE\ProcessK8s; use OpenVRE\ProcessSGE; use OpenVRE\ProcessSlurm;