-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbroker.py
More file actions
64 lines (54 loc) · 1.99 KB
/
broker.py
File metadata and controls
64 lines (54 loc) · 1.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import asyncio
from aio_pika.robust_connection import connect
from aio_pika.robust_connection import connect_robust
from core.settings import RabbitmqData
class Broker:
def __init__(
self,
ssl: bool = False,
host: str = RabbitmqData.async_tasks.host,
login: str = RabbitmqData.async_tasks.login,
password: str = RabbitmqData.async_tasks.password,
virtualhost: str = RabbitmqData.async_tasks.virtualhost,
port: int = 5672,
prefetch_count=None,
connection_type=connect,
):
self._ssl = ssl
self._host = host
self._login = login
self._channel = None
self._connection = None
self._password = password
self._virtualhost = virtualhost
self._port = port
self.prefetch_count = prefetch_count
self.connection_type = connection_type
async def connection(self):
async with asyncio.Lock():
if self._connection is None:
self._connection = await self.connection_type(
ssl=self._ssl,
host=self._host,
login=self._login,
password=self._password,
virtualhost=self._virtualhost,
port=self._port,
loop=asyncio.get_running_loop(),
)
return self._connection
async def get_channel(self):
channel = await (await self.connection()).channel()
if self.prefetch_count is not None:
await channel.set_qos(prefetch_count=self.prefetch_count)
return channel
async def channel(self):
async with asyncio.Lock():
if self._channel is None:
self._channel = await self.get_channel()
return self._channel
async def __aenter__(self):
return await self.channel()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._channel.close()
await self._connection.close()