-
Notifications
You must be signed in to change notification settings - Fork 2
fix: async httpx TLS pinning via wrap_bio interception #60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,16 +96,18 @@ def ground_truth(self) -> Optional[GroundTruth]: | |
| """Returns the last verified enclave state""" | ||
| return self._ground_truth | ||
|
|
||
| def make_secure_http_client(self) -> httpx.Client: | ||
| """ | ||
| Build an httpx.Client that pins the enclave's TLS cert | ||
| """ | ||
| expected_fp = self.verify().public_key | ||
| wrap_socket = self._create_socket_wrapper(expected_fp) | ||
|
|
||
| ctx = ssl.create_default_context() | ||
| ctx.wrap_socket = wrap_socket | ||
| return httpx.Client(verify=ctx, follow_redirects=True) | ||
| @staticmethod | ||
| def _verify_peer_fingerprint(cert_binary: Optional[bytes], expected_fp: str) -> None: | ||
| """Verify that a certificate's public key fingerprint matches the expected value.""" | ||
| if not cert_binary: | ||
| raise ValueError("No certificate found") | ||
| cert = cryptography.x509.load_der_x509_certificate(cert_binary) | ||
| pub_der = cert.public_key().public_bytes( | ||
| Encoding.DER, PublicFormat.SubjectPublicKeyInfo | ||
| ) | ||
| pk_fp = hashlib.sha256(pub_der).hexdigest() | ||
| if pk_fp != expected_fp: | ||
| raise ValueError(f"Certificate fingerprint mismatch: expected {expected_fp}, got {pk_fp}") | ||
|
|
||
| def _create_socket_wrapper(self, expected_fp: str): | ||
| """ | ||
|
|
@@ -114,28 +116,46 @@ def _create_socket_wrapper(self, expected_fp: str): | |
| """ | ||
| def wrap_socket(*args, **kwargs) -> ssl.SSLSocket: | ||
| sock = ssl.create_default_context().wrap_socket(*args, **kwargs) | ||
| cert_binary = sock.getpeercert(binary_form=True) | ||
| if not cert_binary: | ||
| raise ValueError("No certificate found") | ||
| cert = cryptography.x509.load_der_x509_certificate(cert_binary) | ||
| pub_der = cert.public_key().public_bytes( | ||
| Encoding.DER, PublicFormat.SubjectPublicKeyInfo | ||
| SecureClient._verify_peer_fingerprint( | ||
| sock.getpeercert(binary_form=True), expected_fp | ||
| ) | ||
| pk_fp = hashlib.sha256(pub_der).hexdigest() | ||
| if pk_fp != expected_fp: | ||
| raise ValueError(f"Certificate fingerprint mismatch: expected {expected_fp}, got {pk_fp}") | ||
| return sock | ||
| return wrap_socket | ||
|
|
||
| def make_secure_async_http_client(self) -> httpx.AsyncClient: | ||
| def make_secure_http_client(self) -> httpx.Client: | ||
| """ | ||
| Build an httpx.AsyncClient that pins the enclave's TLS cert. | ||
| Build an httpx.Client that pins the enclave's TLS cert | ||
| """ | ||
| expected_fp = self.verify().public_key | ||
| wrap_socket = self._create_socket_wrapper(expected_fp) | ||
|
|
||
| ctx = ssl.create_default_context() | ||
| ctx.wrap_socket = wrap_socket | ||
| return httpx.Client(verify=ctx, follow_redirects=True) | ||
|
|
||
| def make_secure_async_http_client(self) -> httpx.AsyncClient: | ||
| """ | ||
| Build an httpx.AsyncClient that pins the enclave's TLS cert | ||
| """ | ||
| expected_fp = self.verify().public_key | ||
|
|
||
| ctx = ssl.create_default_context() | ||
| original_wrap_bio = ctx.wrap_bio | ||
|
|
||
| def pinned_wrap_bio(*args, **kwargs): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Custom agent: Check System Design and Architectural Patterns The async TLS pinning logic is inlined with deeply nested closures, breaking the separation-of-concerns pattern established by the sync path's Prompt for AI agents |
||
| ssl_object = original_wrap_bio(*args, **kwargs) | ||
| original_do_handshake = ssl_object.do_handshake | ||
|
|
||
| def checked_do_handshake(): | ||
| result = original_do_handshake() | ||
| cert_binary = ssl_object.getpeercert(binary_form=True) | ||
| SecureClient._verify_peer_fingerprint(cert_binary, expected_fp) | ||
| return result | ||
|
|
||
| ssl_object.do_handshake = checked_do_handshake | ||
| return ssl_object | ||
|
|
||
| ctx.wrap_bio = pinned_wrap_bio | ||
| return httpx.AsyncClient(verify=ctx, follow_redirects=True) | ||
|
|
||
| def verify(self) -> GroundTruth: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: The
TLSBoundHTTPSHandler._get_connectionmethod still duplicates the fingerprint verification logic that_verify_peer_fingerprintwas extracted to centralize. Consider refactoringTLSBoundHTTPSHandlerto callSecureClient._verify_peer_fingerprint(cert_binary, self.expected_pubkey)(or promote the helper to a module-level function) so there's a single source of truth for cert pinning verification.Prompt for AI agents