-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsave.py
More file actions
132 lines (117 loc) · 5.96 KB
/
save.py
File metadata and controls
132 lines (117 loc) · 5.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import json
from json.decoder import JSONDecodeError
import os
import re
import requests
import datetime
import time
import traceback
import pytz
from typing import List
from passport import User
header = {
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36'
}
class HealthReport:
base_url = "https://healthreport.zju.edu.cn/ncov/wap/default"
index_url = base_url + "/index"
save_url = base_url + "/save"
def __init__(self, user):
self.user = user
def get_info(self) -> dict:
cookies = self.user.login()
res = self.user.session.get(self.index_url, cookies=cookies, headers=header)
res_text = res.text.replace('\n', ' ')
raw_new_info = re.findall(r'var\s+def\s+=\s+({.*});\s+var\s+vm', res_text)[0]
raw_new_info_ext_1 = re.findall(r'info:\s+\$\.extend\(\{(.+)\}\s*,\s*def', res_text)[0]
raw_new_info_ext_2 = re.findall(r'def\s*,\s*\{(.*)\}\)\s*,\s*oldInfo', res_text)[0]
raw_old_info = re.findall(r'oldInfo: (.*),.+tipMsg', res_text)[0]
try:
new_info = json.loads(raw_new_info)
except JSONDecodeError as e:
new_info = dict(filter(lambda x: len(x) == 2,
[tuple(i.replace(' ', '').replace('\'', '').split(':')) for i in
raw_new_info.split(',')]))
new_info['geo_api_info'] = "{\"type\":\"complete\",\"info\":\"SUCCESS\",\"status\":1,\"cEa\":\"jsonp_630595_\",\"position\":{\"Q\":30.263776,\"R\":120.122946,\"lng\":120.122946,\"lat\":30.263776},\"message\":\"Get ipLocation success.Get address success.\",\"location_type\":\"ip\",\"accuracy\":null,\"isConverted\":true,\"addressComponent\":{\"citycode\":\"0571\",\"adcode\":\"330110\",\"businessAreas\":[],\"neighborhoodType\":\"\",\"neighborhood\":\"\",\"building\":\"\",\"buildingType\":\"\",\"street\":\"浙大路\",\"streetNumber\":\"38号\",\"country\":\"中国\",\"province\":\"浙江省\",\"city\":\"杭州市\",\"district\":\"西湖区\",\"township\":\"灵隐街道\"},\"formattedAddress\":\"浙江省杭州市西湖区灵隐街道浙江大学玉泉校区\",\"roads\":[],\"crosses\":[],\"pois\":[]}"
new_info_ext_1 = dict(filter(lambda x: len(x) == 2,
[tuple(i.replace(' ', '').replace('\'', '').split(':')) for i in
raw_new_info_ext_1.split(',')]))
new_info_ext_2 = dict(filter(lambda x: len(x) == 2,
[tuple(i.replace(' ', '').replace('\'', '').split(':')) for i in
raw_new_info_ext_2.split(',')]))
new_info.update(new_info_ext_1)
new_info.update(new_info_ext_2)
old_info = json.loads(raw_old_info)
if len(old_info.keys()) == 0:
new_info['sfyxjzxgym'] = '1'
new_info['sfbyjzrq'] = '5'
new_info['jzxgymqk'] = '1'
new_info['sffrqjwdg'] = '0'
new_info['sfqtyyqjwdg'] = '0'
new_info['sfymqjczrj'] = '0'
for key, value in list(new_info.items()):
if isinstance(value, str) and re.match("\[.*\]", value):
new_info[key + '[]'] = "0"
del new_info[key]
return new_info
else:
old_info['created'] = new_info['created']
old_info['uid'] = new_info['uid']
old_info['id'] = new_info['id']
old_info['date'] = new_info['date']
for key, value in list(new_info.items()):
if isinstance(value, str) and re.match("\[.*\]", value):
old_info[key + '[]'] = "0"
if key in old_info:
del old_info[key]
return old_info
def save_info(self) -> str:
try:
info = self.get_info()
# print(info)
res = self.user.session.post(self.save_url, data=info, headers=header)
res_json = json.loads(res.text)
if res_json['e'] == 0:
return self.user.user_id + ' submitted successfully'
else:
return self.user.user_id + ' submit failed\nreason: ' + res_json['m']
except Exception as e:
return self.user.user_id + ' submit failed\nreason: running exception:' + e.__str__() + '\n' + traceback.format_exc()
def get_user_from_env() -> List[User]:
users_env: str = os.environ.get('USER_JSON', None)
if users_env:
user_json: list = json.loads(users_env)
return [User(u['name'], u['pass']) for u in user_json]
else:
return []
def push(content):
def sct(send_key, content_body):
data = {"title": datetime.datetime.now(pytz.timezone('Asia/Shanghai')).date().__str__() + "打卡结果",
"desp": content_body}
r = requests.post(f"https://sctapi.ftqq.com/{send_key}.send", data=data)
return r.text
def email(token, target, content_body):
data = {"token": token,
"title": datetime.datetime.now(pytz.timezone('Asia/Shanghai')).date().__str__() + "打卡结果",
"text": content_body,
"to": target}
r = requests.post("https://email.berfen.com/api.v2/", data=data)
return r.text
sct_send_key: str = os.environ.get('SCT_SEND_KEY', None)
if sct_send_key:
print(sct(sct_send_key, content))
print("已使用Server酱·Turbo版进行推送")
email_target: str = os.environ.get('EMAIL', None)
email_token: str = os.environ.get('EMAIL_TOKEN', None)
if email_target is not None and email_token is not None:
print(email(email_token, email_target, content))
print("已使用邮箱推送")
if __name__ == '__main__':
push_msg = ''
for user in get_user_from_env():
report = HealthReport(user)
result = report.save_info()
print(result)
push_msg = push_msg + result + '\n'
time.sleep(10)
push(push_msg)