-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathMalwrAPI.py
More file actions
269 lines (217 loc) · 8.95 KB
/
MalwrAPI.py
File metadata and controls
269 lines (217 loc) · 8.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
This is the (unofficial) Python API for malwr.com Website.
Using this code, you can retrieve recent analyses, domains, tags but also latest comments.
You can also submit files
"""
import hashlib
import re
import requests
import os
from bs4 import BeautifulSoup
class MalwrAPI(object):
"""
MalwrAPI Main Handler
"""
session = None
logged = False
verbose = False
url = "https://malwr.com"
headers = {
'User-Agent': "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:41.0) " +
"Gecko/20100101 Firefox/41.0"
}
def __init__(self, verbose=False, username=None, password=None, apikey=None):
self.verbose = verbose
self.session = requests.session()
self.username = username
self.password = password
self.apikey = apikey
def login(self):
"""Login on malwr.com website"""
if self.username and self.password:
soup = self.request_to_soup(self.url + '/account/login')
csrf_input = soup.find(attrs=dict(name='csrfmiddlewaretoken'))
csrf_token = csrf_input['value']
payload = {
'csrfmiddlewaretoken': csrf_token,
'username': u'{0}'.format(self.username),
'password': u'{0}'.format(self.password)
}
login_request = self.session.post(self.url + "/account/login/",
data=payload, headers=self.headers)
if login_request.status_code == 200:
self.logged = True
return True
else:
self.logged = False
return False
def request_to_soup(self, url=None):
"""Request url and return the Beautifoul Soup object of html returned"""
if not url:
url = self.url
req = self.session.get(url, headers=self.headers)
soup = BeautifulSoup(req.content, "html.parser")
return soup
def display_message(self, s):
"""Display the message"""
if self.verbose:
print('[verbose] %s' % s)
def get_latest_comments(self):
"""Request the last comments on malwr.com"""
res = []
soup = self.request_to_soup()
comments = soup.findAll('div', {'class': 'span6'})[3]
for comment in comments.findAll('tr'):
infos = comment.findAll('td')
infos_to_add = {
'comment': infos[0].string,
'comment_url': infos[1].find('a')['href']
}
res.append(infos_to_add)
return res
def get_recent_domains(self):
"""Get recent domains on index page
Returns a list of objects with keys domain_name and url_analysis"""
res = []
soup = self.request_to_soup()
domains = soup.findAll('div', {'class': 'span6'})[1]
for domain in domains.findAll('tr'):
infos = domain.findAll('td')
infos_to_add = {
'domain_name': infos[0].find('span').string,
'url_analysis': infos[1].find('a')['href']
}
res.append(infos_to_add)
return res
def get_public_tags(self):
"""Get public tags on index page
Return a tag list"""
res = []
soup = self.request_to_soup()
tags = soup.findAll('div', {'class': 'span6'})[2]
for tag in tags.findAll('a', {'class': 'tag-label'}):
res.append(tag.string)
return res
def get_recent_analyses(self):
res = []
soup = self.request_to_soup()
submissions = soup.findAll('div', {'class': 'span6'})[0]
for submission in submissions.findAll('tr'):
infos = submission.findAll('td')
infos_to_add = {
'submission_time': infos[0].string,
'hash': infos[1].find('a').string,
'submission_url': infos[1].find('a')['href']
}
res.append(infos_to_add)
return res
def submit_folder(self, path, analyze=True, share=True, private=True):
filelist = [f for f in os.listdir(path)]
res = []
for item in filelist:
res.append(self.submit_sample(path + item, analyze, share, private))
return res
def submit_sample(self, filepath, analyze=True, share=True, private=True):
if self.logged is False:
self.login()
s = self.session
req = s.get(self.url + '/submission/', headers=self.headers)
soup = BeautifulSoup(req.content, "html.parser")
# TODO: math_captcha_question might be unused. Remove.
# math_captcha_question = soup.find('input', {'name': 'math_captcha_question'})['value']
pattern = '(\d [-+*] \d) ='
data = {
'math_captcha_field': eval(re.findall(pattern, req.content)[0]),
'math_captcha_question': soup.find('input', {'name': 'math_captcha_question'})['value'],
'csrfmiddlewaretoken': soup.find('input', {'name': 'csrfmiddlewaretoken'})['value'],
'share': 'on' if share else 'off', # share by default
'analyze': 'on' if analyze else 'off', # analyze by default
'private': 'on' if private else 'off' # private by default
}
req = s.post(self.url + '/submission/', data=data, headers=self.headers,
files={'sample': open(filepath, 'rb')})
# TODO: soup might be unused. Remove.
# soup = BeautifulSoup(req.content, "html.parser")
# regex to check if the file was already submitted before
pattern = '(\/analysis\/[a-zA-Z0-9]{12,}\/)'
submission_links = re.findall(pattern, req.content)
res = {
'md5': hashlib.md5(open(filepath, 'rb').read()).hexdigest(),
'file': filepath
}
if len(submission_links) > 0:
self.display_message('File %s was already submitted, taking last analysis' % filepath)
res['analysis_link'] = submission_links[0]
else:
pattern = '(\/submission\/status\/[a-zA-Z0-9]{12,}\/)'
submission_status = re.findall(pattern, req.content)
if len(submission_status) > 0:
res['analysis_link'] = submission_status[0]
elif 'file like this waiting for processing, submission aborted.' in req.content:
self.display_message('File already submitted, check on the site')
return None
else:
self.display_message('Error with the file %s' % filepath)
return None
return res
def search(self, search_word):
# Do nothing if not logged in
if not self.logged:
res = self.login()
if res is False:
return False
search_url = self.url + '/analysis/search/'
c = self.request_to_soup(search_url)
csrf_input = c.find(attrs=dict(name='csrfmiddlewaretoken'))
csrf_token = csrf_input['value']
payload = {
'csrfmiddlewaretoken': csrf_token,
'search': u'{}'.format(search_word)
}
sc = self.session.post(search_url, data=payload, headers=self.headers)
ssc = BeautifulSoup(sc.content, "html.parser")
res = []
error = ssc.findAll('div', {'class': 'alert-error'})
if len(error) > 0:
self.display_message('Invalid search term')
return []
submissions = ssc.findAll('div', {'class': 'box-content'})[0]
sub = submissions.findAll('tbody')[0]
for submission in sub.findAll('tr'):
infos = submission.findAll('td')
infos_to_add = {
'submission_time': infos[0].string,
'hash': infos[1].find('a').string,
'submission_url': infos[1].find('a')['href'],
'file_name': infos[2].string
}
res.append(infos_to_add)
return res
def getReport(self, search_url):
# Do nothing if not logged in
if not self.logged:
res = self.login()
if res is False:
return False
search_url = self.url + search_url
c = self.request_to_soup(search_url)
csrf_input = c.find(attrs=dict(name='csrfmiddlewaretoken'))
csrf_token = csrf_input['value']
payload = {
'csrfmiddlewaretoken': csrf_token,
}
sc = self.session.post(search_url, data=payload, headers=self.headers)
ssc = BeautifulSoup(sc.content, "html.parser")
output = {"IP": [], "Domain": []}
domains = ssc.find(id="domains").find_all("td")
# Will go domain, IP, domain, IP
for i in range(len(domains)):
if i%2 == 0:
# Domain
output["Domain"].append(domains[i].text)
else:
# IP
output["IP"].append(domains[i].text)
ips = ssc.find(id="hosts").find_all("td")
output["IP"] += [x.text for x in ips]
return output