-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsplunk_rest.py
More file actions
85 lines (65 loc) · 2.24 KB
/
splunk_rest.py
File metadata and controls
85 lines (65 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import requests, json, logging
from xml.dom import minidom
from requests.auth import HTTPBasicAuth
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def auth_search():
base_url = 'https://localhost:8089'
username = 'admin'
password = 'changeme'
login_url = f'{base_url}/servicesNS/-/-/auth/login'
rest_url = f'{base_url}/services/search/jobs/export'
search_data = {'search': 'search index=_internal earliest=-1h@h |stats count by source',
'output_mode': 'csv'}
# login to get a session key
try:
r = requests.get(login_url,
data={'username':username,'password':password}, verify=False)
except InsecureRequestWarning:
pass
except e:
print(exception)
# print('r = {}'.format(r.text))
session_key = minidom.parseString(r.text).getElementsByTagName('sessionKey')[0].firstChild.nodeValue
# print('session key = {}'.format(session_key))
header = { 'Authorization': 'Splunk {}'.format(session_key)}
# print('header == {}'.format(header))
# post the search data with session key as the Authroization header
try:
r = requests.post(rest_url,
data=search_data,
headers = header,
verify = False)
except e:
print(exception)
print('Result: \n{}'.format(r.text))
def search():
base_url = 'https://localhost:8089'
# construct basic auth info
username = 'admin'
password = 'changeme'
auth_data=(username, password)
rest_url = f'{base_url}/services/search/jobs/export'
# construct search payload
search_base = 'search '
spl = 'index=_internal earliest=-1h@h |stats count by source'
mode = 'csv'
search_data = {'search': f'{search_base}{spl}',
'output_mode': mode}
print(search_data)
# post_data = {**auth_data, **search_data}
# print('post_data == {}'.format(post_data))
# try to search in one shot
try:
r = requests.post(rest_url, auth=auth_data,
data=search_data,
verify=False)
except InsecureRequestWarning:
pass
except e:
print(e)
print('Result: \n{}'.format(r.text))
def main():
search()
if __name__ == "__main__":
main()