forked from biemster/FindMy
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathrequest_reports.py
More file actions
executable file
·240 lines (204 loc) · 10.3 KB
/
request_reports.py
File metadata and controls
executable file
·240 lines (204 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python3
import argparse
import base64
import datetime
import glob
import hashlib
import json
import os
import sqlite3
import struct
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cores.pypush_gsa_icloud import icloud_login_mobileme, generate_anisette_headers
def sha256(data):
digest = hashlib.new("sha256")
digest.update(data)
return digest.digest()
def decrypt(enc_data, algorithm_dkey, mode):
decryptor = Cipher(algorithm_dkey, mode, default_backend()).decryptor()
return decryptor.update(enc_data) + decryptor.finalize()
def decode_tag(data):
latitude = struct.unpack(">i", data[0:4])[0] / 10000000.0
longitude = struct.unpack(">i", data[4:8])[0] / 10000000.0
confidence = int.from_bytes(data[8:9])
status = int.from_bytes(data[9:10])
return {'lat': latitude, 'lon': longitude, 'conf': confidence, 'status': status}
def getAuth(regenerate=False, second_factor='sms'):
CONFIG_PATH = os.path.dirname(os.path.realpath(__file__)) + "/keys/auth.json"
if os.path.exists(CONFIG_PATH) and not regenerate:
with open(CONFIG_PATH, "r") as f:
j = json.load(f)
else:
mobileme = icloud_login_mobileme(second_factor=second_factor)
j = {'dsid': mobileme['dsid'],
'searchPartyToken': mobileme['delegates']['com.apple.mobileme']['service-data']['tokens'][
'searchPartyToken']}
with open(CONFIG_PATH, "w") as f:
json.dump(j, f)
return (j['dsid'], j['searchPartyToken'])
if __name__ == "__main__":
try:
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--hours', help='only show reports not older than these hours', type=int, default=24)
parser.add_argument('-p', '--prefix', help='only use keyfiles starting with this prefix', default='')
parser.add_argument('-k', '--key', help='comma-separated base64 hashed advertisement keys to query', default='')
parser.add_argument('-r', '--regen', help='regenerate search-party-token', action='store_true')
parser.add_argument('-t', '--trusteddevice', help='use trusted device for 2FA instead of SMS',
action='store_true')
args = parser.parse_args()
sq3db = sqlite3.connect(os.path.dirname(os.path.realpath(__file__)) + '/keys/reports.db')
sq3 = sq3db.cursor()
privkeys = {}
names = {}
# If --key argument is provided, use those keys directly
if args.key:
for idx, hashed_key in enumerate(args.key.strip().split(','), 1):
hashed_key = hashed_key.strip()
if hashed_key:
# For direct key input, we don't have private keys, so we can't decrypt
# Just use the hashed key for querying
names[hashed_key] = f"key_{idx}"
print(f"Using provided key: {hashed_key} (will not be able to decrypt without private key)")
else:
# Read key files from disk
for keyfile in glob.glob(os.path.dirname(os.path.realpath(__file__)) + '/keys/' + args.prefix + '*.keys'):
# read key files generated with generate_keys.py
with open(keyfile) as f:
hashed_adv = priv = ''
name = os.path.basename(keyfile)[len(args.prefix):-5]
for line in f:
key = line.rstrip('\n').split(': ')
if key[0] == 'Private key':
priv = key[1]
elif key[0] == 'Hashed adv key':
hashed_adv = key[1]
if priv and hashed_adv:
privkeys[hashed_adv] = priv
names[hashed_adv] = name
else:
print(f"Couldn't find key pair in {keyfile}")
unixEpoch = int(datetime.datetime.now().timestamp())
startdate = unixEpoch - (60 * 60 * args.hours)
# New v2 API request format
data = {
"clientContext": {
"policy": "foregroundClient",
"clientBundleIdentifier": "com.apple.findmy"
},
"fetch": [{
"primaryIds": list(names.keys()),
"keyType": 1,
"endDate": unixEpoch * 1000,
"ownedDeviceIds": [],
"startDateSecondary": startdate * 1000,
"startDate": startdate * 1000
}]
}
r = requests.post("https://gateway.icloud.com/findmyservice/v2/fetch",
auth=getAuth(regenerate=args.regen,
second_factor='trusted_device' if args.trusteddevice else 'sms'),
headers=generate_anisette_headers(),
json=data)
# Handle response
raw_content = r.content.decode()
print(f"Response status: {r.status_code}, length: {len(raw_content)}")
if not raw_content or len(raw_content) == 0:
print(f"Empty response from Apple v2 API (status {r.status_code})")
response = {'acsnLocations': {'locationPayload': []}}
else:
try:
response = json.loads(raw_content)
except json.JSONDecodeError as e:
print(f"Failed to parse JSON response: {e}")
print(f"Raw content: {raw_content[:200]}...")
response = {'acsnLocations': {'locationPayload': []}}
# Parse new v2 response format
res = []
if 'acsnLocations' in response and 'locationPayload' in response['acsnLocations']:
for location_data in response['acsnLocations']['locationPayload']:
key_id = location_data['id']
for location_info in location_data.get('locationInfo', []):
# Convert new format to old format for compatibility
# location_info might be a dict with 'location' field, or just a string payload
if isinstance(location_info, dict):
payload = location_info.get('location', location_info)
else:
payload = location_info
res.append({
'id': key_id,
'payload': payload,
'statusCode': 200
})
print(f'{r.status_code}: {len(res)} reports received.')
ordered = []
found = set()
# SQL query to create a table named 'report' if it does not exist
create_table_query = '''CREATE TABLE IF NOT EXISTS reports (
id_short TEXT, timestamp INTEGER, payload TEXT,
id TEXT, statusCode INTEGER, lat TEXT, lon TEXT, conf INTEGER, PRIMARY KEY(id_short,timestamp));'''
# Execute the SQL query
sq3.execute(create_table_query)
for report in res:
# Check if we have a private key for this report
if report['id'] in privkeys:
# Decrypt the report
priv = int.from_bytes(base64.b64decode(privkeys[report['id']]), byteorder='big')
data = base64.b64decode(report['payload'])
# the following is all copied from https://github.com/hatomist/openhaystack-python, thanks @hatomist!
timestamp = int.from_bytes(data[0:4], 'big') + 978307200
if timestamp >= startdate:
eph_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP224R1(), data[5:62])
shared_key = ec.derive_private_key(priv, ec.SECP224R1(), default_backend()).exchange(ec.ECDH(), eph_key)
symmetric_key = sha256(shared_key + b'\x00\x00\x00\x01' + data[5:62])
decryption_key = symmetric_key[:16]
iv = symmetric_key[16:]
enc_data = data[62:72]
auth_tag = data[72:]
decrypted = decrypt(enc_data, algorithms.AES(decryption_key), modes.GCM(iv, auth_tag))
tag = decode_tag(decrypted)
tag['timestamp'] = timestamp
tag['isodatetime'] = datetime.datetime.fromtimestamp(timestamp).isoformat()
tag['key'] = names[report['id']]
tag['goog'] = 'https://maps.google.com/maps?q=' + str(tag['lat']) + ',' + str(tag['lon'])
found.add(tag['key'])
ordered.append(tag)
# SQL Injection Mitigation
query = "INSERT OR REPLACE INTO reports VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
parameters = (names[report['id']], timestamp, report['payload'], report['id'],
report['statusCode'], str(tag['lat']), str(tag['lon']), tag['conf'])
sq3.execute(query, parameters)
else:
# No private key - just show raw encrypted report
data = base64.b64decode(report['payload'])
timestamp = int.from_bytes(data[0:4], 'big') + 978307200
encrypted_report = {
'key': names.get(report['id'], report['id']),
'timestamp': timestamp,
'isodatetime': datetime.datetime.fromtimestamp(timestamp).isoformat(),
'payload': report['payload'],
'encrypted': True,
'statusCode': report['statusCode']
}
found.add(names[report['id']])
ordered.append(encrypted_report)
print(f"Encrypted report: {encrypted_report}")
print(f'{len(ordered)} reports used.')
ordered.sort(key=lambda item: item.get('timestamp'))
for rep in ordered:
if rep.get('encrypted'):
print(f"[ENCRYPTED] {rep}")
else:
print(rep)
print(f'found: {list(found)}')
print(f'missing: {[key for key in names.values() if key not in found]}')
sq3.close()
sq3db.commit()
sq3db.close()
except Exception as e:
if e == "AuthenticationError":
print("Authentication failed. Please check your Apple ID credentials.")
else:
print(e)