Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
.DEFAULT:
@echo "No such command (or you pass two or many targets to ). List of possible commands: make help"

.DEFAULT_GOAL := help

##@ Local development

.PHONY: help
help: ## Show this help
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m<target> <arg=value>\033[0m\n"} /^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m %s\033[0m\n\n", substr($$0, 5) } ' $(MAKEFILE_LIST)

.PHONY: clear_rabbit
clear_rabbit: ## Clear RabbitMQ data volume and restart container
@docker stop taskiq_aio_pika_rabbitmq && docker rm taskiq_aio_pika_rabbitmq && docker volume rm taskiq-aio-pika_rabbitmq_data && docker compose up -d
65 changes: 33 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ This library provides you with aio-pika broker for taskiq.
Features:
- Supports delayed messages using dead-letter queues or RabbitMQ delayed message exchange plugin.
- Supports message priorities.
- Supports multiple queues and custom routing.

Usage example:

```python
from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker()
broker = AioPikaBroker(...)

@broker.task
async def test() -> None:
Expand All @@ -32,7 +33,7 @@ To send delayed message, you have to specify delay label. You can do it with `ta
In this type of delay we are using additional queue with `expiration` parameter. After declared time message will be deleted from `delay` queue and sent to the main queue. For example:

```python
broker = AioPikaBroker()
broker = AioPikaBroker(...)

@broker.task(delay=3)
async def delayed_task() -> int:
Expand Down Expand Up @@ -86,13 +87,12 @@ async def main():
## Priorities

You can define priorities for messages using `priority` label. Messages with higher priorities are delivered faster.
But to use priorities you need to define `max_priority` of the main queue, by passing `max_priority` parameter in broker's init. This parameter sets maximum priority for the queue and declares it as the priority queue.

Before doing so please read the [documentation](https://www.rabbitmq.com/priority.html#behaviour) about what
downsides you get by using prioritized queues.

```python
broker = AioPikaBroker(max_priority=10)
broker = AioPikaBroker(...)

# We can define default priority for tasks.
@broker.task(priority=2)
Expand All @@ -111,42 +111,43 @@ async def main():
await prio_task.kicker().with_labels(priority=None).kiq()
```

## Configuration
## Custom Queue and Exchange arguments

AioPikaBroker parameters:

* `url` - url to rabbitmq. If None, "amqp://guest:guest@localhost:5672" is used.
* `result_backend` - custom result backend.
* `task_id_generator` - custom task_id genertaor.
* `exchange_name` - name of exchange that used to send messages.
* `exchange_type` - type of the exchange. Used only if `declare_exchange` is True.
* `queue_name` - queue that used to get incoming messages.
* `routing_key` - that used to bind that queue to the exchange.
* `declare_exchange` - whether you want to declare new exchange if it doesn't exist.
* `max_priority` - maximum priority for messages.
* `delay_queue_name` - custom delay queue name. This queue is used to deliver messages with delays.
* `dead_letter_queue_name` - custom dead letter queue name.
This queue is used to receive negatively acknowledged messages from the main queue.
* `qos` - number of messages that worker can prefetch.
* `declare_queues` - whether you want to declare queues even on client side. May be useful for message persistence.
* `declare_queues_kwargs` - see [Custom Queue Arguments](#custom-queue-arguments) for more details.

## Custom Queue Arguments

You can pass custom arguments to the underlying RabbitMQ queue declaration by using the `declare_queues_kwargs` parameter of `AioPikaBroker`. If you want to set specific queue arguments (such as RabbitMQ extensions or custom behaviors), provide them in the `arguments` dictionary inside `declare_queues_kwargs`.
You can pass custom arguments to the underlying RabbitMQ queues and exchange declaration by using the `Queue`/`Exchange` classes from `taskiq_aio_pika`. If you used `faststream` before you are probably familiar with this concept.

These arguments will be merged with the default arguments used by the broker
(such as dead-lettering and priority settings). If there are any conflicts, the values you provide will take precedence over the broker's defaults. Example:

```python
from taskiq_aio_pika import AioPikaBroker, Queue, QueueType, Exchange
from aio_pika.abc import ExchangeType

