-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmessage_xml.py
More file actions
executable file
·246 lines (222 loc) · 9.1 KB
/
message_xml.py
File metadata and controls
executable file
·246 lines (222 loc) · 9.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import time
import os
from lxml import etree
import logging
import requests
# 设置发送给微信服务器的头部
head = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/80.0.3987.163 Safari/537.36 "
}
class Message():
def __init__(self, userInfo):
"""
初始化Message
:param userInfo:传入用户信息,用于定向发送信息
"""
self.sendMessage = []
self.headArticle(userInfo)
# self.Articles = etree.Element("Articles")
# self.item = etree.SubElement(self.Articles, "item")
# self.append_Description("Title", userInfo.find("Content").text)
def append_Description(self, key: str, value):
"""
在xml的回复信息中追加语句
:param key: 回复信息类型
:param value: 值
:return: None
"""
# 在sendMessage中追加一个字典
# 使用dict
# 1。防止出现关键字重复被覆盖
# 2。dict的key,value对于createXML来说重用性更好
self.sendMessage.append({key:value})
def create_xml(self, xmlValue, headElement=None):
"""
创建xml树,传入的标签需要为dict格式,
可以用list来传入多个dict,可以相互
嵌套,使得它的格式支持更多。
:param xmlValue:
:param headElement:
:return:
"""
# 判断是否有头部element,没有则创建根头部
if headElement is None:
headElement = etree.Element("xml")
# 用来判断当前是否为主create_xml
flag = True
else:
flag = False
# 如果传入的为dict类型
if isinstance(xmlValue, dict):
# 提取dict的key与value,将其添加至element之中
for key, data in xmlValue.items():
# 创建一个关于key的element
element = etree.SubElement(headElement, str(key))
# 判断其中是否包含套娃dict或list
if isinstance(data, dict):
# data为dict,将element传入create_xml,此时key为根节点
self.create_xml(data, element)
elif isinstance(data, list):
# data为list,分别获取,将element传入create_xml,此时key为根节点
for value in data:
self.create_xml(value, element)
else:
if isinstance(data, int):
# 如果data为int,则不需要使用CDATA类型
element.text = str(data)
else:
# 使用CDATA,可以让文本兼容性更好
element.text = etree.CDATA(str(data))
# data为list,分别获取,将element传入create_xml,此时headElement为根节点
if isinstance(xmlValue, list):
for value in xmlValue:
self.create_xml(value, headElement)
# 判断这个create_xml是否为主函数,如果是,返回str,不是则直接结束
if flag:
# 将etree打印出来转化为str,使用pretty_print使输出格式可读性高
# 可以方便调试
xmlText = etree.tostring(headElement, encoding="utf-8", pretty_print=True).decode("utf-8")
return xmlText
def headArticle(self, userInfo):
"""
微信头部返回内容的建立
:param userInfo: 用户的信息
:return: None
"""
# 根据微信的需求,完成对应通用的首部格式的制作
headDic = {
"ToUserName": userInfo.find("FromUserName").text,
"FromUserName": userInfo.find("ToUserName").text,
"CreateTime": int(time.time())
}
# 将其添加至sendMessage
self.sendMessage.append(headDic)
def MsgType(self,type: str):
"""
制定发送的微信信息的格式
:param type: 需要发送给用户的内容格式
如:text为文本,image为图片
:return: None
"""
# 将其添加至sendMessage
self.sendMessage.append({"MsgType":type})
def generateMessage(self):
"""
将sendMessage转化为字符串,如果出错这返回空
:return: str
"""
# 尝试创建xml,并获得返回的str
try:
return self.create_xml(self.sendMessage)
except:
# 失败返回空字符串
return ""
def send_WXpic(self, pic_content):
"""
将图片picByte发送给微信服务器
:param pic_content:bstr类型,需要发送到微信服务器的图片
:return:微信的media_id
"""
# 设置微信发送的参数
param = {
"access_token": self.get_accessToken(),
"type": "image"
}
# 将图片临时存入缓存
# 本地测试可以修改为自己的目录
# "/tmp/WXtempIMG.jpg"为服务器所使用的临时目录
try:
mediatype = pic_content.url.split('.')[-1]
with open(f"/tmp/WXtempIMG.{mediatype}", "wb") as f:
f.write(pic_content.content)
logging.debug(f"upload pix to WX start:{time.time()}")
logging.info(f"upload pix mediatype:{mediatype}, content-size:{len(pic_content.content)}")
response = requests.post("http://file.api.weixin.qq.com/cgi-bin/media/upload",params=param,files={"picpath":open(f"/tmp/WXtempIMG.{mediatype}", "rb")}, verify=False, timeout=1.5)
logging.debug(f"upload pix to WX done:{time.time()}")
except Exception as e:
# 访问异常的错误编号和详细信息
logging.warning("POST to WXServer failed")
logging.info(f"POST to WXServer {e.args}")
logging.info(f"POST to WXServer {str(e)}")
logging.info(f"POST to WXServer {repr(e)}")
return ""
# 将获取到的response解析为json
resJson = response.json()
# 尝试获取其中的media_id
try:
return resJson["media_id"]
except:
# 失败尝试获取错误信息
try:
logging.debug(resJson["errmsg"])
return ""
except:
# 获取错误信息失败表示POST失败
logging.debug("POST getPicId Faild")
return ""
def get_accessToken(self):
"""
获取微信的accessToken
:return: access_token:str
"""
# 判断之前获得的微信accessToken是否存在
if os.path.isfile("WXaccessToken.txt"):
# 如果存在,判断他的创建时间
last_tokenTime = os.path.getmtime("WXaccessToken.txt")
# 现在时间
now_time = time.time()
# 通过比对,如果时间超过2h表示该迷失已经过期
# 需要重新获取
if now_time-last_tokenTime>7200:
# 重新获取
if not self.send_client_credential():
return ""
else:
# 如果没有之前的accessToken,则首先获得accessToken
if not self.send_client_credential():
return ""
# 打开文件获得accessToken
with open("WXaccessToken.txt","r") as f:
access_token = f.read()
#返回accessToken
return access_token
def send_client_credential(self):
"""
想微信服务器发送请求,获得access_token
:return: True/False
"""
# 尝试打开文件,确认accessSecret密匙存在
if os.path.isfile("WXaccessSecret.txt"):
with open("WXaccessSecret.txt", "r") as f:
secert = f.read()
# 发送微信服务器的头部
param = {
"grant_type": "client_credential",
"appid": "wxef446a2455cef80c",
"secret": secert
}
# 获取微信服务器返回的JSON
responseAPI = requests.get("https://api.weixin.qq.com/cgi-bin/token", params=param)
# 对json进行解析
tokenJson = responseAPI.json()
# 尝试获取json的数据
try:
# 获取access_token
access_token = tokenJson["access_token"]
# 将其写入"WXaccessToken.txt"
with open("WXaccessToken.txt", "w") as f:
f.write(access_token)
f.close()
return True
except:
# 如果失败
try:
# 尝试获取errmsg
errmsg = tokenJson["errmsg"]
logging.debug(errmsg)
return False
except:
# GET请求失败
logging.debug("GET access_token FROM WX faild")
return False