diff --git a/README.md b/README.md index 8393c93..8757277 100644 --- a/README.md +++ b/README.md @@ -39,14 +39,14 @@ broker = AioPikaBroker( ) ``` -After that you have to specify delay label. You can do it with `task` decorator, or by using kicker. +After that you have to specify x_delay label. You can do it with `task` decorator, or by using kicker. In this type of delay we are using additional queue with `expiration` parameter. After declared time message will be deleted from `delay` queue and sent to the main queue. For example: ```python broker = AioPikaBroker(...) -@broker.task(delay=3) +@broker.task(x_delay=3) async def delayed_task() -> int: return 1 @@ -57,11 +57,11 @@ async def main(): await delayed_task.kiq() # This message is going to be received after the delay in 4 seconds. - # Since we overridden the `delay` label using kicker. - await delayed_task.kicker().with_labels(delay=4).kiq() + # Since we overridden the `x_delay` label using kicker. + await delayed_task.kicker().with_labels(x_delay=4).kiq() # This message is going to be send immediately. Since we deleted the label. - await delayed_task.kicker().with_labels(delay=None).kiq() + await delayed_task.kicker().with_labels(x_delay=None).kiq() # Of course the delay is managed by rabbitmq, so you don't # have to wait delay period before message is going to be sent. @@ -80,7 +80,7 @@ broker = AioPikaBroker( delayed_message_exchange_plugin=True, ) -@broker.task(delay=3) +@broker.task(x_delay=3) async def delayed_task() -> int: return 1 @@ -91,8 +91,8 @@ async def main(): await delayed_task.kiq() # This message is going to be received after the delay in 4 seconds. - # Since we overridden the `delay` label using kicker. - await delayed_task.kicker().with_labels(delay=4).kiq() + # Since we overridden the `x_delay` label using kicker. + await delayed_task.kicker().with_labels(x_delay=4).kiq() ``` ## Priorities diff --git a/examples/delayed_task.py b/examples/delayed_task.py index c91f056..70111ed 100644 --- a/examples/delayed_task.py +++ b/examples/delayed_task.py @@ -25,7 +25,7 @@ async def add_one(value: int) -> int: async def main() -> None: await broker.startup() # Send the task to the broker. - task = await add_one.kicker().with_labels(delay=2).kiq(1) + task = await add_one.kicker().with_labels(x_delay=2).kiq(1) print("Task sent with 2 seconds delay.") # Wait for the result. result = await task.wait_result(timeout=3) diff --git a/taskiq_aio_pika/broker.py b/taskiq_aio_pika/broker.py index 0cc9601..70377a2 100644 --- a/taskiq_aio_pika/broker.py +++ b/taskiq_aio_pika/broker.py @@ -370,7 +370,7 @@ async def kick(self, message: BrokerMessage) -> None: delivery_mode=DeliveryMode.PERSISTENT, priority=priority, ) - delay = parse_val(float, message.labels.get("delay")) + x_delay = parse_val(float, message.labels.get("x_delay")) if len(self._task_queues) == 1: routing_key_name = ( @@ -392,20 +392,20 @@ async def kick(self, message: BrokerMessage) -> None: f"Check routing keys and queue names in broker queues.", ) - if delay is None: + if x_delay is None: exchange = await self.write_channel.get_exchange( self._exchange.name, ensure=False, ) await exchange.publish(rmq_message, routing_key=routing_key_name) elif self._delayed_message_exchange_plugin: - rmq_message.headers["x-delay"] = int(delay * 1000) + rmq_message.headers["x-delay"] = int(x_delay * 1000) exchange = await self.write_channel.get_exchange( self._delayed_message_exchange.name, ) await exchange.publish(rmq_message, routing_key=routing_key_name) elif self._delay_queue: - rmq_message.expiration = timedelta(seconds=delay) + rmq_message.expiration = timedelta(seconds=x_delay) await self.write_channel.default_exchange.publish( rmq_message, routing_key=self._delay_queue.routing_key or self._delay_queue.name, diff --git a/tests/test_delay.py b/tests/test_delay.py index 2c8b62f..c010fd0 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -20,7 +20,7 @@ async def test_when_delayed_message_queue_exists__then_send_with_delay_must_work task_id="1", task_name="name", message=b"message", - labels={"delay": "2"}, + labels={"x_delay": "2"}, ) await broker.kick(broker_msg) diff --git a/tests/test_delay_with_plugin.py b/tests/test_delay_with_plugin.py index 9206450..9611ea0 100644 --- a/tests/test_delay_with_plugin.py +++ b/tests/test_delay_with_plugin.py @@ -19,7 +19,7 @@ async def test_when_delayed_message_plugin_enabled__then_send_with_delay_must_wo task_id="1", task_name="name", message=b"message", - labels={"delay": "2"}, + labels={"x_delay": "2"}, ) # when & then