diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 2d31cb8e84..be2f3ef92e 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -146,14 +146,14 @@ def createJobs(self, newJob: JobTuple) -> bool: self.boss.config.max_jobs ): activity = True - jobID, cpu, memory, command, jobName, environment, gpus = ( + jobID, cpu, memory, walltime, command, jobName, environment, gpus = ( self.waitingJobs.pop(0) ) if self.boss.config.memory_is_product and cpu > 1: memory = memory // cpu # prepare job submission command subLine = self.prepareSubmission( - cpu, memory, jobID, command, jobName, environment, gpus + cpu, memory, walltime, jobID, command, jobName, environment, gpus ) logger.debug("Running %r", subLine) batchJobID = self.boss.with_retries(self.submitJob, subLine) @@ -364,6 +364,7 @@ def prepareSubmission( self, cpu: int, memory: int, + walltime: int, jobID: int, command: str, jobName: str, @@ -500,6 +501,7 @@ def issueBatchJob( job_id, job_desc.cores, job_desc.memory, + job_desc.walltime, command, get_job_kind(job_desc.get_names()), job_environment, diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 32213940fe..400731ba42 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -352,6 +352,7 @@ def prepareSubmission( self, cpu: int, memory: int, + walltime: int, jobID: int, command: str, jobName: str, @@ -361,7 +362,7 @@ def prepareSubmission( # Make sure to use exec so we can get Slurm's signals in the Toil # worker instead of having an intervening Bash return self.prepareSbatch( - cpu, memory, jobID, jobName, job_environment, gpus + cpu, memory, walltime, jobID, jobName, job_environment, gpus ) + [f"--wrap=exec {command}"] def submitJob(self, subLine: list[str]) -> int: @@ -856,6 +857,7 @@ def prepareSbatch( self, cpu: int, mem: int, + walltime: int, jobID: int, jobName: str, job_environment: dict[str, str] | None, @@ -907,8 +909,8 @@ def prepareSbatch( # --export=[ALL,] export_all = True - export_list = [] # Some items here may be multiple comma-separated values - time_limit: int | None = self.boss.config.slurm_time # type: ignore[attr-defined] + export_list = [] # Some items here may be multiple comma-separated values + time_limit: int | None = self.boss.config.slurm_time or walltime # type: ignore[attr-defined] partition: str | None = None qos: str | None = None @@ -1051,7 +1053,7 @@ def prepareSbatch( sbatch_line.append(f"--mem={math.ceil(mem / 2 ** 20)}") if cpu is not None: sbatch_line.append(f"--cpus-per-task={math.ceil(cpu)}") - if time_limit is not None: + if time_limit and time_limit > 0: # Put all the seconds in the seconds slot sbatch_line.append(f"--time=0:{time_limit}") @@ -1101,6 +1103,7 @@ def issueBatchJob( job_id, job_desc.cores, memory, + job_desc.walltime, command, get_job_kind(job_desc.get_names()), job_environment, diff --git a/src/toil/common.py b/src/toil/common.py index bf0e75762d..0ba9a87542 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -217,6 +217,7 @@ class Config: defaultCores: float | int defaultDisk: int defaultPreemptible: bool + defaultWalltime: int # TODO: These names are generated programmatically in # Requirer._fetchRequirement so we can't use snake_case until we fix # that (and add compatibility getters/setters?) @@ -405,6 +406,7 @@ def set_option(option_name: str, old_names: list[str] | None = None) -> None: set_option("maxMemory") set_option("maxDisk") set_option("defaultPreemptible") + set_option("defaultWalltime") # Retrying/rescuing jobs set_option("retryCount") diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index d25c1794c5..ab8ea5beb1 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -2204,6 +2204,7 @@ def __init__( disk: int | str | None = "1MiB", accelerators: list[AcceleratorRequirement] | None = None, preemptible: bool | None = None, + walltime: int | None = 0, tool_id: str | None = None, parent_name: str | None = None, subjob_name: str | None = None, @@ -2250,6 +2251,7 @@ def __init__( disk=disk, accelerators=accelerators, preemptible=preemptible, + walltime=walltime, unitName=self.task_path, displayName=display_name, local=local, @@ -2582,6 +2584,17 @@ def __init__( # Note: if the job is using the toil default memory, it won't be increased memory = max(memory, min_ram) + # Check if the tool has set a time limit. If yes, use it. Otherwise, + # use a None requirement to use the Toil default. + tool_max_walltime = tool.get_requirement("ToolTimeLimit")[0] or {} + if ( + "timelimit" in tool_max_walltime + and (limit_val := tool_max_walltime["timelimit"]) is not None + ): + walltime = cast(int, self.builder.do_eval(limit_val)) + else: + walltime = None + accelerators: list[AcceleratorRequirement] | None = None if req.get("cudaDeviceCount", 0) > 0: # There's a CUDARequirement, which cwltool processed for us @@ -2650,6 +2663,7 @@ def __init__( disk=int(total_disk), accelerators=accelerators, preemptible=preemptible, + walltime=walltime, tool_id=self.cwltool.tool["id"], parent_name=parent_name, local=isinstance(tool, cwltool.command_line_tool.ExpressionTool), diff --git a/src/toil/job.py b/src/toil/job.py index ca39f92500..aeca657004 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -68,7 +68,7 @@ from toil.deferred import DeferredFunction from toil.fileStores import FileID from toil.lib.compatibility import deprecated -from toil.lib.conversions import bytes2human, human2bytes +from toil.lib.conversions import bytes2human, human2bytes, seconds_to_dhms from toil.lib.exceptions import UnimplementedURLException from toil.lib.expando import Expando from toil.lib.resources import ResourceMonitor @@ -418,10 +418,11 @@ class RequirementsDict(TypedDict): disk: NotRequired[int] accelerators: NotRequired[list[AcceleratorRequirement]] preemptible: NotRequired[bool] + walltime: NotRequired[int] # These must be all the key names in RequirementsDict -REQUIREMENT_NAMES = ["disk", "memory", "cores", "accelerators", "preemptible"] +REQUIREMENT_NAMES = ["disk", "memory", "cores", "accelerators", "preemptible", "walltime"] # This is the supertype of all value types in RequirementsDict ParsedRequirement = Union[int, float, bool, list[AcceleratorRequirement]] @@ -454,7 +455,7 @@ class Requirer: """ Base class implementing the storage and presentation of requirements. - Has cores, memory, disk, and preemptability as properties. + Has cores, memory, disk, preemptability, and walltime as properties. """ _requirementOverrides: RequirementsDict @@ -465,7 +466,7 @@ def __init__(self, requirements: Mapping[str, ParseableRequirement | None]) -> N :param dict requirements: Dict from string to value describing a set of resource requirments. 'cores', 'memory', - 'disk', 'preemptible', and 'accelerators' fields, if set, are + 'disk', 'preemptible', 'accelerators', and 'walltime' fields, if set, are parsed and broken out into properties. If unset, the relevant property will be unspecified, and will be pulled from the assigned Config object if queried (see @@ -545,7 +546,7 @@ def __deepcopy__(self, memo: Any) -> Requirer: @overload @staticmethod def _parseResource( - name: Literal["memory"] | Literal["disk"], + name: Literal["memory"] | Literal["disk"] | Literal["walltime"], value: ParseableIndivisibleResource, ) -> int: ... @@ -610,7 +611,7 @@ def _parseResource( # Anything can be None. return value - if name in ("memory", "disk", "cores"): + if name in ("memory", "disk", "cores", "walltime"): # These should be numbers that accept things like "5G". if isinstance(value, bytes): value = value.decode("utf-8") @@ -722,6 +723,15 @@ def memory(self) -> int: def memory(self, val: ParseableIndivisibleResource) -> None: self._requirementOverrides["memory"] = Requirer._parseResource("memory", val) + @property + def walltime(self) -> int: + """Get the maximum walltime in seconds allowed.""" + return cast(int, self._fetchRequirement("walltime")) + + @walltime.setter + def walltime(self, val: ParseableIndivisibleResource) -> None: + self._requirementOverrides["walltime"] = Requirer._parseResource("walltime", val) + @property def cores(self) -> int | float: """Get the number of CPU cores required.""" @@ -791,7 +801,11 @@ def requirements_string(self) -> str: for k in REQUIREMENT_NAMES: v: str | ParsedRequirement | None = self._fetchRequirement(k) if v is not None: - if isinstance(v, (int, float)) and v > 1000: + if k == "walltime": + if v == 0: + continue + v = seconds_to_dhms(cast(int, v)) + elif isinstance(v, (int, float)) and v > 1000: # Make large numbers readable v = bytes2human(v) parts.append(f"{k}: {v}") @@ -843,7 +857,7 @@ def __init__( :param requirements: Dict from string to number, string, or bool describing the resource requirements of the job. 'cores', 'memory', - 'disk', and 'preemptible' fields, if set, are parsed and broken out + 'disk', 'preemptible', and 'walltime' fields, if set, are parsed and broken out into properties. If unset, the relevant property will be unspecified, and will be pulled from the assigned Config object if queried (see :meth:`toil.job.Requirer.assignConfig`). @@ -1744,6 +1758,7 @@ def __init__( accelerators: ParseableAcceleratorRequirement | None = None, preemptible: ParseableFlag | None = None, preemptable: ParseableFlag | None = None, + walltime: ParseableIndivisibleResource | None = None, unitName: str | None = "", checkpoint: bool | None = False, displayName: str | None = "", @@ -1762,6 +1777,7 @@ def __init__( :param accelerators: the computational accelerators required by the job. If a string, can be a string of a number, or a string specifying a model, brand, or API (with optional colon-delimited count). :param preemptible: if the job can be run on a preemptible node. :param preemptable: legacy preemptible parameter, for backwards compatibility with workflows not using the preemptible keyword + :param walltime: the maximum walltime in seconds that the job is allowed to run. :param unitName: Human-readable name for this instance of the job. :param checkpoint: if any of this job's successor jobs completely fails, exhausting all their retries, remove any successor jobs and rerun this job to restart the @@ -1777,6 +1793,7 @@ def __init__( :type disk: int or string convertible by toil.lib.conversions.human2bytes to an int :type accelerators: int, string, dict, or list of those. Strings and dicts must be parseable by parse_accelerator. :type preemptible: bool, int in {0, 1}, or string in {'false', 'true'} in any case + :type walltime: int :type unitName: str :type checkpoint: bool :type displayName: str @@ -1799,6 +1816,7 @@ def __init__( "disk": disk, "accelerators": accelerators, "preemptible": preemptible, + "walltime": walltime, } if descriptionClass is None: if checkpoint: @@ -1941,7 +1959,16 @@ def preemptible(self) -> bool: @preemptible.setter def preemptible(self, val: bool) -> None: self.description.preemptible = val - + + @property + def walltime(self) -> int: + """The maximum walltime in seconds that the job is allowed to run.""" + return self.description.walltime + + @walltime.setter + def walltime(self, val: int) -> None: + self.description.walltime = val + # Note that unless the two halves of a property are *immediately* adjacent, # MyPy throws an error. So the old version has to come later. @deprecated(new_function_name="preemptible") @@ -2693,10 +2720,11 @@ def __init__( disk: ParseableIndivisibleResource | None = None, accelerators: ParseableAcceleratorRequirement | None = None, preemptible: ParseableFlag | None = None, + walltime: ParseableIndivisibleResource | None = None, unitName: str | None = "", ) -> None: """ - Memory, core and disk requirements are specified identically to as in \ + Memory, core, disk and walltime requirements are specified identically to as in \ :func:`toil.job.Job.__init__`. """ # Save the requirements in ourselves so they are visible on `self` to user code. @@ -2707,6 +2735,7 @@ def __init__( "disk": disk, "accelerators": accelerators, "preemptible": preemptible, + "walltime": walltime, } ) @@ -3487,7 +3516,7 @@ def __init__( ``**kwargs`` as arguments. The keywords ``memory``, ``cores``, ``disk``, ``accelerators`, - ``preemptible`` and ``checkpoint`` are reserved keyword arguments that + ``preemptible``, ``walltime``, and ``checkpoint`` are reserved keyword arguments that if specified will be used to determine the resources required for the job, as :func:`toil.job.Job.__init__`. If they are keyword arguments to the function they will be extracted from the function definition, but @@ -3526,6 +3555,7 @@ def resolve(key: str, default: Any | None = None, dehumanize: bool = False) -> A disk=resolve("disk", dehumanize=True), accelerators=resolve("accelerators"), preemptible=resolve("preemptible"), + walltime=resolve("walltime"), checkpoint=resolve("checkpoint", default=False), unitName=resolve("name", default=None), ) @@ -3587,10 +3617,11 @@ class JobFunctionWrappingJob(FunctionWrappingJob): - cores - accelerators - preemptible + - walltime For example to wrap a function into a job we would call:: - Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1) + Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1, walltime=0) """ @@ -3620,6 +3651,7 @@ def __init__(self, userFunction: Callable[..., Any], *args: Any, **kwargs: Any) disk="1M", memory="32M", cores=0.1, + walltime=0, accelerators=[], preemptible=True, preemptable=True, @@ -3721,6 +3753,7 @@ def __init__(self, job: Job | None, unitName: str | None = None) -> None: disk="100M", memory="512M", cores=0.1, + walltime=0, unitName=None if unitName is None else unitName + "-followOn", ) Job.addFollowOn(self, self.encapsulatedFollowOn) diff --git a/src/toil/lib/conversions.py b/src/toil/lib/conversions.py index 50c64e34d4..10ac133b33 100644 --- a/src/toil/lib/conversions.py +++ b/src/toil/lib/conversions.py @@ -179,6 +179,24 @@ def hms_duration_to_seconds(hms: str) -> float: return seconds +def seconds_to_dhms(seconds: int) -> str: + """ + Convert seconds to a days-hours:minutes:seconds string. + """ + if seconds < 0: + raise ValueError("Invalid Time, negative value") + + # A time interval in seconds can be parametrized as + # seconds = a * 60*60*24 + b * 60*60 + c * 60 + d, with + # a in days, b in hours, c in minutes, d in seconds. + a = seconds // (60*60*24) + b = (seconds % (60*60*24)) // (60*60) + c = (seconds % (60*60)) // 60 + d = seconds % 60 + + return f"{a:02}-{b:02}:{c:02}:{d:02}" + + def strtobool(val: str) -> bool: """ Make a human-readable string into a bool. diff --git a/src/toil/options/common.py b/src/toil/options/common.py index 04f0f089da..ebd2b42130 100644 --- a/src/toil/options/common.py +++ b/src/toil/options/common.py @@ -758,6 +758,7 @@ def __call__( ) cpu_note = "Fractions of a core (for example 0.1) are supported on some batch systems [mesos, single_machine]" disk_mem_note = "Standard suffixes like K, Ki, M, Mi, G or Gi are supported" + walltime_note = "Values are assumed to be in seconds. A value of 0 does not limit the walltime" accelerators_note = ( "Each accelerator specification can have a type (gpu [default], nvidia, amd, cuda, rocm, opencl, " "or a specific model like nvidia-tesla-k80), and a count [default: 1]. If both a type and a count " @@ -848,6 +849,16 @@ def __call__( "max", "disk", disk_mem_note, bytes2human(SYS_MAX_SIZE) ), ) + resource_options.add_argument( + "--defaultWalltime", + dest="defaultWalltime", + default="0", + type=int, + action=make_open_interval_action(0), + help=resource_help_msg.format( + "default", "walltime", walltime_note, str(0) + ), + ) # Retrying/rescuing jobs job_options = parser.add_argument_group( diff --git a/src/toil/test/batchSystems/batchSystemTest.py b/src/toil/test/batchSystems/batchSystemTest.py index f9d3a168bd..f7bef3d88e 100644 --- a/src/toil/test/batchSystems/batchSystemTest.py +++ b/src/toil/test/batchSystems/batchSystemTest.py @@ -86,7 +86,7 @@ # Since we aren't always attaching the config to the jobs for these tests, we # need to use fully specified requirements. defaultRequirements = dict( - memory=int(100e6), cores=1, disk=1000, preemptible=preemptible, accelerators=[] + memory=int(100e6), cores=1, disk=1000, preemptible=preemptible, accelerators=[], walltime=0 ) @@ -918,7 +918,6 @@ def testHidingProcessEscape(self) -> None: Test to make sure that child processes and their descendants go away when the Toil workflow stops, even if the job process stops and leaves children. """ - self.testProcessEscape(hide=True) @@ -1029,6 +1028,7 @@ def test(self) -> None: disk=1, accelerators=[], preemptible=preemptible, + walltime=0, ), jobName=str(i), unitName="", diff --git a/src/toil/test/batchSystems/test_slurm.py b/src/toil/test/batchSystems/test_slurm.py index 081233e4c9..8d6054d31d 100644 --- a/src/toil/test/batchSystems/test_slurm.py +++ b/src/toil/test/batchSystems/test_slurm.py @@ -626,14 +626,14 @@ def test_prepareSbatch_partition(self): # Without a partition override in the environment, we should get the # "short" partition for this job - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) assert "--partition=short" in command # With a partition override, we should not. But the override will be rewritten. self.worker.boss.config.slurm_args = ( "--something --partition foo --somethingElse" ) - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) assert "--partition=short" not in command assert "--partition=foo" in command @@ -641,27 +641,27 @@ def test_prepareSbatch_partition(self): self.worker.boss.config.slurm_args = ( "--something --partition=foo --somethingElse" ) - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) assert "--partition=short" not in command assert "--partition=foo" in command # And short options self.worker.boss.config.slurm_args = "--something -p foo --somethingElse" - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) assert "--partition=short" not in command assert "--partition=foo" in command # Partition settings from the config should override automatic selection self.worker.boss.config.slurm_partition = "foobar" self.worker.boss.config.slurm_args = "--something --somethingElse" - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) assert "--partition=foobar" in command # But they should be overridden by the argument overrides self.worker.boss.config.slurm_args = ( "--something --partition=baz --somethingElse" ) - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) assert "--partition=baz" in command def test_prepareSbatch_time(self): @@ -673,7 +673,7 @@ def test_prepareSbatch_time(self): # Without a time override in the environment, we should use the normal # time and the "short" partition - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) logger.debug("Command: %s", command) assert "--time=0:30" in command assert "--partition=short" in command @@ -683,7 +683,7 @@ def test_prepareSbatch_time(self): self.worker.boss.config.slurm_args = ( "--something --time 10:00:00 --somethingElse" ) - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) logger.debug("Command: %s", command) assert "--partition=medium" in command assert "--time=0:36000" in command @@ -692,14 +692,14 @@ def test_prepareSbatch_time(self): self.worker.boss.config.slurm_args = ( "--something --time=10:00:00 --somethingElse" ) - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) logger.debug("Command: %s", command) assert "--partition=medium" in command assert "--time=0:36000" in command # And short options self.worker.boss.config.slurm_args = "--something -t 10:00:00 --somethingElse" - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) logger.debug("Command: %s", command) assert "--partition=medium" in command assert "--time=0:36000" in command @@ -710,17 +710,17 @@ def test_prepareSbatch_export(self): self.worker.boss.partitions = ps # Without any overrides, we need --export=ALL - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) assert "--export=ALL" in command # With overrides, we don't get --export=ALL self.worker.boss.config.slurm_args = "--export=foo" - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) assert "--export=ALL" not in command # With --export-file, we don't get --export=ALL as documented. self.worker.boss.config.slurm_args = "--export-file=./thefile.txt" - command = self.worker.prepareSbatch(1, 100, 5, "job5", None, None) + command = self.worker.prepareSbatch(1, 100, 5, 0, "job5", None, None) assert "--export=ALL" not in command def test_option_detector(self): diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index f990e4ab3f..712d553eec 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -1920,6 +1920,22 @@ def test_workflow_echo_string_scatter_capture_stdout(tmp_path: Path) -> None: assert p.returncode == 0 +@needs_cwl +@pytest.mark.cwl +@pytest.mark.cwl_small +def test_timelimit_expression(tmp_path: Path) -> None: + with get_data("test/cwl/timelimit.cwl") as cwl_file: + cmd = [ + "toil-cwl-runner", + f"--jobStore=file:{tmp_path / 'jobStore'}", + str(cwl_file), + ] + p = subprocess.run(cmd, capture_output=True, text=True) + assert len(p.stdout) > 0 + assert "Finished toil run successfully" in p.stderr + assert p.returncode == 0 + + @needs_cwl @pytest.mark.cwl @pytest.mark.cwl_small diff --git a/src/toil/test/cwl/timelimit.cwl b/src/toil/test/cwl/timelimit.cwl new file mode 100644 index 0000000000..f73a047780 --- /dev/null +++ b/src/toil/test/cwl/timelimit.cwl @@ -0,0 +1,11 @@ +class: CommandLineTool +cwlVersion: v1.2 +inputs: [] +outputs: [] +requirements: + InlineJavascriptRequirement: {} + ToolTimeLimit: + timelimit: $(3*4) + WorkReuse: + enableReuse: false +baseCommand: [sleep, "3"]