-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
441 lines (370 loc) · 17 KB
/
main.py
File metadata and controls
441 lines (370 loc) · 17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
import json
import logging
import signal
import sys
import time
from cryptography.hazmat.primitives.asymmetric import ec
from aliro.authentication_policy import AuthenticationPolicy
from aliro.certificate import Profile0000Certificate
from aliro.flow import AliroFlow
from aliro.protocol import ProtocolError, read_aliro
from repository import Repository
from util.afclf import AnnotationFrameContactlessFrontend, ISODEPTag, RemoteTarget, activate
from util.ecp import ECP
from util.general import hex_or_base64_to_bytes
from util.iso7816 import ISO7816Tag
# By default, this file is located in the same folder as the project
CONFIGURATION_FILE_PATH = "configuration.json"
DEFAULT_STEP_UP_DATA_ELEMENT = "matter1"
def load_configuration(path=CONFIGURATION_FILE_PATH) -> dict:
with open(path) as f:
return json.load(f)
def resolve_reader_certificate(
reader_certificate,
reader_private_key: bytes | None,
) -> tuple[bytes | None, bytes | None]:
if reader_certificate in (None, False):
return None, None
if reader_certificate is True:
if reader_private_key in (None, b""):
raise ValueError("aliro.reader_certificate=true requires aliro.reader_private_key")
issuer_private = ec.derive_private_key(int.from_bytes(reader_private_key, "big"), ec.SECP256R1())
intermediate_private = ec.generate_private_key(ec.SECP256R1())
cert = Profile0000Certificate.generate(
issuer_private_key=issuer_private,
subject_public_key=intermediate_private.public_key(),
).to_bytes()
intermediate_private_bytes = intermediate_private.private_numbers().private_value.to_bytes(32, "big")
logging.info(f"Generated intermediate reader private key bytes: {intermediate_private_bytes.hex()}")
logging.info(f"Generated reader_certificate bytes: {cert.hex()}")
logging.info(
f"Generated reader_certificate from reader_private_key on startup ({len(cert)} bytes); "
"replacing active reader_private_key with generated intermediate key",
)
return cert, intermediate_private_bytes
if isinstance(reader_certificate, str):
try:
cert = hex_or_base64_to_bytes(reader_certificate)
except ValueError as exc:
raise ValueError("aliro.reader_certificate must be hex or base64 when provided as string") from exc
profile = Profile0000Certificate.from_bytes(cert)
if reader_private_key not in (None, b""):
reader_public = ec.derive_private_key(
int.from_bytes(reader_private_key, "big"),
ec.SECP256R1(),
).public_key()
if profile.subject_public_key.public_numbers() != reader_public.public_numbers():
raise ValueError("Configured reader_certificate subject key does not match reader_private_key")
logging.info(f"Loaded reader_certificate from configuration ({len(cert)} bytes)")
return cert, None
raise ValueError("aliro.reader_certificate must be true, false/null, hex string, or base64 string")
def resolve_auth0_command_vendor_extension(value) -> bytes | None:
if value is None:
return None
if isinstance(value, str):
try:
resolved = hex_or_base64_to_bytes(value)
except ValueError as exc:
raise ValueError(
"aliro.auth0_command_vendor_extension must be hex or base64 when provided as string"
) from exc
logging.info(f"Loaded auth0_command_vendor_extension from configuration ({len(resolved)} bytes)")
return resolved
raise ValueError("aliro.auth0_command_vendor_extension must be null, hex string, or base64 string")
def _parse_dotted_protocol_version(value: str, value_path: str) -> bytes:
version_parts = value.split(".")
if len(version_parts) != 2:
raise ValueError(f"{value_path} dotted version must have format '<major>.<minor>' (for example 1.0)")
major_text, minor_text = version_parts
if not major_text.isdigit() or not minor_text.isdigit():
raise ValueError(f"{value_path} dotted version must contain decimal numbers only")
major = int(major_text)
minor = int(minor_text)
if not (0 <= major <= 0xFF and 0 <= minor <= 0xFF):
raise ValueError(f"{value_path} dotted version octets must be between 0 and 255")
return bytes([major, minor])
def _parse_hex_protocol_version(value: str, value_path: str) -> bytes:
hex_candidate = value[2:] if value.lower().startswith("0x") else value
hex_candidate = "".join(hex_candidate.split())
try:
version = bytes.fromhex(hex_candidate)
except ValueError as exc:
raise ValueError(f"{value_path} must be a version code like '1.0', '0100', or '0x0100'") from exc
if len(version) != 2:
raise ValueError(f"{value_path} must resolve to exactly 2 bytes, got {len(version)}")
return version
def _parse_protocol_version_code(value, value_path: str) -> bytes:
if isinstance(value, int):
if value < 0 or value > 0xFFFF:
raise ValueError(f"{value_path} must be between 0 and 65535 when provided as integer")
return value.to_bytes(2, "big")
if not isinstance(value, str):
raise ValueError(f"{value_path} must be a string version code or integer")
normalized = value.strip()
if normalized == "":
raise ValueError(f"{value_path} must not be empty")
if "." in normalized:
return _parse_dotted_protocol_version(normalized, value_path)
return _parse_hex_protocol_version(normalized, value_path)
def resolve_preferred_protocol_versions(value) -> list[bytes]:
if value is None:
logging.info("Aliro protocol version preference is empty (default selection behavior)")
return []
if value == "default":
logging.info("Aliro protocol version preference set to 'default' (default selection behavior)")
return []
raw_values = value if isinstance(value, list) else [value]
preferred_versions = []
for index, raw_value in enumerate(raw_values):
parsed = _parse_protocol_version_code(raw_value, f"aliro.version[{index}]")
if parsed not in preferred_versions:
preferred_versions.append(parsed)
if preferred_versions:
logging.info(f"Configured Aliro protocol version priority: {[version.hex() for version in preferred_versions]}")
else:
logging.info("Configured Aliro protocol version priority is empty (default selection behavior)")
return preferred_versions
def _normalize_step_up_data_element_identifiers(value, value_path: str) -> list[str]:
if value is None:
scopes = [DEFAULT_STEP_UP_DATA_ELEMENT]
elif isinstance(value, str):
scopes = [value]
elif isinstance(value, list):
scopes = value
else:
raise ValueError(f"{value_path} must be a string, array of strings, or null")
normalized_scopes = []
for index, raw_scope in enumerate(scopes):
if not isinstance(raw_scope, str):
raise ValueError(f"{value_path}[{index}] must be a string")
scope = raw_scope.strip()
if scope == "":
raise ValueError(f"{value_path}[{index}] must not be empty")
if len(scope) > 128:
raise ValueError(f"{value_path}[{index}] exceeds 128 characters")
if scope not in normalized_scopes:
normalized_scopes.append(scope)
if not normalized_scopes:
raise ValueError(f"{value_path} must not be empty")
return normalized_scopes
def resolve_step_up_scopes(value) -> dict[str, bool]:
if isinstance(value, dict):
normalized_scopes: dict[str, bool] = {}
for raw_scope, raw_keep_marker in value.items():
scope = _normalize_step_up_data_element_identifiers([raw_scope], "aliro.step_up_scopes keys")[0]
if isinstance(raw_keep_marker, bool):
keep = raw_keep_marker
elif isinstance(raw_keep_marker, str):
marker = raw_keep_marker.strip().lower().replace("-", "").replace("_", "")
if marker in ("keep", "true", "yes", "1"):
keep = True
elif marker in ("nokeep", "false", "no", "0"):
keep = False
else:
raise ValueError(f"aliro.step_up_scopes[{scope}] must be boolean or marker string keep/nokeep")
else:
raise ValueError(f"aliro.step_up_scopes[{scope}] must be boolean or marker string keep/nokeep")
normalized_scopes[scope] = keep
if not normalized_scopes:
raise ValueError("aliro.step_up_scopes must not be empty")
logging.info(f"Configured Step-up scope request map: {normalized_scopes}")
return normalized_scopes
scopes = _normalize_step_up_data_element_identifiers(value, "aliro.step_up_scopes")
scope_map = dict.fromkeys(scopes, True)
logging.info(f"Configured Step-up scope request map: {scope_map}")
return scope_map
def configure_logging(config: dict):
formatter = logging.Formatter("[%(asctime)s] [%(levelname)8s] %(module)-18s:%(lineno)-4d %(message)s")
hdlr = logging.StreamHandler(sys.stdout)
logging.getLogger().setLevel(config.get("level", logging.INFO))
hdlr.setFormatter(formatter)
logging.getLogger().addHandler(hdlr)
return logging.getLogger()
def configure_nfc_device(config: dict):
clf = AnnotationFrameContactlessFrontend(
path=config.get("path", None) or f"tty:{config.get('port')}:{config.get('driver')}",
annotation_enabled=True,
)
return clf
def configure_repository(config: dict, repository=None):
repository = repository or Repository(config["persist"])
reader_private_key_hex = config.get("reader_private_key")
reader_group_identifier_hex = config.get("reader_group_identifier")
reader_group_sub_identifier_hex = config.get("reader_group_sub_identifier")
if reader_private_key_hex:
repository.set_reader_private_key(bytes.fromhex(reader_private_key_hex))
reader_group_identifier = bytes.fromhex(reader_group_identifier_hex) if reader_group_identifier_hex else bytes(16)
if len(reader_group_identifier) != 16:
raise ValueError(
f"aliro.reader_group_identifier must be 16 bytes (32 hex chars), got {len(reader_group_identifier)} bytes"
)
repository.set_reader_group_identifier(reader_group_identifier)
reader_group_sub_identifier = (
bytes.fromhex(reader_group_sub_identifier_hex) if reader_group_sub_identifier_hex else bytes(16)
)
if len(reader_group_sub_identifier) != 16:
raise ValueError(
"aliro.reader_group_sub_identifier must be 16 bytes"
f" (32 hex chars), got {len(reader_group_sub_identifier)} bytes"
)
repository.set_reader_group_sub_identifier(reader_group_sub_identifier)
return repository
def read_aliro_once( # noqa: C901
nfc_device,
repository: Repository,
*,
express: bool,
flow: AliroFlow,
authentication_policy: AuthenticationPolicy,
reader_certificate: bytes | None,
auth0_command_vendor_extension: bytes | None,
step_up_scopes: dict[str, bool],
preferred_versions: list[bytes],
throttle_polling: float,
should_run,
):
start = time.monotonic()
remote_target = nfc_device.sense(
RemoteTarget("106A"),
annotation=ECP.aliro(
identifier=repository.get_reader_group_identifier(),
flag_2=express,
).to_bytes(),
)
if remote_target is None:
# Throttle polling attempts to prevent overheating & RF performance degradation
time.sleep(max(0, throttle_polling - time.monotonic() + start))
return
target = activate(nfc_device, remote_target)
if target is None:
return
if not isinstance(target, ISODEPTag):
logging.info(f"Found non-ISODEP Tag with UID: {target.identifier.hex().upper()}")
nfc_device.close()
nfc_device.open(nfc_device.path)
while nfc_device.sense(RemoteTarget("106A")) is not None:
if not should_run():
return
logging.info("Waiting for target to leave the field...")
time.sleep(0.5)
return
logging.info(f"Got NFC tag {target}")
tag = ISO7816Tag(target)
try:
result_flow, endpoint = read_aliro(
tag,
endpoints=repository.get_all_endpoints(),
preferred_versions=preferred_versions,
flow=flow,
authentication_policy=authentication_policy,
reader_certificate=reader_certificate,
auth0_command_vendor_extension=auth0_command_vendor_extension,
step_up_scopes=step_up_scopes,
reader_group_identifier=repository.get_reader_group_identifier(),
reader_group_sub_identifier=repository.get_reader_group_sub_identifier(),
reader_private_key=repository.get_reader_private_key(),
key_size=16,
)
if endpoint is not None:
repository.upsert_endpoint(endpoint)
logging.info(f"Authenticated endpoint via {result_flow!r}: {endpoint}")
logging.info(f"Transaction took {(time.monotonic() - start) * 1000} ms")
except ProtocolError as e:
logging.info(f'Could not authenticate device due to protocol error "{e}"')
# Let device cool down, wait for ISODEP to drop to consider comms finished
while target.is_present:
if not should_run():
return
logging.info("Waiting for device to leave the field...")
time.sleep(0.5)
logging.info("Device left the field. Continuing in 2 seconds...")
time.sleep(2)
logging.info("Waiting for next device...")
def run_aliro(
nfc_device,
repository: Repository,
*,
express: bool,
flow: AliroFlow,
authentication_policy: AuthenticationPolicy,
reader_certificate: bytes | None,
auth0_command_vendor_extension: bytes | None,
step_up_scopes: dict[str, bool],
preferred_versions: list[bytes],
throttle_polling: float,
should_run,
):
if repository.get_reader_private_key() in (None, b""):
raise Exception("Device is not configured via HAP. NFC inactive")
logging.info("Connecting to the NFC reader...")
nfc_device.device = None
nfc_device.open(nfc_device.path)
if nfc_device.device is None:
raise Exception(f"Could not connect to NFC device {nfc_device} at {nfc_device.path}")
while should_run():
read_aliro_once(
nfc_device,
repository,
express=express,
flow=flow,
authentication_policy=authentication_policy,
reader_certificate=reader_certificate,
auth0_command_vendor_extension=auth0_command_vendor_extension,
step_up_scopes=step_up_scopes,
preferred_versions=preferred_versions,
throttle_polling=throttle_polling,
should_run=should_run,
)
def main():
config = load_configuration()
configure_logging(config["logging"])
nfc_device = configure_nfc_device(config["nfc"])
repository = configure_repository(config["aliro"])
express = bool(config["aliro"].get("express", True))
configured_flow = config["aliro"].get("flow", "fast")
try:
flow = AliroFlow.parse(configured_flow)
except (KeyError, ValueError):
flow = AliroFlow.FAST
logging.warning(f"Digital Key flow {configured_flow} is not supported. Falling back to {flow}")
authentication_policy = AuthenticationPolicy.parse(config["aliro"].get("authentication_policy", "user"))
reader_certificate_config = config["aliro"].get("reader_certificate", None)
reader_certificate, replacement_reader_private_key = resolve_reader_certificate(
reader_certificate_config,
repository.get_reader_private_key(),
)
if replacement_reader_private_key is not None:
repository.set_reader_private_key(replacement_reader_private_key)
auth0_command_vendor_extension = resolve_auth0_command_vendor_extension(
config["aliro"].get("auth0_command_vendor_extension")
)
step_up_scopes = resolve_step_up_scopes(config["aliro"].get("step_up_scopes"))
preferred_versions = resolve_preferred_protocol_versions(config["aliro"].get("version"))
throttle_polling = float(config["nfc"].get("throttle_polling") or 0.15)
running = True
def should_run():
return running
def handle_signal(sig, *_):
nonlocal running
logging.info(f"SIGNAL {signal.Signals(sig).name}")
running = False
for s in (signal.SIGINT, signal.SIGTERM):
signal.signal(s, handle_signal)
try:
run_aliro(
nfc_device,
repository,
express=express,
flow=flow,
authentication_policy=authentication_policy,
reader_certificate=reader_certificate,
auth0_command_vendor_extension=auth0_command_vendor_extension,
step_up_scopes=step_up_scopes,
preferred_versions=preferred_versions,
throttle_polling=throttle_polling,
should_run=should_run,
)
finally:
nfc_device.close()
if __name__ == "__main__":
main()