diff --git a/.gitignore b/.gitignore index 7653983d..e765ed48 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,7 @@ volumes/ *.key *.pem .jwks + +# Python +__pycache__/ +*.pyc diff --git a/front_end/Dockerfile b/front_end/Dockerfile index fe9aa567..cb18bd44 100644 --- a/front_end/Dockerfile +++ b/front_end/Dockerfile @@ -25,7 +25,7 @@ RUN apt-get update && apt-get install -y libmcrypt-dev \ && docker-php-ext-install gd RUN apt-get update && apt-get install -y gnupg curl -RUN curl -fsSL https://www.mongodb.org/static/pgp/server-8.0.asc |gpg -o /usr/share/keyrings/mongodb-server-8.0.gpg --dearmor +RUN curl -fsSL https://www.mongodb.org/static/pgp/server-8.0.asc | gpg -o /usr/share/keyrings/mongodb-server-8.0.gpg --dearmor RUN echo "deb [ signed-by=/usr/share/keyrings/mongodb-server-8.0.gpg ] http://repo.mongodb.org/apt/debian bookworm/mongodb-org/8.0 main" | tee /etc/apt/sources.list.d/mongodb-org-8.0.list RUN apt-get update RUN apt-get install -y mongodb-org @@ -39,9 +39,7 @@ WORKDIR /var/www/html ADD openVRE /var/www/html/openVRE RUN cd openVRE WORKDIR /var/www/html/openVRE -RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer -RUN composer self-update -RUN composer update --ignore-platform-req=ext-mongodb --ignore-platform-req=ext-mongodb +# Dependencies are pre-seeded in ./openVRE/vendor for offline/reproducible builds. RUN mkdir logs RUN touch logs/application.log RUN chmod -R 777 logs/application.log diff --git a/front_end/openVRE/public/phplib/classes/ProcessK8s.php b/front_end/openVRE/public/phplib/classes/ProcessK8s.php new file mode 100755 index 00000000..df2ef212 --- /dev/null +++ b/front_end/openVRE/public/phplib/classes/ProcessK8s.php @@ -0,0 +1,603 @@ + "RUNNING", + "Pending" => "PENDING", + "Succeeded" => "FINISHING", + "Failed" => "ERROR", + "NotFound" => "FINISHING", + ); + + private Logger $logger; + + // --------------------------------------------------------------- + // Constructor + // --------------------------------------------------------------- + + /** + * @param string|false $cl Path to submission script (false = status-only instance) + * @param string $workDir Working directory path inside the pod + * @param string $queue Queue name (unused for K8s, kept for interface compatibility with ProcessSGE) + * @param string $jobname Human-readable job name + * @param int $cpu CPU cores requested/limited + * @param int $mem Memory in GB (0 = default 4Gi) + * @param string $logFile Stdout log filename (unused for K8s, kept for interface compatibility) + * @param string $errFile Stderr log filename (unused for K8s, kept for interface compatibility) + * @param array $jobOptions Options from Tooljob: "image" overrides container image, + * "env" provides extra environment variables for the pod + */ + public function __construct($cl = false, $workDir = "", $queue = "", $jobname = "", $cpu = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobOptions = array()) + { + $this->logger = LoggerFactory::getLogger("Process K8s interface"); + + // Read all configuration from environment variables (set via ConfigMap/Secret). + $this->namespace = getenv("OPENVRE_K8S_NAMESPACE") ?: "bsctre-v2"; + $this->jobImage = getenv("OPENVRE_K8S_JOB_IMAGE") ?: ""; + $this->sharedPvc = getenv("OPENVRE_K8S_SHARED_PVC") ?: "dashboard-frontend-sgecore-shareddata"; + $this->toolsPvc = getenv("OPENVRE_K8S_TOOLS_PVC") ?: "dashboard-frontend-tools"; + $this->schedulerUrl = rtrim(getenv("OPENVRE_K8S_SCHEDULER_URL") ?: "", "/"); + $this->schedulerToken = getenv("OPENVRE_K8S_SCHEDULER_TOKEN") ?: ""; + $this->runAsUid = (int)(getenv("OPENVRE_K8S_RUN_AS_UID") ?: 1000); + $this->runAsGid = (int)(getenv("OPENVRE_K8S_RUN_AS_GID") ?: 1000); + + // Allow per-tool overrides: the tool's Mongo document can specify a custom + // container image and extra environment variables via $jobOptions. + if (is_array($jobOptions)) { + if (!empty($jobOptions["image"])) { + $this->jobImage = (string)$jobOptions["image"]; + } + if (!empty($jobOptions["env"]) && is_array($jobOptions["env"])) { + $this->jobEnv = $jobOptions["env"]; + } + } + + // If a submission script path was provided, immediately create and submit + // the Kubernetes Job. When $cl is false, this instance is used only for + // status queries (getRunningJobInfo, status, stop). + if ($cl !== false) { + $this->workDir = $workDir; + $this->command = $cl; + $this->jobname = $jobname ? $jobname : basename($cl); + $this->runCom($cpu, $mem); + } + } + + // --------------------------------------------------------------- + // Job name generation + // --------------------------------------------------------------- + + /** + * Converts a human-readable job name into a Kubernetes-safe DNS name. + * Kubernetes Job names must be lowercase, alphanumeric + hyphens, max 63 chars. + * Appends a random 8-char suffix to guarantee uniqueness across runs. + */ + private function sanitizeName($name) + { + $name = strtolower($name); + $name = preg_replace('/[^a-z0-9-]+/', '-', $name); + $name = trim($name, '-'); + if ($name === "") { + $name = "openvre-job"; + } + if (strlen($name) > 40) { + $name = substr($name, 0, 40); + $name = rtrim($name, '-'); + } + return $name . "-" . substr(md5(uniqid("", true)), 0, 8); + } + + // --------------------------------------------------------------- + // Job creation and submission + // --------------------------------------------------------------- + + /** + * Builds a Kubernetes Job manifest and submits it to the scheduler. + * + * @param int $cpu Number of CPU cores (request and limit) + * @param int $mem Memory in GB (0 = default 4Gi) + */ + private function runCom($cpu, $mem) + { + // Abort if no container image is configured — there's nothing to run. + if ($this->jobImage === "") { + $this->stderr = "OPENVRE_K8S_JOB_IMAGE is not set"; + $_SESSION['errorData']['Error'][] = $this->stderr; + return; + } + + // Abort if scheduler URL is not configured — we cannot submit jobs. + if ($this->schedulerUrl === "") { + $this->stderr = "OPENVRE_K8S_SCHEDULER_URL is not set"; + $_SESSION['errorData']['Error'][] = $this->stderr; + return; + } + + // Generate a unique, K8s-safe Job name and use it as the OpenVRE "pid". + $jobName = $this->sanitizeName($this->jobname); + $this->pid = $jobName; + + $scriptPath = $this->command; + $workDir = $this->workDir; + $cpuRequest = max(1, (int)$cpu); + $cpuLimit = max(1, (int)$cpu); + $memLimit = ((int)$mem > 0 ? ((int)$mem . "Gi") : "4Gi"); + + // Build the full Kubernetes Job manifest as a PHP associative array. + // This will be serialized to YAML before submission to the scheduler. + $manifest = array( + "apiVersion" => "batch/v1", + "kind" => "Job", + "metadata" => array( + "name" => $jobName, + "namespace" => $this->namespace, + "labels" => array( + "app.kubernetes.io/managed-by" => "openvre", + "openvre-job-id" => $jobName + ) + ), + "spec" => array( + // Auto-delete the Job object N seconds after it finishes (success or failure). + "ttlSecondsAfterFinished" => (int)(getenv("OPENVRE_K8S_JOB_TTL") ?: 120), + + // Kill the Job if it runs longer than this (prevents stuck/hung jobs). + "activeDeadlineSeconds" => (int)(getenv("OPENVRE_K8S_JOB_DEADLINE") ?: 86400), + + // Do not retry on failure — mark as Failed immediately. + "backoffLimit" => 0, + + "template" => array( + "spec" => array( + // Pod should not restart — one attempt only. + "restartPolicy" => "Never", + + "containers" => array( + array( + "name" => "tool-runner", + "image" => $this->jobImage, + + // The pod runs the submission script (generated by Tooljob) + // from the working directory. Both are passed as env vars. + "command" => array("bash", "-lc", "cd \"\$OPENVRE_WORKDIR\" && bash \"\$OPENVRE_SUBMIT_SCRIPT\""), + + "env" => array( + array("name" => "OPENVRE_WORKDIR", "value" => $workDir), + array("name" => "OPENVRE_SUBMIT_SCRIPT", "value" => $scriptPath), + ), + + "resources" => array( + "requests" => array("cpu" => (string)$cpuRequest), + "limits" => array("cpu" => (string)$cpuLimit, "memory" => $memLimit) + ), + + "securityContext" => array( + "allowPrivilegeEscalation" => false, + ), + + // Mount shared storage so the Job can read inputs and write outputs + // to the same PVCs used by the frontend and SGE pods. + "volumeMounts" => array( + array("name" => "shared-data", "mountPath" => "/shared_data"), + array("name" => "tools", "mountPath" => "/var/www/html/openVRE/public/tools") + ) + ) + ), + + // Run the container as a non-root user with a fixed UID/GID + // to match file ownership on the shared PVCs. + "securityContext" => array( + "runAsUser" => max(1, $this->runAsUid), + "runAsGroup" => max(1, $this->runAsGid), + "fsGroup" => max(1, $this->runAsGid) + ), + + // Attach the two shared PVCs as volumes. + "volumes" => array( + array("name" => "shared-data", "persistentVolumeClaim" => array("claimName" => $this->sharedPvc)), + array("name" => "tools", "persistentVolumeClaim" => array("claimName" => $this->toolsPvc)) + ) + ) + ) + ) + ); + + // Inject tool-specific environment variables (e.g. FEM_ACCESS_TOKEN, FEM_API_PREFIX) + // from the tool's Mongo definition into the Job pod's container env list. + if (!empty($this->jobEnv)) { + $envList =& $manifest["spec"]["template"]["spec"]["containers"][0]["env"]; + if (is_array($envList)) { + foreach ($this->jobEnv as $k => $v) { + if (!is_string($k) || $k === "") { + continue; + } + if ($v === null || $v === "") { + continue; + } + $envList[] = array( + "name" => (string)$k, + "value" => (string)$v + ); + } + } + } + + // Serialize the manifest array to YAML for submission to the scheduler. + $yaml = $this->arrayToYaml($manifest); + + // Submit Job via the scheduler service. + $this->fullcommand = "POST " . $this->schedulerUrl . "/jobs"; + $this->logger->debug("K8s job submission via scheduler '" . $this->fullcommand . "'"); + $response = $this->schedulerRequest("POST", "/jobs", array( + "namespace" => $this->namespace, + "manifest" => $yaml + )); + + // Handle submission result. + if ($response["ok"] !== true) { + $this->stderr = $response["error"]; + $this->stdout = ""; + $this->pid = ""; + $msg = "K8s job submission failed: " . trim($this->stderr); + $this->logger->error($msg); + throw new UnexpectedValueException($msg); + } else { + $this->stdout = isset($response["data"]["stdout"]) ? (string)$response["data"]["stdout"] : ""; + $this->stderr = isset($response["data"]["stderr"]) ? (string)$response["data"]["stderr"] : ""; + $this->logger->debug("K8s job submitted: " . $this->pid . ". Output: " . trim($this->stdout)); + } + } + + // --------------------------------------------------------------- + // HTTP client: scheduler service + // --------------------------------------------------------------- + + /** + * Sends an HTTP request to the scheduler service. + * This is the sole communication channel with Kubernetes. + * + * @param string $method HTTP method (GET, POST, DELETE) + * @param string $path API path (e.g. "/jobs", "/jobs/{name}") + * @param array|null $payload JSON-serializable body (for POST) + * @return array ["ok" => bool, "error" => string, "data" => array] + */ + private function schedulerRequest($method, $path, $payload = null) + { + if ($this->schedulerUrl === "") { + return array("ok" => false, "error" => "OPENVRE_K8S_SCHEDULER_URL is not set"); + } + if (!function_exists("curl_init")) { + return array("ok" => false, "error" => "PHP curl extension is required"); + } + + $url = $this->schedulerUrl . $path; + $ch = curl_init($url); + $headers = array("Content-Type: application/json"); + + // Authenticate to the scheduler with a Bearer token. + if ($this->schedulerToken !== "") { + $headers[] = "Authorization: Bearer " . $this->schedulerToken; + } + + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); + curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); + curl_setopt($ch, CURLOPT_TIMEOUT, 30); + if ($payload !== null) { + curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload)); + } + + $raw = curl_exec($ch); + if ($raw === false) { + $err = curl_error($ch); + curl_close($ch); + return array("ok" => false, "error" => "Scheduler request failed: " . $err); + } + $code = curl_getinfo($ch, CURLINFO_HTTP_CODE); + curl_close($ch); + + $json = json_decode($raw, true); + + // Non-2xx status from the scheduler indicates an error. + if ($code < 200 || $code >= 300) { + $msg = is_array($json) && isset($json["error"]) ? $json["error"] : ("HTTP " . $code . " from scheduler"); + return array("ok" => false, "error" => $msg); + } + if (!is_array($json)) { + return array("ok" => false, "error" => "Invalid JSON response from scheduler"); + } + // The scheduler returns {"ok": false, "error": "..."} on application-level errors. + if (isset($json["ok"]) && !$json["ok"]) { + return array("ok" => false, "error" => isset($json["error"]) ? $json["error"] : "Scheduler error"); + } + return array("ok" => true, "data" => $json); + } + + // --------------------------------------------------------------- + // YAML serializer (PHP array -> YAML string) + // --------------------------------------------------------------- + + /** + * Converts a nested PHP associative array into a YAML string. + * Used to serialize the Job manifest before sending it to the scheduler. + * This avoids requiring the Symfony YAML or ext-yaml extension. + */ + private function arrayToYaml($data, $indent = 0) + { + $yaml = ""; + $spaces = str_repeat(" ", $indent); + $isAssoc = function ($arr) { + if (!is_array($arr)) return false; + return array_keys($arr) !== range(0, count($arr) - 1); + }; + + foreach ($data as $key => $value) { + if (is_array($value)) { + if ($isAssoc($value)) { + // Associative array → nested YAML object. + $yaml .= $spaces . $key . ":\n" . $this->arrayToYaml($value, $indent + 1); + } else { + // Sequential array → YAML list with "- " prefix. + $yaml .= $spaces . $key . ":\n"; + foreach ($value as $item) { + if (is_array($item)) { + $yaml .= $spaces . " -\n" . $this->arrayToYaml($item, $indent + 2); + } else { + $yaml .= $spaces . " - " . $this->yamlScalar($item) . "\n"; + } + } + } + } else { + // Scalar value. + $yaml .= $spaces . $key . ": " . $this->yamlScalar($value) . "\n"; + } + } + return $yaml; + } + + /** + * Formats a single scalar value for YAML output. + * Booleans become true/false, numbers stay numeric, strings are double-quoted. + */ + private function yamlScalar($value) + { + if (is_bool($value)) return $value ? "true" : "false"; + if (is_numeric($value)) return (string)$value; + $escaped = str_replace('"', '\"', (string)$value); + return '"' . $escaped . '"'; + } + + // --------------------------------------------------------------- + // Job status and lifecycle queries + // --------------------------------------------------------------- + + /** + * Retrieves the current state of a running Kubernetes Job by its name (pid). + * + * Called by OpenVRE's workspace polling loop to determine whether a tool + * execution is still in progress. Returns an array with "pid", "state", + * and "job_name" if the job is still active, or an empty array if the + * job is finished/deleted (which tells OpenVRE to finalize outputs). + * + * @param string $pid Kubernetes Job name + * @return array Job info array, or empty if job is done/not found + */ + public function getRunningJobInfo($pid) + { + $job = array(); + if (!$pid) return $job; + + // Query job status via the scheduler. + $response = $this->schedulerRequest("GET", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); + + if ($response["ok"] !== true) { + return array(); + } + + $exists = isset($response["data"]["exists"]) ? (bool)$response["data"]["exists"] : false; + if (!$exists) { + return array(); + } + + // Parse the Job JSON to determine its current condition. + $jsonRaw = isset($response["data"]["job"]) ? $response["data"]["job"] : ""; + $json = json_decode($jsonRaw, true); + if (!is_array($json)) { + return array(); + } + + $status = $json['status'] ?? array(); + $state = "Pending"; + if (!empty($status['active'])) { + $state = "Running"; + } elseif (!empty($status['succeeded'])) { + $state = "Succeeded"; + } elseif (!empty($status['failed'])) { + $state = "Failed"; + } + + // Once the Job is finished (succeeded or failed), return empty so + // OpenVRE's workspace loop finalizes outputs and stops polling. + if ($state === "Succeeded" || $state === "Failed") { + return array(); + } + + $job['pid'] = $pid; + $job['state'] = $this->jobState[$state]; + $job['job_name'] = $pid; + return $job; + } + + // --------------------------------------------------------------- + // Simple accessors (ProcessSGE interface compatibility) + // --------------------------------------------------------------- + + /** Returns the full API URL used for the last submission (for debug logging). */ + public function getFullCommand() + { + return $this->fullcommand; + } + + /** Returns the Kubernetes Job name (used as OpenVRE's "pid"). */ + public function getPid() + { + return $this->pid; + } + + /** Returns a combined error string if submission failed, or null on success. */ + public function getErr() + { + if ($this->stderr) { + return trim($this->stdout . " " . $this->stderr); + } + return null; + } + + /** + * Checks whether the Job associated with this instance is still active. + * Returns true if the Job exists and is running/pending, false otherwise. + * Used by OpenVRE to decide whether to keep polling. + */ + public function status() + { + if (!$this->pid) return false; + + // Query job status via the scheduler. + $response = $this->schedulerRequest("GET", "/jobs/" . rawurlencode($this->pid) . "?namespace=" . rawurlencode($this->namespace)); + + if ($response["ok"] !== true) { + return false; + } + + $exists = isset($response["data"]["exists"]) ? (bool)$response["data"]["exists"] : false; + if (!$exists) { + return false; + } + + $jsonRaw = isset($response["data"]["job"]) ? $response["data"]["job"] : ""; + $json = json_decode($jsonRaw, true); + if (!is_array($json)) { + return true; + } + + $status = isset($json["status"]) && is_array($json["status"]) ? $json["status"] : array(); + // Job is no longer active if it has succeeded or failed. + if (!empty($status["succeeded"]) || !empty($status["failed"])) { + return false; + } + + return true; + } + + // --------------------------------------------------------------- + // Job cancellation / deletion + // --------------------------------------------------------------- + + /** + * Deletes a Kubernetes Job (and its pod) by name via the scheduler. + * The scheduler uses "Background" propagation policy so the API call + * returns immediately and Kubernetes garbage-collects the pod asynchronously. + * + * @param string|null $pid Kubernetes Job name to delete + * @return array [bool success, string message] + */ + public function stop($pid = null) + { + if (!$pid) { + return array(false, "No job id '$pid' given"); + } + + $response = $this->schedulerRequest("DELETE", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace)); + + if ($response["ok"] === true) { + $res = isset($response["data"]["stdout"]) ? trim((string)$response["data"]["stdout"]) : ""; + return array(true, $res); + } + return array(false, $response["error"] ?: "Failed to delete kubernetes job"); + } +} diff --git a/front_end/openVRE/public/phplib/classes/Tooljob.php b/front_end/openVRE/public/phplib/classes/Tooljob.php index 1c9cdd00..ef288d59 100644 --- a/front_end/openVRE/public/phplib/classes/Tooljob.php +++ b/front_end/openVRE/public/phplib/classes/Tooljob.php @@ -106,6 +106,7 @@ public function __construct($tool, $execution = "", $project = "", $descrip = "" switch ($this->launcher) { case "SGE": case "docker_SGE": + case "kubernetes_native": $this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id']; $this->root_dir_mug = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual']; $this->pub_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['pubDir_virtual']; @@ -978,6 +979,7 @@ public function prepareExecution($tool, $metadata, $dataLocations = [], $metadat switch ($this->launcher) { case "SGE": + case "kubernetes_native": $cmd = $this->setBashCmd_SGE($tool); $this->createSubmitFile_SGE($cmd); @@ -1208,6 +1210,10 @@ protected function setBashCommandDockerSge($tool) " --out_metadata " . $this->stageout_file_virtual . " --log_file " . $this->log_file_virtual; + if (isset($this->launcher) && $this->launcher === "kubernetes_native") { + return $cmd_vre; + } + $cmd = "docker run --privileged -v /var/run/docker.sock:/var/run/docker.sock -d" . " " . $cmd_envs . @@ -1438,7 +1444,7 @@ public function submit($tool) case "SGE": case "ega_demo": case "docker_SGE": - return $this->enqueue($tool); + case "kubernetes_native": case "Slurm_Singularity": return $this->enqueue($tool); default: @@ -1461,8 +1467,22 @@ protected function enqueue($tool) $cpus = $launcherInfo['cpus'] ?? $tool['infrastructure']['cpus']; $queue = $launcherInfo['queue'] ?? $tool['infrastructure']['clouds'][$this->cloudName]['queue']; $this->logger->info("Resolved Parameters: Queue=$queue, CPUs=$cpus, Memory=$memory"); + $jobOptions = array(); + if ($jobManager === "kubernetes_native") { + $jobOptions["image"] = $tool['infrastructure']['container_image'] ?? ""; + if ($jobOptions["image"] === "") { + $_SESSION['errorData']['Error'][] = "Missing infrastructure.container_image for kubernetes_native launcher."; + return 0; + } + $jobOptions["env"] = array(); + if (isset($tool['infrastructure']['container_env']) && is_array($tool['infrastructure']['container_env'])) { + foreach ($tool['infrastructure']['container_env'] as $env_key => $env_value) { + $jobOptions["env"][$env_key] = (string)$env_value; + } + } + } - $pid = execJob($this->working_dir, $this->submission_file, $queue, $cpus, $memory, $this->stdout_file, $this->stderr_file, $jobManager); + $pid = execJob($this->working_dir, $this->submission_file, $queue, $cpus, $memory, $this->stdout_file, $this->stderr_file, $jobManager, $this->toolId, $jobOptions); $this->logger->info("Tool job submitted to SGE queue '$queue' (PID=$pid)"); $this->pid = $pid; diff --git a/front_end/openVRE/public/phplib/processJob.inc.php b/front_end/openVRE/public/phplib/processJob.inc.php index 92580c3d..0d9be4de 100644 --- a/front_end/openVRE/public/phplib/processJob.inc.php +++ b/front_end/openVRE/public/phplib/processJob.inc.php @@ -2,6 +2,7 @@ use OpenVRE\LoggerFactory; use OpenVRE\NotFoundException; +use OpenVRE\ProcessK8s; use OpenVRE\ProcessSGE; use OpenVRE\ProcessSlurm; @@ -18,7 +19,7 @@ function getJobProcessLogger() } -function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobManager = "docker_SGE") +function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobManager = "docker_SGE", $toolId = "", $jobOptions = array()) { getJobProcessLogger()->info("Start job submission via SGE"); @@ -55,6 +56,23 @@ function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job getJobProcessLogger()->info("Submitting job via Slurm_Singularity to $remote_system. Parameters: shFile=$shFile, workDir=$workDir, logFile=$logFile, errFile=$errFile"); $process = new ProcessSlurm($shFile, $workDir, $logFile, $errFile, $remote_system); break; + case "kubernetes_native": + $schedUrl = getenv("OPENVRE_K8S_SCHEDULER_URL") ?: ""; + $schedHost = $schedUrl !== "" + ? (string)(parse_url($schedUrl, PHP_URL_HOST) ?: "(parse_failed)") + : "(not_set)"; + $k8sNs = getenv("OPENVRE_K8S_NAMESPACE") ?: "(env_unset)"; + $jobOptKeys = is_array($jobOptions) && count($jobOptions) + ? implode(",", array_keys($jobOptions)) + : "(none)"; + error_log( + "DEBUG: Submitting job via kubernetes_native. Parameters: shFile=$shFile, workDir=$workDir, queue=$queue, " + . "jobname=$jobname, cpus=$cpus, mem=$mem, logFile=$logFile, errFile=$errFile, " + . "namespace=$k8sNs, scheduler_host=$schedHost, jobOptions_keys=$jobOptKeys" + ); + require_once __DIR__ . "/classes/ProcessK8s.php"; + $process = new ProcessK8s($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile, $jobOptions); + break; default: $process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile); break; @@ -82,15 +100,26 @@ function getRunningJobInfo($pid, $launcherType = null) if (is_null($launcherType) && is_numeric($pid)) { $launcherType = "SGE"; + } elseif (strpos((string)$pid, "-") !== false) { + $launcherType = "kubernetes_native"; } - if (!in_array($launcherType, array("SGE", "docker_SGE", "Slurm_Singularity"))) { + if ($launcherType == "SGE" || $launcherType == "docker_SGE") { + $process = new ProcessSGE(); + $job = $process->getRunningJobInfo($pid); + } elseif ($launcherType == "kubernetes_native") { + require_once __DIR__ . "/classes/ProcessK8s.php"; + $process = new ProcessK8s(); + $job = $process->getRunningJobInfo($pid); + } elseif ($launcherType == "Slurm_Singularity") { + $process = new ProcessSlurm(); + $job = $process->getRunningJobInfo($pid); + } else { getJobProcessLogger()->error("Cannot monitor job '$pid' of type '$launcherType'. Launcher not implemented."); throw new UnexpectedValueException("Cannot monitor job '$pid' of type '$launcherType'. Launcher not implemented."); } - $process = new ProcessSGE(); - return $process->getRunningJobInfo($pid); + return $job; } @@ -190,6 +219,8 @@ function delJob($pid, $launcherType = null, $login = null) // guess launcher if (!$launcherType && is_numeric($pid)) { $launcherType = "docker_SGE"; + } elseif (strpos((string)$pid, "-") !== false) { + $launcherType = "kubernetes_native"; } // cancel job @@ -197,6 +228,15 @@ function delJob($pid, $launcherType = null, $login = null) if ($launcherType == "SGE" || $launcherType == "docker_SGE") { $processSGE = new ProcessSGE(); list($r_sge, $msg_sge) = $processSGE->stop($pid); + } elseif ($launcherType == "kubernetes_native") { + getJobProcessLogger()->debug("delJob kubernetes_native pid=$pid calling ProcessK8s::stop"); + require_once __DIR__ . "/classes/ProcessK8s.php"; + $processK8s = new ProcessK8s(); + list($r_sge, $msg_sge) = $processK8s->stop($pid); + getJobProcessLogger()->debug( + "delJob kubernetes_native pid=$pid stop_ok=" . ($r_sge ? "1" : "0") + . " msg=" . $msg_sge + ); } else { getJobProcessLogger()->error("Cannot delete job of type '$launcherType' [id = $pid]. Launcher not implemented."); throw new UnexpectedValueException("Cannot delete job of type '$launcherType' [id = $pid]. Launcher not implemented."); diff --git a/front_end/openVRE/public/phplib/projects.inc.php b/front_end/openVRE/public/phplib/projects.inc.php index b677e181..09b3fd0a 100644 --- a/front_end/openVRE/public/phplib/projects.inc.php +++ b/front_end/openVRE/public/phplib/projects.inc.php @@ -1908,7 +1908,7 @@ function resolvePath_toLocalAbsolutePath($path, $job) // path is an absolute path if (preg_match('/^\//', $path)) { if (preg_match('/^' . preg_quote($job['root_dir_virtual'], '/') . '/', $path)) { - if ($job['launcher'] == "SGE" || $job['launcher'] == "ega_demo" || $job['launcher'] == "docker_SGE") { + if ($job['launcher'] == "SGE" || $job['launcher'] == "ega_demo" || $job['launcher'] == "docker_SGE" || $job['launcher'] == "kubernetes_native") { $rfn = str_replace($job['root_dir_mug'], $GLOBALS['dataDir'], $path); } // direct from path @@ -1936,6 +1936,10 @@ function resolvePath_toLocalAbsolutePath($path, $job) } } //clean slashes + if ($rfn === "") { + // Keep original absolute path instead of returning empty when launcher mapping is missing. + $rfn = $path; + } $rfn = preg_replace('#/+#', '/', $rfn); //return absolute path diff --git a/scheduler/.dockerignore b/scheduler/.dockerignore new file mode 100644 index 00000000..b765ce2f --- /dev/null +++ b/scheduler/.dockerignore @@ -0,0 +1,12 @@ +.git +.gitignore +__pycache__/ +*.pyc +*.pyo +*.pyd +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +dist/ +build/ +*.egg-info/ diff --git a/scheduler/Dockerfile b/scheduler/Dockerfile new file mode 100644 index 00000000..0e7e69a9 --- /dev/null +++ b/scheduler/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.11-alpine + +WORKDIR /app + +COPY app.py /app/app.py + +EXPOSE 8080 + +USER 65534:65534 + +CMD ["python", "/app/app.py"] diff --git a/scheduler/app.py b/scheduler/app.py new file mode 100644 index 00000000..5288be69 --- /dev/null +++ b/scheduler/app.py @@ -0,0 +1,133 @@ +import json +import os +import ssl +import urllib.error +import urllib.parse +import urllib.request +from http.server import BaseHTTPRequestHandler, HTTPServer + + +TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" +CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" +API_HOST = os.environ.get("KUBERNETES_SERVICE_HOST", "kubernetes.default.svc") +API_PORT = os.environ.get("KUBERNETES_SERVICE_PORT", "443") +DEFAULT_NAMESPACE = os.environ.get("DEFAULT_NAMESPACE", "default") +SCHEDULER_AUTH_TOKEN = os.environ.get("SCHEDULER_AUTH_TOKEN", "") + +with open(TOKEN_PATH, "r", encoding="utf-8") as f: + SA_TOKEN = f.read().strip() + +SSL_CTX = ssl.create_default_context(cafile=CA_PATH) + + +def k8s_request(method, path, body=None, content_type="application/json"): + url = f"https://{API_HOST}:{API_PORT}{path}" + data = body.encode("utf-8") if body is not None else None + req = urllib.request.Request(url, method=method, data=data) + req.add_header("Authorization", f"Bearer {SA_TOKEN}") + if body is not None: + req.add_header("Content-Type", content_type) + try: + with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as resp: + raw = resp.read().decode("utf-8") + return resp.getcode(), raw + except urllib.error.HTTPError as e: + raw = e.read().decode("utf-8") if e.fp else str(e) + return e.code, raw + + +class Handler(BaseHTTPRequestHandler): + def _authorized(self): + if SCHEDULER_AUTH_TOKEN == "": + return False + auth = self.headers.get("Authorization", "") + return auth == f"Bearer {SCHEDULER_AUTH_TOKEN}" + + def _json(self, code, payload): + out = json.dumps(payload).encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(out))) + self.end_headers() + self.wfile.write(out) + + def _read_json(self): + length = int(self.headers.get("Content-Length", "0")) + if length == 0: + return {} + raw = self.rfile.read(length).decode("utf-8") + return json.loads(raw) + + def do_GET(self): + if self.path == "/healthz": + return self._json(200, {"ok": True}) + if not self._authorized(): + return self._json(401, {"ok": False, "error": "unauthorized"}) + if self.path.startswith("/jobs/"): + name_q = self.path[len("/jobs/"):] + name, _, query = name_q.partition("?") + params = urllib.parse.parse_qs(query) + ns = params.get("namespace", [DEFAULT_NAMESPACE])[0] + code, raw = k8s_request("GET", f"/apis/batch/v1/namespaces/{ns}/jobs/{name}") + if code == 404: + return self._json(200, {"ok": True, "exists": False, "job": ""}) + if code >= 300: + return self._json(500, {"ok": False, "error": raw}) + return self._json(200, {"ok": True, "exists": True, "job": raw}) + return self._json(404, {"ok": False, "error": "not found"}) + + def do_POST(self): + if not self._authorized(): + return self._json(401, {"ok": False, "error": "unauthorized"}) + if self.path != "/jobs": + return self._json(404, {"ok": False, "error": "not found"}) + try: + body = self._read_json() + ns = body.get("namespace") or DEFAULT_NAMESPACE + manifest = body.get("manifest") + if not manifest: + return self._json(400, {"ok": False, "error": "manifest is required"}) + code, raw = k8s_request( + "POST", + f"/apis/batch/v1/namespaces/{ns}/jobs", + body=manifest, + content_type="application/yaml", + ) + if code >= 300: + return self._json(500, {"ok": False, "error": raw}) + return self._json(200, {"ok": True, "stdout": raw, "stderr": ""}) + except Exception as e: + return self._json(500, {"ok": False, "error": str(e)}) + + def do_DELETE(self): + if not self._authorized(): + return self._json(401, {"ok": False, "error": "unauthorized"}) + if not self.path.startswith("/jobs/"): + return self._json(404, {"ok": False, "error": "not found"}) + name_q = self.path[len("/jobs/"):] + name, _, query = name_q.partition("?") + params = urllib.parse.parse_qs(query) + ns = params.get("namespace", [DEFAULT_NAMESPACE])[0] + delete_opts = '{"apiVersion":"batch/v1","kind":"DeleteOptions","propagationPolicy":"Background"}' + code, raw = k8s_request( + "DELETE", + f"/apis/batch/v1/namespaces/{ns}/jobs/{name}", + body=delete_opts, + content_type="application/json", + ) + if code == 404: + return self._json( + 200, + {"ok": True, "stdout": f"job.batch/{name} already deleted", "stderr": ""}, + ) + if code >= 300: + return self._json(500, {"ok": False, "error": raw}) + return self._json(200, {"ok": True, "stdout": raw, "stderr": ""}) + + def log_message(self, fmt, *args): + return + + +if __name__ == "__main__": + server = HTTPServer(("0.0.0.0", 8080), Handler) + server.serve_forever() diff --git a/sge/Dockerfile b/sge/Dockerfile index bd3ad34d..05056e67 100644 --- a/sge/Dockerfile +++ b/sge/Dockerfile @@ -54,9 +54,19 @@ RUN useradd -m application \ && echo "application:application" | chpasswd # Make sure host and container share the same GID for group 'docker', bc it has reading permissions to the socket file -ARG DOCKER_GROUP -RUN groupmod -g $DOCKER_GROUP docker -RUN usermod -aG docker application + +#ARG DOCKER_GROUP +#RUN groupmod -g $DOCKER_GROUP docker +#RUN usermod -aG docker application + +ARG DOCKER_GROUP=1002 +RUN set -eux; \ + if getent group docker >/dev/null 2>&1; then \ + groupmod -g "${DOCKER_GROUP}" docker; \ + else \ + groupadd -g "${DOCKER_GROUP}" docker; \ + fi; \ + usermod -aG docker application # Add setup script and set permissions ADD setup_gridengine.sh /usr/local/bin/setup_gridengine.sh