broker = AioPikaBroker(
declare_queues_kwargs={
"arguments": {
"x-message-ttl": 60000, # Set message TTL to 60 seconds
"x-queue-type": "quorum", # Use quorum queue type
}
}
exchange=Exchange(
name="custom_exchange",
type=ExchangeType.TOPIC,
declare=True,
durable=True,
auto_delete=False,
)
task_queues=[
Queue(
name="custom_queue",
type=QueueType.CLASSIC,
declare=True,
durable=True,
max_priority=10,
routing_key="custom_queue",
)
]
)
```

This will ensure that the queue is created with your custom arguments, in addition to the broker's defaults.


## Multiqueue support

You can define multiple queues for your tasks. Each queue can have its own routing key and other settings. And your workers can listen to multiple queues (or specific queue) as well.

You can check [multiqueue usage example](./examples/topic_with_two_queues.py) in examples folder for more details.
12 changes: 11 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ services:
rabbitmq:
container_name: taskiq_aio_pika_rabbitmq
image: heidiks/rabbitmq-delayed-message-exchange:latest
# image: rabbitmq:3.13.7-management # rabbit with management UI for debugging
environment:
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
Expand All @@ -14,4 +15,13 @@ services:
ports:
- "5672:5672"
- "15672:15672"
- "61613:61613"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
redis:
container_name: taskiq_aio_pika_redis
image: redis:latest
ports:
- "6379:6379"

volumes:
rabbitmq_data:
40 changes: 40 additions & 0 deletions examples/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Basic example of using Taskiq with AioPika broker.

How to run:
1. Run worker: taskiq worker examples.basic:broker -w 1
2. Run broker: uv run examples/basic.py
"""

import asyncio

from taskiq_redis import RedisAsyncResultBackend

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker(
"amqp://guest:guest@localhost:5672/",
).with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))


@broker.task
async def add_one(value: int) -> int:
return value + 1


async def main() -> None:
await broker.startup()
# Send the task to the broker.
task = await add_one.kiq(1)
# Wait for the result.
result = await task.wait_result(timeout=2)
print(f"Task execution took: {result.execution_time} seconds.")
if not result.is_err:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
41 changes: 41 additions & 0 deletions examples/delayed_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Example of delayed task execution using Taskiq with AioPika broker.

How to run:
1. Run worker: taskiq worker examples.delayed_task:broker -w 1
2. Run broker: uv run examples/delayed_task.py
"""

import asyncio

from taskiq_redis import RedisAsyncResultBackend

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker(
"amqp://guest:guest@localhost:5672/",
).with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))


@broker.task
async def add_one(value: int) -> int:
return value + 1


async def main() -> None:
await broker.startup()
# Send the task to the broker.
task = await add_one.kicker().with_labels(delay=2).kiq(1)
print("Task sent with 2 seconds delay.")
# Wait for the result.
result = await task.wait_result(timeout=3)
print(f"Task execution took: {result.execution_time} seconds.")
if not result.is_err:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
96 changes: 96 additions & 0 deletions examples/topic_with_two_queues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Example with two queues for different workers and one topic exchange.

It can be useful when you want to have two worker

How to run:
1. Run worker for queue_1: taskiq worker examples.topic_with_two_queues:get_broker_for_queue_1 -w 1
2. Run worker for queue_2: taskiq worker examples.topic_with_two_queues:get_broker_for_queue_2 -w 1
3. Run broker to send a task: uv run examples/topic_with_two_queues.py --queue 1
4. Optionally run broker to send a task to other queue: uv run examples/topic_with_two_queues.py --queue 2
"""

import argparse
import asyncio
import uuid

from aio_pika.abc import ExchangeType
from taskiq_redis import RedisAsyncResultBackend

from taskiq_aio_pika import AioPikaBroker, Exchange, Queue, QueueType

broker = AioPikaBroker(
"amqp://guest:guest@localhost:5672/",
exchange=Exchange(
name="topic_exchange",
type=ExchangeType.TOPIC,
),
delay_queue=Queue(
name="taskiq.delay",
routing_key="queue1",
), # send delayed messages to queue1
).with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))


@broker.task
async def add_one(value: int) -> int:
return value + 1


queue_1 = Queue(
name="queue1",
type=QueueType.CLASSIC,
durable=False,
)
queue_2 = Queue(
name="queue2",
type=QueueType.CLASSIC,
durable=False,
)


def get_broker_for_queue_1() -> AioPikaBroker:
print("This broker will listen to queue1")
return broker.with_queue(queue_1)


def get_broker_for_queue_2() -> AioPikaBroker:
print("This broker will listen to queue2")
return broker.with_queue(queue_2)


async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--queue",
choices=["1", "2"],
required=True,
help="Queue to send the task to.",
)
args = parser.parse_args()

queue_name = queue_1.name if args.queue == "1" else queue_2.name

broker.with_queues(
queue_1,
queue_2,
) # declare both queues to know about them during publishing
await broker.startup()

task = (
await add_one.kicker()
.with_labels(queue_name=queue_name) # or it can be routing_key from queue_1
.with_task_id(uuid.uuid4().hex)
.kiq(2)
)
result = await task.wait_result(timeout=2)
print(f"Task execution took: {result.execution_time} seconds.")
if not result.is_err:
print(f"Returned value: {result.return_value}")
else:
print("Error found while executing task.")
await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
13 changes: 12 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ repository = "https://github.com/taskiq-python/taskiq-aio-pika"
keywords = ["taskiq", "tasks", "distributed", "async", "aio-pika"]
requires-python = ">=3.10,<4"
dependencies = [
"taskiq>=0.11.20,<1",
"taskiq>=0.12.0,<1",
"aio-pika>=9.0.0",
"aiostream>=0.7.1",
]

[dependency-groups]
Expand All @@ -48,6 +49,10 @@ dev = [
"coverage>=7.11.3",
"pytest-xdist[psutil]>=3.8.0",
"anyio>=4.11.0",
{include-group = "examples"},
]
examples = [
"taskiq-redis>=1.1.2",
]

[tool.mypy]
Expand Down Expand Up @@ -130,6 +135,12 @@ line-length = 88
"SLF001", # Private member accessed
"S311", # Standard pseudo-random generators are not suitable for security/cryptographic purposes
"D101", # Missing docstring in public class
"D102", # Missing docstring in public method
"E501", # Line too long
]
"examples/*" = [
"D", # missing docstrings
"T201", # print found
]

[tool.ruff.lint.pydocstyle]
Expand Down
9 changes: 8 additions & 1 deletion taskiq_aio_pika/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
from importlib.metadata import version

from taskiq_aio_pika.broker import AioPikaBroker
from taskiq_aio_pika.exchange import Exchange
from taskiq_aio_pika.queue import Queue, QueueType

__version__ = version("taskiq-aio-pika")

__all__ = ["AioPikaBroker"]
__all__ = [
"AioPikaBroker",
"Exchange",
"Queue",
"QueueType",
]
Loading