Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 208 additions & 0 deletions xtest/fixtures/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,214 @@ def managed_key_km2_ec(
return key


@pytest.fixture(scope="module")
def key_e256(
otdfctl: OpentdfCommandLineTool,
kas_entry_km2: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create EC secp256r1 managed key on km2.

Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip("Key management feature is not enabled")

key_id = f"e256-{_key_id_suffix(root_key)}"
existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2)
key = next((k for k in existing_keys if k.key.key_id == key_id), None)
if key is None:
key = otdfctl.kas_registry_create_key(
kas_entry_km2,
key_id=key_id,
mode="local",
algorithm="ec:secp256r1",
wrapping_key=root_key,
wrapping_key_id="root",
)
return key


@pytest.fixture(scope="module")
def key_e384(
otdfctl: OpentdfCommandLineTool,
kas_entry_km2: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create EC secp384r1 managed key on km2

Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip("Key management feature is not enabled")

key_id = f"e384-{_key_id_suffix(root_key)}"
existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2)
key = next((k for k in existing_keys if k.key.key_id == key_id), None)
if key is None:
key = otdfctl.kas_registry_create_key(
kas_entry_km2,
key_id=key_id,
mode="local",
algorithm="ec:secp384r1",
wrapping_key=root_key,
wrapping_key_id="root",
)
return key


@pytest.fixture(scope="module")
def key_e521(
otdfctl: OpentdfCommandLineTool,
kas_entry_km2: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create EC secp521r1 managed key on km2.

Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip("Key management feature is not enabled")

key_id = f"e521-{_key_id_suffix(root_key)}"
existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km2)
key = next((k for k in existing_keys if k.key.key_id == key_id), None)
if key is None:
key = otdfctl.kas_registry_create_key(
kas_entry_km2,
key_id=key_id,
mode="local",
algorithm="ec:secp521r1",
wrapping_key=root_key,
wrapping_key_id="root",
)
return key


@pytest.fixture(scope="module")
def key_r2048(
otdfctl: OpentdfCommandLineTool,
kas_entry_km1: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create RSA 2048 managed key on km1.

Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip("Key management feature is not enabled")

key_id = f"r2048-{_key_id_suffix(root_key)}"
existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1)
key = next((k for k in existing_keys if k.key.key_id == key_id), None)
if key is None:
key = otdfctl.kas_registry_create_key(
kas_entry_km1,
key_id=key_id,
mode="local",
algorithm="rsa:2048",
wrapping_key=root_key,
wrapping_key_id="root",
)
return key


@pytest.fixture(scope="module")
def key_r4096(
otdfctl: OpentdfCommandLineTool,
kas_entry_km1: abac.KasEntry,
root_key: str,
) -> abac.KasKey:
"""Get or create RSA 4096 managed key on km1.

Key ID includes a hash of the root key to ensure that if the root key changes,
a new key will be created instead of reusing an incompatible one.
"""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip("Key management feature is not enabled")

key_id = f"r4096-{_key_id_suffix(root_key)}"
existing_keys = otdfctl.kas_registry_keys_list(kas_entry_km1)
key = next((k for k in existing_keys if k.key.key_id == key_id), None)
if key is None:
key = otdfctl.kas_registry_create_key(
kas_entry_km1,
key_id=key_id,
mode="local",
algorithm="rsa:4096",
wrapping_key=root_key,
wrapping_key_id="root",
)
return key
Comment on lines +130 to +276
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new key fixtures (key_e256, key_e384, key_e521, key_r2048, key_r4096) contain a lot of duplicated code for getting or creating a key. To improve maintainability and reduce redundancy, this logic can be extracted into a single helper function. The fixtures can then call this helper with the specific parameters for each key type.

def _get_or_create_key(
    otdfctl: OpentdfCommandLineTool,
    kas_entry: abac.KasEntry,
    root_key: str,
    key_prefix: str,
    algorithm: str,
) -> abac.KasKey:
    """Helper to get or create a managed key.

    Key ID includes a hash of the root key to ensure that if the root key changes,
    a new key will be created instead of reusing an incompatible one.
    """
    pfs = tdfs.PlatformFeatureSet()
    if "key_management" not in pfs.features:
        pytest.skip("Key management feature is not enabled")

    key_id = f"{key_prefix}-{_key_id_suffix(root_key)}"
    existing_keys = otdfctl.kas_registry_keys_list(kas_entry)
    key = next((k for k in existing_keys if k.key.key_id == key_id), None)
    if key is None:
        key = otdfctl.kas_registry_create_key(
            kas_entry,
            key_id=key_id,
            mode="local",
            algorithm=algorithm,
            wrapping_key=root_key,
            wrapping_key_id="root",
        )
    return key


@pytest.fixture(scope="module")
def key_e256(
    otdfctl: OpentdfCommandLineTool,
    kas_entry_km2: abac.KasEntry,
    root_key: str,
) -> abac.KasKey:
    """Get or create EC secp256r1 managed key on km2.

    Key ID includes a hash of the root key to ensure that if the root key changes,
    a new key will be created instead of reusing an incompatible one.
    """
    return _get_or_create_key(
        otdfctl, kas_entry_km2, root_key, "e256", "ec:secp256r1"
    )


@pytest.fixture(scope="module")
def key_e384(
    otdfctl: OpentdfCommandLineTool,
    kas_entry_km2: abac.KasEntry,
    root_key: str,
) -> abac.KasKey:
    """Get or create EC secp384r1 managed key on km2.

    Key ID includes a hash of the root key to ensure that if the root key changes,
    a new key will be created instead of reusing an incompatible one.
    """
    return _get_or_create_key(
        otdfctl, kas_entry_km2, root_key, "e384", "ec:secp384r1"
    )


@pytest.fixture(scope="module")
def key_e521(
    otdfctl: OpentdfCommandLineTool,
    kas_entry_km2: abac.KasEntry,
    root_key: str,
) -> abac.KasKey:
    """Get or create EC secp521r1 managed key on km2.

    Key ID includes a hash of the root key to ensure that if the root key changes,
    a new key will be created instead of reusing an incompatible one.
    """
    return _get_or_create_key(
        otdfctl, kas_entry_km2, root_key, "e521", "ec:secp521r1"
    )


@pytest.fixture(scope="module")
def key_r2048(
    otdfctl: OpentdfCommandLineTool,
    kas_entry_km1: abac.KasEntry,
    root_key: str,
) -> abac.KasKey:
    """Get or create RSA 2048 managed key on km1.

    Key ID includes a hash of the root key to ensure that if the root key changes,
    a new key will be created instead of reusing an incompatible one.
    """
    return _get_or_create_key(otdfctl, kas_entry_km1, root_key, "r2048", "rsa:2048")


@pytest.fixture(scope="module")
def key_r4096(
    otdfctl: OpentdfCommandLineTool,
    kas_entry_km1: abac.KasEntry,
    root_key: str,
) -> abac.KasKey:
    """Get or create RSA 4096 managed key on km1.

    Key ID includes a hash of the root key to ensure that if the root key changes,
    a new key will be created instead of reusing an incompatible one.
    """
    return _get_or_create_key(otdfctl, kas_entry_km1, root_key, "r4096", "rsa:4096")



@pytest.fixture(scope="module")
def attribute_allof_with_extended_mechanisms(
otdfctl: OpentdfCommandLineTool,
key_e256: abac.KasKey,
key_e384: abac.KasKey,
key_e521: abac.KasKey,
key_r2048: abac.KasKey,
key_r4096: abac.KasKey,
otdf_client_scs: abac.SubjectConditionSet,
temporary_namespace: abac.Namespace,
) -> tuple[abac.Attribute, list[str]]:
"""Create an ALL_OF attribute and assign extended mechanism keys to it.

- Uses ec:secp256r1, ec:secp384r1, ec:secp521r1, and rsa:2048, rsa:4096 keys
- Reuses existing managed keys
- Assigns all keys to attribute values (value-level assignment)
- Maps all attribute values to the client SCS
"""
pfs = tdfs.PlatformFeatureSet()
if "key_management" not in pfs.features:
pytest.skip(
"Key management feature is not enabled; skipping key assignment fixture"
)

# Create attribute with three values under ALL_OF
attr = otdfctl.attribute_create(
temporary_namespace,
"mechanism-select",
abac.AttributeRule.ALL_OF,
["ec-secp256r1", "ec-secp384r1", "ec-secp521r1", "rsa-2048", "rsa-4096"],
)
assert attr.values and len(attr.values) == 5
v_e256, v_e384, v_e521, v_r2048, v_r4096 = attr.values
assert v_e256.value == "ec-secp256r1"
assert v_e384.value == "ec-secp384r1"
assert v_e521.value == "ec-secp521r1"
assert v_r2048.value == "rsa-2048"
assert v_r4096.value == "rsa-4096"

# Ensure client has access to all values and assign keys
keys = (key_e256, key_e384, key_e521, key_r2048, key_r4096)
for key, value in zip(keys, attr.values):
sm = otdfctl.scs_map(otdf_client_scs, value)
assert sm.attribute_value.value == value.value
otdfctl.key_assign_value(key, value)

return (
attr,
[
key_e256.key.key_id,
key_e384.key.key_id,
key_e521.key.key_id,
key_r2048.key.key_id,
key_r4096.key.key_id,
],
)


@pytest.fixture(scope="module")
def attribute_allof_with_two_managed_keys(
otdfctl: OpentdfCommandLineTool,
Expand Down
Loading
Loading