diff --git a/xtest/fixtures/keys.py b/xtest/fixtures/keys.py index 13514086..24e2a937 100644 --- a/xtest/fixtures/keys.py +++ b/xtest/fixtures/keys.py @@ -126,6 +126,214 @@ def managed_key_km2_ec( return key +@pytest.fixture(scope="module") +def key_e256( + otdfctl: OpentdfCommandLineTool, + kas_entry_km2: abac.KasEntry, + root_key: str, +) -> abac.KasKey: + """Get or create EC secp256r1 managed key on km2. + + Key ID includes a hash of the root key to ensure that if the root key changes, + a new key will be created instead of reusing an incompatible one. + """ + pfs = tdfs.PlatformFeatureSet() + if "key_management" not in pfs.features: + pytest.skip("Key management feature is not enabled") + + key_id = f"e256-{_key_id_suffix(root_key)}" + existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2) + key = next((k for k in existing_keys if k.key.key_id == key_id), None) + if key is None: + key = otdfctl.kas_registry_create_key( + kas_entry_km2, + key_id=key_id, + mode="local", + algorithm="ec:secp256r1", + wrapping_key=root_key, + wrapping_key_id="root", + ) + return key + + +@pytest.fixture(scope="module") +def key_e384( + otdfctl: OpentdfCommandLineTool, + kas_entry_km2: abac.KasEntry, + root_key: str, +) -> abac.KasKey: + """Get or create EC secp384r1 managed key on km2 + + Key ID includes a hash of the root key to ensure that if the root key changes, + a new key will be created instead of reusing an incompatible one. + """ + pfs = tdfs.PlatformFeatureSet() + if "key_management" not in pfs.features: + pytest.skip("Key management feature is not enabled") + + key_id = f"e384-{_key_id_suffix(root_key)}" + existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2) + key = next((k for k in existing_keys if k.key.key_id == key_id), None) + if key is None: + key = otdfctl.kas_registry_create_key( + kas_entry_km2, + key_id=key_id, + mode="local", + algorithm="ec:secp384r1", + wrapping_key=root_key, + wrapping_key_id="root", + ) + return key + + +@pytest.fixture(scope="module") +def key_e521( + otdfctl: OpentdfCommandLineTool, + kas_entry_km2: abac.KasEntry, + root_key: str, +) -> abac.KasKey: + """Get or create EC secp521r1 managed key on km2. + + Key ID includes a hash of the root key to ensure that if the root key changes, + a new key will be created instead of reusing an incompatible one. + """ + pfs = tdfs.PlatformFeatureSet() + if "key_management" not in pfs.features: + pytest.skip("Key management feature is not enabled") + + key_id = f"e521-{_key_id_suffix(root_key)}" + existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2) + key = next((k for k in existing_keys if k.key.key_id == key_id), None) + if key is None: + key = otdfctl.kas_registry_create_key( + kas_entry_km2, + key_id=key_id, + mode="local", + algorithm="ec:secp521r1", + wrapping_key=root_key, + wrapping_key_id="root", + ) + return key + + +@pytest.fixture(scope="module") +def key_r2048( + otdfctl: OpentdfCommandLineTool, + kas_entry_km1: abac.KasEntry, + root_key: str, +) -> abac.KasKey: + """Get or create RSA 2048 managed key on km1. + + Key ID includes a hash of the root key to ensure that if the root key changes, + a new key will be created instead of reusing an incompatible one. + """ + pfs = tdfs.PlatformFeatureSet() + if "key_management" not in pfs.features: + pytest.skip("Key management feature is not enabled") + + key_id = f"r2048-{_key_id_suffix(root_key)}" + existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) + key = next((k for k in existing_keys if k.key.key_id == key_id), None) + if key is None: + key = otdfctl.kas_registry_create_key( + kas_entry_km1, + key_id=key_id, + mode="local", + algorithm="rsa:2048", + wrapping_key=root_key, + wrapping_key_id="root", + ) + return key + + +@pytest.fixture(scope="module") +def key_r4096( + otdfctl: OpentdfCommandLineTool, + kas_entry_km1: abac.KasEntry, + root_key: str, +) -> abac.KasKey: + """Get or create RSA 4096 managed key on km1. + + Key ID includes a hash of the root key to ensure that if the root key changes, + a new key will be created instead of reusing an incompatible one. + """ + pfs = tdfs.PlatformFeatureSet() + if "key_management" not in pfs.features: + pytest.skip("Key management feature is not enabled") + + key_id = f"r4096-{_key_id_suffix(root_key)}" + existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) + key = next((k for k in existing_keys if k.key.key_id == key_id), None) + if key is None: + key = otdfctl.kas_registry_create_key( + kas_entry_km1, + key_id=key_id, + mode="local", + algorithm="rsa:4096", + wrapping_key=root_key, + wrapping_key_id="root", + ) + return key + + +@pytest.fixture(scope="module") +def attribute_allof_with_extended_mechanisms( + otdfctl: OpentdfCommandLineTool, + key_e256: abac.KasKey, + key_e384: abac.KasKey, + key_e521: abac.KasKey, + key_r2048: abac.KasKey, + key_r4096: abac.KasKey, + otdf_client_scs: abac.SubjectConditionSet, + temporary_namespace: abac.Namespace, +) -> tuple[abac.Attribute, list[str]]: + """Create an ALL_OF attribute and assign extended mechanism keys to it. + + - Uses ec:secp256r1, ec:secp384r1, ec:secp521r1, and rsa:2048, rsa:4096 keys + - Reuses existing managed keys + - Assigns all keys to attribute values (value-level assignment) + - Maps all attribute values to the client SCS + """ + pfs = tdfs.PlatformFeatureSet() + if "key_management" not in pfs.features: + pytest.skip( + "Key management feature is not enabled; skipping key assignment fixture" + ) + + # Create attribute with three values under ALL_OF + attr = otdfctl.attribute_create( + temporary_namespace, + "mechanism-select", + abac.AttributeRule.ALL_OF, + ["ec-secp256r1", "ec-secp384r1", "ec-secp521r1", "rsa-2048", "rsa-4096"], + ) + assert attr.values and len(attr.values) == 5 + v_e256, v_e384, v_e521, v_r2048, v_r4096 = attr.values + assert v_e256.value == "ec-secp256r1" + assert v_e384.value == "ec-secp384r1" + assert v_e521.value == "ec-secp521r1" + assert v_r2048.value == "rsa-2048" + assert v_r4096.value == "rsa-4096" + + # Ensure client has access to all values and assign keys + keys = (key_e256, key_e384, key_e521, key_r2048, key_r4096) + for key, value in zip(keys, attr.values): + sm = otdfctl.scs_map(otdf_client_scs, value) + assert sm.attribute_value.value == value.value + otdfctl.key_assign_value(key, value) + + return ( + attr, + [ + key_e256.key.key_id, + key_e384.key.key_id, + key_e521.key.key_id, + key_r2048.key.key_id, + key_r4096.key.key_id, + ], + ) + + @pytest.fixture(scope="module") def attribute_allof_with_two_managed_keys( otdfctl: OpentdfCommandLineTool, diff --git a/xtest/test_abac.py b/xtest/test_abac.py index 39e4ce0f..28aaf786 100644 --- a/xtest/test_abac.py +++ b/xtest/test_abac.py @@ -117,6 +117,209 @@ def test_key_mapping_multiple_mechanisms( assert filecmp.cmp(pt_file, rt_file) +def test_key_mapping_extended_mechanisms( + attribute_allof_with_extended_mechanisms: tuple[Attribute, list[str]], + encrypt_sdk: tdfs.SDK, + decrypt_sdk: tdfs.SDK, + tmp_dir: Path, + pt_file: Path, + kas_url_km1: str, + kas_url_km2: str, + in_focus: set[tdfs.SDK], +): + """Test encryption and decryption with extended cryptographic mechanisms. + + This test verifies support for ec:secp384r1, ec:secp521r1, and rsa:4096 + key types by encrypting with all three mechanisms and successfully decrypting. + """ + if not in_focus & {encrypt_sdk, decrypt_sdk}: + pytest.skip("Not in focus") + tdfs.skip_if_unsupported(encrypt_sdk, "key_management") + tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure") + pfs = tdfs.PlatformFeatureSet() + tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) + tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) + skip_dspx1153(encrypt_sdk, decrypt_sdk) + + attr, key_ids = attribute_allof_with_extended_mechanisms + + sample_name = f"extended-mechanisms-{encrypt_sdk}" + if sample_name in cipherTexts: + ct_file = cipherTexts[sample_name] + else: + ct_file = tmp_dir / f"{sample_name}.tdf" + cipherTexts[sample_name] = ct_file + encrypt_sdk.encrypt( + pt_file, + ct_file, + mime_type="text/plain", + container="ztdf", + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + + manifest = tdfs.manifest(ct_file) + assert len(manifest.encryptionInformation.keyAccess) == 5 + + # Verify that all three key IDs are present in the manifest + manifest_kids = {kao.kid for kao in manifest.encryptionInformation.keyAccess} + expected_kids = set(key_ids) + assert manifest_kids == expected_kids, ( + f"Expected key IDs {expected_kids} but got {manifest_kids}" + ) + + # Verify KAS URLs are from km1 or km2 + manifest_urls = {kao.url for kao in manifest.encryptionInformation.keyAccess} + assert manifest_urls <= {kas_url_km1, kas_url_km2}, ( + f"Expected KAS URLs to be from km1 or km2, but got {manifest_urls}" + ) + + # Verify EC wrapping support if needed + if any( + kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess + ): + tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") + + # Decrypt and verify + rt_file = tmp_dir / f"extended-mechanisms-{encrypt_sdk}-{decrypt_sdk}.untdf" + decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") + assert filecmp.cmp(pt_file, rt_file) + + +def test_key_mapping_extended_ec_mechanisms( + attribute_allof_with_extended_mechanisms: tuple[Attribute, list[str]], + encrypt_sdk: tdfs.SDK, + decrypt_sdk: tdfs.SDK, + tmp_dir: Path, + pt_file: Path, + kas_url_km2: str, + in_focus: set[tdfs.SDK], +): + """Test encryption and decryption with extended EC cryptographic mechanisms. + + This test verifies support for ec:secp384r1 and ec:secp521r1 + key types by encrypting with them and successfully decrypting. + """ + if not in_focus & {encrypt_sdk, decrypt_sdk}: + pytest.skip("Not in focus") + tdfs.skip_if_unsupported(encrypt_sdk, "key_management") + tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure") + pfs = tdfs.PlatformFeatureSet() + tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) + tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) + skip_dspx1153(encrypt_sdk, decrypt_sdk) + + attr, key_ids = attribute_allof_with_extended_mechanisms + + ec_kids = [kid for kid in key_ids if kid.startswith("e3") or kid.startswith("e5")] + ec_vals = [v for v in attr.value_fqns if "ec-secp3" in v or "ec-secp5" in v] + assert len(ec_kids) == len(ec_vals), "Mismatch in EC key IDs and attribute values" + + sample_name = f"extended-mechanisms-ec-{encrypt_sdk}" + if sample_name in cipherTexts: + ct_file = cipherTexts[sample_name] + else: + ct_file = tmp_dir / f"{sample_name}.tdf" + cipherTexts[sample_name] = ct_file + encrypt_sdk.encrypt( + pt_file, + ct_file, + mime_type="text/plain", + container="ztdf", + attr_values=ec_vals, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + + manifest = tdfs.manifest(ct_file) + assert len(manifest.encryptionInformation.keyAccess) == len(ec_kids) + + # Verify that all three key IDs are present in the manifest + manifest_kids = {kao.kid for kao in manifest.encryptionInformation.keyAccess} + expected_kids = set(ec_kids) + assert manifest_kids == expected_kids, ( + f"Expected key IDs {expected_kids} but got {manifest_kids}" + ) + + # Verify KAS URLs are from km2 + manifest_urls = {kao.url for kao in manifest.encryptionInformation.keyAccess} + assert manifest_urls <= {kas_url_km2}, ( + f"Expected KAS URLs to be from km2, but got {manifest_urls}" + ) + + # Decrypt and verify + rt_file = tmp_dir / f"extended-mechanisms-ec-{encrypt_sdk}-{decrypt_sdk}.untdf" + decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") + assert filecmp.cmp(pt_file, rt_file) + + +def test_key_mapping_extended_rsa_mechanisms( + attribute_allof_with_extended_mechanisms: tuple[Attribute, list[str]], + encrypt_sdk: tdfs.SDK, + decrypt_sdk: tdfs.SDK, + tmp_dir: Path, + pt_file: Path, + kas_url_km1: str, + in_focus: set[tdfs.SDK], +): + """Test encryption and decryption with extended RSA cryptographic mechanisms. + + This test verifies support for rsa:2048 and rsa:4096 + key types by encrypting with both mechanisms and successfully decrypting. + """ + if not in_focus & {encrypt_sdk, decrypt_sdk}: + pytest.skip("Not in focus") + tdfs.skip_if_unsupported(encrypt_sdk, "key_management") + tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure") + pfs = tdfs.PlatformFeatureSet() + tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) + tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) + skip_dspx1153(encrypt_sdk, decrypt_sdk) + + attr, key_ids = attribute_allof_with_extended_mechanisms + + rsa_kids = [kid for kid in key_ids if kid.startswith("r")] + rsa_vals = [v for v in attr.value_fqns if "rsa-" in v] + assert len(rsa_kids) == len(rsa_vals), ( + "Mismatch in RSA key IDs and attribute values" + ) + + sample_name = f"extended-mechanisms-rsa-{encrypt_sdk}" + if sample_name in cipherTexts: + ct_file = cipherTexts[sample_name] + else: + ct_file = tmp_dir / f"{sample_name}.tdf" + cipherTexts[sample_name] = ct_file + encrypt_sdk.encrypt( + pt_file, + ct_file, + mime_type="text/plain", + container="ztdf", + attr_values=rsa_vals, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + + manifest = tdfs.manifest(ct_file) + assert len(manifest.encryptionInformation.keyAccess) == len(rsa_kids) + + # Verify that all three key IDs are present in the manifest + manifest_kids = {kao.kid for kao in manifest.encryptionInformation.keyAccess} + expected_kids = set(rsa_kids) + assert manifest_kids == expected_kids, ( + f"Expected key IDs {expected_kids} but got {manifest_kids}" + ) + + # Verify KAS URLs are from km1 + manifest_urls = {kao.url for kao in manifest.encryptionInformation.keyAccess} + assert manifest_urls <= {kas_url_km1}, ( + f"Expected KAS URLs to be from km1, but got {manifest_urls}" + ) + + # Decrypt and verify + rt_file = tmp_dir / f"extended-mechanisms-rsa-{encrypt_sdk}-{decrypt_sdk}.untdf" + decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") + assert filecmp.cmp(pt_file, rt_file) + + def test_autoconfigure_one_attribute_standard( attribute_single_kas_grant: Attribute, encrypt_sdk: tdfs.SDK,