-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmanage.py
More file actions
104 lines (69 loc) · 2.76 KB
/
manage.py
File metadata and controls
104 lines (69 loc) · 2.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
import signal
import sys
from time import sleep
import click
from commons.asyncio import run_in_loop
from commons.redis import get_redis
from worker.consumers import RandomSleepConsumer
from worker.processors import QueueProcessor
from worker.queues import InMemoryQueue, RedisQueue
from worker.serializers import CreateOrderMessageSerializer
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger(__name__)
@click.group()
def cli() -> None:
pass
def setup_signal_handlers(processor: QueueProcessor):
def handle_signal(signum, frame):
logger.warning('killing processor signum=%s frame=%s', signum, frame)
processor.kill()
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGQUIT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
async def consume_memory_tasks(tasks_number: int, concurrency: int) -> None:
queue = InMemoryQueue()
serializer = CreateOrderMessageSerializer()
consumer = RandomSleepConsumer()
processor = QueueProcessor(queue, serializer, consumer, concurrency=concurrency)
setup_signal_handlers(processor)
for i in range(tasks_number):
queue.enqueue(f'{{"order_number": "{i}"}}')
await processor.process()
async def consume_redis_tasks(concurrency: int) -> None:
redis = await get_redis()
queue = RedisQueue(redis, key='orders')
serializer = CreateOrderMessageSerializer()
consumer = RandomSleepConsumer()
processor = QueueProcessor(queue, serializer, consumer, concurrency=concurrency)
setup_signal_handlers(processor)
await processor.process()
redis.close()
await redis.wait_closed()
async def push_redis_tasks(tasks_number: int) -> None:
redis = await get_redis()
queue = RedisQueue(redis, key='orders')
for i in range(tasks_number):
await queue.enqueue(f'{{"order_number": "{i}"}}')
redis.close()
await redis.wait_closed()
@cli.command()
@click.option('--concurrency', default=500, help='limit of concurrent in-memory tasks')
@click.option('--tasks', default=1000, help='number of tasks to consume')
def memory_worker(concurrency: int, tasks: int) -> None:
run_in_loop(consume_memory_tasks(tasks, concurrency))
@cli.command()
@click.option('--number', default=1000, help='number of tasks to consume')
def produce_redis_tasks(number: int) -> None:
run_in_loop(push_redis_tasks(number))
@cli.command()
@click.option('--concurrency', default=50, help='limit of concurrent in-memory tasks')
def redis_worker(concurrency: int) -> None:
run_in_loop(consume_redis_tasks(concurrency))
@cli.command()
def keep_sleeping() -> None:
while True:
sleep(10)
commands = click.CommandCollection(sources=[cli])
if __name__ == '__main__':
commands()