diff --git a/README.md b/README.md index f8b4efb..3b1b24d 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,25 @@ async with NanoKVMClient( await client.authenticate("username", "password") ``` -#### Option 2: Use custom CA certificate (recommended) +#### Option 2: Certificate pinning (recommended for self-signed) + +NanoKVM devices generate self-signed certificates for `localhost` with no CA to verify against. Certificate pinning verifies the server's certificate fingerprint directly instead of relying on CA-based trust. + +```python +from nanokvm.utils import async_fetch_remote_fingerprint + +# First, fetch the fingerprint (trust-on-first-use) +fingerprint = await async_fetch_remote_fingerprint("https://kvm.local/api/") + +# Then connect with the pinned fingerprint +async with NanoKVMClient( + "https://kvm.local/api/", + ssl_fingerprint=fingerprint, +) as client: + await client.authenticate("username", "password") +``` + +#### Option 3: Use custom CA certificate ```python async with NanoKVMClient( diff --git a/nanokvm/client.py b/nanokvm/client.py index 18058d5..f362f0d 100644 --- a/nanokvm/client.py +++ b/nanokvm/client.py @@ -16,8 +16,8 @@ BodyPartReader, ClientResponse, ClientSession, + Fingerprint, MultipartReader, - TCPConnector, hdrs, ) from PIL import Image @@ -123,8 +123,10 @@ def __init__( *, token: str | None = None, request_timeout: int = 10, + session: ClientSession | None = None, verify_ssl: bool = True, ssl_ca_cert: str | None = None, + ssl_fingerprint: str | None = None, use_password_obfuscation: bool | None = None, ) -> None: """ @@ -132,31 +134,40 @@ def __init__( Args: url: Base URL of the NanoKVM API (e.g., "https://kvm.local/api/") + session: aiohttp ClientSession to use for requests. token: Optional pre-existing authentication token request_timeout: Request timeout in seconds (default: 10) verify_ssl: Enable SSL certificate verification (default: True). Set to False to disable verification for self-signed certificates. ssl_ca_cert: Path to custom CA certificate bundle file for SSL verification. Useful for self-signed certificates or private CAs. + ssl_fingerprint: SHA-256 fingerprint of the server's TLS certificate + as a hex string. When set, the client will verify the server's + certificate fingerprint instead of performing CA-based verification. + Use `async_fetch_remote_fingerprint()` to retrieve this value. use_password_obfuscation: Control password obfuscation mode (default: None). None = auto-detect (try obfuscated first, fall back to plain text). True = always use obfuscated passwords (older NanoKVM versions). False = always use plain text passwords (newer HTTPS-enabled versions). """ self.url = yarl.URL(url) - self._session: ClientSession | None = None + self._session: ClientSession | None = session + self._external_session_provided = session is not None self._token = token self._request_timeout = request_timeout self._ws: aiohttp.ClientWebSocketResponse | None = None self._verify_ssl = verify_ssl self._ssl_ca_cert = ssl_ca_cert + self._ssl_fingerprint = ssl_fingerprint self._use_password_obfuscation = use_password_obfuscation + self._ssl_config: ssl.SSLContext | Fingerprint | bool | None = None - def _create_ssl_context(self) -> ssl.SSLContext | bool: + def _create_ssl_context(self) -> ssl.SSLContext | Fingerprint | bool: """ Create and configure SSL context based on initialization parameters. Returns: + Fingerprint: Certificate fingerprint pinning (when ssl_fingerprint set) ssl.SSLContext: Configured SSL context for custom certificates True: Use default SSL verification (aiohttp default) False: Disable SSL verification @@ -166,6 +177,10 @@ def _create_ssl_context(self) -> ssl.SSLContext | bool: ssl.SSLError: If the CA certificate is invalid. """ + if self._ssl_fingerprint: + _LOGGER.debug("Using certificate fingerprint pinning") + return Fingerprint(bytes.fromhex(self._ssl_fingerprint.replace(":", ""))) + if not self._verify_ssl: _LOGGER.warning( "SSL verification is disabled. This is insecure and should only be " @@ -188,16 +203,10 @@ def token(self) -> str | None: async def __aenter__(self) -> NanoKVMClient: """Async context manager entry.""" + if self._session is None and not self._external_session_provided: + self._session = ClientSession() - ssl_config = await asyncio.to_thread(self._create_ssl_context) - connector = TCPConnector(ssl=ssl_config) - self._session = ClientSession(connector=connector) - - _LOGGER.debug( - "Created client session with SSL verification: %s", - "disabled" if ssl_config is False else "enabled", - ) - + self._ssl_config = await asyncio.to_thread(self._create_ssl_context) return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: @@ -206,8 +215,9 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: if self._ws is not None and not self._ws.closed: await self._ws.close() self._ws = None + # Close HTTP session - if self._session is not None: + if self._session is not None and not self._external_session_provided: await self._session.close() self._session = None @@ -221,16 +231,15 @@ async def _request( **kwargs: Any, ) -> AsyncIterator[ClientResponse]: """Make an API request.""" - assert self._session is not None, ( - "Client session not initialized. " - "Use as context manager: 'async with NanoKVMClient(url) as client:'" - ) cookies = {} if authenticate: if not self._token: raise NanoKVMNotAuthenticatedError("Client is not authenticated") cookies["nano-kvm-token"] = self._token + assert self._session is not None + assert self._ssl_config is not None + async with self._session.request( method, self.url / path.lstrip("/"), @@ -240,6 +249,7 @@ async def _request( cookies=cookies, timeout=aiohttp.ClientTimeout(total=self._request_timeout), raise_for_status=True, + ssl=self._ssl_config, **kwargs, ) as response: yield response @@ -278,7 +288,7 @@ async def _api_request_json( async with self._request( method, path, - json=(data.dict() if data is not None else None), + json=(data.model_dump() if data is not None else None), **kwargs, ) as response: try: @@ -795,11 +805,6 @@ async def set_mouse_jiggler_state( async def _get_ws(self) -> aiohttp.ClientWebSocketResponse: """Get or create WebSocket connection for mouse events.""" if self._ws is None or self._ws.closed: - assert self._session is not None, ( - "Client session not initialized. " - "Use as context manager: 'async with NanoKVMClient(url) as client:'" - ) - if not self._token: raise NanoKVMNotAuthenticatedError("Client is not authenticated") @@ -807,9 +812,13 @@ async def _get_ws(self) -> aiohttp.ClientWebSocketResponse: scheme = "ws" if self.url.scheme == "http" else "wss" ws_url = self.url.with_scheme(scheme) / "ws" + assert self._session is not None + assert self._ssl_config is not None + self._ws = await self._session.ws_connect( str(ws_url), headers={"Cookie": f"nano-kvm-token={self._token}"}, + ssl=self._ssl_config, ) return self._ws diff --git a/nanokvm/utils.py b/nanokvm/utils.py index b879630..a2e0e29 100644 --- a/nanokvm/utils.py +++ b/nanokvm/utils.py @@ -1,6 +1,8 @@ +import asyncio import base64 import hashlib import os +import ssl import urllib.parse from cryptography.hazmat.primitives import padding @@ -52,3 +54,34 @@ def obfuscate_password(password: str) -> str: ) return urllib.parse.quote(base64.b64encode(password_enc).decode("utf-8"), safe="") + + +async def async_fetch_remote_fingerprint( + url: str, *, timeout: float | None = 10.0 +) -> str: + """Retrieve the SHA-256 fingerprint of the remote server's TLS certificate. + + Connects to the server with verification disabled to grab the raw certificate, + then returns its SHA-256 hash as an uppercase hex string. + + This is useful for establishing an initial trust-on-first-use pin with + `NanoKVMClient(url, ssl_fingerprint=...)`. + """ + parsed_url = urllib.parse.urlparse(url) + hostname = parsed_url.hostname + port = parsed_url.port or 443 + + ssl_ctx = await asyncio.to_thread(ssl.create_default_context) + ssl_ctx.check_hostname = False + ssl_ctx.verify_mode = ssl.CERT_NONE + + async with asyncio.timeout(timeout): + _, writer = await asyncio.open_connection(hostname, port, ssl=ssl_ctx) + + try: + ssl_obj = writer.get_extra_info("ssl_object") + der_cert = ssl_obj.getpeercert(binary_form=True) + return hashlib.sha256(der_cert).hexdigest().upper() + finally: + writer.close() + await writer.wait_closed() diff --git a/pyproject.toml b/pyproject.toml index 47cbc55..24e5c86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ exclude = ["tests", "tests.*"] testing = [ "tomli", "coverage[toml]", + "cryptography", "pytest", "pytest-xdist", "pytest-asyncio", diff --git a/tests/test_certificate_pinning.py b/tests/test_certificate_pinning.py new file mode 100644 index 0000000..d9a070e --- /dev/null +++ b/tests/test_certificate_pinning.py @@ -0,0 +1,205 @@ +"""Integration test for TLS certificate pinning with a real HTTPS server.""" + +from collections.abc import AsyncGenerator +import datetime +import ipaddress +import json +import pathlib +import ssl + +import aiohttp +from aiohttp import web +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID +import pytest + +from nanokvm.client import NanoKVMClient +from nanokvm.utils import async_fetch_remote_fingerprint + + +def generate_nanokvm_cert() -> tuple[bytes, bytes]: + """Generate a self-signed certificate matching NanoKVM's cert.go parameters. + + RSA 2048, CN=localhost, SAN: localhost + 127.0.0.1 + ::1, valid 10 years, + KeyUsage: keyEncipherment | digitalSignature, ExtKeyUsage: serverAuth. + """ + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + + subject = issuer = x509.Name( + [ + x509.NameAttribute(NameOID.COMMON_NAME, "localhost"), + ] + ) + + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.datetime.now(datetime.timezone.utc)) + .not_valid_after( + datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(days=365 * 10) + ) + .add_extension( + x509.SubjectAlternativeName( + [ + x509.DNSName("localhost"), + x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")), + x509.IPAddress(ipaddress.IPv6Address("::1")), + ] + ), + critical=False, + ) + .add_extension( + x509.KeyUsage( + digital_signature=True, + key_encipherment=True, + content_commitment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + .add_extension( + x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.SERVER_AUTH]), + critical=False, + ) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + .sign(key, hashes.SHA256()) + ) + + cert_pem = cert.public_bytes(serialization.Encoding.PEM) + key_pem = key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + + return cert_pem, key_pem + + +async def _handle_login(request: web.Request) -> web.Response: + body = await request.json() + + if body["username"] == "admin" and body["password"] == "test": + return web.Response( + text=json.dumps( + { + "code": 0, + "msg": "success", + "data": {"token": "fake-token-123"}, + } + ), + content_type="application/json", + ) + + return web.Response( + text=json.dumps( + { + "code": -2, + "msg": "invalid username or password", + "data": None, + } + ), + content_type="application/json", + ) + + +@pytest.fixture +async def nanokvm_https_server(tmp_path: pathlib.Path) -> AsyncGenerator[str, None]: + """Spin up a minimal HTTPS server mimicking a NanoKVM device.""" + cert_pem, key_pem = generate_nanokvm_cert() + + cert_file = tmp_path / "server.crt" + key_file = tmp_path / "server.key" + cert_file.write_bytes(cert_pem) + key_file.write_bytes(key_pem) + + ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_ctx.load_cert_chain(str(cert_file), str(key_file)) + + app = web.Application() + app.router.add_post("/api/auth/login", _handle_login) + + runner = web.AppRunner(app) + await runner.setup() + + try: + site = web.TCPSite(runner, "127.0.0.1", 0, ssl_context=ssl_ctx) + await site.start() + + host, port = runner.addresses[0] + yield f"https://{host}:{port}/api/" + finally: + await runner.cleanup() + + +async def test_certificate_pinning(nanokvm_https_server: str) -> None: + """Test the full certificate pinning flow against a real HTTPS server. + + 1. Connecting with default SSL verification fails (self-signed cert). + 2. Fetch the server's certificate fingerprint. + 3. Connecting with the pinned fingerprint succeeds. + """ + url = nanokvm_https_server + + # Step 1: default SSL verification rejects the self-signed certificate + async with NanoKVMClient(url, use_password_obfuscation=False) as client: + with pytest.raises(aiohttp.ClientConnectorCertificateError): + await client.authenticate("admin", "test") + + # Step 2: fetch the remote certificate fingerprint + fingerprint = await async_fetch_remote_fingerprint(url) + assert len(fingerprint) == 64 # SHA-256 hex string + + # Step 3: pinned fingerprint allows the connection to succeed + async with NanoKVMClient( + url, + ssl_fingerprint=fingerprint, + use_password_obfuscation=False, + ) as client: + await client.authenticate("admin", "test") + assert client.token == "fake-token-123" + + +async def test_certificate_pinning_colon_separated(nanokvm_https_server: str) -> None: + """Test that colon-separated fingerprints (e.g. from openssl) are accepted.""" + url = nanokvm_https_server + fingerprint = await async_fetch_remote_fingerprint(url) + + # Convert "AABB..." to "AA:BB:..." + colon_fingerprint = ":".join( + fingerprint[i : i + 2] for i in range(0, len(fingerprint), 2) + ) + + async with NanoKVMClient( + url, + ssl_fingerprint=colon_fingerprint, + use_password_obfuscation=False, + ) as client: + await client.authenticate("admin", "test") + assert client.token == "fake-token-123" + + +async def test_certificate_pinning_wrong_hash(nanokvm_https_server: str) -> None: + """Test that a wrong pinned hash is rejected.""" + url = nanokvm_https_server + + async with NanoKVMClient( + url, + ssl_fingerprint="AB" * 32, + use_password_obfuscation=False, + ) as client: + with pytest.raises(aiohttp.ServerFingerprintMismatch): + await client.authenticate("admin", "test") diff --git a/tests/test_client.py b/tests/test_client.py index f8d2070..05bf9c9 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,3 +1,4 @@ +from aiohttp import ClientSession from aioresponses import aioresponses import pytest @@ -79,3 +80,28 @@ async def test_client_context_manager() -> None: # After exiting context, session should be closed assert client._session is None + + +async def test_client_context_manager_external_session() -> None: + """Test that client properly deals with an external session.""" + async with ClientSession() as session: + client3 = NanoKVMClient("http://localhost:8888/api/", session=session) + + # All clients connect with the same external session + async with ( + NanoKVMClient("http://localhost:8888/api/", session=session) as client1, + NanoKVMClient("http://localhost:8888/api/", session=session) as client2, + client3, + ): + # Verify session is created + assert client1._session is session + assert client2._session is session + assert client3._session is session + + # Reusing a client with an external session should not close the session + async with client3: + assert client3._session is session + + assert not session.closed + + assert session.closed