From 4c8646954760a555233953179c276bd2aa4257f1 Mon Sep 17 00:00:00 2001 From: Imran Siddique Date: Fri, 26 Jun 2026 16:32:29 -0700 Subject: [PATCH] fix(verify): TPM qualifying_data check uses the implemented key thumbprint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TPM verifier checked qualifying_data == SHA-256(pubkey || session_id), the old nonce formula, while the TPM provider commits the §3.3 nonce's first 32 bytes (the RFC 7638 JWK thumbprint of the key). It was also passed cnf.jwk.x (base64url) into bytes.fromhex(), so the check always errored. Now the dispatcher re-derives the thumbprint from cnf.jwk.x and the verifier compares it (constant-time) to the quote's qualifying_data, matching the generic CRYPTO-001 key binding and the SEV-SNP/TDX paths. - verify_tpm_measurement: replace tee_public_key_hex/session_id with expected_qualifying_data: bytes - verify.py dispatcher computes expected_qd via _jwk_thumbprint_sha256(cnf.jwk.x) - claim-hw-attestation experiment updated to the new signature - tests: rewrite qualifying_data unit tests + add end-to-end verify_trace_claim tests for match and key-substitution rejection - spec: TPM quote commits nonce[:32] (the thumbprint) Verified: ruff + mypy clean, 756 unit tests pass (3 skipped). --- docs/spec/attestation.md | 2 +- experiments/claim-hw-attestation/run.py | 4 +- src/cmcp_verify/tpm.py | 42 +++++++++------------ src/cmcp_verify/verify.py | 8 +++- tests/unit/test_tpm_verify.py | 50 ++++++++++++++++++------- 5 files changed, 63 insertions(+), 43 deletions(-) diff --git a/docs/spec/attestation.md b/docs/spec/attestation.md index b097000..61ed1db 100644 --- a/docs/spec/attestation.md +++ b/docs/spec/attestation.md @@ -55,7 +55,7 @@ measurement = SHA-256(PCR0 || PCR1 || PCR2 || PCR3 || PCR4 || PCR5 || PCR6 || PC Each PCR value is the raw 32-byte SHA-256 digest read from the TPM. Concatenation is in bank index order (0 through 7), no separators. The result is a 32-byte SHA-256 digest encoded as lowercase hex. The PCR bank used is SHA-256. If the platform only offers a SHA-1 bank, the runtime logs a warning and uses SHA-1 PCR values zero-extended to 32 bytes before hashing; this is noted in `attestation_report.measurement_note: "sha1-bank-fallback"`. -Quote generation: the gateway calls `TPM2_Quote` with `qualifying_data` set to the §3.3 nonce (`JWK_thumbprint(tee_public_key) || random_salt`). The quote and its signature are stored in `attestation_report.raw_evidence` (base64-encoded) for verifier use. +Quote generation: the gateway calls `TPM2_Quote` with `qualifying_data` set to the first 32 bytes of the §3.3 nonce — the `JWK_thumbprint(tee_public_key)` — because TPM `qualifying_data` carries a single digest. A verifier re-derives the thumbprint from `cnf.jwk.x` and checks it against the quote's `qualifying_data`. The quote and its signature are stored in `attestation_report.raw_evidence` (base64-encoded) for verifier use. #### SEV-SNP (High Assurance) diff --git a/experiments/claim-hw-attestation/run.py b/experiments/claim-hw-attestation/run.py index 010584b..248bd2d 100644 --- a/experiments/claim-hw-attestation/run.py +++ b/experiments/claim-hw-attestation/run.py @@ -160,13 +160,13 @@ def _verify_raw_evidence(report: AttestationReport, signing_key: SigningKey, ses report_data_hex=report.report_data, ) if name == "tpm": + from cmcp_runtime.tee.base import jwk_thumbprint from cmcp_verify.tpm import verify_tpm_measurement return verify_tpm_measurement( measurement=report.measurement, raw_evidence=report.raw_evidence, - tee_public_key_hex=signing_key.public_key_hex, - session_id=session_id, + expected_qualifying_data=jwk_thumbprint(signing_key.public_key_bytes), ) return None diff --git a/src/cmcp_verify/tpm.py b/src/cmcp_verify/tpm.py index 6d5d77a..15ca282 100644 --- a/src/cmcp_verify/tpm.py +++ b/src/cmcp_verify/tpm.py @@ -2,7 +2,7 @@ from __future__ import annotations -import hashlib +import hmac import struct from dataclasses import dataclass, field from typing import Any @@ -24,8 +24,7 @@ class TPMVerificationResult: def verify_tpm_measurement( measurement: str, raw_evidence: bytes | None, - tee_public_key_hex: str | None = None, - session_id: str | None = None, + expected_qualifying_data: bytes | None = None, ) -> TPMVerificationResult: """ Verify TPM attestation from a TRACE Claim. @@ -34,7 +33,11 @@ def verify_tpm_measurement( - measurement field format: must start with "sha256:" followed by 64 hex chars What requires raw_evidence (TPM2B_ATTEST): - - qualifying_data = SHA-256(tee_public_key || session_id) matches quote + - the quote's qualifying_data equals expected_qualifying_data. The gateway + commits the attestation nonce's first 32 bytes -- the RFC 7638 JWK Thumbprint + of the TEE public key (docs/spec/attestation.md §3.3) -- as the TPM2_Quote + qualifying_data. The caller re-derives that thumbprint from cnf.jwk and passes + it here, so a key substituted after attestation is detected. - PCR digest in quote matches measurement field EK cert chain validation: always marked as unverified_fields (requires @@ -63,13 +66,11 @@ def verify_tpm_measurement( if raw_evidence is not None: parse_ok, parse_details = _parse_tpm2b_attest( raw_evidence, - measurement=measurement, - tee_public_key_hex=tee_public_key_hex, - session_id=session_id, + expected_qualifying_data=expected_qualifying_data, ) if parse_ok: verified_fields.append("pcr_format") - if tee_public_key_hex and session_id: + if expected_qualifying_data is not None: qd_verified = parse_details.get("qualifying_data_verified", False) if qd_verified: verified_fields.append("qualifying_data") @@ -80,7 +81,7 @@ def verify_tpm_measurement( ) else: unverified_fields.append("qualifying_data") - details["qualifying_data_error"] = "tee_public_key_hex or session_id not provided" + details["qualifying_data_error"] = "expected_qualifying_data not provided" else: unverified_fields.append("pcr_format") unverified_fields.append("qualifying_data") @@ -131,12 +132,10 @@ def _valid_measurement(measurement: str) -> bool: def _parse_tpm2b_attest( data: bytes, *, - measurement: str, - tee_public_key_hex: str | None, - session_id: str | None, + expected_qualifying_data: bytes | None, ) -> tuple[bool, dict[str, Any]]: """ - Parse a TPM2B_ATTEST blob and verify qualifying_data if keys are provided. + Parse a TPM2B_ATTEST blob and verify qualifying_data if an expected value is given. TPM2B_ATTEST layout: [0:2] size (uint16 big-endian) - size of the following TPMS_ATTEST @@ -190,17 +189,12 @@ def _parse_tpm2b_attest( result: dict[str, Any] = {"qualifying_data_verified": False} - if tee_public_key_hex and session_id: - try: - expected_qd = hashlib.sha256( - bytes.fromhex(tee_public_key_hex) + session_id.encode() - ).digest() - if qualifying_data == expected_qd: - result["qualifying_data_verified"] = True - else: - result["qualifying_data_error"] = "qualifying_data hash mismatch" - except ValueError as exc: - result["qualifying_data_error"] = f"cannot decode tee_public_key_hex: {exc}" + if expected_qualifying_data is not None: + # hmac.compare_digest for constant-time comparison of the committed nonce + if hmac.compare_digest(qualifying_data, expected_qualifying_data): + result["qualifying_data_verified"] = True + else: + result["qualifying_data_error"] = "qualifying_data does not match expected key thumbprint" return True, result diff --git a/src/cmcp_verify/verify.py b/src/cmcp_verify/verify.py index 4fb1df6..1b422c7 100644 --- a/src/cmcp_verify/verify.py +++ b/src/cmcp_verify/verify.py @@ -654,11 +654,15 @@ def verify_trace_claim( raw_ev = _runtime.get("raw_evidence") raw_bytes = base64.b64decode(raw_ev) if raw_ev else None + # The TPM quote commits the attestation nonce's first 32 bytes -- the RFC 7638 + # JWK Thumbprint of the TEE key -- as qualifying_data (§3.3). Re-derive it from + # cnf.jwk.x so a substituted key is detected. + _tpm_jwk_x = claim_json.get("trace", {}).get("cnf", {}).get("jwk", {}).get("x") + _expected_qd = _jwk_thumbprint_sha256(_tpm_jwk_x) if _tpm_jwk_x else None tpm_result = verify_tpm_measurement( measurement=_runtime.get("measurement", ""), raw_evidence=raw_bytes, - tee_public_key_hex=claim_json.get("trace", {}).get("cnf", {}).get("jwk", {}).get("x"), - session_id=claim_json.get("gateway", {}).get("session_id"), + expected_qualifying_data=_expected_qd, ) if tpm_result.verified: verified.append("hardware_attestation") diff --git a/tests/unit/test_tpm_verify.py b/tests/unit/test_tpm_verify.py index c1a3f02..df31417 100644 --- a/tests/unit/test_tpm_verify.py +++ b/tests/unit/test_tpm_verify.py @@ -3,7 +3,6 @@ from __future__ import annotations import base64 -import hashlib import struct from datetime import UTC, datetime @@ -147,43 +146,38 @@ def test_raw_evidence_empty_fails_gracefully() -> None: def test_qualifying_data_verified_when_correct() -> None: - pub_key_hex = "aa" * 32 - session = "test-session-123" - expected_qd = hashlib.sha256(bytes.fromhex(pub_key_hex) + session.encode()).digest() + """qualifying_data matching the expected key thumbprint verifies.""" + expected_qd = b"\x5a" * 32 # stands in for JWK_thumbprint(tee_public_key) blob = _make_tpm2b_attest(qualifying_data=expected_qd) result = verify_tpm_measurement( measurement=VALID_MEASUREMENT, raw_evidence=blob, - tee_public_key_hex=pub_key_hex, - session_id=session, + expected_qualifying_data=expected_qd, ) assert "qualifying_data" in result.verified_fields assert "qualifying_data" not in result.unverified_fields def test_qualifying_data_mismatch_is_unverified() -> None: - pub_key_hex = "aa" * 32 - session = "test-session-123" + """A quote committing a different value than the expected thumbprint is rejected.""" blob = _make_tpm2b_attest(qualifying_data=b"\xff" * 32) result = verify_tpm_measurement( measurement=VALID_MEASUREMENT, raw_evidence=blob, - tee_public_key_hex=pub_key_hex, - session_id=session, + expected_qualifying_data=b"\x5a" * 32, ) assert "qualifying_data" in result.unverified_fields assert "qualifying_data" not in result.verified_fields -def test_no_keys_qualifying_data_unverified() -> None: +def test_no_expected_qualifying_data_unverified() -> None: blob = _make_tpm2b_attest() result = verify_tpm_measurement( measurement=VALID_MEASUREMENT, raw_evidence=blob, - tee_public_key_hex=None, - session_id=None, + expected_qualifying_data=None, ) assert "qualifying_data" in result.unverified_fields @@ -195,6 +189,7 @@ def _make_tpm2_claim( measurement: str = VALID_MEASUREMENT, firmware_version: str = "2.0-production", raw_evidence_b64: str | None = None, + key: SigningKey | None = None, ) -> dict: """Build a signed claim with tpm2 platform. @@ -202,7 +197,7 @@ def _make_tpm2_claim( after signing, since AttestationReportInfo does not carry those fields and verify_trace_claim reads them from the raw dict. """ - key = SigningKey() + key = key or SigningKey() chain = AuditChain("tpm-session") # Use a valid sha256 measurement for claim generation so Pydantic accepts it @@ -323,3 +318,30 @@ def test_tpm2_with_valid_raw_evidence_parses() -> None: result = verify_trace_claim(claim_dict, _approved()) # pcr_format should be verified since magic is correct assert "pcr_format" in result.verified_fields + + +def test_tpm2_qualifying_data_verifies_with_key_thumbprint() -> None: + """End to end: a quote committing the key's JWK thumbprint verifies through the claim path. + + Regression test for the verifier bug where qualifying_data was checked against + SHA-256(pubkey||session_id) instead of the implemented JWK thumbprint (§3.3). + """ + from cmcp_runtime.tee.base import jwk_thumbprint + + key = SigningKey() + blob = _make_tpm2b_attest(qualifying_data=jwk_thumbprint(key.public_key_bytes)) + claim_dict = _make_tpm2_claim(raw_evidence_b64=base64.b64encode(blob).decode(), key=key) + result = verify_trace_claim(claim_dict, _approved()) + assert "qualifying_data" in result.verified_fields + assert "qualifying_data" not in result.unverified_fields + + +def test_tpm2_qualifying_data_rejected_on_key_substitution() -> None: + """A quote bound to a different key fails the qualifying_data check.""" + from cmcp_runtime.tee.base import jwk_thumbprint + + other_key = SigningKey() + blob = _make_tpm2b_attest(qualifying_data=jwk_thumbprint(other_key.public_key_bytes)) + claim_dict = _make_tpm2_claim(raw_evidence_b64=base64.b64encode(blob).decode(), key=SigningKey()) + result = verify_trace_claim(claim_dict, _approved()) + assert "qualifying_data" in result.unverified_fields