Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ def createJobs(self, newJob: JobTuple) -> bool:
self.boss.config.max_jobs
):
activity = True
jobID, cpu, memory, command, jobName, environment, gpus = (
jobID, cpu, memory, walltime, command, jobName, environment, gpus = (
self.waitingJobs.pop(0)
)
if self.boss.config.memory_is_product and cpu > 1:
memory = memory // cpu
# prepare job submission command
subLine = self.prepareSubmission(
cpu, memory, jobID, command, jobName, environment, gpus
cpu, memory, walltime, jobID, command, jobName, environment, gpus
)
logger.debug("Running %r", subLine)
batchJobID = self.boss.with_retries(self.submitJob, subLine)
Expand Down Expand Up @@ -364,6 +364,7 @@ def prepareSubmission(
self,
cpu: int,
memory: int,
walltime: int,
jobID: int,
command: str,
jobName: str,
Expand Down Expand Up @@ -500,6 +501,7 @@ def issueBatchJob(
job_id,
job_desc.cores,
job_desc.memory,
job_desc.walltime,
command,
get_job_kind(job_desc.get_names()),
job_environment,
Expand Down
11 changes: 7 additions & 4 deletions src/toil/batchSystems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def prepareSubmission(
self,
cpu: int,
memory: int,
walltime: int,
Comment thread
mr-c marked this conversation as resolved.
jobID: int,
command: str,
jobName: str,
Expand All @@ -361,7 +362,7 @@ def prepareSubmission(
# Make sure to use exec so we can get Slurm's signals in the Toil
# worker instead of having an intervening Bash
return self.prepareSbatch(
cpu, memory, jobID, jobName, job_environment, gpus
cpu, memory, walltime, jobID, jobName, job_environment, gpus
) + [f"--wrap=exec {command}"]

def submitJob(self, subLine: list[str]) -> int:
Expand Down Expand Up @@ -856,6 +857,7 @@ def prepareSbatch(
self,
cpu: int,
mem: int,
walltime: int,
jobID: int,
jobName: str,
job_environment: dict[str, str] | None,
Expand Down Expand Up @@ -907,8 +909,8 @@ def prepareSbatch(

# --export=[ALL,]<environment_toil_variables>
export_all = True
export_list = [] # Some items here may be multiple comma-separated values
time_limit: int | None = self.boss.config.slurm_time # type: ignore[attr-defined]
export_list = [] # Some items here may be multiple comma-separated values
time_limit: int | None = self.boss.config.slurm_time or walltime # type: ignore[attr-defined]
partition: str | None = None
qos: str | None = None

Expand Down Expand Up @@ -1051,7 +1053,7 @@ def prepareSbatch(
sbatch_line.append(f"--mem={math.ceil(mem / 2 ** 20)}")
if cpu is not None:
sbatch_line.append(f"--cpus-per-task={math.ceil(cpu)}")
if time_limit is not None:
if time_limit and time_limit > 0:
# Put all the seconds in the seconds slot
sbatch_line.append(f"--time=0:{time_limit}")

Expand Down Expand Up @@ -1101,6 +1103,7 @@ def issueBatchJob(
job_id,
job_desc.cores,
memory,
job_desc.walltime,
command,
get_job_kind(job_desc.get_names()),
job_environment,
Expand Down
2 changes: 2 additions & 0 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class Config:
defaultCores: float | int
defaultDisk: int
defaultPreemptible: bool
defaultWalltime: int
# TODO: These names are generated programmatically in
# Requirer._fetchRequirement so we can't use snake_case until we fix
# that (and add compatibility getters/setters?)
Expand Down Expand Up @@ -405,6 +406,7 @@ def set_option(option_name: str, old_names: list[str] | None = None) -> None:
set_option("maxMemory")
set_option("maxDisk")
set_option("defaultPreemptible")
set_option("defaultWalltime")

# Retrying/rescuing jobs
set_option("retryCount")
Expand Down
14 changes: 14 additions & 0 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2204,6 +2204,7 @@ def __init__(
disk: int | str | None = "1MiB",
accelerators: list[AcceleratorRequirement] | None = None,
preemptible: bool | None = None,
walltime: int | None = 0,
tool_id: str | None = None,
parent_name: str | None = None,
subjob_name: str | None = None,
Expand Down Expand Up @@ -2250,6 +2251,7 @@ def __init__(
disk=disk,
accelerators=accelerators,
preemptible=preemptible,
walltime=walltime,
unitName=self.task_path,
displayName=display_name,
local=local,
Expand Down Expand Up @@ -2582,6 +2584,17 @@ def __init__(
# Note: if the job is using the toil default memory, it won't be increased
memory = max(memory, min_ram)

# Check if the tool has set a time limit. If yes, use it. Otherwise,
# use a None requirement to use the Toil default.
tool_max_walltime = tool.get_requirement("ToolTimeLimit")[0] or {}
if (
"timelimit" in tool_max_walltime
and (limit_val := tool_max_walltime["timelimit"]) is not None
):
walltime = cast(int, self.builder.do_eval(limit_val))
else:
walltime = None

accelerators: list[AcceleratorRequirement] | None = None
if req.get("cudaDeviceCount", 0) > 0:
# There's a CUDARequirement, which cwltool processed for us
Expand Down Expand Up @@ -2650,6 +2663,7 @@ def __init__(
disk=int(total_disk),
accelerators=accelerators,
preemptible=preemptible,
walltime=walltime,
tool_id=self.cwltool.tool["id"],
parent_name=parent_name,
local=isinstance(tool, cwltool.command_line_tool.ExpressionTool),
Expand Down
57 changes: 45 additions & 12 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from toil.deferred import DeferredFunction
from toil.fileStores import FileID
from toil.lib.compatibility import deprecated
from toil.lib.conversions import bytes2human, human2bytes
from toil.lib.conversions import bytes2human, human2bytes, seconds_to_dhms
from toil.lib.exceptions import UnimplementedURLException
from toil.lib.expando import Expando
from toil.lib.resources import ResourceMonitor
Expand Down Expand Up @@ -418,10 +418,11 @@ class RequirementsDict(TypedDict):
disk: NotRequired[int]
accelerators: NotRequired[list[AcceleratorRequirement]]
preemptible: NotRequired[bool]
walltime: NotRequired[int]


# These must be all the key names in RequirementsDict
REQUIREMENT_NAMES = ["disk", "memory", "cores", "accelerators", "preemptible"]
REQUIREMENT_NAMES = ["disk", "memory", "cores", "accelerators", "preemptible", "walltime"]

# This is the supertype of all value types in RequirementsDict
ParsedRequirement = Union[int, float, bool, list[AcceleratorRequirement]]
Expand Down Expand Up @@ -454,7 +455,7 @@ class Requirer:
"""
Base class implementing the storage and presentation of requirements.

Has cores, memory, disk, and preemptability as properties.
Has cores, memory, disk, preemptability, and walltime as properties.
"""

_requirementOverrides: RequirementsDict
Expand All @@ -465,7 +466,7 @@ def __init__(self, requirements: Mapping[str, ParseableRequirement | None]) -> N

:param dict requirements: Dict from string to value
describing a set of resource requirments. 'cores', 'memory',
'disk', 'preemptible', and 'accelerators' fields, if set, are
'disk', 'preemptible', 'accelerators', and 'walltime' fields, if set, are
parsed and broken out into properties. If unset, the relevant
property will be unspecified, and will be pulled from the assigned
Config object if queried (see
Expand Down Expand Up @@ -545,7 +546,7 @@ def __deepcopy__(self, memo: Any) -> Requirer:
@overload
@staticmethod
def _parseResource(
name: Literal["memory"] | Literal["disk"],
name: Literal["memory"] | Literal["disk"] | Literal["walltime"],
value: ParseableIndivisibleResource,
) -> int: ...

Expand Down Expand Up @@ -610,7 +611,7 @@ def _parseResource(
# Anything can be None.
return value

if name in ("memory", "disk", "cores"):
if name in ("memory", "disk", "cores", "walltime"):
# These should be numbers that accept things like "5G".
if isinstance(value, bytes):
value = value.decode("utf-8")
Expand Down Expand Up @@ -722,6 +723,15 @@ def memory(self) -> int:
def memory(self, val: ParseableIndivisibleResource) -> None:
self._requirementOverrides["memory"] = Requirer._parseResource("memory", val)

@property
def walltime(self) -> int:
"""Get the maximum walltime in seconds allowed."""
return cast(int, self._fetchRequirement("walltime"))

@walltime.setter
def walltime(self, val: ParseableIndivisibleResource) -> None:
self._requirementOverrides["walltime"] = Requirer._parseResource("walltime", val)

@property
def cores(self) -> int | float:
"""Get the number of CPU cores required."""
Expand Down Expand Up @@ -791,7 +801,11 @@ def requirements_string(self) -> str:
for k in REQUIREMENT_NAMES:
v: str | ParsedRequirement | None = self._fetchRequirement(k)
if v is not None:
if isinstance(v, (int, float)) and v > 1000:
if k == "walltime":
if v == 0:
continue
v = seconds_to_dhms(cast(int, v))
elif isinstance(v, (int, float)) and v > 1000:
# Make large numbers readable
v = bytes2human(v)
parts.append(f"{k}: {v}")
Expand Down Expand Up @@ -843,7 +857,7 @@ def __init__(

:param requirements: Dict from string to number, string, or bool
describing the resource requirements of the job. 'cores', 'memory',
'disk', and 'preemptible' fields, if set, are parsed and broken out
'disk', 'preemptible', and 'walltime' fields, if set, are parsed and broken out
into properties. If unset, the relevant property will be
unspecified, and will be pulled from the assigned Config object if
queried (see :meth:`toil.job.Requirer.assignConfig`).
Expand Down Expand Up @@ -1744,6 +1758,7 @@ def __init__(
accelerators: ParseableAcceleratorRequirement | None = None,
preemptible: ParseableFlag | None = None,
preemptable: ParseableFlag | None = None,
walltime: ParseableIndivisibleResource | None = None,
unitName: str | None = "",
checkpoint: bool | None = False,
displayName: str | None = "",
Expand All @@ -1762,6 +1777,7 @@ def __init__(
:param accelerators: the computational accelerators required by the job. If a string, can be a string of a number, or a string specifying a model, brand, or API (with optional colon-delimited count).
:param preemptible: if the job can be run on a preemptible node.
:param preemptable: legacy preemptible parameter, for backwards compatibility with workflows not using the preemptible keyword
:param walltime: the maximum walltime in seconds that the job is allowed to run.
:param unitName: Human-readable name for this instance of the job.
:param checkpoint: if any of this job's successor jobs completely fails,
exhausting all their retries, remove any successor jobs and rerun this job to restart the
Expand All @@ -1777,6 +1793,7 @@ def __init__(
:type disk: int or string convertible by toil.lib.conversions.human2bytes to an int
:type accelerators: int, string, dict, or list of those. Strings and dicts must be parseable by parse_accelerator.
:type preemptible: bool, int in {0, 1}, or string in {'false', 'true'} in any case
:type walltime: int
:type unitName: str
:type checkpoint: bool
:type displayName: str
Expand All @@ -1799,6 +1816,7 @@ def __init__(
"disk": disk,
"accelerators": accelerators,
"preemptible": preemptible,
"walltime": walltime,
}
if descriptionClass is None:
if checkpoint:
Expand Down Expand Up @@ -1941,7 +1959,16 @@ def preemptible(self) -> bool:
@preemptible.setter
def preemptible(self, val: bool) -> None:
self.description.preemptible = val


@property
def walltime(self) -> int:
"""The maximum walltime in seconds that the job is allowed to run."""
return self.description.walltime

@walltime.setter
def walltime(self, val: int) -> None:
self.description.walltime = val

# Note that unless the two halves of a property are *immediately* adjacent,
# MyPy throws an error. So the old version has to come later.
@deprecated(new_function_name="preemptible")
Expand Down Expand Up @@ -2693,10 +2720,11 @@ def __init__(
disk: ParseableIndivisibleResource | None = None,
accelerators: ParseableAcceleratorRequirement | None = None,
preemptible: ParseableFlag | None = None,
walltime: ParseableIndivisibleResource | None = None,
unitName: str | None = "",
) -> None:
"""
Memory, core and disk requirements are specified identically to as in \
Memory, core, disk and walltime requirements are specified identically to as in \
:func:`toil.job.Job.__init__`.
"""
# Save the requirements in ourselves so they are visible on `self` to user code.
Expand All @@ -2707,6 +2735,7 @@ def __init__(
"disk": disk,
"accelerators": accelerators,
"preemptible": preemptible,
"walltime": walltime,
}
)

Expand Down Expand Up @@ -3487,7 +3516,7 @@ def __init__(
``**kwargs`` as arguments.

The keywords ``memory``, ``cores``, ``disk``, ``accelerators`,
``preemptible`` and ``checkpoint`` are reserved keyword arguments that
``preemptible``, ``walltime``, and ``checkpoint`` are reserved keyword arguments that
if specified will be used to determine the resources required for the
job, as :func:`toil.job.Job.__init__`. If they are keyword arguments to
the function they will be extracted from the function definition, but
Expand Down Expand Up @@ -3526,6 +3555,7 @@ def resolve(key: str, default: Any | None = None, dehumanize: bool = False) -> A
disk=resolve("disk", dehumanize=True),
accelerators=resolve("accelerators"),
preemptible=resolve("preemptible"),
walltime=resolve("walltime"),
checkpoint=resolve("checkpoint", default=False),
unitName=resolve("name", default=None),
)
Expand Down Expand Up @@ -3587,10 +3617,11 @@ class JobFunctionWrappingJob(FunctionWrappingJob):
- cores
- accelerators
- preemptible
- walltime

For example to wrap a function into a job we would call::

Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1)
Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1, walltime=0)

"""

Expand Down Expand Up @@ -3620,6 +3651,7 @@ def __init__(self, userFunction: Callable[..., Any], *args: Any, **kwargs: Any)
disk="1M",
memory="32M",
cores=0.1,
walltime=0,
accelerators=[],
preemptible=True,
preemptable=True,
Expand Down Expand Up @@ -3721,6 +3753,7 @@ def __init__(self, job: Job | None, unitName: str | None = None) -> None:
disk="100M",
memory="512M",
cores=0.1,
walltime=0,
unitName=None if unitName is None else unitName + "-followOn",
)
Job.addFollowOn(self, self.encapsulatedFollowOn)
Expand Down
18 changes: 18 additions & 0 deletions src/toil/lib/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,24 @@ def hms_duration_to_seconds(hms: str) -> float:
return seconds


def seconds_to_dhms(seconds: int) -> str:
"""
Convert seconds to a days-hours:minutes:seconds string.
"""
if seconds < 0:
raise ValueError("Invalid Time, negative value")

# A time interval in seconds can be parametrized as
# seconds = a * 60*60*24 + b * 60*60 + c * 60 + d, with
# a in days, b in hours, c in minutes, d in seconds.
a = seconds // (60*60*24)
b = (seconds % (60*60*24)) // (60*60)
c = (seconds % (60*60)) // 60
d = seconds % 60

return f"{a:02}-{b:02}:{c:02}:{d:02}"


def strtobool(val: str) -> bool:
"""
Make a human-readable string into a bool.
Expand Down
Loading