From 4576e0635c3553f4854a3cebd31e89c72d7184a8 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:04:46 -0500 Subject: [PATCH 01/15] Add support for `pinned_ca_cert_hash` --- nanokvm/client.py | 53 +++++++++++++++++++++++++++-------------------- nanokvm/utils.py | 30 +++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 22 deletions(-) diff --git a/nanokvm/client.py b/nanokvm/client.py index 18058d5..a64402c 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -16,8 +16,8 @@ BodyPartReader, ClientResponse, ClientSession, + Fingerprint, MultipartReader, - TCPConnector, hdrs, ) from PIL import Image @@ -120,11 +120,13 @@ class NanoKVMClient: def __init__( self, url: str, + session: ClientSession | None = None, *, token: str | None = None, request_timeout: int = 10, verify_ssl: bool = True, ssl_ca_cert: str | None = None, + pinned_ca_cert_hash: str | None = None, use_password_obfuscation: bool | None = None, ) -> None: """ @@ -132,31 +134,40 @@ def __init__( Args: url: Base URL of the NanoKVM API (e.g., "https://kvm.local/api/") + session: aiohttp ClientSession to use for requests. token: Optional pre-existing authentication token request_timeout: Request timeout in seconds (default: 10) verify_ssl: Enable SSL certificate verification (default: True). Set to False to disable verification for self-signed certificates. ssl_ca_cert: Path to custom CA certificate bundle file for SSL verification. Useful for self-signed certificates or private CAs. + pinned_ca_cert_hash: SHA-256 fingerprint of the server's TLS certificate + as a hex string. When set, the client will verify the server's + certificate fingerprint instead of performing CA-based verification. + Use `async_fetch_remote_fingerprint()` to retrieve this value. use_password_obfuscation: Control password obfuscation mode (default: None). None = auto-detect (try obfuscated first, fall back to plain text). True = always use obfuscated passwords (older NanoKVM versions). False = always use plain text passwords (newer HTTPS-enabled versions). """ self.url = yarl.URL(url) - self._session: ClientSession | None = None + self._session: ClientSession | None = session + self._auto_close_session = session is None self._token = token self._request_timeout = request_timeout self._ws: aiohttp.ClientWebSocketResponse | None = None self._verify_ssl = verify_ssl self._ssl_ca_cert = ssl_ca_cert + self._pinned_ca_cert_hash = pinned_ca_cert_hash self._use_password_obfuscation = use_password_obfuscation + self._ssl_config: ssl.SSLContext | Fingerprint | bool | None = None - def _create_ssl_context(self) -> ssl.SSLContext | bool: + def _create_ssl_context(self) -> ssl.SSLContext | Fingerprint | bool: """ Create and configure SSL context based on initialization parameters. Returns: + Fingerprint: Certificate fingerprint pinning (when pinned_ca_cert_hash set) ssl.SSLContext: Configured SSL context for custom certificates True: Use default SSL verification (aiohttp default) False: Disable SSL verification @@ -166,6 +177,10 @@ def _create_ssl_context(self) -> ssl.SSLContext | bool: ssl.SSLError: If the CA certificate is invalid. """ + if self._pinned_ca_cert_hash: + _LOGGER.debug("Using certificate fingerprint pinning") + return Fingerprint(bytes.fromhex(self._pinned_ca_cert_hash)) + if not self._verify_ssl: _LOGGER.warning( "SSL verification is disabled. This is insecure and should only be " @@ -188,16 +203,10 @@ def token(self) -> str | None: async def __aenter__(self) -> NanoKVMClient: """Async context manager entry.""" + if self._session is None: + self._session = ClientSession() - ssl_config = await asyncio.to_thread(self._create_ssl_context) - connector = TCPConnector(ssl=ssl_config) - self._session = ClientSession(connector=connector) - - _LOGGER.debug( - "Created client session with SSL verification: %s", - "disabled" if ssl_config is False else "enabled", - ) - + self._ssl_config = await asyncio.to_thread(self._create_ssl_context) return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: @@ -206,8 +215,9 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: if self._ws is not None and not self._ws.closed: await self._ws.close() self._ws = None + # Close HTTP session - if self._session is not None: + if self._session is not None and self._auto_close_session: await self._session.close() self._session = None @@ -221,16 +231,15 @@ async def _request( **kwargs: Any, ) -> AsyncIterator[ClientResponse]: """Make an API request.""" - assert self._session is not None, ( - "Client session not initialized. " - "Use as context manager: 'async with NanoKVMClient(url) as client:'" - ) cookies = {} if authenticate: if not self._token: raise NanoKVMNotAuthenticatedError("Client is not authenticated") cookies["nano-kvm-token"] = self._token + assert self._session is not None + assert self._ssl_config is not None + async with self._session.request( method, self.url / path.lstrip("/"), @@ -240,6 +249,7 @@ async def _request( cookies=cookies, timeout=aiohttp.ClientTimeout(total=self._request_timeout), raise_for_status=True, + ssl=self._ssl_config, **kwargs, ) as response: yield response @@ -795,11 +805,6 @@ async def set_mouse_jiggler_state( async def _get_ws(self) -> aiohttp.ClientWebSocketResponse: """Get or create WebSocket connection for mouse events.""" if self._ws is None or self._ws.closed: - assert self._session is not None, ( - "Client session not initialized. " - "Use as context manager: 'async with NanoKVMClient(url) as client:'" - ) - if not self._token: raise NanoKVMNotAuthenticatedError("Client is not authenticated") @@ -807,9 +812,13 @@ async def _get_ws(self) -> aiohttp.ClientWebSocketResponse: scheme = "ws" if self.url.scheme == "http" else "wss" ws_url = self.url.with_scheme(scheme) / "ws" + assert self._session is not None + assert self._ssl_config is not None + self._ws = await self._session.ws_connect( str(ws_url), headers={"Cookie": f"nano-kvm-token={self._token}"}, + ssl=self._ssl_config, ) return self._ws diff --git a/nanokvm/utils.py b/nanokvm/utils.py index b879630..543dae1 100644 --- a/nanokvm/utils.py +++ b/nanokvm/utils.py @@ -1,6 +1,8 @@ +import asyncio import base64 import hashlib import os +import ssl import urllib.parse from cryptography.hazmat.primitives import padding @@ -44,6 +46,34 @@ def openssl_encrypt_aes256cbc_md5(plaintext: bytes, password: bytes) -> bytes: return b"Salted__" + salt + ciphertext +async def async_fetch_remote_fingerprint(url: str) -> str: + """Retrieve the SHA-256 fingerprint of the remote server's TLS certificate. + + Connects to the server with verification disabled to grab the raw certificate, + then returns its SHA-256 hash as an uppercase hex string. + + This is useful for establishing an initial trust-on-first-use pin with + `NanoKVMClient(url, pinned_ca_cert_hash=...)`. + """ + parsed_url = urllib.parse.urlparse(url) + hostname = parsed_url.hostname + port = parsed_url.port or 443 + + ssl_ctx = ssl.create_default_context() + ssl_ctx.check_hostname = False + ssl_ctx.verify_mode = ssl.CERT_NONE + + reader, writer = await asyncio.open_connection(hostname, port, ssl=ssl_ctx) + + try: + ssl_obj = writer.get_extra_info("ssl_object") + der_cert = ssl_obj.getpeercert(binary_form=True) + return hashlib.sha256(der_cert).hexdigest().upper() + finally: + writer.close() + await writer.wait_closed() + + def obfuscate_password(password: str) -> str: """Obfuscate a password.""" password_enc = openssl_encrypt_aes256cbc_md5( From 016eb6c031e2abb80d39a3c8cfa1ace4ff63ce9a Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:09:18 -0500 Subject: [PATCH 02/15] Fix pydantic deprecation warning --- nanokvm/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nanokvm/client.py b/nanokvm/client.py index a64402c..030ebd1 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -288,7 +288,7 @@ async def _api_request_json( async with self._request( method, path, - json=(data.dict() if data is not None else None), + json=(data.model_dump() if data is not None else None), **kwargs, ) as response: try: From 8adb04c5601b6a0c6676c107a1bdc9520daf3ed5 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:10:19 -0500 Subject: [PATCH 03/15] Make `session` a kwarg --- nanokvm/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nanokvm/client.py b/nanokvm/client.py index 030ebd1..5dea2ad 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -120,10 +120,10 @@ class NanoKVMClient: def __init__( self, url: str, - session: ClientSession | None = None, *, token: str | None = None, request_timeout: int = 10, + session: ClientSession | None = None, verify_ssl: bool = True, ssl_ca_cert: str | None = None, pinned_ca_cert_hash: str | None = None, From aea685f3eeaaf44d9528993ea83d80bdb2d4fef3 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:27:54 -0500 Subject: [PATCH 04/15] Test certificate pinning --- tests/test_certificate_pinning.py | 185 ++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 tests/test_certificate_pinning.py diff --git a/tests/test_certificate_pinning.py b/tests/test_certificate_pinning.py new file mode 100644 index 0000000..d990b98 --- /dev/null +++ b/tests/test_certificate_pinning.py @@ -0,0 +1,185 @@ +"""Integration test for TLS certificate pinning with a real HTTPS server.""" + +from collections.abc import AsyncGenerator +import datetime +import ipaddress +import json +import pathlib +import ssl + +import aiohttp +from aiohttp import web +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID +import pytest + +from nanokvm.client import NanoKVMClient +from nanokvm.utils import async_fetch_remote_fingerprint + + +def generate_nanokvm_cert() -> tuple[bytes, bytes]: + """Generate a self-signed certificate matching NanoKVM's cert.go parameters. + + RSA 2048, CN=localhost, SAN: localhost + 127.0.0.1 + ::1, valid 10 years, + KeyUsage: keyEncipherment | digitalSignature, ExtKeyUsage: serverAuth. + """ + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + + subject = issuer = x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, "localhost"), + ] + ) + + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.datetime.now(datetime.timezone.utc)) + .not_valid_after( + datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(days=365 * 10) + ) + .add_extension( + x509.SubjectAlternativeName( + [ + x509.DNSName("localhost"), + x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")), + x509.IPAddress(ipaddress.IPv6Address("::1")), + ] + ), + critical=False, + ) + .add_extension( + x509.KeyUsage( + digital_signature=True, + key_encipherment=True, + content_commitment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + .add_extension( + x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.SERVER_AUTH]), + critical=False, + ) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + .sign(key, hashes.SHA256()) + ) + + cert_pem = cert.public_bytes(serialization.Encoding.PEM) + key_pem = key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + + return cert_pem, key_pem + + +async def _handle_login(request: web.Request) -> web.Response: + body = await request.json() + + if body["username"] == "admin" and body["password"] == "test": + return web.Response( + text=json.dumps( + { + "code": 0, + "msg": "success", + "data": {"token": "fake-token-123"}, + } + ), + content_type="application/json", + ) + + return web.Response( + text=json.dumps( + { + "code": -2, + "msg": "invalid username or password", + "data": None, + } + ), + content_type="application/json", + ) + + +@pytest.fixture +async def nanokvm_https_server(tmp_path: pathlib.Path) -> AsyncGenerator[str, None]: + """Spin up a minimal HTTPS server mimicking a NanoKVM device.""" + cert_pem, key_pem = generate_nanokvm_cert() + + cert_file = tmp_path / "server.crt" + key_file = tmp_path / "server.key" + cert_file.write_bytes(cert_pem) + key_file.write_bytes(key_pem) + + ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_ctx.load_cert_chain(str(cert_file), str(key_file)) + + app = web.Application() + app.router.add_post("/api/auth/login", _handle_login) + + runner = web.AppRunner(app) + await runner.setup() + + try: + site = web.TCPSite(runner, "127.0.0.1", 0, ssl_context=ssl_ctx) + await site.start() + + yield site.name + "api/" + finally: + await runner.cleanup() + + +async def test_certificate_pinning(nanokvm_https_server: str) -> None: + """Test the full certificate pinning flow against a real HTTPS server. + + 1. Connecting with default SSL verification fails (self-signed cert). + 2. Fetch the server's certificate fingerprint. + 3. Connecting with the pinned fingerprint succeeds. + """ + url = nanokvm_https_server + + # Step 1: default SSL verification rejects the self-signed certificate + async with NanoKVMClient(url, use_password_obfuscation=False) as client: + with pytest.raises(aiohttp.ClientConnectorCertificateError): + await client.authenticate("admin", "test") + + # Step 2: fetch the remote certificate fingerprint + fingerprint = await async_fetch_remote_fingerprint(url) + assert len(fingerprint) == 64 # SHA-256 hex string + + # Step 3: pinned fingerprint allows the connection to succeed + async with NanoKVMClient( + url, + pinned_ca_cert_hash=fingerprint, + use_password_obfuscation=False, + ) as client: + await client.authenticate("admin", "test") + assert client.token == "fake-token-123" + + +async def test_certificate_pinning_wrong_hash(nanokvm_https_server: str) -> None: + """Test that a wrong pinned hash is rejected.""" + url = nanokvm_https_server + + async with NanoKVMClient( + url, + pinned_ca_cert_hash="AB" * 32, + use_password_obfuscation=False, + ) as client: + with pytest.raises(aiohttp.ServerFingerprintMismatch): + await client.authenticate("admin", "test") From 190f9af938336960df1555123ab268a807b61a97 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:28:02 -0500 Subject: [PATCH 05/15] Test new session behavior --- nanokvm/client.py | 6 ++++-- tests/test_client.py | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/nanokvm/client.py b/nanokvm/client.py index 5dea2ad..066dee7 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -217,8 +217,10 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self._ws = None # Close HTTP session - if self._session is not None and self._auto_close_session: - await self._session.close() + if self._session is not None: + if self._auto_close_session: + await self._session.close() + self._session = None @contextlib.asynccontextmanager diff --git a/tests/test_client.py b/tests/test_client.py index f8d2070..6b1921e 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,3 +1,4 @@ +from aiohttp import ClientSession from aioresponses import aioresponses import pytest @@ -79,3 +80,23 @@ async def test_client_context_manager() -> None: # After exiting context, session should be closed assert client._session is None + + +async def test_client_context_manager_external_session() -> None: + """Test that client properly deals with an external session.""" + async with ClientSession() as session: + # Both clients connect with the same external session + async with ( + NanoKVMClient("http://localhost:8888/api/", session=session) as client1, + NanoKVMClient("http://localhost:8888/api/", session=session) as client2, + ): + # Verify session is created + assert client1._session is session + assert client2._session is session + + # Both exit but the session is still open + assert not session.closed + assert client1._session is None + assert client2._session is None + + assert session.closed From 3dafad6e03a3eb85b4445a5cf66384ef4f5b5079 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:29:12 -0500 Subject: [PATCH 06/15] Update README --- README.md | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f8b4efb..58b8a97 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,25 @@ async with NanoKVMClient( await client.authenticate("username", "password") ``` -#### Option 2: Use custom CA certificate (recommended) +#### Option 2: Certificate pinning (recommended for self-signed) + +NanoKVM devices generate self-signed certificates for `localhost` with no CA to verify against. Certificate pinning verifies the server's certificate fingerprint directly instead of relying on CA-based trust. + +```python +from nanokvm.utils import async_fetch_remote_fingerprint + +# First, fetch the fingerprint (trust-on-first-use) +fingerprint = await async_fetch_remote_fingerprint("https://kvm.local/api/") + +# Then connect with the pinned fingerprint +async with NanoKVMClient( + "https://kvm.local/api/", + pinned_ca_cert_hash=fingerprint, +) as client: + await client.authenticate("username", "password") +``` + +#### Option 3: Use custom CA certificate ```python async with NanoKVMClient( From 3ba8effcaa62d24f94d0b30f85910654911c7a5f Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:29:23 -0500 Subject: [PATCH 07/15] Make `cryptography` a test-time dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 47cbc55..24e5c86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ exclude = ["tests", "tests.*"] testing = [ "tomli", "coverage[toml]", + "cryptography", "pytest", "pytest-xdist", "pytest-asyncio", From c333bb43fc18324fc2f6cd9593462cfec2c28f57 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:32:00 -0500 Subject: [PATCH 08/15] Rename `pinned_ca_cert_hash` to `ssl_fingerprint` --- README.md | 2 +- nanokvm/client.py | 12 ++++++------ nanokvm/utils.py | 2 +- tests/test_certificate_pinning.py | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 58b8a97..3b1b24d 100644 --- a/README.md +++ b/README.md @@ -118,7 +118,7 @@ fingerprint = await async_fetch_remote_fingerprint("https://kvm.local/api/") # Then connect with the pinned fingerprint async with NanoKVMClient( "https://kvm.local/api/", - pinned_ca_cert_hash=fingerprint, + ssl_fingerprint=fingerprint, ) as client: await client.authenticate("username", "password") ``` diff --git a/nanokvm/client.py b/nanokvm/client.py index 066dee7..19fc05c 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -126,7 +126,7 @@ def __init__( session: ClientSession | None = None, verify_ssl: bool = True, ssl_ca_cert: str | None = None, - pinned_ca_cert_hash: str | None = None, + ssl_fingerprint: str | None = None, use_password_obfuscation: bool | None = None, ) -> None: """ @@ -141,7 +141,7 @@ def __init__( Set to False to disable verification for self-signed certificates. ssl_ca_cert: Path to custom CA certificate bundle file for SSL verification. Useful for self-signed certificates or private CAs. - pinned_ca_cert_hash: SHA-256 fingerprint of the server's TLS certificate + ssl_fingerprint: SHA-256 fingerprint of the server's TLS certificate as a hex string. When set, the client will verify the server's certificate fingerprint instead of performing CA-based verification. Use `async_fetch_remote_fingerprint()` to retrieve this value. @@ -158,7 +158,7 @@ def __init__( self._ws: aiohttp.ClientWebSocketResponse | None = None self._verify_ssl = verify_ssl self._ssl_ca_cert = ssl_ca_cert - self._pinned_ca_cert_hash = pinned_ca_cert_hash + self._ssl_fingerprint = ssl_fingerprint self._use_password_obfuscation = use_password_obfuscation self._ssl_config: ssl.SSLContext | Fingerprint | bool | None = None @@ -167,7 +167,7 @@ def _create_ssl_context(self) -> ssl.SSLContext | Fingerprint | bool: Create and configure SSL context based on initialization parameters. Returns: - Fingerprint: Certificate fingerprint pinning (when pinned_ca_cert_hash set) + Fingerprint: Certificate fingerprint pinning (when ssl_fingerprint set) ssl.SSLContext: Configured SSL context for custom certificates True: Use default SSL verification (aiohttp default) False: Disable SSL verification @@ -177,9 +177,9 @@ def _create_ssl_context(self) -> ssl.SSLContext | Fingerprint | bool: ssl.SSLError: If the CA certificate is invalid. """ - if self._pinned_ca_cert_hash: + if self._ssl_fingerprint: _LOGGER.debug("Using certificate fingerprint pinning") - return Fingerprint(bytes.fromhex(self._pinned_ca_cert_hash)) + return Fingerprint(bytes.fromhex(self._ssl_fingerprint)) if not self._verify_ssl: _LOGGER.warning( diff --git a/nanokvm/utils.py b/nanokvm/utils.py index 543dae1..4047121 100644 --- a/nanokvm/utils.py +++ b/nanokvm/utils.py @@ -53,7 +53,7 @@ async def async_fetch_remote_fingerprint(url: str) -> str: then returns its SHA-256 hash as an uppercase hex string. This is useful for establishing an initial trust-on-first-use pin with - `NanoKVMClient(url, pinned_ca_cert_hash=...)`. + `NanoKVMClient(url, ssl_fingerprint=...)`. """ parsed_url = urllib.parse.urlparse(url) hostname = parsed_url.hostname diff --git a/tests/test_certificate_pinning.py b/tests/test_certificate_pinning.py index d990b98..2bf2acb 100644 --- a/tests/test_certificate_pinning.py +++ b/tests/test_certificate_pinning.py @@ -165,7 +165,7 @@ async def test_certificate_pinning(nanokvm_https_server: str) -> None: # Step 3: pinned fingerprint allows the connection to succeed async with NanoKVMClient( url, - pinned_ca_cert_hash=fingerprint, + ssl_fingerprint=fingerprint, use_password_obfuscation=False, ) as client: await client.authenticate("admin", "test") @@ -178,7 +178,7 @@ async def test_certificate_pinning_wrong_hash(nanokvm_https_server: str) -> None async with NanoKVMClient( url, - pinned_ca_cert_hash="AB" * 32, + ssl_fingerprint="AB" * 32, use_password_obfuscation=False, ) as client: with pytest.raises(aiohttp.ServerFingerprintMismatch): From 1b073d0c7566cca39de1581afd94710fb5e4891f Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:48:57 -0500 Subject: [PATCH 09/15] Fix tests --- tests/test_certificate_pinning.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_certificate_pinning.py b/tests/test_certificate_pinning.py index 2bf2acb..e7b481f 100644 --- a/tests/test_certificate_pinning.py +++ b/tests/test_certificate_pinning.py @@ -139,7 +139,8 @@ async def nanokvm_https_server(tmp_path: pathlib.Path) -> AsyncGenerator[str, No site = web.TCPSite(runner, "127.0.0.1", 0, ssl_context=ssl_ctx) await site.start() - yield site.name + "api/" + host, port = runner.addresses[0] + yield f"https://{host}:{port}/api/" finally: await runner.cleanup() From 51a5b1cffeef2f105256ad02031a365a73a0c695 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:54:27 -0500 Subject: [PATCH 10/15] Move `async_fetch_remote_fingerprint` to the end --- nanokvm/utils.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/nanokvm/utils.py b/nanokvm/utils.py index 4047121..d50e166 100644 --- a/nanokvm/utils.py +++ b/nanokvm/utils.py @@ -46,6 +46,16 @@ def openssl_encrypt_aes256cbc_md5(plaintext: bytes, password: bytes) -> bytes: return b"Salted__" + salt + ciphertext +def obfuscate_password(password: str) -> str: + """Obfuscate a password.""" + password_enc = openssl_encrypt_aes256cbc_md5( + plaintext=password.encode("utf-8"), + password=SECRET_KEY, + ) + + return urllib.parse.quote(base64.b64encode(password_enc).decode("utf-8"), safe="") + + async def async_fetch_remote_fingerprint(url: str) -> str: """Retrieve the SHA-256 fingerprint of the remote server's TLS certificate. @@ -72,13 +82,3 @@ async def async_fetch_remote_fingerprint(url: str) -> str: finally: writer.close() await writer.wait_closed() - - -def obfuscate_password(password: str) -> str: - """Obfuscate a password.""" - password_enc = openssl_encrypt_aes256cbc_md5( - plaintext=password.encode("utf-8"), - password=SECRET_KEY, - ) - - return urllib.parse.quote(base64.b64encode(password_enc).decode("utf-8"), safe="") From 65318c5c15976e87be43b7ab70bf231901d52c32 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:54:37 -0500 Subject: [PATCH 11/15] Add a timeout --- nanokvm/utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nanokvm/utils.py b/nanokvm/utils.py index d50e166..162a1a5 100644 --- a/nanokvm/utils.py +++ b/nanokvm/utils.py @@ -56,7 +56,9 @@ def obfuscate_password(password: str) -> str: return urllib.parse.quote(base64.b64encode(password_enc).decode("utf-8"), safe="") -async def async_fetch_remote_fingerprint(url: str) -> str: +async def async_fetch_remote_fingerprint( + url: str, *, timeout: float | None = 10.0 +) -> str: """Retrieve the SHA-256 fingerprint of the remote server's TLS certificate. Connects to the server with verification disabled to grab the raw certificate, @@ -73,7 +75,8 @@ async def async_fetch_remote_fingerprint(url: str) -> str: ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE - reader, writer = await asyncio.open_connection(hostname, port, ssl=ssl_ctx) + async with asyncio.timeout(timeout): + reader, writer = await asyncio.open_connection(hostname, port, ssl=ssl_ctx) try: ssl_obj = writer.get_extra_info("ssl_object") From af851541563548feb976fe42762f7b772b8bd31f Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 11:07:00 -0500 Subject: [PATCH 12/15] Ignore unused reader --- nanokvm/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nanokvm/utils.py b/nanokvm/utils.py index 162a1a5..43faa11 100644 --- a/nanokvm/utils.py +++ b/nanokvm/utils.py @@ -76,7 +76,7 @@ async def async_fetch_remote_fingerprint( ssl_ctx.verify_mode = ssl.CERT_NONE async with asyncio.timeout(timeout): - reader, writer = await asyncio.open_connection(hostname, port, ssl=ssl_ctx) + _, writer = await asyncio.open_connection(hostname, port, ssl=ssl_ctx) try: ssl_obj = writer.get_extra_info("ssl_object") From 1b18a88ceda02db6ee61dacf96fcde21194a4d3f Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 11:07:44 -0500 Subject: [PATCH 13/15] Support colons in the fingerprint --- nanokvm/client.py | 2 +- tests/test_certificate_pinning.py | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/nanokvm/client.py b/nanokvm/client.py index 19fc05c..61d085b 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -179,7 +179,7 @@ def _create_ssl_context(self) -> ssl.SSLContext | Fingerprint | bool: if self._ssl_fingerprint: _LOGGER.debug("Using certificate fingerprint pinning") - return Fingerprint(bytes.fromhex(self._ssl_fingerprint)) + return Fingerprint(bytes.fromhex(self._ssl_fingerprint.replace(":", ""))) if not self._verify_ssl: _LOGGER.warning( diff --git a/tests/test_certificate_pinning.py b/tests/test_certificate_pinning.py index e7b481f..d9a070e 100644 --- a/tests/test_certificate_pinning.py +++ b/tests/test_certificate_pinning.py @@ -173,6 +173,25 @@ async def test_certificate_pinning(nanokvm_https_server: str) -> None: assert client.token == "fake-token-123" +async def test_certificate_pinning_colon_separated(nanokvm_https_server: str) -> None: + """Test that colon-separated fingerprints (e.g. from openssl) are accepted.""" + url = nanokvm_https_server + fingerprint = await async_fetch_remote_fingerprint(url) + + # Convert "AABB..." to "AA:BB:..." + colon_fingerprint = ":".join( + fingerprint[i : i + 2] for i in range(0, len(fingerprint), 2) + ) + + async with NanoKVMClient( + url, + ssl_fingerprint=colon_fingerprint, + use_password_obfuscation=False, + ) as client: + await client.authenticate("admin", "test") + assert client.token == "fake-token-123" + + async def test_certificate_pinning_wrong_hash(nanokvm_https_server: str) -> None: """Test that a wrong pinned hash is rejected.""" url = nanokvm_https_server From 7d0e48c0d66706d87597f76ff3bebfc666363b70 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 11:08:39 -0500 Subject: [PATCH 14/15] Clean up logic surrounding session auto-creation --- nanokvm/client.py | 10 ++++------ tests/test_client.py | 13 +++++++++---- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/nanokvm/client.py b/nanokvm/client.py index 61d085b..f362f0d 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -152,7 +152,7 @@ def __init__( """ self.url = yarl.URL(url) self._session: ClientSession | None = session - self._auto_close_session = session is None + self._external_session_provided = session is not None self._token = token self._request_timeout = request_timeout self._ws: aiohttp.ClientWebSocketResponse | None = None @@ -203,7 +203,7 @@ def token(self) -> str | None: async def __aenter__(self) -> NanoKVMClient: """Async context manager entry.""" - if self._session is None: + if self._session is None and not self._external_session_provided: self._session = ClientSession() self._ssl_config = await asyncio.to_thread(self._create_ssl_context) @@ -217,10 +217,8 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self._ws = None # Close HTTP session - if self._session is not None: - if self._auto_close_session: - await self._session.close() - + if self._session is not None and not self._external_session_provided: + await self._session.close() self._session = None @contextlib.asynccontextmanager diff --git a/tests/test_client.py b/tests/test_client.py index 6b1921e..05bf9c9 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -85,18 +85,23 @@ async def test_client_context_manager() -> None: async def test_client_context_manager_external_session() -> None: """Test that client properly deals with an external session.""" async with ClientSession() as session: - # Both clients connect with the same external session + client3 = NanoKVMClient("http://localhost:8888/api/", session=session) + + # All clients connect with the same external session async with ( NanoKVMClient("http://localhost:8888/api/", session=session) as client1, NanoKVMClient("http://localhost:8888/api/", session=session) as client2, + client3, ): # Verify session is created assert client1._session is session assert client2._session is session + assert client3._session is session + + # Reusing a client with an external session should not close the session + async with client3: + assert client3._session is session - # Both exit but the session is still open assert not session.closed - assert client1._session is None - assert client2._session is None assert session.closed From f218f53591505f978be8789376a6093c3e0ee878 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sat, 7 Mar 2026 14:17:28 -0500 Subject: [PATCH 15/15] Run `ssl.create_default_context` in a thread --- nanokvm/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nanokvm/utils.py b/nanokvm/utils.py index 43faa11..a2e0e29 100644 --- a/nanokvm/utils.py +++ b/nanokvm/utils.py @@ -71,7 +71,7 @@ async def async_fetch_remote_fingerprint( hostname = parsed_url.hostname port = parsed_url.port or 443 - ssl_ctx = ssl.create_default_context() + ssl_ctx = await asyncio.to_thread(ssl.create_default_context) ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE