From fc07888b37ad02c7d5aa98afed213887b4b60e5b Mon Sep 17 00:00:00 2001 From: jlnav Date: Fri, 22 May 2026 16:58:59 -0500 Subject: [PATCH 1/2] Claude-directed Flux extensions for MPIExecutor and standalone FluxExecutor for inside container. Basically, if Flux is found and FLUX_URI is set, this is sufficient to submit jobs that that scheduler. --- libensemble/executors/__init__.py | 9 +- libensemble/executors/flux_executor.py | 466 ++++++++++++++++++++++ libensemble/executors/mpi_runner.py | 37 ++ libensemble/resources/env_resources.py | 99 ++++- libensemble/resources/mpi_resources.py | 89 +++-- libensemble/resources/platforms.py | 17 +- libensemble/tests/unit_tests/test_flux.py | 374 +++++++++++++++++ libensemble/utils/validators.py | 11 +- pyproject.toml | 7 + 9 files changed, 1049 insertions(+), 60 deletions(-) create mode 100644 libensemble/executors/flux_executor.py create mode 100644 libensemble/tests/unit_tests/test_flux.py diff --git a/libensemble/executors/__init__.py b/libensemble/executors/__init__.py index 563fa33525..13c7d851d5 100644 --- a/libensemble/executors/__init__.py +++ b/libensemble/executors/__init__.py @@ -1,4 +1,11 @@ from libensemble.executors.executor import Executor from libensemble.executors.mpi_executor import MPIExecutor -__all__ = ["Executor", "MPIExecutor"] +# FluxExecutor is optional - requires flux-core Python bindings +try: + from libensemble.executors.flux_executor import FluxExecutor # noqa: F401 + + __all__ = ["Executor", "MPIExecutor", "FluxExecutor"] +except ImportError: + # flux-core not available - FluxExecutor won't be importable + __all__ = ["Executor", "MPIExecutor"] diff --git a/libensemble/executors/flux_executor.py b/libensemble/executors/flux_executor.py new file mode 100644 index 0000000000..b5b4f4dbc4 --- /dev/null +++ b/libensemble/executors/flux_executor.py @@ -0,0 +1,466 @@ +""" +This module provides a native Flux executor using the Flux Python API. + +The FluxExecutor submits jobs directly to Flux using its Python bindings, +rather than wrapping `flux run` as a subprocess. This provides better +integration with Flux's job lifecycle management and is particularly +useful when running inside containers where MPI runners may not be available. + +Usage:: + + from libensemble.executors.flux_executor import FluxExecutor + + exctr = FluxExecutor() + exctr.register_app(full_path="/path/to/my_app.x", app_name="my_app") + + # In your sim function: + task = exctr.submit(app_name="my_app", num_procs=4, num_nodes=1) + task.wait() + +Requirements: + - flux-core Python bindings must be installed + - Must be running inside a Flux instance (FLUX_URI must be set) +""" + +import logging +import os +import time + +from libensemble.executors.executor import ( + Application, + Executor, + ExecutorException, + Task, + jassert, +) + +logger = logging.getLogger(__name__) + +# Try to import flux - it's optional +try: + import flux + import flux.job + from flux.job import JobspecV1 + + FLUX_AVAILABLE = True +except ImportError: + FLUX_AVAILABLE = False + flux = None + JobspecV1 = None + + +class FluxTask(Task): + """ + Task subclass for Flux jobs using native Flux Python API. + + Overrides poll() and kill() to use Flux job management instead + of subprocess operations. + """ + + def __init__( + self, + app=None, + app_args=None, + workdir=None, + stdout=None, + stderr=None, + workerid=None, + dry_run=False, + ) -> None: + super().__init__(app, app_args, workdir, stdout, stderr, workerid, dry_run) + self.flux_handle = None + self.flux_jobid = None + self.flux_future = None + + def reset(self) -> None: + super().reset() + self.flux_jobid = None + self.flux_future = None + + def _check_poll(self) -> bool: + """Check whether polling this task makes sense.""" + jassert( + self.flux_jobid is not None, + f"task {self.name} has no Flux job ID - check task has been launched", + ) + if self.finished: + logger.debug(f"Polled task {self.name} has already finished. Not re-polling. Status is {self.state}") + return False + return True + + def poll(self) -> None: + """Polls and updates the status attributes of the task using Flux job state.""" + if self.dry_run: + self._set_complete() + return + + if not self._check_poll(): + return + + try: + # Get job info to check state + info = flux.job.job_info(self.flux_handle, self.flux_jobid).get_info() + state = info.get("state", "UNKNOWN") + + # Map Flux states to libEnsemble states + # Flux states: DEPEND, PRIORITY, SCHED, RUN, CLEANUP, INACTIVE + if state in ("DEPEND", "PRIORITY", "SCHED"): + self.state = "WAITING" + elif state == "RUN": + self.state = "RUNNING" + self.runtime = self.timer.elapsed + elif state in ("CLEANUP", "INACTIVE"): + # Job has finished - check if successful + self._handle_completion(info) + else: + self.state = "UNKNOWN" + self.runtime = self.timer.elapsed + + except Exception as e: + logger.warning(f"Error polling Flux job {self.flux_jobid}: {e}") + self.state = "UNKNOWN" + self.runtime = self.timer.elapsed + + def _handle_completion(self, info: dict) -> None: + """Handle job completion and determine success/failure.""" + self.finished = True + self.calc_task_timing() + + # Check result/exit status + result = info.get("result", "") + success = info.get("success", False) + + if success or result == "COMPLETED": + self.success = True + self.state = "FINISHED" + self.errcode = 0 + else: + self.success = False + self.state = "FAILED" + # Try to get exit code from result + self.errcode = info.get("returncode", 1) + + logger.info(f"Task {self.name} finished with state {self.state} (result={result})") + + def _set_complete(self) -> None: + """Set task as complete (used for dry_run).""" + self.finished = True + if self.dry_run: + self.success = True + self.state = "FINISHED" + else: + self.calc_task_timing() + self.success = self.errcode == 0 + self.state = "FINISHED" if self.success else "FAILED" + logger.info(f"Task {self.name} finished with errcode {self.errcode} ({self.state})") + + def wait(self, timeout: float | None = None) -> None: + """Waits on completion of the Flux job or raises TimeoutExpired exception.""" + from libensemble.executors.executor import TimeoutExpired + + if self.dry_run: + self._set_complete() + return + + if not self._check_poll(): + return + + try: + # Wait for job to complete + start_time = time.time() + while True: + self.poll() + if self.finished: + break + + if timeout is not None: + elapsed = time.time() - start_time + if elapsed >= timeout: + raise TimeoutExpired(self.name, timeout) + + time.sleep(0.1) + + except TimeoutExpired: + raise + except Exception as e: + logger.warning(f"Error waiting for Flux job {self.flux_jobid}: {e}") + self.state = "FAILED" + self.finished = True + + def kill(self, wait_time: int | None = 60) -> None: + """Kills/cancels the Flux job. + + Parameters + ---------- + wait_time: int, Optional + Time in seconds to wait for cancellation. + Note: Flux handles job cancellation internally. + """ + self.poll() + if self.dry_run: + return + + if self.finished: + logger.warning(f"Trying to kill task that is no longer running. Task {self.name}: Status is {self.state}") + return + + if self.flux_jobid is None: + logger.warning(f"Task {self.name} has no Flux job ID - cannot kill") + return + + logger.info(f"Canceling Flux job {self.flux_jobid} for task {self.name}") + + try: + # Cancel the job using Flux API + flux.job.cancel(self.flux_handle, self.flux_jobid) + + # Wait briefly for cancellation to take effect + if wait_time: + deadline = time.time() + min(wait_time, 5) # Don't wait too long + while time.time() < deadline: + self.poll() + if self.finished: + break + time.sleep(0.1) + + except Exception as e: + logger.warning(f"Error canceling Flux job {self.flux_jobid}: {e}") + + self.state = "USER_KILLED" + self.finished = True + self.calc_task_timing() + + +class FluxExecutor(Executor): + """ + Native Flux executor using the Flux Python API. + + This executor submits jobs directly to Flux rather than wrapping + `flux run` as a subprocess. It provides better integration with + Flux's job lifecycle and is suitable for container-based workflows. + + Parameters + ---------- + None + + Raises + ------ + ExecutorException + If flux Python bindings are not available or FLUX_URI is not set. + + Example + ------- + :: + + from libensemble.executors.flux_executor import FluxExecutor + + exctr = FluxExecutor() + exctr.register_app(full_path="/path/to/sim.x", app_name="sim") + + # In sim function: + task = exctr.submit(app_name="sim", num_procs=4) + task.wait() + """ + + def __init__(self) -> None: + """Instantiate a new FluxExecutor instance.""" + if not FLUX_AVAILABLE: + raise ExecutorException( + "Flux Python bindings not available. " + "Install flux-core or use MPIExecutor with mpi_runner='flux' instead." + ) + + if not os.environ.get("FLUX_URI"): + raise ExecutorException( + "FLUX_URI environment variable not set. " "FluxExecutor must be used inside a Flux instance." + ) + + super().__init__() + + # Connect to the Flux instance + try: + self.flux_handle = flux.Flux() + except Exception as e: + raise ExecutorException(f"Failed to connect to Flux instance: {e}") + + self.resources = None + self.platform_info: dict = {} + + def set_resources(self, resources) -> None: + """Set resources for the executor.""" + self.resources = resources + + def add_platform_info(self, platform_info: dict | None = None) -> None: + """Add platform info to the executor.""" + self.platform_info = platform_info or {} + + def submit( + self, + calc_type: str | None = None, + app_name: str | None = None, + num_procs: int | None = None, + num_nodes: int | None = None, + procs_per_node: int | None = None, + num_gpus: int | None = None, + app_args: str | None = None, + stdout: str | None = None, + stderr: str | None = None, + dry_run: bool = False, + wait_on_start: bool = False, + extra_args: str | None = None, + ) -> FluxTask: + """Submit a job to Flux. + + Returns :class:`FluxTask` object. + + Parameters + ---------- + calc_type: str, Optional + The calculation type: 'sim' or 'gen' + + app_name: str, Optional + The application name. + + num_procs: int, Optional + The total number of processes (MPI ranks) + + num_nodes: int, Optional + The number of nodes + + procs_per_node: int, Optional + The processes per node + + num_gpus: int, Optional + The total number of GPUs + + app_args: str, Optional + Application arguments + + stdout: str, Optional + Standard output filename + + stderr: str, Optional + Standard error filename + + dry_run: bool, Optional + If True, don't actually submit the job + + wait_on_start: bool, Optional + Whether to wait for job to start running + + extra_args: str, Optional + Additional arguments (currently not used for native Flux) + + Returns + ------- + task: FluxTask + The submitted task object + """ + app: Application | None = None + if app_name is not None: + app = self.get_app(app_name) + elif calc_type is not None: + app = self.default_app(calc_type) + else: + raise ExecutorException("Either app_name or calc_type must be set") + + assert app is not None + + default_workdir = os.getcwd() + task = FluxTask(app, app_args, default_workdir, stdout, stderr, self.workerID, dry_run) + task.flux_handle = self.flux_handle + + if not dry_run: + self._check_app_exists(task.app) + + # Build the command + command = [app.full_path] + if app.precedent: + command = app.precedent.split() + command + if app_args: + command.extend(app_args.split()) + + # Determine resource configuration + if num_procs is None: + num_procs = 1 + if num_nodes is None: + if procs_per_node and num_procs: + num_nodes = max(1, num_procs // procs_per_node) + else: + num_nodes = 1 + if procs_per_node is None: + procs_per_node = max(1, num_procs // num_nodes) + + task.runline = " ".join(command) + + if dry_run: + logger.info(f"Test (No submit) Command: {task.runline}") + logger.info(f" num_procs={num_procs}, num_nodes={num_nodes}, procs_per_node={procs_per_node}") + task._set_complete() + else: + # Create Flux jobspec + try: + jobspec = JobspecV1.from_command( + command, + num_tasks=num_procs, + num_nodes=num_nodes, + cores_per_task=1, + ) + + # Set working directory + jobspec.cwd = task.workdir + + # Set output files + if stdout: + jobspec.stdout = os.path.join(task.workdir, stdout) + if stderr: + jobspec.stderr = os.path.join(task.workdir, stderr) + + # Add GPU resources if requested + if num_gpus and num_gpus > 0: + jobspec.setattr_shell_option("gpu-affinity", "per-task") + # Note: GPU binding in Flux may vary by version + try: + jobspec.setattr("system.resources.gpus", num_gpus) + except Exception: + # Fallback for older Flux versions + pass + + # Submit the job + logger.info(f"Submitting Flux job for task {task.name}: {task.runline}") + task.flux_jobid = flux.job.submit(self.flux_handle, jobspec) + logger.info(f"Task {task.name} submitted with Flux job ID {task.flux_jobid}") + + task.timer.start() + task.submit_time = task.timer.tstart + + if wait_on_start: + self._wait_on_start(task) + + except Exception as e: + logger.error(f"Failed to submit Flux job: {e}") + task.state = "FAILED_TO_START" + task.finished = True + raise ExecutorException(f"Failed to submit Flux job: {e}") + + self.list_of_tasks.append(task) + return task + + def _wait_on_start(self, task: FluxTask, timeout: float = 60.0) -> None: + """Wait for a task to start running.""" + start = time.time() + task.timer.start() + task.submit_time = task.timer.tstart + + while task.state in ("CREATED", "WAITING"): + time.sleep(0.1) + task.poll() + if time.time() - start > timeout: + logger.warning(f"Timeout waiting for task {task.name} to start") + break + + if not task.finished: + task.timer.start() + task.submit_time = task.timer.tstart + + logger.debug(f"Task {task.name} polled as {task.state} after {time.time() - start:.2f} seconds") diff --git a/libensemble/executors/mpi_runner.py b/libensemble/executors/mpi_runner.py index 48953cc3c9..7cca15d12d 100644 --- a/libensemble/executors/mpi_runner.py +++ b/libensemble/executors/mpi_runner.py @@ -19,6 +19,7 @@ def get_runner(mpi_runner_type, runner_name=None, platform_info=None): "srun": SRUN_MPIRunner, "jsrun": JSRUN_MPIRunner, "msmpi": MSMPI_MPIRunner, + "flux": FLUX_MPIRunner, "custom": MPIRunner, } runner = None @@ -519,3 +520,39 @@ def __init__(self, run_command="jsrun", platform_info=None): def express_spec(self, task, nprocs, nnodes, ppn, machinefile, hyperthreads, extra_args, resources, workerID): """Returns None, None as jsrun uses neither hostlist or machinefile""" return None, None + + +class FLUX_MPIRunner(MPIRunner): + """MPI Runner for Flux Framework (flux run). + + Flux provides flexible resource management and job scheduling. + See https://flux-framework.org/ for details. + """ + + def __init__(self, run_command="flux", platform_info=None): + self.run_command = run_command + self.subgroup_launch = False # Flux manages job lifecycle + self.mfile_support = False + self.arg_nprocs = ("-n", "--ntasks") + self.arg_nnodes = ("-N", "--nodes") + self.arg_ppn = ("--tasks-per-node",) + self.default_mpi_options = None + self.default_gpu_arg_type = "option_gpus_per_task" + self.default_gpu_args = {"option_gpus_per_task": "-g", "option_gpus_per_node": "--gpus-per-node"} + self.platform_info = platform_info + self.rm_rpn = False + + # flux run command template + # Note: "run" subcommand is added after the base command + self.mpi_command = [ + self.run_command, + "run", + "-N {num_nodes}", + "-n {num_procs}", + "--tasks-per-node {procs_per_node}", + "{extra_args}", + ] + + def express_spec(self, task, nprocs, nnodes, ppn, machinefile, hyperthreads, extra_args, resources, workerID): + """Returns None, None as flux manages resources internally""" + return None, None diff --git a/libensemble/resources/env_resources.py b/libensemble/resources/env_resources.py index 47b3d78624..4d7de9c368 100644 --- a/libensemble/resources/env_resources.py +++ b/libensemble/resources/env_resources.py @@ -35,6 +35,7 @@ class EnvResources: default_nodelist_env_pbs = "PBS_NODEFILE" default_nodelist_env_lsf = "LSB_HOSTS" default_nodelist_env_lsf_shortform = "LSB_MCPU_HOSTS" + default_nodelist_env_flux = "FLUX_URI" def __init__( self, @@ -43,6 +44,7 @@ def __init__( nodelist_env_pbs: str | None = None, nodelist_env_lsf: str | None = None, nodelist_env_lsf_shortform: str | None = None, + nodelist_env_flux: str | None = None, ) -> None: """Initializes a new EnvResources instance @@ -71,10 +73,17 @@ def __init__( nodelist_env_lsf_shortform: String, optional The environment variable giving a node list in LSF short-form format (Default: uses LSB_MCPU_HOSTS). Note: This is queried only if a node_list file is not provided. + + nodelist_env_flux: String, optional + The environment variable indicating a Flux instance (Default: uses FLUX_URI). + When present, the nodelist is obtained via `flux resource list`. + Note: This is queried only if a node_list file is not provided. """ self.scheduler = None self.nodelists = {} + # Check Flux first - it may run inside Slurm but should take precedence + self.nodelists["Flux"] = nodelist_env_flux or EnvResources.default_nodelist_env_flux self.nodelists["Slurm"] = nodelist_env_slurm or EnvResources.default_nodelist_env_slurm self.nodelists["Cobalt"] = nodelist_env_cobalt or EnvResources.default_nodelist_env_cobalt self.nodelists["PBS"] = nodelist_env_pbs or EnvResources.default_nodelist_env_pbs @@ -82,6 +91,7 @@ def __init__( self.nodelists["LSF_shortform"] = nodelist_env_lsf_shortform or EnvResources.default_nodelist_env_lsf_shortform self.ndlist_funcs = {} + self.ndlist_funcs["Flux"] = EnvResources.get_flux_nodelist self.ndlist_funcs["Slurm"] = EnvResources.get_slurm_nodelist self.ndlist_funcs["Cobalt"] = EnvResources.get_cobalt_nodelist self.ndlist_funcs["PBS"] = EnvResources.get_pbs_nodelist @@ -105,7 +115,7 @@ def get_nodelist(self) -> list[str | Any]: return [] @staticmethod - def abbrev_nodenames(node_list: list[str], prefix: str = None) -> list[str]: + def abbrev_nodenames(node_list: list[str], prefix: str | None = None) -> list[str]: """Returns nodelist with only string up to first dot""" newlist = [s.split(".", 1)[0] for s in node_list] return newlist @@ -151,25 +161,7 @@ def _noderange_append(prefix: str, nidstr: str, suffix: str) -> list[str]: def get_slurm_nodelist(node_list_env: str) -> list[str | Any]: """Gets global libEnsemble nodelist from the Slurm environment""" fullstr = os.environ[node_list_env] - if not fullstr: - return [] - # Split at commas outside of square brackets - r = re.compile(r"(?:[^,\[]|\[[^\]]*\])+") - part_splitstr = r.findall(fullstr) - nidlst = [] - for i in range(len(part_splitstr)): - part = part_splitstr[i] - splitstr = part.split("[", 1) - if len(splitstr) == 1: - nidlst.append(splitstr[0]) - else: - prefix = splitstr[0] - remainder = splitstr[1] - splitstr = remainder.split("]", 1) - nidstr = splitstr[0] - suffix = splitstr[1] - nidlst.extend(EnvResources._noderange_append(prefix, nidstr, suffix)) - return sorted(nidlst) + return EnvResources.get_slurm_nodelist_from_string(fullstr) @staticmethod def get_cobalt_nodelist(node_list_env: str) -> list[str | Any]: @@ -220,3 +212,70 @@ def get_lsf_nodelist_frm_shortform(node_list_env: str) -> list[str | Any]: nodes_with_count = [n for n in zipped_list if "batch" not in n[0]] nodes = [n[0] for n in nodes_with_count] return nodes + + @staticmethod + def get_flux_nodelist(node_list_env: str) -> list[str | Any]: + """Gets global libEnsemble nodelist from a Flux instance. + + Uses `flux resource list` to obtain the list of available nodes. + The node_list_env parameter (FLUX_URI) is used to detect Flux presence + but the actual nodelist comes from the flux command. + """ + import subprocess + + try: + # flux resource list with format to get just hostnames + # -n: no header, -o: output format + result = subprocess.run( + ["flux", "resource", "list", "-n", "-o", "{nodelist}"], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode != 0: + logger.warning(f"flux resource list failed: {result.stderr}") + return [] + + nodelist_str = result.stdout.strip() + if not nodelist_str: + return [] + + # Parse the nodelist - Flux uses similar format to Slurm (e.g., "node[1-4]") + # We can reuse the Slurm parser for bracket notation + return EnvResources.get_slurm_nodelist_from_string(nodelist_str) + + except subprocess.TimeoutExpired: + logger.warning("flux resource list timed out") + return [] + except FileNotFoundError: + logger.warning("flux command not found") + return [] + except Exception as e: + logger.warning(f"Error getting Flux nodelist: {e}") + return [] + + @staticmethod + def get_slurm_nodelist_from_string(fullstr: str) -> list[str | Any]: + """Parses a nodelist string in Slurm format (also used by Flux). + + This is extracted from get_slurm_nodelist to allow reuse with Flux. + """ + if not fullstr: + return [] + # Split at commas outside of square brackets + r = re.compile(r"(?:[^,\[]|\[[^\]]*\])+") + part_splitstr = r.findall(fullstr) + nidlst = [] + for i in range(len(part_splitstr)): + part = part_splitstr[i] + splitstr = part.split("[", 1) + if len(splitstr) == 1: + nidlst.append(splitstr[0]) + else: + prefix = splitstr[0] + remainder = splitstr[1] + splitstr = remainder.split("]", 1) + nidstr = splitstr[0] + suffix = splitstr[1] + nidlst.extend(EnvResources._noderange_append(prefix, nidstr, suffix)) + return sorted(nidlst) diff --git a/libensemble/resources/mpi_resources.py b/libensemble/resources/mpi_resources.py index 33b62ce3c4..0ee1dc155a 100644 --- a/libensemble/resources/mpi_resources.py +++ b/libensemble/resources/mpi_resources.py @@ -29,16 +29,24 @@ def rassert(test: int | bool | None, *args) -> None: # logger.setLevel(logging.DEBUG) -def get_MPI_variant() -> str: +def get_MPI_variant() -> str | None: """Returns MPI base implementation Returns ------- - mpi_variant: str - MPI variant 'aprun' or 'jsrun' or 'msmpi' or 'mpich' or 'openmpi' or 'srun' + mpi_variant: str | None + MPI variant 'aprun' or 'jsrun' or 'msmpi' or 'mpich' or 'openmpi' or 'srun' or 'flux', or None if not found """ + # Check for Flux first via FLUX_URI environment variable (indicates running inside a Flux instance) + if os.environ.get("FLUX_URI"): + try: + subprocess.check_call(["flux", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return "flux" + except Exception: + pass + try: subprocess.check_call(["aprun", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) return "aprun" @@ -85,7 +93,7 @@ def get_MPI_variant() -> str: return None -def get_MPI_runner(mpi_runner=None) -> str: +def get_MPI_runner(mpi_runner: str | None = None) -> str | None: """Return whether ``mpirun`` is openmpi or mpich""" var = mpi_runner or get_MPI_variant() if var in ["mpich", "openmpi"]: @@ -102,29 +110,31 @@ def task_partition( """ # Convert to int if string is provided - num_procs = int(num_procs) if num_procs else None - num_nodes = int(num_nodes) if num_nodes else None - procs_per_node = int(procs_per_node) if procs_per_node else None + num_procs_int: int | None = int(num_procs) if num_procs else None + num_nodes_int: int | None = int(num_nodes) if num_nodes else None + procs_per_node_int: int | None = int(procs_per_node) if procs_per_node else None # If machinefile is provided - ignore everything else if machinefile: - if num_procs or num_nodes or procs_per_node: + if num_procs_int or num_nodes_int or procs_per_node_int: logger.warning("Machinefile provided - overriding " "procs/nodes/procs_per_node") return None, None, None - if not num_procs: - rassert(num_nodes and procs_per_node, "Need num_procs, num_nodes/procs_per_node, or machinefile") - num_procs = num_nodes * procs_per_node + if not num_procs_int: + rassert(num_nodes_int and procs_per_node_int, "Need num_procs, num_nodes/procs_per_node, or machinefile") + assert num_nodes_int is not None and procs_per_node_int is not None # for mypy + num_procs_int = num_nodes_int * procs_per_node_int - elif not num_nodes: - procs_per_node = procs_per_node or num_procs - num_nodes = num_procs // procs_per_node + elif not num_nodes_int: + procs_per_node_int = procs_per_node_int or num_procs_int + num_nodes_int = num_procs_int // procs_per_node_int - elif not procs_per_node: - procs_per_node = num_procs // num_nodes + elif not procs_per_node_int: + procs_per_node_int = num_procs_int // num_nodes_int - rassert(num_procs == num_nodes * procs_per_node, "num_procs does not equal num_nodes*procs_per_node") - return num_procs, num_nodes, procs_per_node + assert num_nodes_int is not None and procs_per_node_int is not None # for mypy + rassert(num_procs_int == num_nodes_int * procs_per_node_int, "num_procs does not equal num_nodes*procs_per_node") + return num_procs_int, num_nodes_int, procs_per_node_int def _max_rsets_per_node(worker_resources: WorkerResources) -> int: @@ -137,9 +147,9 @@ def _max_rsets_per_node(worker_resources: WorkerResources) -> int: def get_resources( resources: Resources, - num_procs: int = None, - num_nodes: int = None, - procs_per_node: int = None, + num_procs: int | None = None, + num_nodes: int | None = None, + procs_per_node: int | None = None, hyperthreads: bool = False, ) -> tuple[int, int, int]: """Reconciles user-supplied options with available worker @@ -190,7 +200,7 @@ def get_resources( f"Nodes: {num_nodes} procs_per_node {procs_per_node}" ) elif not num_nodes and not procs_per_node: - if num_procs <= cores_avail_per_node_per_worker: + if num_procs is not None and num_procs <= cores_avail_per_node_per_worker: num_nodes = 1 else: num_nodes = local_node_count @@ -198,49 +208,54 @@ def get_resources( num_nodes = local_node_count # Checks config is consistent and sufficient to express - num_procs, num_nodes, procs_per_node = task_partition(num_procs, num_nodes, procs_per_node) + result = task_partition(num_procs, num_nodes, procs_per_node) + if result == (None, None, None): + raise MPIResourcesException("task_partition returned all None unexpectedly") + num_procs_out, num_nodes_out, procs_per_node_out = result + # Type narrowing for mypy + assert num_procs_out is not None and num_nodes_out is not None and procs_per_node_out is not None rassert( - num_nodes <= local_node_count, - "Not enough nodes to honor arguments. " f"Requested {num_nodes}. Only {local_node_count} available", + num_nodes_out <= local_node_count, + "Not enough nodes to honor arguments. " f"Requested {num_nodes_out}. Only {local_node_count} available", ) if gresources.enforce_worker_core_bounds: rassert( - procs_per_node <= cores_avail_per_node, + procs_per_node_out <= cores_avail_per_node, "Not enough processors on a node to honor arguments. " - f"Requested {procs_per_node}. Only {cores_avail_per_node} available", + f"Requested {procs_per_node_out}. Only {cores_avail_per_node} available", ) rassert( - procs_per_node <= cores_avail_per_node_per_worker, + procs_per_node_out <= cores_avail_per_node_per_worker, "Not enough processors per worker to honor arguments. " - f"Requested {procs_per_node}. Only {cores_avail_per_node_per_worker} available", + f"Requested {procs_per_node_out}. Only {cores_avail_per_node_per_worker} available", ) rassert( - num_procs <= (cores_avail_per_node * local_node_count), + num_procs_out <= (cores_avail_per_node * local_node_count), "Not enough procs to honor arguments. " - f"Requested {num_procs}. Only {cores_avail_per_node * local_node_count} available", + f"Requested {num_procs_out}. Only {cores_avail_per_node * local_node_count} available", ) - if num_nodes < local_node_count: + if num_nodes_out < local_node_count: logger.debug( "User constraints mean fewer nodes being used " - f"than available. {num_nodes} nodes used. {local_node_count} nodes available" + f"than available. {num_nodes_out} nodes used. {local_node_count} nodes available" ) - return num_procs, num_nodes, procs_per_node + return num_procs_out, num_nodes_out, procs_per_node_out def create_machinefile( resources: Resources, machinefile: str | None = None, - num_procs: int = None, + num_procs: int | None = None, num_nodes: int | None = None, procs_per_node: int | None = None, hyperthreads: bool = False, -) -> tuple[bool, None, int, int]: +) -> tuple[bool, int | None, int | None, int | None]: """Creates a machinefile based on user-supplied config options, completed by detected machine resources """ @@ -257,7 +272,7 @@ def create_machinefile( with open(machinefile, "w") as f: for node in node_list[:num_nodes]: - f.write((node + "\n") * procs_per_node) + f.write((node + "\n") * (procs_per_node or 1)) built_mfile = os.path.isfile(machinefile) and os.path.getsize(machinefile) > 0 return built_mfile, num_procs, num_nodes, procs_per_node diff --git a/libensemble/resources/platforms.py b/libensemble/resources/platforms.py index 69c36242fc..7f1e3962ab 100644 --- a/libensemble/resources/platforms.py +++ b/libensemble/resources/platforms.py @@ -60,7 +60,7 @@ def check_logical_cores(self): mpi_runner: str | None = None """MPI runner: One of ``"mpich"``, ``"openmpi"``, ``"aprun"``, - ``"srun"``, ``"jsrun"``, ``"msmpi"``, ``"custom"`` """ + ``"srun"``, ``"jsrun"``, ``"msmpi"``, ``"flux"``, ``"custom"`` """ runner_name: str | None = None """Literal string of MPI runner command. Only needed if different to the default @@ -230,6 +230,20 @@ class Polaris(Platform): scheduler_match_slots: bool = True +class FluxAllocation(Platform): + """Platform configuration for running inside a Flux allocation. + + This is a generic Flux configuration. For specific systems using Flux + (e.g., LLNL El Capitan), you may want to create a more specific platform + class with cores_per_node and gpus_per_node set appropriately. + """ + + mpi_runner: str = "flux" + runner_name: str = "flux" + gpu_setting_type: str = "runner_default" + scheduler_match_slots: bool = False + + class Known_platforms(BaseModel): """A list of platforms with known configurations. @@ -271,6 +285,7 @@ class Known_platforms(BaseModel): aurora: Aurora = Aurora() generic_rocm: GenericROCm = GenericROCm() frontier: Frontier = Frontier() + flux: FluxAllocation = FluxAllocation() lumi: Lumi = Lumi() lumi_g: LumiGPU = LumiGPU() perlmutter: Perlmutter = Perlmutter() diff --git a/libensemble/tests/unit_tests/test_flux.py b/libensemble/tests/unit_tests/test_flux.py new file mode 100644 index 0000000000..78dd9f952c --- /dev/null +++ b/libensemble/tests/unit_tests/test_flux.py @@ -0,0 +1,374 @@ +""" +Unit tests for Flux Framework integration in libEnsemble. + +Tests cover: +- FLUX_MPIRunner command generation +- Flux nodelist parsing (via slurm-style bracket notation) +- Flux MPI variant detection +- FluxAllocation platform configuration +- FluxExecutor (when flux bindings available) +""" + +import os +import subprocess +from unittest import mock + +import pytest + +from libensemble.executors.mpi_runner import FLUX_MPIRunner, MPIRunner +from libensemble.resources.env_resources import EnvResources +from libensemble.resources.platforms import FluxAllocation, Known_platforms +from libensemble.utils.validators import check_mpi_runner_type + +# ======================================================================================== +# Tests for FLUX_MPIRunner +# ======================================================================================== + + +def test_flux_runner_factory_registration(): + """Test that flux runner is registered in the factory""" + runner = MPIRunner.get_runner("flux") + assert runner is not None + assert isinstance(runner, FLUX_MPIRunner) + + +def test_flux_runner_default_command(): + """Test default flux run command""" + runner = FLUX_MPIRunner() + assert runner.run_command == "flux" + assert runner.subgroup_launch is False + assert runner.mfile_support is False + + +def test_flux_runner_mpi_command_template(): + """Test the MPI command template for flux""" + runner = FLUX_MPIRunner() + expected = ["flux", "run", "-N {num_nodes}", "-n {num_procs}", "--tasks-per-node {procs_per_node}", "{extra_args}"] + assert runner.mpi_command == expected + + +def test_flux_runner_arg_parsing(): + """Test argument parsing configuration""" + runner = FLUX_MPIRunner() + assert "-n" in runner.arg_nprocs + assert "--ntasks" in runner.arg_nprocs + assert "-N" in runner.arg_nnodes + assert "--nodes" in runner.arg_nnodes + assert "--tasks-per-node" in runner.arg_ppn + + +def test_flux_runner_gpu_settings(): + """Test GPU argument configuration""" + runner = FLUX_MPIRunner() + assert runner.default_gpu_arg_type == "option_gpus_per_task" + assert runner.default_gpu_args["option_gpus_per_task"] == "-g" + assert runner.default_gpu_args["option_gpus_per_node"] == "--gpus-per-node" + + +def test_flux_runner_express_spec(): + """Test that express_spec returns None for both hostlist and machinefile""" + runner = FLUX_MPIRunner() + hostlist, machinefile = runner.express_spec( + task=None, + nprocs=4, + nnodes=2, + ppn=2, + machinefile=None, + hyperthreads=False, + extra_args=None, + resources=None, + workerID=1, + ) + assert hostlist is None + assert machinefile is None + + +def test_flux_runner_custom_command(): + """Test custom run command override""" + runner = FLUX_MPIRunner(run_command="/custom/flux") + assert runner.run_command == "/custom/flux" + assert runner.mpi_command[0] == "/custom/flux" + + +# ======================================================================================== +# Tests for Flux nodelist parsing +# ======================================================================================== + + +def test_flux_nodelist_from_string_empty(): + """Test parsing empty nodelist string""" + result = EnvResources.get_slurm_nodelist_from_string("") + assert result == [] + + +def test_flux_nodelist_from_string_single(): + """Test parsing single node""" + result = EnvResources.get_slurm_nodelist_from_string("node001") + assert result == ["node001"] + + +def test_flux_nodelist_from_string_range(): + """Test parsing node range (flux uses slurm-style notation)""" + result = EnvResources.get_slurm_nodelist_from_string("node[001-004]") + assert result == ["node001", "node002", "node003", "node004"] + + +def test_flux_nodelist_from_string_mixed(): + """Test parsing mixed single nodes and ranges""" + result = EnvResources.get_slurm_nodelist_from_string("node[001-002,005],other[010-011]") + assert "node001" in result + assert "node002" in result + assert "node005" in result + assert "other010" in result + assert "other011" in result + + +@mock.patch("subprocess.run") +def test_flux_nodelist_success(mock_run): + """Test getting nodelist from flux resource list""" + mock_run.return_value = mock.Mock(returncode=0, stdout="node[001-004]\n", stderr="") + + result = EnvResources.get_flux_nodelist("FLUX_URI") + + mock_run.assert_called_once() + assert "flux" in mock_run.call_args[0][0] + assert "resource" in mock_run.call_args[0][0] + assert result == ["node001", "node002", "node003", "node004"] + + +@mock.patch("subprocess.run") +def test_flux_nodelist_command_failure(mock_run): + """Test handling flux command failure""" + mock_run.return_value = mock.Mock(returncode=1, stdout="", stderr="error message") + + result = EnvResources.get_flux_nodelist("FLUX_URI") + assert result == [] + + +@mock.patch("subprocess.run") +def test_flux_nodelist_timeout(mock_run): + """Test handling flux command timeout""" + mock_run.side_effect = subprocess.TimeoutExpired(cmd="flux", timeout=10) + + result = EnvResources.get_flux_nodelist("FLUX_URI") + assert result == [] + + +@mock.patch("subprocess.run") +def test_flux_nodelist_not_found(mock_run): + """Test handling flux command not found""" + mock_run.side_effect = FileNotFoundError() + + result = EnvResources.get_flux_nodelist("FLUX_URI") + assert result == [] + + +def test_env_resources_flux_detection(): + """Test that EnvResources detects Flux when FLUX_URI is set""" + # Save current env + old_flux_uri = os.environ.get("FLUX_URI") + old_slurm = os.environ.get("SLURM_NODELIST") + + try: + # Clear conflicting env vars + if "SLURM_NODELIST" in os.environ: + del os.environ["SLURM_NODELIST"] + + # Set FLUX_URI + os.environ["FLUX_URI"] = "local:///tmp/flux-test" + + env_resources = EnvResources() + assert env_resources.scheduler == "Flux" + assert "Flux" in env_resources.nodelists + assert "Flux" in env_resources.ndlist_funcs + + finally: + # Restore env + if old_flux_uri: + os.environ["FLUX_URI"] = old_flux_uri + elif "FLUX_URI" in os.environ: + del os.environ["FLUX_URI"] + + if old_slurm: + os.environ["SLURM_NODELIST"] = old_slurm + + +def test_env_resources_flux_env_variable(): + """Test default Flux environment variable""" + assert EnvResources.default_nodelist_env_flux == "FLUX_URI" + + +# ======================================================================================== +# Tests for Flux MPI variant detection +# ======================================================================================== + + +@mock.patch.dict(os.environ, {"FLUX_URI": "local:///tmp/flux-test"}) +@mock.patch("subprocess.check_call") +def test_get_mpi_variant_flux(mock_check_call): + """Test MPI variant detection returns flux when in Flux instance""" + from libensemble.resources.mpi_resources import get_MPI_variant + + mock_check_call.return_value = 0 # flux --version succeeds + + result = get_MPI_variant() + assert result == "flux" + + # Verify flux --version was called + mock_check_call.assert_called_once() + assert mock_check_call.call_args[0][0] == ["flux", "--version"] + + +@mock.patch.dict(os.environ, {}, clear=True) +@mock.patch("subprocess.check_call") +@mock.patch("subprocess.Popen") +def test_get_mpi_variant_no_flux_uri(mock_popen, mock_check_call): + """Test MPI variant detection skips flux when FLUX_URI not set""" + from libensemble.resources.mpi_resources import get_MPI_variant + + # Make all checks fail + mock_check_call.side_effect = Exception("not found") + mock_popen.side_effect = FileNotFoundError() + + result = get_MPI_variant() + + # Should not be flux since FLUX_URI not set + assert result != "flux" or result is None + + +# ======================================================================================== +# Tests for FluxAllocation platform +# ======================================================================================== + + +def test_flux_allocation_platform(): + """Test FluxAllocation platform configuration""" + platform = FluxAllocation() + assert platform.mpi_runner == "flux" + assert platform.runner_name == "flux" + assert platform.gpu_setting_type == "runner_default" + assert platform.scheduler_match_slots is False + + +def test_flux_in_known_platforms(): + """Test that flux is registered in known platforms""" + platforms = Known_platforms() + assert hasattr(platforms, "flux") + assert isinstance(platforms.flux, FluxAllocation) + + +# ======================================================================================== +# Tests for validator +# ======================================================================================== + + +def test_validator_accepts_flux(): + """Test that flux is accepted by the MPI runner validator""" + + class MockCls: + pass + + result = check_mpi_runner_type(MockCls, "flux") + assert result == "flux" + + +def test_validator_accepts_all_runners(): + """Test all valid runner names are accepted""" + + class MockCls: + pass + + valid_runners = ["mpich", "openmpi", "aprun", "srun", "jsrun", "msmpi", "flux", "custom"] + for runner in valid_runners: + result = check_mpi_runner_type(MockCls, runner) + assert result == runner + + +def test_validator_rejects_invalid(): + """Test invalid runner names are rejected""" + + class MockCls: + pass + + with pytest.raises(AssertionError): + check_mpi_runner_type(MockCls, "invalid_runner") + + +# ======================================================================================== +# Tests for FluxExecutor (conditional on flux availability) +# ======================================================================================== + + +def test_flux_executor_import_without_flux(): + """Test FluxExecutor handles missing flux gracefully""" + # This test just verifies the module can be imported + # even when flux is not available + try: + from libensemble.executors import flux_executor + + # FLUX_AVAILABLE should be False if flux not installed + # This is fine - we just want to ensure import doesn't crash + assert hasattr(flux_executor, "FLUX_AVAILABLE") + except ImportError: + pytest.skip("flux_executor module not available") + + +def test_flux_executor_requires_flux_uri(): + """Test FluxExecutor raises error when FLUX_URI not set""" + try: + from libensemble.executors.flux_executor import FLUX_AVAILABLE, FluxExecutor + + if not FLUX_AVAILABLE: + pytest.skip("Flux Python bindings not available") + + # Save and clear FLUX_URI + old_uri = os.environ.get("FLUX_URI") + if "FLUX_URI" in os.environ: + del os.environ["FLUX_URI"] + + try: + from libensemble.executors.executor import ExecutorException + + with pytest.raises(ExecutorException, match="FLUX_URI"): + FluxExecutor() + finally: + if old_uri: + os.environ["FLUX_URI"] = old_uri + + except ImportError: + pytest.skip("flux_executor module not available") + + +# ======================================================================================== +# Test runner standalone execution +# ======================================================================================== + + +if __name__ == "__main__": + # FLUX_MPIRunner tests + test_flux_runner_factory_registration() + test_flux_runner_default_command() + test_flux_runner_mpi_command_template() + test_flux_runner_arg_parsing() + test_flux_runner_gpu_settings() + test_flux_runner_express_spec() + test_flux_runner_custom_command() + + # Nodelist parsing tests + test_flux_nodelist_from_string_empty() + test_flux_nodelist_from_string_single() + test_flux_nodelist_from_string_range() + test_flux_nodelist_from_string_mixed() + + # Validator tests + test_validator_accepts_flux() + test_validator_accepts_all_runners() + + # Platform tests + test_flux_allocation_platform() + test_flux_in_known_platforms() + + # EnvResources tests + test_env_resources_flux_env_variable() + + print("All standalone tests passed!") diff --git a/libensemble/utils/validators.py b/libensemble/utils/validators.py index 58cddd4adc..c1fee67ac8 100644 --- a/libensemble/utils/validators.py +++ b/libensemble/utils/validators.py @@ -96,7 +96,16 @@ def check_gpu_setting_type(cls, value): def check_mpi_runner_type(cls, value): if value is not None: - assert value in ["mpich", "openmpi", "aprun", "srun", "jsrun", "msmpi", "custom"], "Invalid MPI runner name" + assert value in [ + "mpich", + "openmpi", + "aprun", + "srun", + "jsrun", + "msmpi", + "flux", + "custom", + ], "Invalid MPI runner name" return value diff --git a/pyproject.toml b/pyproject.toml index 45bb84e22f..7a010b5576 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -216,6 +216,13 @@ extra = [ dev = ["wat>=0.7.0,<0.8"] docs = ["pyenchant", "enchant>=0.0.1,<0.0.2", "sphinx-lfs-content>=1.1.10,<2"] +# Note: For FluxExecutor (native Flux Python API), the flux-core Python bindings +# are required. These are typically installed via: +# - conda: conda install -c conda-forge flux-core +# - spack: spack install flux-core +# - system package manager on supported systems +# The MPIExecutor with mpi_runner="flux" works without Python bindings. + # Various config from here onward [tool.black] line-length = 120 From 4d3b7a8d6fb51e8c8d23549028adfb3fbb81916a Mon Sep 17 00:00:00 2001 From: jlnav Date: Wed, 27 May 2026 14:45:23 -0500 Subject: [PATCH 2/2] GPT-5.5 review of Claude implementation. Safer job-polling, more error-conditions for FluxExecutor, don't use --tasks-per-node. append "-c 1" as extra Flux-run arg. propagate environment. additional tests --- libensemble/executors/flux_executor.py | 68 ++++++------- libensemble/executors/mpi_runner.py | 42 +++++++- libensemble/resources/resources.py | 2 + libensemble/tests/unit_tests/test_flux.py | 113 +++++++++++++++++++++- 4 files changed, 187 insertions(+), 38 deletions(-) diff --git a/libensemble/executors/flux_executor.py b/libensemble/executors/flux_executor.py index b5b4f4dbc4..e2b1aa2e33 100644 --- a/libensemble/executors/flux_executor.py +++ b/libensemble/executors/flux_executor.py @@ -24,6 +24,7 @@ import logging import os +import shlex import time from libensemble.executors.executor import ( @@ -98,9 +99,9 @@ def poll(self) -> None: return try: - # Get job info to check state - info = flux.job.job_info(self.flux_handle, self.flux_jobid).get_info() - state = info.get("state", "UNKNOWN") + info = flux.job.get_job(self.flux_handle, self.flux_jobid) + jassert(info is not None, f"Flux job {self.flux_jobid} was not found") + state = str(info.get("state", "UNKNOWN")).upper() # Map Flux states to libEnsemble states # Flux states: DEPEND, PRIORITY, SCHED, RUN, CLEANUP, INACTIVE @@ -127,10 +128,10 @@ def _handle_completion(self, info: dict) -> None: self.calc_task_timing() # Check result/exit status - result = info.get("result", "") - success = info.get("success", False) + result = str(info.get("result", "")).upper() + success = result == "COMPLETED" or info.get("returncode", 1) == 0 - if success or result == "COMPLETED": + if success: self.success = True self.state = "FINISHED" self.errcode = 0 @@ -373,24 +374,25 @@ def submit( if not dry_run: self._check_app_exists(task.app) - # Build the command - command = [app.full_path] - if app.precedent: - command = app.precedent.split() + command - if app_args: - command.extend(app_args.split()) + if extra_args: + raise ExecutorException("extra_args is not supported by FluxExecutor") - # Determine resource configuration - if num_procs is None: - num_procs = 1 + num_procs = num_procs or 1 if num_nodes is None: - if procs_per_node and num_procs: - num_nodes = max(1, num_procs // procs_per_node) + if procs_per_node is not None: + if num_procs % procs_per_node != 0: + raise ExecutorException("num_procs must be divisible by procs_per_node for FluxExecutor") + num_nodes = num_procs // procs_per_node else: num_nodes = 1 - if procs_per_node is None: - procs_per_node = max(1, num_procs // num_nodes) + elif procs_per_node is not None and num_procs != num_nodes * procs_per_node: + raise ExecutorException("num_procs must equal num_nodes * procs_per_node for FluxExecutor") + command = shlex.split(task.app.app_cmd) + if task.app_args: + command.extend(shlex.split(task.app_args)) + + command = self._set_sim_dir_env(task, command) task.runline = " ".join(command) if dry_run: @@ -400,33 +402,31 @@ def submit( else: # Create Flux jobspec try: + gpus_per_task = None + if num_gpus is not None: + if num_gpus < 0: + raise ExecutorException("num_gpus must be non-negative") + if num_gpus and num_gpus % num_procs != 0: + raise ExecutorException("num_gpus must be divisible by num_procs for FluxExecutor") + gpus_per_task = num_gpus // num_procs if num_gpus else 0 + jobspec = JobspecV1.from_command( command, num_tasks=num_procs, num_nodes=num_nodes, cores_per_task=1, + gpus_per_task=gpus_per_task, + cwd=task.workdir, + environment=dict(os.environ), ) - # Set working directory - jobspec.cwd = task.workdir - - # Set output files if stdout: jobspec.stdout = os.path.join(task.workdir, stdout) if stderr: jobspec.stderr = os.path.join(task.workdir, stderr) - - # Add GPU resources if requested - if num_gpus and num_gpus > 0: + if gpus_per_task: jobspec.setattr_shell_option("gpu-affinity", "per-task") - # Note: GPU binding in Flux may vary by version - try: - jobspec.setattr("system.resources.gpus", num_gpus) - except Exception: - # Fallback for older Flux versions - pass - - # Submit the job + logger.info(f"Submitting Flux job for task {task.name}: {task.runline}") task.flux_jobid = flux.job.submit(self.flux_handle, jobspec) logger.info(f"Task {task.name} submitted with Flux job ID {task.flux_jobid}") diff --git a/libensemble/executors/mpi_runner.py b/libensemble/executors/mpi_runner.py index 7cca15d12d..d44152aff1 100644 --- a/libensemble/executors/mpi_runner.py +++ b/libensemble/executors/mpi_runner.py @@ -542,17 +542,53 @@ def __init__(self, run_command="flux", platform_info=None): self.platform_info = platform_info self.rm_rpn = False - # flux run command template - # Note: "run" subcommand is added after the base command + # Flux's per-resource options are mutually exclusive with -n/--ntasks, + # so express layouts with nodes plus per-task resources. self.mpi_command = [ self.run_command, "run", "-N {num_nodes}", "-n {num_procs}", - "--tasks-per-node {procs_per_node}", "{extra_args}", ] + def get_mpi_specs( + self, + task, + nprocs, + nnodes, + ppn, + ngpus, + machinefile, + hyperthreads, + extra_args, + auto_assign_gpus, + match_procs_to_gpus, + resources, + workerID, + ): + specs = super().get_mpi_specs( + task, + nprocs, + nnodes, + ppn, + ngpus, + machinefile, + hyperthreads, + extra_args, + auto_assign_gpus, + match_procs_to_gpus, + resources, + workerID, + ) + + ppn = specs["procs_per_node"] + if ppn: + extra_args = self._append_to_extra_args(specs["extra_args"], "-c 1") + specs["extra_args"] = extra_args + specs["procs_per_node"] = None + return specs + def express_spec(self, task, nprocs, nnodes, ppn, machinefile, hyperthreads, extra_args, resources, workerID): """Returns None, None as flux manages resources internally""" return None, None diff --git a/libensemble/resources/resources.py b/libensemble/resources/resources.py index 19e246fb6f..9b68e43f04 100644 --- a/libensemble/resources/resources.py +++ b/libensemble/resources/resources.py @@ -182,12 +182,14 @@ def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: st nodelist_env_cobalt = resource_info.get("nodelist_env_cobalt", None) nodelist_env_lsf = resource_info.get("nodelist_env_lsf", None) nodelist_env_lsf_shortform = resource_info.get("nodelist_env_lsf_shortform", None) + nodelist_env_flux = resource_info.get("nodelist_env_flux", None) self.env_resources = EnvResources( nodelist_env_slurm=nodelist_env_slurm, nodelist_env_cobalt=nodelist_env_cobalt, nodelist_env_lsf=nodelist_env_lsf, nodelist_env_lsf_shortform=nodelist_env_lsf_shortform, + nodelist_env_flux=nodelist_env_flux, ) if node_file is None: diff --git a/libensemble/tests/unit_tests/test_flux.py b/libensemble/tests/unit_tests/test_flux.py index 78dd9f952c..cc8b2a2ecf 100644 --- a/libensemble/tests/unit_tests/test_flux.py +++ b/libensemble/tests/unit_tests/test_flux.py @@ -11,13 +11,16 @@ import os import subprocess +from types import SimpleNamespace from unittest import mock import pytest +from libensemble.executors import flux_executor from libensemble.executors.mpi_runner import FLUX_MPIRunner, MPIRunner from libensemble.resources.env_resources import EnvResources from libensemble.resources.platforms import FluxAllocation, Known_platforms +from libensemble.utils import launcher from libensemble.utils.validators import check_mpi_runner_type # ======================================================================================== @@ -43,10 +46,32 @@ def test_flux_runner_default_command(): def test_flux_runner_mpi_command_template(): """Test the MPI command template for flux""" runner = FLUX_MPIRunner() - expected = ["flux", "run", "-N {num_nodes}", "-n {num_procs}", "--tasks-per-node {procs_per_node}", "{extra_args}"] + expected = ["flux", "run", "-N {num_nodes}", "-n {num_procs}", "{extra_args}"] assert runner.mpi_command == expected +def test_flux_runner_forms_valid_runline_without_tasks_per_node(): + """Flux run should avoid mixing per-resource and per-task options""" + runner = FLUX_MPIRunner() + specs = runner.get_mpi_specs( + task=SimpleNamespace(env={}, _add_to_env=lambda *args: None, ngpus_req=0), + nprocs=4, + nnodes=2, + ppn=2, + ngpus=None, + machinefile=None, + hyperthreads=False, + extra_args=None, + auto_assign_gpus=False, + match_procs_to_gpus=False, + resources=None, + workerID=1, + ) + + runline = launcher.form_command(runner.mpi_command, specs) + assert runline == ["flux", "run", "-N", "2", "-n", "4", "-c", "1"] + + def test_flux_runner_arg_parsing(): """Test argument parsing configuration""" runner = FLUX_MPIRunner() @@ -339,6 +364,91 @@ def test_flux_executor_requires_flux_uri(): pytest.skip("flux_executor module not available") +def test_flux_task_poll_uses_get_job(): + """Test FluxTask polls using Flux's get_job helper""" + if not flux_executor.FLUX_AVAILABLE: + pytest.skip("Flux Python bindings not available") + + task = flux_executor.FluxTask( + app=SimpleNamespace(name="app"), + app_args=None, + workdir=os.getcwd(), + stdout="out.txt", + stderr="err.txt", + workerid=1, + dry_run=False, + ) + task.flux_handle = object() + task.flux_jobid = 123 + task.timer.start() + task.submit_time = task.timer.tstart + + with mock.patch.object(flux_executor.flux.job, "get_job", return_value={"state": "RUN"}) as mock_get_job: + task.poll() + + mock_get_job.assert_called_once_with(task.flux_handle, task.flux_jobid) + assert task.state == "RUNNING" + + +def test_flux_executor_submit_builds_jobspec_with_environment_and_gpus(): + """Test FluxExecutor submit passes environment and GPU resources via jobspec""" + if not flux_executor.FLUX_AVAILABLE: + pytest.skip("Flux Python bindings not available") + + executor = object.__new__(flux_executor.FluxExecutor) + executor.flux_handle = object() + executor.resources = None + executor.platform_info = {} + executor.workerID = 7 + executor.list_of_tasks = [] + executor.apps = {} + executor.default_apps = {"sim": None, "gen": None} + executor.base_dir = os.getcwd() + + app = SimpleNamespace( + name="sim", full_path="/path/to/sim.x", app_cmd="fluxwrap /path/to/sim.x", precedent="fluxwrap" + ) + executor.get_app = lambda app_name: app + executor.default_app = lambda calc_type: app + executor._check_app_exists = lambda app_obj: None + + old_env = os.environ.get("TEST_FLUX_ENV") + os.environ["TEST_FLUX_ENV"] = "present" + + jobspec = SimpleNamespace(stdout=None, stderr=None) + submit_calls = [] + + def fake_from_command(command, **kwargs): + submit_calls.append((command, kwargs)) + jobspec.cwd = kwargs.get("cwd") + jobspec.environment = kwargs.get("environment") + jobspec.setattr_shell_option = mock.Mock() + return jobspec + + try: + with ( + mock.patch.object(flux_executor.JobspecV1, "from_command", side_effect=fake_from_command), + mock.patch.object(flux_executor.flux.job, "submit", return_value=42), + ): + task = executor.submit(app_name="sim", num_procs=4, num_nodes=2, num_gpus=4, app_args="--flag value") + finally: + if old_env is None: + del os.environ["TEST_FLUX_ENV"] + else: + os.environ["TEST_FLUX_ENV"] = old_env + + command, kwargs = submit_calls[0] + assert command[:2] == ["fluxwrap", "/path/to/sim.x"] + assert command[-2:] == ["--flag", "value"] + assert kwargs["num_tasks"] == 4 + assert kwargs["num_nodes"] == 2 + assert kwargs["gpus_per_task"] == 1 + assert kwargs["environment"]["TEST_FLUX_ENV"] == "present" + assert kwargs["environment"]["LIBENSEMBLE_SIM_DIR"] == "." + jobspec.setattr_shell_option.assert_called_once_with("gpu-affinity", "per-task") + assert task.flux_jobid == 42 + + # ======================================================================================== # Test runner standalone execution # ======================================================================================== @@ -349,6 +459,7 @@ def test_flux_executor_requires_flux_uri(): test_flux_runner_factory_registration() test_flux_runner_default_command() test_flux_runner_mpi_command_template() + test_flux_runner_forms_valid_runline_without_tasks_per_node() test_flux_runner_arg_parsing() test_flux_runner_gpu_settings() test_flux_runner_express_spec()