-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathwatcher.py
More file actions
executable file
·143 lines (115 loc) · 5.16 KB
/
watcher.py
File metadata and controls
executable file
·143 lines (115 loc) · 5.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/python3
import argparse
import hashlib
import json
import os
import sys
import tempfile
import difflib
import requests
from lxml import html
from jsonpath_ng import parse as parse_jsonpath
from adapters import SendAdapterFactory
from model import WatchResult
def get_xml_nodes(exp, page, ignore):
""" Returns lxml nodes corresponding to the XPath expression """
tree = html.fromstring(page)
for i in ignore:
for j in tree.xpath(i):
j.drop_tree()
return tree.xpath(exp)
def filter_document(nodes) -> str:
""" Returns the text content of the specified nodes """
text = ""
for element in nodes:
text = text + element.text_content()
return text
def get_json_nodes(exp, content):
""" Returns text data corresponding to the JSONPath expression """
try:
parsed_json = json.loads(content)
jsonpath_expression = parse_jsonpath(exp)
return jsonpath_expression.find(parsed_json)
except json.JSONDecodeError as e:
print(f'Warning: Invalid JSON content: {e}')
return []
except Exception as e:
print(f"Error processing JSONPath: {e}")
return []
def filter_json(nodes) -> str:
""" Returns the text content of the specified nodes """
return ''.join([str(match.value) for match in nodes])
def get_tmp_file(url: str) -> str:
tmp_dir = tempfile.gettempdir()
m = hashlib.md5()
m.update(url.encode('utf-8'))
return os.path.join(tmp_dir, f'{m.hexdigest()[:6]}_cache.txt')
def diff_chars(a: str, b: str) -> int:
d = difflib.unified_diff(a, b)
return sum([i >= 2 and len(l) > 0 and l[0] in ['+', '-'] for i, l in enumerate(d)])
def main(args, remaining_args):
tmp_location = get_tmp_file(args.url)
doc1, doc2 = '', ''
try:
adapter = SendAdapterFactory.get(args.adapter, remaining_args)
except AttributeError:
sys.exit(1)
if args.json and args.xpath and args.xpath != '//body':
print('Error: --json and --xpath are mutually exclusive')
sys.exit(1)
if args.jsonpath and args.xpath and args.xpath != '//body':
print('Error: --jsonpath and --xpath are mutually exclusive')
sys.exit(1)
args.json = args.json or args.jsonpath
# Read length of old web page version
try:
with open(tmp_location, 'r', encoding='utf8', newline='') as f:
cached_content = f.read()
if args.json:
doc1 = filter_json(get_json_nodes(args.jsonpath, cached_content))
else:
doc1 = filter_document(get_xml_nodes(args.xpath, cached_content, args.ignore))
except:
pass
if args.user_agent.lower() == 'firefox':
args.user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0' # Firefox 84 on Windows 10
# Read length of current web page version
# 301 and 302 redirections are resolved automatically
r = requests.get(args.url, headers = { 'user-agent': args.user_agent })
if 200 <= r.status_code <= 299 :
if args.json:
doc2 = filter_json(get_json_nodes(args.jsonpath, r.text))
else:
doc2 = filter_document(get_xml_nodes(args.xpath, r.text, args.ignore))
else:
print('Could not fetch %s.' % args.url)
# Write new version to file
try:
with open(tmp_location, 'w', encoding='utf-8', newline='') as f:
f.write(r.text)
except Exception as e:
print('Could not open file %s: %s' % (tmp_location, e))
diff = diff_chars(doc1, doc2)
if diff > args.tolerance:
ok = adapter.send(WatchResult(args.url, diff))
if not ok:
sys.exit(1)
if __name__ == '__main__':
if len(sys.argv) >= 3 and sys.argv[1] == 'help':
adapter_class = SendAdapterFactory.get_class(sys.argv[2])
if adapter_class is None:
sys.exit(1)
else:
adapter_class.adapter.get_parser().print_help()
sys.exit(0)
parser = argparse.ArgumentParser(prog='Website Watcher')
parser.add_argument('-u', '--url', required=True, type=str, help='URL to watch.')
parser.add_argument('-t', '--tolerance', default=0, type=int, help='Number of characters which have to differ between cached- and new content to trigger a notification.')
parser.add_argument('-x', '--xpath', default='//body', type=str, help="XPath expression designating the elements to watch.")
parser.add_argument('--jsonpath', type=str, help='JSONPath expression to watch (e.g., "$.data.items[*]"). Mutually exclusive with --xpath.')
parser.add_argument('-i', '--ignore', default='', type=str, nargs='+', help="One or multiple XPath expressions designating the elements to ignore.")
parser.add_argument('-ua', '--user-agent', default='muety/website-watcher', type=str, help='User agent header to include in requests (available shortcuts: "firefox").')
parser.add_argument('--adapter', default='email', type=str, help='Send method to use. See "adapters" for all available.')
parser.add_argument('--json', action='store_true', help='Treat endpoint as JSON (mutually exclusive with --xpath).')
args, remaining_args = parser.parse_known_args()
main(*parser.parse_known_args())