-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreceive_logs.py
More file actions
44 lines (31 loc) · 1.12 KB
/
receive_logs.py
File metadata and controls
44 lines (31 loc) · 1.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import sys
from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
topic_logs_exchange = await channel.declare_exchange(
"topic_logs",
ExchangeType.TOPIC,
)
queue = await channel.declare_queue(
"task_queue",
durable=True,
)
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
await queue.bind(topic_logs_exchange, routing_key=binding_key)
print(" [*] Waiting for messages. To exit press CTRL+C")
async with queue.iterator() as iterator:
message: AbstractIncomingMessage
async for message in iterator:
async with message.process():
print(f" [x] {message.routing_key!r}:{message.body!r}")
if __name__ == "__main__":
asyncio.run(main())