feat(xtest): add support for extended cryptographic mechanisms#401
feat(xtest): add support for extended cryptographic mechanisms#401dmihalcik-virtru wants to merge 4 commits intoopentdf:mainfrom
Conversation
Summary of ChangesHello @dmihalcik-virtru, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the testing framework by integrating support for extended cryptographic mechanisms. It introduces new fixtures for managing Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request adds support for extended cryptographic mechanisms, including new fixtures and tests. The implementation is a good step forward, but I've identified areas for improvement. Specifically, there's significant code duplication in the new fixtures within xtest/fixtures/keys.py that could be refactored for better maintainability. Additionally, the new test in xtest/test_abac.py has a bug in its assertions and a misleading docstring. My review includes suggestions to address these points, which will improve the code's quality and robustness.
xtest/test_abac.py
Outdated
| assert len(manifest.encryptionInformation.keyAccess) == 3 | ||
|
|
||
| # Verify that all three key IDs are present in the manifest |
There was a problem hiding this comment.
There's a bug in the test logic. The attribute_allof_with_extended_mechanisms fixture provides 5 key IDs, so expected_kids will be a set of 5. However, you're asserting that keyAccess has a length of 3. This will cause the subsequent assertion manifest_kids == expected_kids to fail. The assertion should check for 5 key access objects.
| assert len(manifest.encryptionInformation.keyAccess) == 3 | |
| # Verify that all three key IDs are present in the manifest | |
| assert len(manifest.encryptionInformation.keyAccess) == 5 | |
| # Verify that all five key IDs are present in the manifest |
| @pytest.fixture(scope="module") | ||
| def key_e256( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km1: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create EC secp256r1 managed key on km1. | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"e256-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km1, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="ec:secp256r1", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def key_e384( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km1: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create EC secp384r1 managed key on km1. | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"e384-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km1, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="ec:secp384r1", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def key_e521( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km2: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create EC secp521r1 managed key on km2. | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"e521-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km2, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="ec:secp521r1", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def key_r2048( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km1: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create RSA 2048 managed key on km1. | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"r2048-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km1, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="rsa:2048", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def key_r4096( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km1: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create RSA 4096 managed key on km1. | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"r4096-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km1, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="rsa:4096", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key |
There was a problem hiding this comment.
The five new key fixtures (key_e256, key_e384, key_e521, key_r2048, key_r4096) contain a lot of duplicated code. To improve maintainability and reduce redundancy, this logic can be extracted into a single helper function. The fixtures can then call this helper with the specific parameters for each key type.
def _get_or_create_managed_key(
otdfctl: OpentdfCommandLineTool,
kas_entry: abac.KasEntry,
root_key: str,
key_id_prefix: str,
algorithm: str,
) -> abac.KasKey:
"""Helper to get or create a managed key."""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip("Key management feature is not enabled")
key_id = f"{key_id_prefix}-{_key_id_suffix(root_key)}"
existing_keys = otdfctl.kas_registry_keys_list(kas_entry)
key = next((k for k in existing_keys if k.key.key_id == key_id), None)
if key is None:
key = otdfctl.kas_registry_create_key(
kas_entry,
key_id=key_id,
mode="local",
algorithm=algorithm,
wrapping_key=root_key,
wrapping_key_id="root",
)
return key
@pytest.fixture(scope="module")
def key_e256(
otdfctl: OpentdfCommandLineTool,
kas_entry_km1: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create EC secp256r1 managed key on km1."""
return _get_or_create_managed_key(
otdfctl, kas_entry_km1, root_key, "e256", "ec:secp256r1"
)
@pytest.fixture(scope="module")
def key_e384(
otdfctl: OpentdfCommandLineTool,
kas_entry_km1: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create EC secp384r1 managed key on km1."""
return _get_or_create_managed_key(
otdfctl, kas_entry_km1, root_key, "e384", "ec:secp384r1"
)
@pytest.fixture(scope="module")
def key_e521(
otdfctl: OpentdfCommandLineTool,
kas_entry_km2: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create EC secp521r1 managed key on km2."""
return _get_or_create_managed_key(
otdfctl, kas_entry_km2, root_key, "e521", "ec:secp521r1"
)
@pytest.fixture(scope="module")
def key_r2048(
otdfctl: OpentdfCommandLineTool,
kas_entry_km1: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create RSA 2048 managed key on km1."""
return _get_or_create_managed_key(
otdfctl, kas_entry_km1, root_key, "r2048", "rsa:2048"
)
@pytest.fixture(scope="module")
def key_r4096(
otdfctl: OpentdfCommandLineTool,
kas_entry_km1: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create RSA 4096 managed key on km1."""
return _get_or_create_managed_key(
otdfctl, kas_entry_km1, root_key, "r4096", "rsa:4096"
)| # Create attribute with three values under ALL_OF | ||
| attr = otdfctl.attribute_create( | ||
| temporary_namespace, | ||
| "mechanism-select", | ||
| abac.AttributeRule.ALL_OF, | ||
| ["ec:secp256r1", "ec:secp384r1", "ec:secp521r1", "rsa:2048", "rsa:4096"], | ||
| ) | ||
| assert attr.values and len(attr.values) == 5 | ||
| v_e256, v_e384, v_e521, v_r2048, v_r4096 = attr.values | ||
| assert v_e256.value == "ec:secp256r1" | ||
| assert v_e384.value == "ec:secp384r1" | ||
| assert v_e521.value == "ec:secp521r1" | ||
| assert v_r2048.value == "rsa:2048" | ||
| assert v_r4096.value == "rsa:4096" | ||
|
|
||
| # Ensure client has access to all values | ||
| sm1 = otdfctl.scs_map(otdf_client_scs, v_e256) | ||
| assert sm1.attribute_value.value == v_e256.value | ||
| sm2 = otdfctl.scs_map(otdf_client_scs, v_e384) | ||
| assert sm2.attribute_value.value == v_e384.value | ||
| sm3 = otdfctl.scs_map(otdf_client_scs, v_e521) | ||
| assert sm3.attribute_value.value == v_e521.value | ||
| sm4 = otdfctl.scs_map(otdf_client_scs, v_r2048) | ||
| assert sm4.attribute_value.value == v_r2048.value | ||
| sm5 = otdfctl.scs_map(otdf_client_scs, v_r4096) | ||
| assert sm5.attribute_value.value == v_r4096.value | ||
|
|
||
| # Assign keys to corresponding attribute values | ||
| otdfctl.key_assign_value(key_e256, v_e256) | ||
| otdfctl.key_assign_value(key_e384, v_e384) | ||
| otdfctl.key_assign_value(key_e521, v_e521) | ||
| otdfctl.key_assign_value(key_r2048, v_r2048) | ||
| otdfctl.key_assign_value(key_r4096, v_r4096) | ||
|
|
||
| return ( | ||
| attr, | ||
| [ | ||
| key_e256.key.key_id, | ||
| key_e384.key.key_id, | ||
| key_e521.key.key_id, | ||
| key_r2048.key.key_id, | ||
| key_r4096.key.key_id, | ||
| ], | ||
| ) |
There was a problem hiding this comment.
The implementation of this fixture is repetitive and brittle. It relies on positional unpacking of attr.values, which assumes a specific order that is not guaranteed. This can be refactored to be more robust and data-driven by using a dictionary to map mechanisms to keys and iterating over it. This will make the code cleaner, easier to maintain, and less prone to breaking if the order of attribute values changes.
mechanisms_to_keys = {
"ec:secp256r1": key_e256,
"ec:secp384r1": key_e384,
"ec:secp521r1": key_e521,
"rsa:2048": key_r2048,
"rsa:4096": key_r4096,
}
mechanism_values = list(mechanisms_to_keys.keys())
# Create attribute with values under ALL_OF
attr = otdfctl.attribute_create(
temporary_namespace,
"mechanism-select",
abac.AttributeRule.ALL_OF,
mechanism_values,
)
assert attr.values and len(attr.values) == len(mechanism_values)
# Create a mapping from value string to value object for easier lookup
value_map = {v.value: v for v in attr.values}
for mechanism, key in mechanisms_to_keys.items():
value_obj = value_map[mechanism]
# Ensure client has access to the value
sm = otdfctl.scs_map(otdf_client_scs, value_obj)
assert sm.attribute_value.value == value_obj.value
# Assign key to corresponding attribute value
otdfctl.key_assign_value(key, value_obj)
return (
attr,
[key.key.key_id for key in mechanisms_to_keys.values()],
)Add fixtures and tests for additional key types: - ec:secp384r1 (key_e384) - ec:secp521r1 (key_e521) - rsa:4096 (key_r4096) New fixtures in keys.py: - Three managed key fixtures with session-unique key_id suffixes - attribute_allof_with_extended_mechanisms fixture that creates an ALL_OF attribute with all three mechanisms and maps them to the test client for encryption New test in test_abac.py: - test_key_mapping_extended_mechanisms verifies encryption and decryption with all three extended mechanisms, validating that the manifest contains the correct key IDs and KAS URLs
Add fixtures and tests for additional key types: - ec:secp384r1 (key_e384) - ec:secp521r1 (key_e521) - rsa:4096 (key_r4096) New fixtures in keys.py: - Three managed key fixtures with session-unique key_id suffixes - attribute_allof_with_extended_mechanisms fixture that creates an ALL_OF attribute with all three mechanisms and maps them to the test client for encryption New test in test_abac.py: - test_key_mapping_extended_mechanisms verifies encryption and decryption with all three extended mechanisms, validating that the manifest contains the correct key IDs and KAS URLs
75165b9 to
ba15344
Compare
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|



Add fixtures and tests for additional key types:
New fixtures in keys.py:
New test in test_abac.py: