From 797d38d95dd9f39f256a334157b2c028a1c9ccc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Bartosi=C5=84ski?= Date: Fri, 9 Dec 2022 15:59:41 +0100 Subject: [PATCH] Reduce polling frequency on empty queues --- tasktiger/worker.py | 44 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 4897ecf3..df63d8db 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -212,10 +212,39 @@ def _poll_for_queues(self) -> None: This is only used when using polling to get queues with queued tasks. """ - if not self._did_work: - time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"]) + + self._wait_to_refresh_queue_set() self._refresh_queue_set() + def _wait_to_refresh_queue_set(self): + interval = self.config["POLL_TASK_QUEUES_INTERVAL"] + + if self._did_work: + self.log.info("Queue poll: No delay") + return + + def throttle_queue_poll(): + return self.connection.exists( + self._key("throttle_queue_poll", self.worker_group_name) + ) + + lock = self.connection.lock( + self._key("lockv2", "queue_poll", self.worker_group_name), + timeout=interval * 0.5, + ) + + while True: + if not throttle_queue_poll(): + self.log.info(f"Queue poll: Sleeping {interval}s") + time.sleep(interval) + return + + if lock.acquire(blocking=False): + self.log.info("Queue poll: Acquired lock") + return + + time.sleep(interval * 0.1) + def _pubsub_for_queues(self, timeout=0, batch_timeout=0) -> None: """ Check activity channel for new queues and wait as necessary. @@ -1155,6 +1184,17 @@ def _refresh_queue_set(self) -> None: self._filter_queues(self._retrieve_queues(self._key(QUEUED))) ) + self.log.info("Queue poll: Done") + + throttle_key = self._key("throttle_queue_poll", self.worker_group_name) + interval = self.config["POLL_TASK_QUEUES_INTERVAL"] + + if interval: + if self._queue_set: + self.connection.delete(throttle_key) + else: + self.connection.set(throttle_key, 1, px=int(interval * 1000)) + def _retrieve_queues(self, key) -> Set[str]: if len(self.only_queues) != 1: return self.connection.smembers(key)