Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ broker = AioPikaBroker(
)
```

After that you have to specify delay label. You can do it with `task` decorator, or by using kicker.
After that you have to specify x_delay label. You can do it with `task` decorator, or by using kicker.

In this type of delay we are using additional queue with `expiration` parameter. After declared time message will be deleted from `delay` queue and sent to the main queue. For example:

```python
broker = AioPikaBroker(...)

@broker.task(delay=3)
@broker.task(x_delay=3)
async def delayed_task() -> int:
return 1

Expand All @@ -57,11 +57,11 @@ async def main():
await delayed_task.kiq()

# This message is going to be received after the delay in 4 seconds.
# Since we overridden the `delay` label using kicker.
await delayed_task.kicker().with_labels(delay=4).kiq()
# Since we overridden the `x_delay` label using kicker.
await delayed_task.kicker().with_labels(x_delay=4).kiq()

# This message is going to be send immediately. Since we deleted the label.
await delayed_task.kicker().with_labels(delay=None).kiq()
await delayed_task.kicker().with_labels(x_delay=None).kiq()

# Of course the delay is managed by rabbitmq, so you don't
# have to wait delay period before message is going to be sent.
Expand All @@ -80,7 +80,7 @@ broker = AioPikaBroker(
delayed_message_exchange_plugin=True,
)

@broker.task(delay=3)
@broker.task(x_delay=3)
async def delayed_task() -> int:
return 1

Expand All @@ -91,8 +91,8 @@ async def main():
await delayed_task.kiq()

# This message is going to be received after the delay in 4 seconds.
# Since we overridden the `delay` label using kicker.
await delayed_task.kicker().with_labels(delay=4).kiq()
# Since we overridden the `x_delay` label using kicker.
await delayed_task.kicker().with_labels(x_delay=4).kiq()
```

## Priorities
Expand Down
2 changes: 1 addition & 1 deletion examples/delayed_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def add_one(value: int) -> int:
async def main() -> None:
await broker.startup()
# Send the task to the broker.
task = await add_one.kicker().with_labels(delay=2).kiq(1)
task = await add_one.kicker().with_labels(x_delay=2).kiq(1)
print("Task sent with 2 seconds delay.")
# Wait for the result.
result = await task.wait_result(timeout=3)
Expand Down
8 changes: 4 additions & 4 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ async def kick(self, message: BrokerMessage) -> None:
delivery_mode=DeliveryMode.PERSISTENT,
priority=priority,
)
delay = parse_val(float, message.labels.get("delay"))
x_delay = parse_val(float, message.labels.get("x_delay"))

if len(self._task_queues) == 1:
routing_key_name = (
Expand All @@ -392,20 +392,20 @@ async def kick(self, message: BrokerMessage) -> None:
f"Check routing keys and queue names in broker queues.",
)

if delay is None:
if x_delay is None:
exchange = await self.write_channel.get_exchange(
self._exchange.name,
ensure=False,
)
await exchange.publish(rmq_message, routing_key=routing_key_name)
elif self._delayed_message_exchange_plugin:
rmq_message.headers["x-delay"] = int(delay * 1000)
rmq_message.headers["x-delay"] = int(x_delay * 1000)
exchange = await self.write_channel.get_exchange(
self._delayed_message_exchange.name,
)
await exchange.publish(rmq_message, routing_key=routing_key_name)
elif self._delay_queue:
rmq_message.expiration = timedelta(seconds=delay)
rmq_message.expiration = timedelta(seconds=x_delay)
await self.write_channel.default_exchange.publish(
rmq_message,
routing_key=self._delay_queue.routing_key or self._delay_queue.name,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def test_when_delayed_message_queue_exists__then_send_with_delay_must_work
task_id="1",
task_name="name",
message=b"message",
labels={"delay": "2"},
labels={"x_delay": "2"},
)
await broker.kick(broker_msg)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_delay_with_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def test_when_delayed_message_plugin_enabled__then_send_with_delay_must_wo
task_id="1",
task_name="name",
message=b"message",
labels={"delay": "2"},
labels={"x_delay": "2"},
)

# when & then
Expand Down
Loading