diff --git a/.gitignore b/.gitignore
index 7653983d..e765ed48 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,3 +15,7 @@ volumes/
*.key
*.pem
.jwks
+
+# Python
+__pycache__/
+*.pyc
diff --git a/front_end/Dockerfile b/front_end/Dockerfile
index ba1cdfb4..ee90ec5c 100644
--- a/front_end/Dockerfile
+++ b/front_end/Dockerfile
@@ -25,7 +25,7 @@ RUN apt-get update && apt-get install -y libmcrypt-dev \
&& docker-php-ext-install gd
RUN apt-get update && apt-get install gnupg curl
-RUN curl -fsSL https://www.mongodb.org/static/pgp/server-7.0.asc |gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg --dearmor
+RUN curl -fsSL https://pgp.mongodb.com/server-7.0.asc | gpg -o /usr/share/keyrings/mongodb-server-7.0.gpg --dearmor
RUN echo "deb [ signed-by=/usr/share/keyrings/mongodb-server-7.0.gpg ] http://repo.mongodb.org/apt/debian bookworm/mongodb-org/7.0 main" | tee /etc/apt/sources.list.d/mongodb-org-7.0.list
RUN apt-get update
RUN apt-get install -y mongodb-org
@@ -39,9 +39,7 @@ WORKDIR /var/www/html
ADD openVRE /var/www/html/openVRE
RUN cd openVRE
WORKDIR /var/www/html/openVRE
-RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer
-RUN composer self-update
-RUN composer update --ignore-platform-req=ext-mongodb --ignore-platform-req=ext-mongodb
+# Dependencies are pre-seeded in ./openVRE/vendor for offline/reproducible builds.
RUN mkdir logs
RUN touch logs/application.log
RUN chmod -R 777 logs/application.log
diff --git a/front_end/openVRE/public/phplib/classes/ProcessK8s.php b/front_end/openVRE/public/phplib/classes/ProcessK8s.php
new file mode 100755
index 00000000..58b5b982
--- /dev/null
+++ b/front_end/openVRE/public/phplib/classes/ProcessK8s.php
@@ -0,0 +1,594 @@
+ "RUNNING",
+ "Pending" => "PENDING",
+ "Succeeded" => "FINISHING",
+ "Failed" => "ERROR",
+ "NotFound" => "FINISHING",
+ );
+
+ // ---------------------------------------------------------------
+ // Constructor
+ // ---------------------------------------------------------------
+
+ /**
+ * @param string|false $cl Path to submission script (false = status-only instance)
+ * @param string $workDir Working directory path inside the pod
+ * @param string $queue Queue name (unused for K8s, kept for interface compatibility with ProcessSGE)
+ * @param string $jobname Human-readable job name
+ * @param int $cpu CPU cores requested/limited
+ * @param int $mem Memory in GB (0 = default 4Gi)
+ * @param string $logFile Stdout log filename (unused for K8s, kept for interface compatibility)
+ * @param string $errFile Stderr log filename (unused for K8s, kept for interface compatibility)
+ * @param array $jobOptions Options from Tooljob: "image" overrides container image,
+ * "env" provides extra environment variables for the pod
+ */
+ public function __construct($cl = false, $workDir = "", $queue = "", $jobname = "", $cpu = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobOptions = array())
+ {
+ // Read all configuration from environment variables (set via ConfigMap/Secret).
+ $this->namespace = getenv("OPENVRE_K8S_NAMESPACE") ?: "bsctre-v2";
+ $this->jobImage = getenv("OPENVRE_K8S_JOB_IMAGE") ?: "";
+ $this->sharedPvc = getenv("OPENVRE_K8S_SHARED_PVC") ?: "dashboard-frontend-sgecore-shareddata";
+ $this->toolsPvc = getenv("OPENVRE_K8S_TOOLS_PVC") ?: "dashboard-frontend-tools";
+ $this->schedulerUrl = rtrim(getenv("OPENVRE_K8S_SCHEDULER_URL") ?: "", "/");
+ $this->schedulerToken = getenv("OPENVRE_K8S_SCHEDULER_TOKEN") ?: "";
+ $this->runAsUid = (int)(getenv("OPENVRE_K8S_RUN_AS_UID") ?: 1000);
+ $this->runAsGid = (int)(getenv("OPENVRE_K8S_RUN_AS_GID") ?: 1000);
+
+ // Allow per-tool overrides: the tool's Mongo document can specify a custom
+ // container image and extra environment variables via $jobOptions.
+ if (is_array($jobOptions)) {
+ if (!empty($jobOptions["image"])) {
+ $this->jobImage = (string)$jobOptions["image"];
+ }
+ if (!empty($jobOptions["env"]) && is_array($jobOptions["env"])) {
+ $this->jobEnv = $jobOptions["env"];
+ }
+ }
+
+ // If a submission script path was provided, immediately create and submit
+ // the Kubernetes Job. When $cl is false, this instance is used only for
+ // status queries (getRunningJobInfo, status, stop).
+ if ($cl !== false) {
+ $this->workDir = $workDir;
+ $this->command = $cl;
+ $this->jobname = $jobname ? $jobname : basename($cl);
+ $this->runCom($cpu, $mem);
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Job name generation
+ // ---------------------------------------------------------------
+
+ /**
+ * Converts a human-readable job name into a Kubernetes-safe DNS name.
+ * Kubernetes Job names must be lowercase, alphanumeric + hyphens, max 63 chars.
+ * Appends a random 8-char suffix to guarantee uniqueness across runs.
+ */
+ private function sanitizeName($name)
+ {
+ $name = strtolower($name);
+ $name = preg_replace('/[^a-z0-9-]+/', '-', $name);
+ $name = trim($name, '-');
+ if ($name === "") {
+ $name = "openvre-job";
+ }
+ if (strlen($name) > 40) {
+ $name = substr($name, 0, 40);
+ $name = rtrim($name, '-');
+ }
+ return $name . "-" . substr(md5(uniqid("", true)), 0, 8);
+ }
+
+ // ---------------------------------------------------------------
+ // Job creation and submission
+ // ---------------------------------------------------------------
+
+ /**
+ * Builds a Kubernetes Job manifest and submits it to the scheduler.
+ *
+ * @param int $cpu Number of CPU cores (request and limit)
+ * @param int $mem Memory in GB (0 = default 4Gi)
+ */
+ private function runCom($cpu, $mem)
+ {
+ // Abort if no container image is configured — there's nothing to run.
+ if ($this->jobImage === "") {
+ $this->stderr = "OPENVRE_K8S_JOB_IMAGE is not set";
+ $_SESSION['errorData']['Error'][] = $this->stderr;
+ return;
+ }
+
+ // Abort if scheduler URL is not configured — we cannot submit jobs.
+ if ($this->schedulerUrl === "") {
+ $this->stderr = "OPENVRE_K8S_SCHEDULER_URL is not set";
+ $_SESSION['errorData']['Error'][] = $this->stderr;
+ return;
+ }
+
+ // Generate a unique, K8s-safe Job name and use it as the OpenVRE "pid".
+ $jobName = $this->sanitizeName($this->jobname);
+ $this->pid = $jobName;
+
+ $scriptPath = $this->command;
+ $workDir = $this->workDir;
+ $cpuRequest = max(1, (int)$cpu);
+ $cpuLimit = max(1, (int)$cpu);
+ $memLimit = ((int)$mem > 0 ? ((int)$mem . "Gi") : "4Gi");
+
+ // Build the full Kubernetes Job manifest as a PHP associative array.
+ // This will be serialized to YAML before submission to the scheduler.
+ $manifest = array(
+ "apiVersion" => "batch/v1",
+ "kind" => "Job",
+ "metadata" => array(
+ "name" => $jobName,
+ "namespace" => $this->namespace,
+ "labels" => array(
+ "app.kubernetes.io/managed-by" => "openvre",
+ "openvre-job-id" => $jobName
+ )
+ ),
+ "spec" => array(
+ // Auto-delete the Job object N seconds after it finishes (success or failure).
+ "ttlSecondsAfterFinished" => (int)(getenv("OPENVRE_K8S_JOB_TTL") ?: 120),
+
+ // Kill the Job if it runs longer than this (prevents stuck/hung jobs).
+ "activeDeadlineSeconds" => (int)(getenv("OPENVRE_K8S_JOB_DEADLINE") ?: 86400),
+
+ // Do not retry on failure — mark as Failed immediately.
+ "backoffLimit" => 0,
+
+ "template" => array(
+ "spec" => array(
+ // Pod should not restart — one attempt only.
+ "restartPolicy" => "Never",
+
+ "containers" => array(
+ array(
+ "name" => "tool-runner",
+ "image" => $this->jobImage,
+
+ // The pod runs the submission script (generated by Tooljob)
+ // from the working directory. Both are passed as env vars.
+ "command" => array("bash", "-lc", "cd \"\$OPENVRE_WORKDIR\" && bash \"\$OPENVRE_SUBMIT_SCRIPT\""),
+
+ "env" => array(
+ array("name" => "OPENVRE_WORKDIR", "value" => $workDir),
+ array("name" => "OPENVRE_SUBMIT_SCRIPT", "value" => $scriptPath),
+ ),
+
+ "resources" => array(
+ "requests" => array("cpu" => (string)$cpuRequest),
+ "limits" => array("cpu" => (string)$cpuLimit, "memory" => $memLimit)
+ ),
+
+ "securityContext" => array(
+ "allowPrivilegeEscalation" => false,
+ ),
+
+ // Mount shared storage so the Job can read inputs and write outputs
+ // to the same PVCs used by the frontend and SGE pods.
+ "volumeMounts" => array(
+ array("name" => "shared-data", "mountPath" => "/shared_data"),
+ array("name" => "tools", "mountPath" => "/var/www/html/openVRE/public/tools")
+ )
+ )
+ ),
+
+ // Run the container as a non-root user with a fixed UID/GID
+ // to match file ownership on the shared PVCs.
+ "securityContext" => array(
+ "runAsUser" => max(1, $this->runAsUid),
+ "runAsGroup" => max(1, $this->runAsGid),
+ "fsGroup" => max(1, $this->runAsGid)
+ ),
+
+ // Attach the two shared PVCs as volumes.
+ "volumes" => array(
+ array("name" => "shared-data", "persistentVolumeClaim" => array("claimName" => $this->sharedPvc)),
+ array("name" => "tools", "persistentVolumeClaim" => array("claimName" => $this->toolsPvc))
+ )
+ )
+ )
+ )
+ );
+
+ // Inject tool-specific environment variables (e.g. FEM_ACCESS_TOKEN, FEM_API_PREFIX)
+ // from the tool's Mongo definition into the Job pod's container env list.
+ if (!empty($this->jobEnv)) {
+ $envList =& $manifest["spec"]["template"]["spec"]["containers"][0]["env"];
+ if (is_array($envList)) {
+ foreach ($this->jobEnv as $k => $v) {
+ if (!is_string($k) || $k === "") {
+ continue;
+ }
+ if ($v === null || $v === "") {
+ continue;
+ }
+ $envList[] = array(
+ "name" => (string)$k,
+ "value" => (string)$v
+ );
+ }
+ }
+ }
+
+ // Serialize the manifest array to YAML for submission to the scheduler.
+ $yaml = $this->arrayToYaml($manifest);
+
+ // Submit Job via the scheduler service.
+ $this->fullcommand = "POST " . $this->schedulerUrl . "/jobs";
+ logger("K8s job submission via scheduler '" . $this->fullcommand . "'");
+ $response = $this->schedulerRequest("POST", "/jobs", array(
+ "namespace" => $this->namespace,
+ "manifest" => $yaml
+ ));
+
+ // Handle submission result.
+ if ($response["ok"] !== true) {
+ $this->stderr = $response["error"];
+ $this->stdout = "";
+ $msg = "K8s job submission failed: " . trim($this->stderr);
+ logger($msg);
+ $_SESSION['errorData']['Error'][] = $msg;
+ $this->pid = "";
+ } else {
+ $this->stdout = isset($response["data"]["stdout"]) ? (string)$response["data"]["stdout"] : "";
+ $this->stderr = isset($response["data"]["stderr"]) ? (string)$response["data"]["stderr"] : "";
+ logger("K8s job submitted: " . $this->pid . ". Output: " . trim($this->stdout));
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // HTTP client: scheduler service
+ // ---------------------------------------------------------------
+
+ /**
+ * Sends an HTTP request to the scheduler service.
+ * This is the sole communication channel with Kubernetes.
+ *
+ * @param string $method HTTP method (GET, POST, DELETE)
+ * @param string $path API path (e.g. "/jobs", "/jobs/{name}")
+ * @param array|null $payload JSON-serializable body (for POST)
+ * @return array ["ok" => bool, "error" => string, "data" => array]
+ */
+ private function schedulerRequest($method, $path, $payload = null)
+ {
+ if ($this->schedulerUrl === "") {
+ return array("ok" => false, "error" => "OPENVRE_K8S_SCHEDULER_URL is not set");
+ }
+ if (!function_exists("curl_init")) {
+ return array("ok" => false, "error" => "PHP curl extension is required");
+ }
+
+ $url = $this->schedulerUrl . $path;
+ $ch = curl_init($url);
+ $headers = array("Content-Type: application/json");
+
+ // Authenticate to the scheduler with a Bearer token.
+ if ($this->schedulerToken !== "") {
+ $headers[] = "Authorization: Bearer " . $this->schedulerToken;
+ }
+
+ curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
+ curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
+ curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
+ curl_setopt($ch, CURLOPT_TIMEOUT, 30);
+ if ($payload !== null) {
+ curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
+ }
+
+ $raw = curl_exec($ch);
+ if ($raw === false) {
+ $err = curl_error($ch);
+ curl_close($ch);
+ return array("ok" => false, "error" => "Scheduler request failed: " . $err);
+ }
+ $code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
+ curl_close($ch);
+
+ $json = json_decode($raw, true);
+
+ // Non-2xx status from the scheduler indicates an error.
+ if ($code < 200 || $code >= 300) {
+ $msg = is_array($json) && isset($json["error"]) ? $json["error"] : ("HTTP " . $code . " from scheduler");
+ return array("ok" => false, "error" => $msg);
+ }
+ if (!is_array($json)) {
+ return array("ok" => false, "error" => "Invalid JSON response from scheduler");
+ }
+ // The scheduler returns {"ok": false, "error": "..."} on application-level errors.
+ if (isset($json["ok"]) && !$json["ok"]) {
+ return array("ok" => false, "error" => isset($json["error"]) ? $json["error"] : "Scheduler error");
+ }
+ return array("ok" => true, "data" => $json);
+ }
+
+ // ---------------------------------------------------------------
+ // YAML serializer (PHP array -> YAML string)
+ // ---------------------------------------------------------------
+
+ /**
+ * Converts a nested PHP associative array into a YAML string.
+ * Used to serialize the Job manifest before sending it to the scheduler.
+ * This avoids requiring the Symfony YAML or ext-yaml extension.
+ */
+ private function arrayToYaml($data, $indent = 0)
+ {
+ $yaml = "";
+ $spaces = str_repeat(" ", $indent);
+ $isAssoc = function ($arr) {
+ if (!is_array($arr)) return false;
+ return array_keys($arr) !== range(0, count($arr) - 1);
+ };
+
+ foreach ($data as $key => $value) {
+ if (is_array($value)) {
+ if ($isAssoc($value)) {
+ // Associative array → nested YAML object.
+ $yaml .= $spaces . $key . ":\n" . $this->arrayToYaml($value, $indent + 1);
+ } else {
+ // Sequential array → YAML list with "- " prefix.
+ $yaml .= $spaces . $key . ":\n";
+ foreach ($value as $item) {
+ if (is_array($item)) {
+ $yaml .= $spaces . " -\n" . $this->arrayToYaml($item, $indent + 2);
+ } else {
+ $yaml .= $spaces . " - " . $this->yamlScalar($item) . "\n";
+ }
+ }
+ }
+ } else {
+ // Scalar value.
+ $yaml .= $spaces . $key . ": " . $this->yamlScalar($value) . "\n";
+ }
+ }
+ return $yaml;
+ }
+
+ /**
+ * Formats a single scalar value for YAML output.
+ * Booleans become true/false, numbers stay numeric, strings are double-quoted.
+ */
+ private function yamlScalar($value)
+ {
+ if (is_bool($value)) return $value ? "true" : "false";
+ if (is_numeric($value)) return (string)$value;
+ $escaped = str_replace('"', '\"', (string)$value);
+ return '"' . $escaped . '"';
+ }
+
+ // ---------------------------------------------------------------
+ // Job status and lifecycle queries
+ // ---------------------------------------------------------------
+
+ /**
+ * Retrieves the current state of a running Kubernetes Job by its name (pid).
+ *
+ * Called by OpenVRE's workspace polling loop to determine whether a tool
+ * execution is still in progress. Returns an array with "pid", "state",
+ * and "job_name" if the job is still active, or an empty array if the
+ * job is finished/deleted (which tells OpenVRE to finalize outputs).
+ *
+ * @param string $pid Kubernetes Job name
+ * @return array Job info array, or empty if job is done/not found
+ */
+ public function getRunningJobInfo($pid)
+ {
+ $job = array();
+ if (!$pid) return $job;
+
+ // Query job status via the scheduler.
+ $response = $this->schedulerRequest("GET", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace));
+
+ if ($response["ok"] !== true) {
+ return array();
+ }
+
+ $exists = isset($response["data"]["exists"]) ? (bool)$response["data"]["exists"] : false;
+ if (!$exists) {
+ return array();
+ }
+
+ // Parse the Job JSON to determine its current condition.
+ $jsonRaw = isset($response["data"]["job"]) ? $response["data"]["job"] : "";
+ $json = json_decode($jsonRaw, true);
+ if (!is_array($json)) {
+ return array();
+ }
+
+ $status = $json['status'] ?? array();
+ $state = "Pending";
+ if (!empty($status['active'])) {
+ $state = "Running";
+ } elseif (!empty($status['succeeded'])) {
+ $state = "Succeeded";
+ } elseif (!empty($status['failed'])) {
+ $state = "Failed";
+ }
+
+ // Once the Job is finished (succeeded or failed), return empty so
+ // OpenVRE's workspace loop finalizes outputs and stops polling.
+ if ($state === "Succeeded" || $state === "Failed") {
+ return array();
+ }
+
+ $job['pid'] = $pid;
+ $job['state'] = $this->jobState[$state];
+ $job['job_name'] = $pid;
+ return $job;
+ }
+
+ // ---------------------------------------------------------------
+ // Simple accessors (ProcessSGE interface compatibility)
+ // ---------------------------------------------------------------
+
+ /** Returns the full API URL used for the last submission (for debug logging). */
+ public function getFullCommand()
+ {
+ return $this->fullcommand;
+ }
+
+ /** Returns the Kubernetes Job name (used as OpenVRE's "pid"). */
+ public function getPid()
+ {
+ return $this->pid;
+ }
+
+ /** Returns a combined error string if submission failed, or null on success. */
+ public function getErr()
+ {
+ if ($this->stderr) {
+ return trim($this->stdout . " " . $this->stderr);
+ }
+ return null;
+ }
+
+ /**
+ * Checks whether the Job associated with this instance is still active.
+ * Returns true if the Job exists and is running/pending, false otherwise.
+ * Used by OpenVRE to decide whether to keep polling.
+ */
+ public function status()
+ {
+ if (!$this->pid) return false;
+
+ // Query job status via the scheduler.
+ $response = $this->schedulerRequest("GET", "/jobs/" . rawurlencode($this->pid) . "?namespace=" . rawurlencode($this->namespace));
+
+ if ($response["ok"] !== true) {
+ return false;
+ }
+
+ $exists = isset($response["data"]["exists"]) ? (bool)$response["data"]["exists"] : false;
+ if (!$exists) {
+ return false;
+ }
+
+ $jsonRaw = isset($response["data"]["job"]) ? $response["data"]["job"] : "";
+ $json = json_decode($jsonRaw, true);
+ if (!is_array($json)) {
+ return true;
+ }
+
+ $status = isset($json["status"]) && is_array($json["status"]) ? $json["status"] : array();
+ // Job is no longer active if it has succeeded or failed.
+ if (!empty($status["succeeded"]) || !empty($status["failed"])) {
+ return false;
+ }
+
+ return true;
+ }
+
+ // ---------------------------------------------------------------
+ // Job cancellation / deletion
+ // ---------------------------------------------------------------
+
+ /**
+ * Deletes a Kubernetes Job (and its pod) by name via the scheduler.
+ * The scheduler uses "Background" propagation policy so the API call
+ * returns immediately and Kubernetes garbage-collects the pod asynchronously.
+ *
+ * @param string|null $pid Kubernetes Job name to delete
+ * @return array [bool success, string message]
+ */
+ public function stop($pid = null)
+ {
+ if (!$pid) {
+ return array(false, "No job id '$pid' given");
+ }
+
+ $response = $this->schedulerRequest("DELETE", "/jobs/" . rawurlencode($pid) . "?namespace=" . rawurlencode($this->namespace));
+
+ if ($response["ok"] === true) {
+ $res = isset($response["data"]["stdout"]) ? trim((string)$response["data"]["stdout"]) : "";
+ return array(true, $res);
+ }
+ return array(false, $response["error"] ?: "Failed to delete kubernetes job");
+ }
+}
diff --git a/front_end/openVRE/public/phplib/classes/Tooljob.php b/front_end/openVRE/public/phplib/classes/Tooljob.php
index 283d378f..4c1b6c8e 100644
--- a/front_end/openVRE/public/phplib/classes/Tooljob.php
+++ b/front_end/openVRE/public/phplib/classes/Tooljob.php
@@ -98,6 +98,7 @@ public function __construct($tool, $execution = "", $project = "", $descrip = ""
switch ($this->launcher) {
case "SGE":
case "docker_SGE":
+ case "kubernetes_native":
$this->root_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'] . "/" . $_SESSION['User']['id'];
$this->root_dir_mug = $GLOBALS['clouds'][$this->cloudName]['dataDir_virtual'];
$this->pub_dir_virtual = $GLOBALS['clouds'][$this->cloudName]['pubDir_virtual'];
@@ -1116,6 +1117,19 @@ public function prepareExecution($tool, $metadata, $metadata_pub = [])
break;
+ case "kubernetes_native":
+ $cmd = $this->setBashCmd_SGE($tool);
+ if (!$cmd) {
+ return 0;
+ }
+
+ $submissionFilename = $this->createSubmitFile_SGE($cmd);
+ if (!is_file($submissionFilename)) {
+ return 0;
+ }
+
+ break;
+
case "PMES":
$json_data = $this->setPMESrequest($tool);
if (!$json_data) {
@@ -1395,6 +1409,10 @@ protected function setBashCommandDockerSge($tool)
" --out_metadata " . $this->stageout_file_virtual .
" --log_file " . $this->log_file_virtual;
+ if (isset($this->launcher) && $this->launcher === "kubernetes_native") {
+ return $cmd_vre;
+ }
+
$cmd = "docker run --privileged -v /var/run/docker.sock:/var/run/docker.sock -d" .
" " . $cmd_envs .
@@ -1788,6 +1806,7 @@ public function submit($tool)
case "SGE":
case "ega_demo":
case "docker_SGE":
+ case "kubernetes_native":
return $this->enqueue($tool);
case "Slurm_Singularity":
return $this->enqueue($tool);
@@ -1808,21 +1827,32 @@ protected function enqueue($tool)
return 0;
}
$jobManager = $launcherInfo['launcher']['job_manager']
- ?? $tool['infrastructure']['clouds'][$this->cloudName]['launcher'];
+ ?? $tool['infrastructure']['clouds'][$this->cloudName]['launcher'];
$memory = $launcherInfo['memory'] ?? $tool['infrastructure']['memory'];
$cpus = $launcherInfo['cpus'] ?? $tool['infrastructure']['cpus'];
$queue = $launcherInfo['queue'] ?? $tool['infrastructure']['clouds'][$this->cloudName]['queue'];
- // error_log("Resolved Parameters: Queue=$queue, CPUs=$cpus, Memory=$memory, jobManager=$jobManager");
+ $jobOptions = array();
+ if ($jobManager === "kubernetes_native") {
+ $jobOptions["image"] = $tool['infrastructure']['container_image'] ?? "";
+ if ($jobOptions["image"] === "") {
+ $_SESSION['errorData']['Error'][] = "Missing infrastructure.container_image for kubernetes_native launcher.";
+ return 0;
+ }
+ $jobOptions["env"] = array();
+ if (isset($tool['infrastructure']['container_env']) && is_array($tool['infrastructure']['container_env'])) {
+ foreach ($tool['infrastructure']['container_env'] as $env_key => $env_value) {
+ $jobOptions["env"][$env_key] = (string)$env_value;
+ }
+ }
+ }
- list($pid, $errMesg) = execJob($this->working_dir, $this->submission_file, $queue, $cpus, $memory, $this->stdout_file, $this->stderr_file, $jobManager);
+ list($pid, $errMesg) = execJob($this->working_dir, $this->submission_file, $queue, $cpus, $memory, $this->stdout_file, $this->stderr_file, $jobManager, $this->toolId, $jobOptions);
if (!$pid) {
- //error_log($pid, $errMesg, NULL, $this->toolId, $this->cloudName, "SGE", $cpus, $memory);
- error_log("Error: $errMesg");
+ log_addError($pid, $errMesg, NULL, $this->toolId, $this->cloudName, $jobManager, $cpus, $memory);
$_SESSION['errorData']['Error'][] = "Internal error. Cannot enqueue job.";
return 0;
}
- #logger("USER:" . $_SESSION['User']['_id'] . ", ID:" . $_SESSION['User']['id'] . ", LAUNCHER:SGE, TOOL:" . $this->toolId . ", PID:$pid");
- //error_log("USER:" . $_SESSION['User']['_id'] . ", ID:" . $_SESSION['User']['id'] . ", LAUNCHER:SGE, TOOL:" . $this->toolId . ", PID:$pid");
+ logger("USER:" . $_SESSION['User']['_id'] . ", ID:" . $_SESSION['User']['id'] . ", LAUNCHER:" . $jobManager . ", TOOL:" . $this->toolId . ", PID:$pid");
log_addSubmission($pid, $this->toolId, $this->cloudName, $jobManager, $cpus, $memory, $this->working_dir);
$this->pid = $pid;
diff --git a/front_end/openVRE/public/phplib/processJob.inc.php b/front_end/openVRE/public/phplib/processJob.inc.php
index 13893615..7d7d06ec 100644
--- a/front_end/openVRE/public/phplib/processJob.inc.php
+++ b/front_end/openVRE/public/phplib/processJob.inc.php
@@ -5,7 +5,7 @@
#
-function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobManager = "docker_SGE")
+function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job_output.log", $errFile = "job_error.log", $jobManager = "docker_SGE", $toolId = "", $jobOptions = array())
{
logger("Start job submission via $jobManager");
error_log("DEBUG- execJob: Start job submission via $jobManager");
@@ -33,15 +33,11 @@ function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job
$_SESSION['errorData']['Error'][] = "Queue not provided.";
return [0, "Queue not provided."];
}
-
+
$queue = (isset($queue) ? $queue : $GLOBALS['queueTask']);
$jobname = $_SESSION['User']['id'] . "#" . basename($shFile);
- //
- // Start SGE process
- //$process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile);
-
switch ($jobManager) {
case "docker_SGE":
error_log("DEBUG: Submitting job via docker_SGE. Parameters: shFile=$shFile, workDir=$workDir, queue=$queue, jobname=$jobname, cpus=$cpus, mem=$mem, logFile=$logFile, errFile=$errFile");
@@ -52,14 +48,31 @@ function execJob($workDir, $shFile, $queue, $cpus = 1, $mem = 0, $logFile = "job
error_log("DEBUG: Submitting job via Slurm to $remote_system. Parameters: shFile=$shFile, workDir=$workDir, logFile=$logFile, errFile=$errFile");
$process = new ProcessSlurm($shFile, $workDir, $logFile, $errFile, $remote_system);
break;
+ case "kubernetes_native":
+ $schedUrl = getenv("OPENVRE_K8S_SCHEDULER_URL") ?: "";
+ $schedHost = $schedUrl !== ""
+ ? (string)(parse_url($schedUrl, PHP_URL_HOST) ?: "(parse_failed)")
+ : "(not_set)";
+ $k8sNs = getenv("OPENVRE_K8S_NAMESPACE") ?: "(env_unset)";
+ $jobOptKeys = is_array($jobOptions) && count($jobOptions)
+ ? implode(",", array_keys($jobOptions))
+ : "(none)";
+ error_log(
+ "DEBUG: Submitting job via kubernetes_native. Parameters: shFile=$shFile, workDir=$workDir, queue=$queue, "
+ . "jobname=$jobname, cpus=$cpus, mem=$mem, logFile=$logFile, errFile=$errFile, "
+ . "namespace=$k8sNs, scheduler_host=$schedHost, jobOptions_keys=$jobOptKeys"
+ );
+ require_once __DIR__ . "/classes/ProcessK8s.php";
+ $process = new ProcessK8s($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile, $jobOptions);
+ break;
default:
$process = new ProcessSGE($shFile, $workDir, $queue, $jobname, $cpus, $mem, $logFile, $errFile);
- break;
+ break;
}
-
+
if (!$process->status()) {
$_SESSION['errorData']['Error'][] = "Job submission failed.
" . $process->getFullCommand() . "
" . $process->getErr();
- $errMesg = "ERROR: Job submission failed. FullCommand: '" . $process->getFullCommand() . "'. ErrorSGE: '" . $process->getErr() . "'";
+ $errMesg = "ERROR: Job submission failed. FullCommand: '" . $process->getFullCommand() . "'. ErrorDetail: '" . $process->getErr() . "'";
logger($errMesg);
return array(0, $errMesg);
}
@@ -140,18 +153,30 @@ function getRunningJobInfo($pid, $launcherType = NULL, $cloudName = "local")
// guess launcher
if (!$launcherType) {
- if (is_numeric($pid))
+ if (is_numeric($pid)) {
$launcherType = "SGE";
- else
+ } elseif (strpos((string)$pid, "-") !== false) {
+ $launcherType = "kubernetes_native";
+ } else {
$launcherType = "PMES";
+ }
}
-
logger("getRunningJobInfo: launcherType = $launcherType");
// create new jobProcess
if ($launcherType == "SGE" || $launcherType == "docker_SGE") {
$process = new ProcessSGE();
$job = $process->getRunningJobInfo($pid);
+ } elseif ($launcherType == "kubernetes_native") {
+ require_once __DIR__ . "/classes/ProcessK8s.php";
+ $process = new ProcessK8s();
+ $job = $process->getRunningJobInfo($pid);
+ $k8sSummary = empty($job)
+ ? "empty (job finished or unknown to scheduler)"
+ : ("keys=" . implode(",", array_keys($job))
+ . (isset($job['state']) ? " state=" . $job['state'] : ""));
+ error_log("DEBUG: getRunningJobInfo kubernetes_native pid=$pid $k8sSummary");
+ logger("getRunningJobInfo (kubernetes_native): pid=$pid $k8sSummary");
} elseif ($launcherType == "PMES") {
$process = new ProcessPMES($cloudName);
$job = $process->getRunningJobInfo($pid);
@@ -164,9 +189,7 @@ function getRunningJobInfo($pid, $launcherType = NULL, $cloudName = "local")
$_SESSION['errorData']['Error'][] = "Cannot monitor job '$pid' of type '$launcherType'. Launcher not implemented.";
return $job;
}
-
logger("getRunningJobInfo: end processing $pid");
-
// return job info
return $job;
}
@@ -304,6 +327,8 @@ function delJob($pid, $launcherType = NULL, $cloudName = "local", $login = NULL)
if (!$launcherType) {
if (is_numeric($pid)) {
$launcherType = "docker_SGE";
+ } elseif (strpos((string)$pid, "-") !== false) {
+ $launcherType = "kubernetes_native";
} else {
$launcherType = "PMES";
}
@@ -321,6 +346,16 @@ function delJob($pid, $launcherType = NULL, $cloudName = "local", $login = NULL)
updateLogFromJobInfo($jobInfo['log'], $pid, $launcherType, $cloudName);
// Add any other file redirection logic here
}
+ } elseif ($launcherType == "kubernetes_native") {
+ error_log("DEBUG: delJob kubernetes_native pid=$pid calling ProcessK8s::stop");
+ require_once __DIR__ . "/classes/ProcessK8s.php";
+ $processK8s = new ProcessK8s();
+ list($r_sge, $msg_sge) = $processK8s->stop($pid);
+ error_log(
+ "DEBUG: delJob kubernetes_native pid=$pid stop_ok=" . ($r_sge ? "1" : "0")
+ . " msg=" . $msg_sge
+ );
+ logger("delJob (kubernetes_native): pid=$pid stop_ok=" . ($r_sge ? "1" : "0") . " msg=" . $msg_sge);
} elseif ($launcherType == "PMES") {
$process = new ProcessPMES();
$r = $process->stop($pid);
diff --git a/front_end/openVRE/public/phplib/projects.inc.php b/front_end/openVRE/public/phplib/projects.inc.php
index ca951a32..73741ab4 100644
--- a/front_end/openVRE/public/phplib/projects.inc.php
+++ b/front_end/openVRE/public/phplib/projects.inc.php
@@ -2347,7 +2347,7 @@ function resolvePath_toLocalAbsolutePath($path, $job)
$rfn = str_replace($job['root_dir_virtual'], $GLOBALS['dataDir'] . $_SESSION['User']['id'], $path);
//SGE finds mounted dataDir as root_dir_virtual
- } elseif ($job['launcher'] == "SGE" || $job['launcher'] == "ega_demo" || $job['launcher'] == "docker_SGE") {
+ } elseif ($job['launcher'] == "SGE" || $job['launcher'] == "ega_demo" || $job['launcher'] == "docker_SGE" || $job['launcher'] == "kubernetes_native") {
$rfn = str_replace($job['root_dir_mug'], $GLOBALS['dataDir'], $path);
}
// direct from file_path
@@ -2380,6 +2380,10 @@ function resolvePath_toLocalAbsolutePath($path, $job)
}
}
//clean slashes
+ if ($rfn === "") {
+ // Keep original absolute path instead of returning empty when launcher mapping is missing.
+ $rfn = $path;
+ }
$rfn = preg_replace('#/+#', '/', $rfn);
//return absolute path
diff --git a/scheduler/.dockerignore b/scheduler/.dockerignore
new file mode 100644
index 00000000..b765ce2f
--- /dev/null
+++ b/scheduler/.dockerignore
@@ -0,0 +1,12 @@
+.git
+.gitignore
+__pycache__/
+*.pyc
+*.pyo
+*.pyd
+.pytest_cache/
+.mypy_cache/
+.ruff_cache/
+dist/
+build/
+*.egg-info/
diff --git a/scheduler/Dockerfile b/scheduler/Dockerfile
new file mode 100644
index 00000000..0e7e69a9
--- /dev/null
+++ b/scheduler/Dockerfile
@@ -0,0 +1,11 @@
+FROM python:3.11-alpine
+
+WORKDIR /app
+
+COPY app.py /app/app.py
+
+EXPOSE 8080
+
+USER 65534:65534
+
+CMD ["python", "/app/app.py"]
diff --git a/scheduler/app.py b/scheduler/app.py
new file mode 100644
index 00000000..5288be69
--- /dev/null
+++ b/scheduler/app.py
@@ -0,0 +1,133 @@
+import json
+import os
+import ssl
+import urllib.error
+import urllib.parse
+import urllib.request
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+
+TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token"
+CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
+API_HOST = os.environ.get("KUBERNETES_SERVICE_HOST", "kubernetes.default.svc")
+API_PORT = os.environ.get("KUBERNETES_SERVICE_PORT", "443")
+DEFAULT_NAMESPACE = os.environ.get("DEFAULT_NAMESPACE", "default")
+SCHEDULER_AUTH_TOKEN = os.environ.get("SCHEDULER_AUTH_TOKEN", "")
+
+with open(TOKEN_PATH, "r", encoding="utf-8") as f:
+ SA_TOKEN = f.read().strip()
+
+SSL_CTX = ssl.create_default_context(cafile=CA_PATH)
+
+
+def k8s_request(method, path, body=None, content_type="application/json"):
+ url = f"https://{API_HOST}:{API_PORT}{path}"
+ data = body.encode("utf-8") if body is not None else None
+ req = urllib.request.Request(url, method=method, data=data)
+ req.add_header("Authorization", f"Bearer {SA_TOKEN}")
+ if body is not None:
+ req.add_header("Content-Type", content_type)
+ try:
+ with urllib.request.urlopen(req, context=SSL_CTX, timeout=30) as resp:
+ raw = resp.read().decode("utf-8")
+ return resp.getcode(), raw
+ except urllib.error.HTTPError as e:
+ raw = e.read().decode("utf-8") if e.fp else str(e)
+ return e.code, raw
+
+
+class Handler(BaseHTTPRequestHandler):
+ def _authorized(self):
+ if SCHEDULER_AUTH_TOKEN == "":
+ return False
+ auth = self.headers.get("Authorization", "")
+ return auth == f"Bearer {SCHEDULER_AUTH_TOKEN}"
+
+ def _json(self, code, payload):
+ out = json.dumps(payload).encode("utf-8")
+ self.send_response(code)
+ self.send_header("Content-Type", "application/json")
+ self.send_header("Content-Length", str(len(out)))
+ self.end_headers()
+ self.wfile.write(out)
+
+ def _read_json(self):
+ length = int(self.headers.get("Content-Length", "0"))
+ if length == 0:
+ return {}
+ raw = self.rfile.read(length).decode("utf-8")
+ return json.loads(raw)
+
+ def do_GET(self):
+ if self.path == "/healthz":
+ return self._json(200, {"ok": True})
+ if not self._authorized():
+ return self._json(401, {"ok": False, "error": "unauthorized"})
+ if self.path.startswith("/jobs/"):
+ name_q = self.path[len("/jobs/"):]
+ name, _, query = name_q.partition("?")
+ params = urllib.parse.parse_qs(query)
+ ns = params.get("namespace", [DEFAULT_NAMESPACE])[0]
+ code, raw = k8s_request("GET", f"/apis/batch/v1/namespaces/{ns}/jobs/{name}")
+ if code == 404:
+ return self._json(200, {"ok": True, "exists": False, "job": ""})
+ if code >= 300:
+ return self._json(500, {"ok": False, "error": raw})
+ return self._json(200, {"ok": True, "exists": True, "job": raw})
+ return self._json(404, {"ok": False, "error": "not found"})
+
+ def do_POST(self):
+ if not self._authorized():
+ return self._json(401, {"ok": False, "error": "unauthorized"})
+ if self.path != "/jobs":
+ return self._json(404, {"ok": False, "error": "not found"})
+ try:
+ body = self._read_json()
+ ns = body.get("namespace") or DEFAULT_NAMESPACE
+ manifest = body.get("manifest")
+ if not manifest:
+ return self._json(400, {"ok": False, "error": "manifest is required"})
+ code, raw = k8s_request(
+ "POST",
+ f"/apis/batch/v1/namespaces/{ns}/jobs",
+ body=manifest,
+ content_type="application/yaml",
+ )
+ if code >= 300:
+ return self._json(500, {"ok": False, "error": raw})
+ return self._json(200, {"ok": True, "stdout": raw, "stderr": ""})
+ except Exception as e:
+ return self._json(500, {"ok": False, "error": str(e)})
+
+ def do_DELETE(self):
+ if not self._authorized():
+ return self._json(401, {"ok": False, "error": "unauthorized"})
+ if not self.path.startswith("/jobs/"):
+ return self._json(404, {"ok": False, "error": "not found"})
+ name_q = self.path[len("/jobs/"):]
+ name, _, query = name_q.partition("?")
+ params = urllib.parse.parse_qs(query)
+ ns = params.get("namespace", [DEFAULT_NAMESPACE])[0]
+ delete_opts = '{"apiVersion":"batch/v1","kind":"DeleteOptions","propagationPolicy":"Background"}'
+ code, raw = k8s_request(
+ "DELETE",
+ f"/apis/batch/v1/namespaces/{ns}/jobs/{name}",
+ body=delete_opts,
+ content_type="application/json",
+ )
+ if code == 404:
+ return self._json(
+ 200,
+ {"ok": True, "stdout": f"job.batch/{name} already deleted", "stderr": ""},
+ )
+ if code >= 300:
+ return self._json(500, {"ok": False, "error": raw})
+ return self._json(200, {"ok": True, "stdout": raw, "stderr": ""})
+
+ def log_message(self, fmt, *args):
+ return
+
+
+if __name__ == "__main__":
+ server = HTTPServer(("0.0.0.0", 8080), Handler)
+ server.serve_forever()
diff --git a/sge/Dockerfile b/sge/Dockerfile
index 73b7b688..7b2391b3 100644
--- a/sge/Dockerfile
+++ b/sge/Dockerfile
@@ -52,9 +52,19 @@ RUN useradd -m application \
&& echo "application:application" | chpasswd
# Make sure host and container share the same GID for group 'docker', bc it has reading permissions to the socket file
-ARG DOCKER_GROUP
-RUN groupmod -g $DOCKER_GROUP docker
-RUN usermod -aG docker application
+
+#ARG DOCKER_GROUP
+#RUN groupmod -g $DOCKER_GROUP docker
+#RUN usermod -aG docker application
+
+ARG DOCKER_GROUP=1002
+RUN set -eux; \
+ if getent group docker >/dev/null 2>&1; then \
+ groupmod -g "${DOCKER_GROUP}" docker; \
+ else \
+ groupadd -g "${DOCKER_GROUP}" docker; \
+ fi; \
+ usermod -aG docker application
# Add setup script and set permissions
ADD setup_gridengine.sh /usr/local/bin/setup_gridengine.sh