Skip to content

Commit 359072a

Browse files
authored
Merge pull request #101 from Project-OMOTES/98-add-job-priority
add job priority
2 parents 963980b + a58454f commit 359072a

File tree

3 files changed

+11
-4
lines changed

3 files changed

+11
-4
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ classifiers = [
2525

2626
dependencies = [
2727
"aio-pika ~= 9.4, < 9.5",
28-
"omotes-sdk-protocol ~= 1.0",
28+
"omotes-sdk-protocol ~= 1.1",
2929
"pyesdl ~= 24.2",
3030
"pamqp ~= 3.3",
3131
"celery ~= 5.3",

src/omotes_sdk/internal/worker/worker.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,13 +316,16 @@ def start(self) -> None:
316316
)
317317

318318
# Config of celery app
319-
self.celery_app.conf.task_queues = (
320-
KombuQueue(WORKER_TASK_TYPE, routing_key=WORKER_TASK_TYPE),
321-
) # Tell the worker to listen to a specific queue for 1 workflow type.
319+
self.celery_app.conf.task_queues = [KombuQueue(
320+
WORKER_TASK_TYPE, routing_key=WORKER_TASK_TYPE, queue_arguments={"x-max-priority": 10}
321+
)] # Tell the worker to listen to a specific queue for 1 workflow type.
322322
self.celery_app.conf.task_acks_late = True
323323
self.celery_app.conf.task_reject_on_worker_lost = True
324324
self.celery_app.conf.task_acks_on_failure_or_timeout = False
325325
self.celery_app.conf.worker_prefetch_multiplier = 1
326+
self.celery_app.conf.broker_transport_options = {
327+
"priority_step": 1
328+
} # Prioritize higher numbers
326329
self.celery_app.conf.broker_connection_retry_on_startup = True
327330
# app.conf.worker_send_task_events = True # Tell the worker to send task events.
328331
self.celery_app.conf.worker_hijack_root_logger = False

src/omotes_sdk/omotes_interface.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from __future__ import annotations
12
import logging
23
import threading
34
import uuid
@@ -303,6 +304,7 @@ def submit_job(
303304
auto_disconnect_on_result: bool,
304305
job_reference: Optional[str] = None,
305306
auto_cleanup_after_ttl: Optional[timedelta] = JOB_QUEUES_TTL,
307+
job_priority: Union[JobSubmission.JobPriority, int] = JobSubmission.JobPriority.MEDIUM,
306308
) -> Job:
307309
"""Submit a new job and connect to progress and status updates and the job result.
308310
@@ -322,6 +324,7 @@ def submit_job(
322324
`callback_on_finished`.
323325
:param job_reference: An optional reference to the submitted job which is used in the
324326
name of the output ESDL as well as in internal logging of OMOTES.
327+
:param job_priority: An optional priority value for the job used in celery.
325328
:param auto_cleanup_after_ttl: When erroneous situations occur (e.g. client is offline),
326329
all queues pertaining to this job will be removed after the given TTL.
327330
Default to 48 hours if unset. Set to `None` to turn off auto clean up,
@@ -362,6 +365,7 @@ def submit_job(
362365
esdl=esdl,
363366
params_dict=convert_params_dict_to_struct(workflow_type, params_dict),
364367
job_reference=job_reference,
368+
job_priority=job_priority, # type: ignore [arg-type]
365369
)
366370
self.broker_if.send_message_to(
367371
exchange_name=OmotesQueueNames.omotes_exchange_name(),

0 commit comments

Comments
 (0)