feat(xtest): add extended crypto mechanism tests and key fixtures#409
feat(xtest): add extended crypto mechanism tests and key fixtures#409dmihalcik-virtru wants to merge 7 commits intoopentdf:mainfrom
Conversation
Bump pyright from 1.1.380 to 1.1.408 and alphabetize dev dependency ordering in pyproject.toml. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: David Mihalcik <dmihalcik@virtru.com>
Add key fixtures for EC secp384r1/521r1 and RSA 4096. Add ABAC tests for multi-mechanism encryption with extended key types. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: David Mihalcik <dmihalcik@virtru.com>
Summary of ChangesHello @dmihalcik-virtru, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces new tests for extended cryptographic mechanisms and adds corresponding key fixtures. The changes are well-structured, but there are opportunities to improve code maintainability by reducing duplication in both the new fixtures and tests. I've also identified a bug in one of the new tests where the filtering logic is incorrect, and some docstrings that need correction to accurately reflect the test's purpose. My review includes suggestions to address these points.
| def key_e256( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km2: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create EC secp256r1 managed key on km2. | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"e256-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km2, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="ec:secp256r1", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def key_e384( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km2: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create EC secp384r1 managed key on km2 | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"e384-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km2, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="ec:secp384r1", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def key_e521( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km2: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create EC secp521r1 managed key on km2. | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"e521-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km2, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="ec:secp521r1", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def key_r2048( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km1: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create RSA 2048 managed key on km1. | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"r2048-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km1, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="rsa:2048", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def key_r4096( | ||
| otdfctl: OpentdfCommandLineTool, | ||
| kas_entry_km1: abac.KasEntry, | ||
| root_key: str, | ||
| ) -> abac.KasKey: | ||
| """Get or create RSA 4096 managed key on km1. | ||
|
|
||
| Key ID includes a hash of the root key to ensure that if the root key changes, | ||
| a new key will be created instead of reusing an incompatible one. | ||
| """ | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| if "key_management" not in pfs.features: | ||
| pytest.skip("Key management feature is not enabled") | ||
|
|
||
| key_id = f"r4096-{_key_id_suffix(root_key)}" | ||
| existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1) | ||
| key = next((k for k in existing_keys if k.key.key_id == key_id), None) | ||
| if key is None: | ||
| key = otdfctl.kas_registry_create_key( | ||
| kas_entry_km1, | ||
| key_id=key_id, | ||
| mode="local", | ||
| algorithm="rsa:4096", | ||
| wrapping_key=root_key, | ||
| wrapping_key_id="root", | ||
| ) | ||
| return key |
There was a problem hiding this comment.
The new key fixtures (key_e256, key_e384, key_e521, key_r2048, key_r4096) contain a lot of duplicated code for getting or creating a key. To improve maintainability and reduce redundancy, this logic can be extracted into a single helper function. The fixtures can then call this helper with the specific parameters for each key type.
def _get_or_create_key(
otdfctl: OpentdfCommandLineTool,
kas_entry: abac.KasEntry,
root_key: str,
key_prefix: str,
algorithm: str,
) -> abac.KasKey:
"""Helper to get or create a managed key.
Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip("Key management feature is not enabled")
key_id = f"{key_prefix}-{_key_id_suffix(root_key)}"
existing_keys = otdfctl.kas_registry_keys_list(kas_entry)
key = next((k for k in existing_keys if k.key.key_id == key_id), None)
if key is None:
key = otdfctl.kas_registry_create_key(
kas_entry,
key_id=key_id,
mode="local",
algorithm=algorithm,
wrapping_key=root_key,
wrapping_key_id="root",
)
return key
@pytest.fixture(scope="module")
def key_e256(
otdfctl: OpentdfCommandLineTool,
kas_entry_km2: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create EC secp256r1 managed key on km2.
Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
return _get_or_create_key(
otdfctl, kas_entry_km2, root_key, "e256", "ec:secp256r1"
)
@pytest.fixture(scope="module")
def key_e384(
otdfctl: OpentdfCommandLineTool,
kas_entry_km2: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create EC secp384r1 managed key on km2.
Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
return _get_or_create_key(
otdfctl, kas_entry_km2, root_key, "e384", "ec:secp384r1"
)
@pytest.fixture(scope="module")
def key_e521(
otdfctl: OpentdfCommandLineTool,
kas_entry_km2: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create EC secp521r1 managed key on km2.
Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
return _get_or_create_key(
otdfctl, kas_entry_km2, root_key, "e521", "ec:secp521r1"
)
@pytest.fixture(scope="module")
def key_r2048(
otdfctl: OpentdfCommandLineTool,
kas_entry_km1: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create RSA 2048 managed key on km1.
Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
return _get_or_create_key(otdfctl, kas_entry_km1, root_key, "r2048", "rsa:2048")
@pytest.fixture(scope="module")
def key_r4096(
otdfctl: OpentdfCommandLineTool,
kas_entry_km1: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create RSA 4096 managed key on km1.
Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
return _get_or_create_key(otdfctl, kas_entry_km1, root_key, "r4096", "rsa:4096")| if not in_focus & {encrypt_sdk, decrypt_sdk}: | ||
| pytest.skip("Not in focus") | ||
| tdfs.skip_if_unsupported(encrypt_sdk, "key_management") | ||
| tdfs.skip_if_unsupported(encrypt_sdk, "autoconfigure") | ||
| pfs = tdfs.PlatformFeatureSet() | ||
| tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) | ||
| tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) | ||
| skip_dspx1153(encrypt_sdk, decrypt_sdk) |
There was a problem hiding this comment.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|



Summary
attribute_allof_with_extended_mechanismsand related fixturestest_key_mapping_extended_mechanismsand related ABAC tests for multi-mechanism encryptionParent PRs
chore/xtest-dev-deps)Test plan
cd xtest && uv run ruff check . && uv run pyrightuv run pytest test_abac.py --sdks go -v --no-audit-logs🤖 Generated with Claude Code
Part of stacked PR series decomposing
chore/the-claudiest-day-tmux