-
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathurl.py
More file actions
366 lines (316 loc) · 12.6 KB
/
url.py
File metadata and controls
366 lines (316 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""Get for example HTML or JSON from an URL."""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026041403'
import base64
import json
import re
import ssl
import urllib
import urllib.parse
import urllib.request
from . import txt
def fetch(
url,
insecure=False,
no_proxy=False,
timeout=8,
header=None,
data=None,
encoding='urlencode',
digest_auth_user=None,
digest_auth_password=None,
extended=False,
to_text=True,
):
"""
Fetch any URL with optional POST, basic authentication, and SSL/TLS handling.
This function supports:
- GET and POST requests (using the `data` parameter).
- Basic authentication (using the `header` and `digest_auth_*` parameters).
- SSL/TLS certificate validation (with the `insecure` parameter to disable it).
- Handling of response headers (with `extended=True`).
Flowchart:
Start
|
|--> Digest Auth? --Yes--> Setup opener
| No
|
|--> Data? --Yes--> Encode+POST
| No --> GET
|
|--> Set Headers
|
|--> SSL Context Setup (insecure?)
|
|--> Proxy? --Yes--> Use ProxyHandler
| No --> DigestAuth? --Yes--> urlopen (timeout)
| No --> urlopen (context, timeout)
|
|--> Try to fetch
| |--> HTTPError? --> return False + msg
| |--> URLError? --> return False + msg
| |--> TypeError? --> return False + msg
| |--> Exception? --> return False + msg
|
|--> If Success:
| |--> Charset? (default UTF-8)
| |--> extended? --Yes--> build full dict
| No --> simple response
|
|--> Return True + result
End
### Parameters
- **url** (`str`):
The URL to fetch.
- **insecure** (`bool`, optional):
If True, disables SSL certificate validation. Defaults to False.
- **no_proxy** (`bool`, optional):
If True, disables the use of proxies. Defaults to False.
- **timeout** (`int`, optional):
Timeout in seconds for the request. Defaults to 8 seconds.
- **header** (`dict`, optional):
Headers to include in the request.
- **data** (`dict`, optional):
Data to send in the request body (used for POST requests).
- **encoding** (`str`, optional):
The encoding type for the request body. Defaults to `'urlencode'`.
- **digest_auth_user** (`str`, optional):
The username for HTTP Digest Authentication.
- **digest_auth_password** (`str`, optional):
The password for HTTP Digest Authentication.
- **extended** (`bool`, optional):
If True, includes the response header and status code in the result. Defaults to False.
- **to_text** (`bool`, optional):
If True, converts the response to text. Defaults to True.
### Returns
- **tuple**:
- **success** (`bool`): True if the request was successful, False otherwise.
- **result** (`dict` or `str`):
- If successful, the response body (as a string or raw data).
- If `extended=True`, the result includes the response, status code, and response headers.
- An error message string if the request failed.
### Example
>>> result = fetch(
... 'https://api.example.com',
... timeout=10,
... header={'Authorization': 'Bearer token'},
... )
>>> result = fetch('https://api.example.com', data={'key': 'value'}, extended=True)
"""
if header is None:
header = {}
if data is None:
data = {}
try:
if digest_auth_user and digest_auth_password:
passmgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
passmgr.add_password(None, url, digest_auth_user, digest_auth_password)
auth_handler = urllib.request.HTTPDigestAuthHandler(passmgr)
urllib.request.install_opener(urllib.request.build_opener(auth_handler))
if data:
if encoding == 'urlencode':
data = urllib.parse.urlencode(data)
elif encoding == 'serialized-json':
data = json.dumps(data)
data = txt.to_bytes(data)
request = urllib.request.Request(url, data=data)
else:
request = urllib.request.Request(url)
for key, value in header.items():
request.add_header(key, value)
request.add_header('Connection', 'close')
request.add_header('User-Agent', 'Linuxfabrik Monitoring Plugins')
ctx = ssl.create_default_context()
if insecure:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# URL comes from the admin-controlled Icinga check config, never from
# untrusted input; bandit B310 flags any urlopen on dynamic URLs
if no_proxy:
proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(
proxy_handler,
urllib.request.HTTPSHandler(context=ctx),
)
response = opener.open(request) # nosec B310
else:
# when digest auth is active the opener installed further up
# already carries its own context, so we must not pass one here
response = urllib.request.urlopen( # nosec B310
request,
timeout=timeout,
context=None if digest_auth_user else ctx,
)
except (
urllib.request.HTTPError,
urllib.request.URLError,
TypeError,
Exception,
) as e:
url_safe = re.sub(r'(token|password)=([^&]+)', r'\1=********', url)
if isinstance(e, urllib.request.HTTPError):
return False, f'HTTP error "{e.code} {e.reason}" while fetching {url_safe}'
if isinstance(e, urllib.request.URLError):
return False, f'URL error "{e.reason}" for {url_safe}'
if isinstance(e, TypeError):
return False, f'Type error "{e}", data="{data}"'
return False, f'{e} while fetching {url_safe}'
try:
charset = response.headers.get_content_charset() or 'UTF-8'
if not extended:
return True, txt.to_text(
response.read(), encoding=charset
) if to_text else response.read()
result = {
'response': txt.to_text(response.read(), encoding=charset)
if to_text
else response.read(),
'status_code': response.getcode(),
'response_header': response.info(),
}
return True, result
except Exception as e:
return False, f'{e} while fetching {url}'
def fetch_json(
url,
insecure=False,
no_proxy=False,
timeout=8,
header=None,
data=None,
encoding='urlencode',
digest_auth_user=None,
digest_auth_password=None,
extended=False,
):
"""
Fetch JSON from a URL with optional POST, authentication, and SSL/TLS handling.
This function uses the `fetch()` function to retrieve the content from the URL and then
attempts to parse the response as JSON.
### Parameters
- **url** (`str`): The URL to fetch the JSON from.
- **insecure** (`bool`, optional):
If True, disables SSL certificate validation. Defaults to False.
- **no_proxy** (`bool`, optional):
If True, disables the use of proxies. Defaults to False.
- **timeout** (`int`, optional):
Timeout in seconds for the request. Defaults to 8 seconds.
- **header** (`dict`, optional):
Headers to include in the request.
- **data** (`dict`, optional):
Data to send in the request body (used for POST requests).
- **encoding** (`str`, optional):
The encoding type for the request body. Defaults to `'urlencode'`.
- **digest_auth_user** (`str`, optional):
The username for HTTP Digest Authentication.
- **digest_auth_password** (`str`, optional):
The password for HTTP Digest Authentication.
- **extended** (`bool`, optional):
If True, includes the response header and status code in the result. Defaults to False.
### Returns
- **tuple**:
- **success** (`bool`): True if the JSON was successfully fetched and parsed, False otherwise.
- **result** (`dict` or `str`):
- The parsed JSON object if successful.
- An error message string if the request failed or JSON decoding failed.
### Example
>>> fetch_json('https://192.0.2.74/api/v2/?resource=cpu')
(True, {'cpu': {'usage': '45%', 'temperature': '50C'}})
"""
success, jsonst = fetch(
url,
data=data,
digest_auth_password=digest_auth_password,
digest_auth_user=digest_auth_user,
encoding=encoding,
extended=extended,
header=header,
insecure=insecure,
no_proxy=no_proxy,
timeout=timeout,
)
if not success:
return False, jsonst
try:
if extended:
jsonst['response_json'] = json.loads(jsonst['response'])
return True, jsonst
return True, json.loads(jsonst)
except Exception as e:
return False, f'{e}. No JSON object could be decoded.'
def get_latest_version_from_github(user, repo, key='tag_name'):
"""
Get the newest release tag from a GitHub repository.
This function fetches the latest release information from the GitHub API and retrieves the release tag.
### Parameters
- **user** (`str`): The GitHub username or organization name.
- **repo** (`str`): The GitHub repository name.
- **key** (`str`, optional): The key to retrieve from the JSON response (default is `'tag_name'`).
### Returns
- **tuple**:
- **success** (`bool`): True if the latest version was successfully fetched, False otherwise.
- **result** (`str` or `bool`):
- The value of the specified key (e.g., the latest release tag) if successful.
- `False` if no result was found or the GitHub API did not return any data.
### Example
>>> get_latest_version_from_github('Linuxfabrik', 'monitoring-plugins')
(True, 'v1.2.3')
"""
url = f'https://api.github.com/repos/{user}/{repo}/releases/latest'
success, result = fetch_json(url)
if not success:
return success, result
if not isinstance(result, dict) or not result:
return True, False
return True, result.get(key, False)
def split_basic_auth(url):
"""Extract userinfo from `url` and return a `(url, headers)` tuple.
The returned URL has any `user[:password]@` prefix stripped from
its netloc so the credentials never reach the request line or
any proxy log. If userinfo is present, `headers` contains the
matching `Authorization: Basic ...` entry; otherwise it is an
empty dict.
Pass the returned `url` and `headers` to `lib.url.fetch()` /
`lib.url.fetch_json()` so plugins can accept HTTP basic auth via
the URL (e.g. `https://user:secret@host/path`) instead of
exposing separate `--username` / `--password` arguments.
>>> split_basic_auth('https://example.com/path')
('https://example.com/path', {})
>>> u, h = split_basic_auth('https://alice:secret@example.com/path')
>>> u
'https://example.com/path'
>>> h
{'Authorization': 'Basic YWxpY2U6c2VjcmV0'}
"""
parsed = urllib.parse.urlparse(url)
if not parsed.username:
return url, {}
user = urllib.parse.unquote(parsed.username)
password = urllib.parse.unquote(parsed.password or '')
token = txt.to_text(base64.b64encode(txt.to_bytes(f'{user}:{password}')))
netloc = parsed.hostname or ''
if parsed.port is not None:
netloc = f'{netloc}:{parsed.port}'
stripped = urllib.parse.urlunparse(parsed._replace(netloc=netloc))
return stripped, {'Authorization': f'Basic {token}'}
def strip_tags(html):
"""
Strips all HTML tags from a given string.
This function removes any HTML tags from the input string, leaving only the raw text content.
### Parameters
- **html** (`str`): The string containing HTML tags to be stripped.
### Returns
- **str**: The input string with all HTML tags removed.
### Example
>>> strip_tags('<div>Hello, <b>world</b>!</div>')
'Hello, world!'
"""
return re.sub(r'<[^<]+?>', '', html or '')