|
7 | 7 | from unittest.mock import MagicMock, patch |
8 | 8 |
|
9 | 9 | import pytest |
| 10 | +from hypothesis import given, strategies as st |
10 | 11 |
|
11 | | -from ..protobuf import admin_pb2, localonly_pb2, config_pb2, mesh_pb2 |
| 12 | +from ..protobuf import admin_pb2, localonly_pb2, config_pb2, mesh_pb2, nanopb_pb2 |
12 | 13 | from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611 |
13 | 14 | from ..node import Node |
14 | 15 | from ..serial_interface import SerialInterface |
15 | 16 | from ..mesh_interface import MeshInterface |
| 17 | +from ..util import to_node_num |
16 | 18 |
|
17 | 19 | # from ..config_pb2 import Config |
18 | 20 | # from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2, |
19 | 21 | # CannedMessagePluginMessagePart3, CannedMessagePluginMessagePart4, |
20 | 22 | # CannedMessagePluginMessagePart5) |
21 | 23 | # from ..util import Timeout |
22 | 24 |
|
| 25 | +# Extract nanopb max_size constraints from the User protobuf descriptor |
| 26 | +_USER_NANOPB = { |
| 27 | + field.name: field.GetOptions().Extensions[nanopb_pb2.nanopb] |
| 28 | + for field in mesh_pb2.User.DESCRIPTOR.fields |
| 29 | +} |
23 | 30 |
|
24 | 31 | @pytest.mark.unit |
25 | 32 | def test_node(capsys): |
@@ -341,53 +348,245 @@ def test_setURL_valid_URL_but_no_settings(capsys): |
341 | 348 |
|
342 | 349 |
|
343 | 350 | @pytest.mark.unit |
344 | | -def test_contact_url_roundtrip(): |
345 | | - """Verify that contact URL generation and parsing is fully reversible""" |
346 | | - def encode_url(contact): |
347 | | - data = contact.SerializeToString() |
348 | | - s = base64.urlsafe_b64encode(data).decode("ascii") |
349 | | - s = s.replace("=", "").replace("+", "-").replace("/", "_") |
350 | | - return f"https://meshtastic.org/v/#{s}" |
351 | | - |
352 | | - def decode_url(url): |
353 | | - b64 = url.split("/#")[-1] |
354 | | - missing_padding = len(b64) % 4 |
355 | | - if missing_padding: |
356 | | - b64 += "=" * (4 - missing_padding) |
357 | | - decoded = base64.urlsafe_b64decode(b64) |
358 | | - contact = admin_pb2.SharedContact() |
359 | | - contact.ParseFromString(decoded) |
360 | | - return contact |
361 | | - |
362 | | - original = admin_pb2.SharedContact() |
363 | | - original.node_num = 2198819370 |
364 | | - original.user.id = "!830f522a" |
365 | | - original.user.long_name = "Roadrunner Ridge" |
366 | | - original.user.short_name = "RKSN" |
367 | | - original.user.macaddr = b'\x00\x00\x00\x00\x00\x00' |
368 | | - original.user.hw_model = mesh_pb2.HardwareModel.Value("RAK4631") |
369 | | - original.user.role = mesh_pb2.User.DESCRIPTOR.fields_by_name['role'].enum_type.values_by_name["ROUTER"].number |
370 | | - original.user.public_key = bytes.fromhex("471a3f170fdeae0408851a816eb1dab0b632de053e032f21ae1bb83959c83d07") |
371 | | - original.user.is_licensed = True |
372 | | - original.user.is_unmessagable = False |
373 | | - original.should_ignore = True |
374 | | - original.manually_verified = True |
375 | | - |
376 | | - url = encode_url(original) |
377 | | - parsed = decode_url(url) |
378 | | - |
379 | | - assert parsed.node_num == original.node_num |
380 | | - assert parsed.user.id == original.user.id |
381 | | - assert parsed.user.long_name == original.user.long_name |
382 | | - assert parsed.user.short_name == original.user.short_name |
383 | | - assert parsed.user.macaddr == original.user.macaddr |
384 | | - assert parsed.user.hw_model == original.user.hw_model |
385 | | - assert parsed.user.role == original.user.role |
386 | | - assert parsed.user.public_key == original.user.public_key |
387 | | - assert parsed.user.is_licensed == original.user.is_licensed |
388 | | - assert parsed.user.is_unmessagable == original.user.is_unmessagable |
389 | | - assert parsed.should_ignore == original.should_ignore |
390 | | - assert parsed.manually_verified == original.manually_verified |
| 351 | +@pytest.mark.parametrize("node_id,node_data,should_ignore,manually_verified", [ |
| 352 | + pytest.param( |
| 353 | + "!830f522a", |
| 354 | + { |
| 355 | + "num": 2198819370, |
| 356 | + "user": { |
| 357 | + "id": "!830f522a", |
| 358 | + "longName": "Roadrunner Ridge", |
| 359 | + "shortName": "RKSN", |
| 360 | + "macaddr": "AAAAAAAAAAA=", |
| 361 | + "hwModel": "RAK4631", |
| 362 | + "role": "ROUTER", |
| 363 | + "publicKey": "Rx8XD96uBAiFGoFusdqwti3eBT4DLyGuG7g5Wcg9Bw==", |
| 364 | + "isLicensed": True, |
| 365 | + "isUnmessagable": False, |
| 366 | + }, |
| 367 | + }, |
| 368 | + True, |
| 369 | + True, |
| 370 | + id="all_fields_all_flags", |
| 371 | + ), |
| 372 | + pytest.param( |
| 373 | + "!12345678", |
| 374 | + { |
| 375 | + "num": 305419896, |
| 376 | + "user": { |
| 377 | + "id": "!12345678", |
| 378 | + "longName": "Test Node", |
| 379 | + "shortName": "TN", |
| 380 | + "macaddr": "QkVTVEVWRVI=", |
| 381 | + "hwModel": "TBEAM", |
| 382 | + }, |
| 383 | + }, |
| 384 | + False, |
| 385 | + False, |
| 386 | + id="minimal_fields_no_flags", |
| 387 | + ), |
| 388 | + pytest.param( |
| 389 | + 305419896, |
| 390 | + { |
| 391 | + "num": 305419896, |
| 392 | + "user": { |
| 393 | + "id": "!12345678", |
| 394 | + "longName": "Another Node", |
| 395 | + "shortName": "AN", |
| 396 | + "macaddr": "QkVTVEVWRVI=", |
| 397 | + "hwModel": "HELTEC_V3", |
| 398 | + "role": "CLIENT", |
| 399 | + "publicKey": "AAAAAAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8=", |
| 400 | + "isLicensed": False, |
| 401 | + }, |
| 402 | + }, |
| 403 | + True, |
| 404 | + False, |
| 405 | + id="int_node_id_should_ignore_only", |
| 406 | + ), |
| 407 | + pytest.param( |
| 408 | + "!deadbeef", |
| 409 | + { |
| 410 | + "num": 3735928559, |
| 411 | + "user": { |
| 412 | + "id": "!deadbeef", |
| 413 | + "longName": "Minimal Contact", |
| 414 | + "shortName": "MC", |
| 415 | + "macaddr": "BQYHCAkKCw==", |
| 416 | + "hwModel": "UNSET", |
| 417 | + "role": "CLIENT_MUTE", |
| 418 | + }, |
| 419 | + }, |
| 420 | + False, |
| 421 | + True, |
| 422 | + id="unset_hw_model_verified_only", |
| 423 | + ), |
| 424 | + pytest.param( |
| 425 | + "!1a2b3c4d", |
| 426 | + { |
| 427 | + "num": 439041101, |
| 428 | + "user": { |
| 429 | + "id": "!1a2b3c4d", |
| 430 | + "longName": "Licensed Node", |
| 431 | + "shortName": "LN", |
| 432 | + "macaddr": "DA0ODxAREg==", |
| 433 | + "hwModel": "NANO_G1", |
| 434 | + "isLicensed": True, |
| 435 | + "isUnmessagable": True, |
| 436 | + }, |
| 437 | + }, |
| 438 | + False, |
| 439 | + False, |
| 440 | + id="licensed_unmessagable_no_flags", |
| 441 | + ), |
| 442 | +]) |
| 443 | +def test_contact_url_roundtrip(node_id, node_data, should_ignore, manually_verified): |
| 444 | + """Verify that contact URL generation via getContactURL() and parsing via addContactURL() is fully reversible""" |
| 445 | + iface = MagicMock(autospec=MeshInterface) |
| 446 | + node_num = to_node_num(node_id) |
| 447 | + iface.nodesByNum = {node_num: node_data} |
| 448 | + iface.localNode = None |
| 449 | + |
| 450 | + anode = Node(iface, node_num, noProto=True) |
| 451 | + |
| 452 | + sent_admin = [] |
| 453 | + def capture_send(p, *args, **kwargs): |
| 454 | + sent_admin.append(p) |
| 455 | + |
| 456 | + with patch.object(anode, "_sendAdmin", side_effect=capture_send): |
| 457 | + url = anode.getContactURL(node_id, should_ignore=should_ignore, manually_verified=manually_verified) |
| 458 | + assert url.startswith("https://meshtastic.org/v/#") |
| 459 | + |
| 460 | + anode.addContactURL(url) |
| 461 | + |
| 462 | + assert len(sent_admin) == 1 |
| 463 | + contact = sent_admin[0].add_contact |
| 464 | + u = node_data["user"] |
| 465 | + |
| 466 | + assert contact.node_num == node_num |
| 467 | + assert contact.user.id == u["id"] |
| 468 | + assert contact.user.long_name == u["longName"] |
| 469 | + assert contact.user.short_name == u["shortName"] |
| 470 | + assert contact.user.macaddr == base64.b64decode(u["macaddr"]) |
| 471 | + |
| 472 | + if u.get("hwModel") and u["hwModel"] != "UNSET": |
| 473 | + assert contact.user.hw_model == mesh_pb2.HardwareModel.Value(u["hwModel"]) |
| 474 | + if u.get("role"): |
| 475 | + assert contact.user.role == config_pb2.Config.DeviceConfig.Role.Value(u["role"]) |
| 476 | + if u.get("publicKey"): |
| 477 | + assert contact.user.public_key == base64.b64decode(u["publicKey"]) |
| 478 | + if u.get("isLicensed"): |
| 479 | + assert contact.user.is_licensed is True |
| 480 | + if u.get("isUnmessagable") is not None: |
| 481 | + assert contact.user.is_unmessagable == u["isUnmessagable"] |
| 482 | + |
| 483 | + assert contact.should_ignore == should_ignore |
| 484 | + assert contact.manually_verified == manually_verified |
| 485 | + |
| 486 | + |
| 487 | +@st.composite |
| 488 | +def contact_url_roundtrip_params(draw): |
| 489 | + """Hypothesis strategy: generate a full node config and roundtrip flags""" |
| 490 | + should_ignore = draw(st.booleans()) |
| 491 | + manually_verified = draw(st.booleans()) |
| 492 | + |
| 493 | + node_num = draw(st.integers(min_value=6, max_value=2**32 - 2)) |
| 494 | + node_id = f"!{node_num:08x}" |
| 495 | + |
| 496 | + hw_model = draw(st.sampled_from(list(mesh_pb2.HardwareModel.keys()))) |
| 497 | + role = draw(st.one_of( |
| 498 | + st.none(), |
| 499 | + st.sampled_from(list(config_pb2.Config.DeviceConfig.Role.keys())), |
| 500 | + )) |
| 501 | + |
| 502 | + long_name = draw(st.text( |
| 503 | + min_size=1, max_size=_USER_NANOPB['long_name'].max_size |
| 504 | + )) |
| 505 | + short_name = draw(st.text( |
| 506 | + min_size=1, max_size=_USER_NANOPB['short_name'].max_size |
| 507 | + )) |
| 508 | + |
| 509 | + macaddr_bytes = draw(st.binary( |
| 510 | + min_size=_USER_NANOPB['macaddr'].max_size, |
| 511 | + max_size=_USER_NANOPB['macaddr'].max_size, |
| 512 | + )) |
| 513 | + macaddr_b64 = base64.b64encode(macaddr_bytes).decode("ascii") |
| 514 | + |
| 515 | + has_public_key = draw(st.booleans()) |
| 516 | + public_key_b64 = None |
| 517 | + if has_public_key: |
| 518 | + pk_bytes = draw(st.binary( |
| 519 | + min_size=_USER_NANOPB['public_key'].max_size, |
| 520 | + max_size=_USER_NANOPB['public_key'].max_size, |
| 521 | + )) |
| 522 | + public_key_b64 = base64.b64encode(pk_bytes).decode("ascii") |
| 523 | + |
| 524 | + is_licensed = draw(st.booleans()) |
| 525 | + is_unmessagable = draw(st.booleans()) |
| 526 | + |
| 527 | + node_data = { |
| 528 | + "num": node_num, |
| 529 | + "user": { |
| 530 | + "id": node_id, |
| 531 | + "longName": long_name, |
| 532 | + "shortName": short_name, |
| 533 | + "macaddr": macaddr_b64, |
| 534 | + "hwModel": hw_model, |
| 535 | + "isLicensed": is_licensed, |
| 536 | + "isUnmessagable": is_unmessagable, |
| 537 | + }, |
| 538 | + } |
| 539 | + if role is not None: |
| 540 | + node_data["user"]["role"] = role |
| 541 | + if public_key_b64 is not None: |
| 542 | + node_data["user"]["publicKey"] = public_key_b64 |
| 543 | + |
| 544 | + return node_num, node_data, should_ignore, manually_verified |
| 545 | + |
| 546 | + |
| 547 | +@pytest.mark.unit |
| 548 | +@given(contact_url_roundtrip_params()) |
| 549 | +def test_contact_url_roundtrip_hypothesis(params): |
| 550 | + """Property: roundtrip preserves data across random field configurations""" |
| 551 | + node_num, node_data, should_ignore, manually_verified = params |
| 552 | + |
| 553 | + iface = MagicMock(autospec=MeshInterface) |
| 554 | + iface.nodesByNum = {node_num: node_data} |
| 555 | + iface.localNode = None |
| 556 | + |
| 557 | + anode = Node(iface, node_num, noProto=True) |
| 558 | + |
| 559 | + sent_admin = [] |
| 560 | + def capture_send(p, *args, **kwargs): |
| 561 | + sent_admin.append(p) |
| 562 | + |
| 563 | + with patch.object(anode, "_sendAdmin", side_effect=capture_send): |
| 564 | + url = anode.getContactURL( |
| 565 | + node_num, |
| 566 | + should_ignore=should_ignore, |
| 567 | + manually_verified=manually_verified, |
| 568 | + ) |
| 569 | + anode.addContactURL(url) |
| 570 | + |
| 571 | + assert len(sent_admin) == 1 |
| 572 | + contact = sent_admin[0].add_contact |
| 573 | + u = node_data["user"] |
| 574 | + |
| 575 | + assert contact.node_num == node_num |
| 576 | + assert contact.user.id == u["id"] |
| 577 | + assert contact.user.long_name == u["longName"] |
| 578 | + assert contact.user.short_name == u["shortName"] |
| 579 | + assert contact.user.macaddr == base64.b64decode(u["macaddr"]) |
| 580 | + assert contact.user.hw_model == mesh_pb2.HardwareModel.Value(u["hwModel"]) |
| 581 | + |
| 582 | + if "role" in u: |
| 583 | + assert contact.user.role == config_pb2.Config.DeviceConfig.Role.Value(u["role"]) |
| 584 | + if "publicKey" in u: |
| 585 | + assert contact.user.public_key == base64.b64decode(u["publicKey"]) |
| 586 | + assert contact.user.is_licensed == u["isLicensed"] |
| 587 | + assert contact.user.is_unmessagable == u["isUnmessagable"] |
| 588 | + assert contact.should_ignore == should_ignore |
| 589 | + assert contact.manually_verified == manually_verified |
391 | 590 |
|
392 | 591 |
|
393 | 592 | # TODO |
|
0 commit comments