-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy paththread_worker.py
More file actions
42 lines (32 loc) · 1.45 KB
/
thread_worker.py
File metadata and controls
42 lines (32 loc) · 1.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading, pika, logging, json
from back_processing import back_processing
class ThreadWorker(threading.Thread):
def __init__(self, amqp_url, queue, thread_num):
print(thread_num, "run")
super().__init__()
self.amqp_url = amqp_url
self.thread_num = thread_num
self.queue = queue
self.channel = None
def run(self):
connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
self.channel = connection.channel()
self.channel.queue_declare(queue=self.queue, durable=True)
self.channel.basic_consume(
self.processing_callback, queue=self.queue, no_ack=True)
logging.info('fontto-processing started!')
self.channel.start_consuming()
def processing_callback(self, ch, method, properties, body):
logging.info("%s" % self.thread_num)
logging.info("received %r" % body)
received_message = json.loads(body.decode('utf8').replace("'", '"'))
# you can use like this
# received_message['userId']
# received_message['count']
# received_message['unicodes']
# received_message['env']
back_processing(received_message['userId'], received_message['count'],
received_message['unicodes'], received_message['env'])
received_message_dumps = json.dumps(received_message, indent=4)
print(received_message_dumps)
# processing...........