-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
132 lines (117 loc) · 4.06 KB
/
worker.py
File metadata and controls
132 lines (117 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# coding=utf8
################################################################################
#
# Copyright (c) 2016 Baidu.com, Inc. All Rights Reserved
#
################################################################################
"""
Message Worker For QA Platform.
Authors: maxun(maxun@baidu.com)
Date: 2016/12/07
"""
from common import jsonutil
from common import message
from multiprocessing import Pool
from common import configutil
import logging
from config import logconf
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
CONF = configutil.get_items_in_section('common.conf', 'WORKER')
# POOLSIZE = int(CONF['poolsize'])
# pool = Pool(POOLSIZE)
logger = logging.getLogger('appLogger')
def call_func_in_progress(command_message_json):
"""
此函数子进程中调用。
通过反射调用指定模块.类.方法或模块.函数
(取决于command_message_json是否含有class_name)
:param command_message_json:命令消息json串
:return:
"""
logger.debug(command_message_json)
print command_message_json
if command_message_json:
command_message = jsonutil.json2object(command_message_json)#反序列化
module_name = command_message.module_name
if command_message.class_name:
class_name = command_message.class_name
else:
class_name = ''
func_name = command_message.func_name
parameters = command_message.parameters
module = __import__(module_name, fromlist=True)
if class_name:
instance = getattr(module, class_name)()
instance_method = getattr(instance, func_name)
instance_method(parameters)
else:
func = getattr(module, func_name)
print func
func(parameters)
def is_multiprocessing():
"""
根据poolsize判断是否启用多进程模式
:return: 是否多进程
"""
if not CONF['poolsize']:
return False
if not CONF['poolsize'].isdigit():
return False
if not int(CONF['poolsize']) > 0:
return False
return True
def call_func(command_message_json):
"""
使用进程池启动要执行的函数
:param command_message_json: 命令消息json串
:return:
"""
logger.debug(command_message_json)
if is_multiprocessing():
pool.apply_async(call_func_in_progress, (command_message_json,))
else:
call_func_in_progress(command_message_json)
def callback(ch, method, properties, body):
"""
消息回调
:param ch:
:param method:
:param properties:
:param body:
:return:
"""
try:
logger.debug(" [x] Received %r" % body)
# time.sleep(body.count(b'.'))
call_func(body)
logger.debug(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.exception('Handle message: [%s] failed! Exception is: [%s]' % (body, e.message))
def receive_message():
"""
接收指定队列消息(队列名及routing_key均为conf中的type)
:return:
"""
if not message.MessageQueue.channel:
try:
message.MessageQueue.init()
except Exception as e:
raise RuntimeError('Init mq connection failed!')
# message.MessageQueue.init()
message.MessageQueue.channel.exchange_declare(exchange=CONF['exchange'], type='direct')
message.MessageQueue.channel.queue_declare(queue=CONF['type'], durable=True)
message.MessageQueue.channel.queue_bind(exchange=CONF['exchange'],
queue=CONF['type'], routing_key=CONF['type'])
# print(' [*] Waiting for messages. To exit press CTRL+C')
message.MessageQueue.channel.basic_qos(prefetch_count=1)
message.MessageQueue.channel.basic_consume(callback, queue=CONF['type'])
message.MessageQueue.channel.start_consuming()
if __name__ == '__main__':
if is_multiprocessing():
POOLSIZE = int(CONF['poolsize'])
pool = Pool(POOLSIZE)
receive_message()
# call_func({'parameters':1,'module_name':'test','class_name':'','func_name':''})