From e4923907953f9fa12f07f85dd5ed154cfaf536dd Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 13:54:25 +0000 Subject: [PATCH 01/18] Refactor authentication handling in OverkizClient and add new credential classes - Introduced UsernamePasswordCredentials and LocalTokenCredentials for better credential management. - Updated OverkizClient to utilize the new credential classes and refactored login logic. - Added authentication strategies for various servers, including Somfy and Rexel. - Created new modules for auth strategies and credentials to improve code organization. - Enhanced README with updated usage examples for the new authentication methods. --- README.md | 21 +- pyoverkiz/auth/__init__.py | 22 ++ pyoverkiz/auth/base.py | 31 +++ pyoverkiz/auth/credentials.py | 28 +++ pyoverkiz/auth/factory.py | 124 +++++++++++ pyoverkiz/auth/strategies.py | 408 ++++++++++++++++++++++++++++++++++ pyoverkiz/client.py | 295 ++++-------------------- pyoverkiz/const.py | 14 +- tests/test_client.py | 12 +- 9 files changed, 680 insertions(+), 275 deletions(-) create mode 100644 pyoverkiz/auth/__init__.py create mode 100644 pyoverkiz/auth/base.py create mode 100644 pyoverkiz/auth/credentials.py create mode 100644 pyoverkiz/auth/factory.py create mode 100644 pyoverkiz/auth/strategies.py diff --git a/README.md b/README.md index bfe04a20..f2726a5b 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ pip install pyoverkiz import asyncio import time +from pyoverkiz.auth.credentials import UsernamePasswordCredentials from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.client import OverkizClient from pyoverkiz.models import Action @@ -48,7 +49,8 @@ PASSWORD = "" async def main() -> None: async with OverkizClient( - USERNAME, PASSWORD, server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE] + server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + credentials=UsernamePasswordCredentials(USERNAME, PASSWORD), ) as client: try: await client.login() @@ -90,11 +92,10 @@ asyncio.run(main()) ```python import asyncio import time -import aiohttp +from pyoverkiz.auth.credentials import LocalTokenCredentials from pyoverkiz.client import OverkizClient -from pyoverkiz.const import SUPPORTED_SERVERS, OverkizServer -from pyoverkiz.enums import Server +from pyoverkiz.const import OverkizServer USERNAME = "" PASSWORD = "" @@ -105,23 +106,15 @@ VERIFY_SSL = True # set verify_ssl to False if you don't use the .local hostnam async def main() -> None: token = "" # generate your token via the Somfy app and include it here - # Local Connection - session = aiohttp.ClientSession( - connector=aiohttp.TCPConnector(verify_ssl=VERIFY_SSL) - ) - async with OverkizClient( - username="", - password="", - token=token, - session=session, - verify_ssl=VERIFY_SSL, server=OverkizServer( name="Somfy TaHoma (local)", endpoint=f"https://{LOCAL_GATEWAY}:8443/enduser-mobile-web/1/enduserAPI/", manufacturer="Somfy", configuration_url=None, ), + credentials=LocalTokenCredentials(token), + verify_ssl=VERIFY_SSL, ) as client: await client.login() diff --git a/pyoverkiz/auth/__init__.py b/pyoverkiz/auth/__init__.py new file mode 100644 index 00000000..6c3317d7 --- /dev/null +++ b/pyoverkiz/auth/__init__.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from pyoverkiz.auth.base import AuthContext, AuthStrategy +from pyoverkiz.auth.credentials import ( + Credentials, + LocalTokenCredentials, + RexelOAuthCodeCredentials, + TokenCredentials, + UsernamePasswordCredentials, +) +from pyoverkiz.auth.factory import build_auth_strategy + +__all__ = [ + "AuthContext", + "AuthStrategy", + "Credentials", + "LocalTokenCredentials", + "RexelOAuthCodeCredentials", + "TokenCredentials", + "UsernamePasswordCredentials", + "build_auth_strategy", +] diff --git a/pyoverkiz/auth/base.py b/pyoverkiz/auth/base.py new file mode 100644 index 00000000..d06657f6 --- /dev/null +++ b/pyoverkiz/auth/base.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import datetime +from collections.abc import Mapping +from dataclasses import dataclass +from typing import Protocol + + +@dataclass(slots=True) +class AuthContext: + access_token: str | None = None + refresh_token: str | None = None + expires_at: datetime.datetime | None = None + + def is_expired(self, *, skew_seconds: int = 5) -> bool: + if not self.expires_at: + return False + + return datetime.datetime.now() >= self.expires_at - datetime.timedelta( + seconds=skew_seconds + ) + + +class AuthStrategy(Protocol): + async def login(self) -> None: ... + + async def refresh_if_needed(self) -> bool: ... + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: ... + + async def close(self) -> None: ... diff --git a/pyoverkiz/auth/credentials.py b/pyoverkiz/auth/credentials.py new file mode 100644 index 00000000..eb4595be --- /dev/null +++ b/pyoverkiz/auth/credentials.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +class Credentials: + """Marker base class for auth credentials.""" + + +@dataclass(slots=True) +class UsernamePasswordCredentials(Credentials): + username: str + password: str + + +@dataclass(slots=True) +class TokenCredentials(Credentials): + token: str + + +@dataclass(slots=True) +class LocalTokenCredentials(TokenCredentials): ... + + +@dataclass(slots=True) +class RexelOAuthCodeCredentials(Credentials): + code: str + redirect_uri: str diff --git a/pyoverkiz/auth/factory.py b/pyoverkiz/auth/factory.py new file mode 100644 index 00000000..86277677 --- /dev/null +++ b/pyoverkiz/auth/factory.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import ssl +from typing import Any + +from aiohttp import ClientSession + +from pyoverkiz.auth.credentials import ( + Credentials, + LocalTokenCredentials, + RexelOAuthCodeCredentials, + TokenCredentials, + UsernamePasswordCredentials, +) +from pyoverkiz.auth.strategies import ( + BearerTokenAuthStrategy, + CozytouchAuthStrategy, + LocalTokenAuthStrategy, + NexityAuthStrategy, + RexelAuthStrategy, + SessionLoginStrategy, + SomfyAuthStrategy, +) +from pyoverkiz.const import LOCAL_API_PATH, SUPPORTED_SERVERS +from pyoverkiz.enums import APIType, Server +from pyoverkiz.models import OverkizServer + + +def build_auth_strategy( + server_key: str | Server | None, + server: OverkizServer, + credentials: Credentials, + session: ClientSession, + ssl_context: ssl.SSLContext | bool, +) -> Any: + api_type = APIType.LOCAL if LOCAL_API_PATH in server.endpoint else APIType.CLOUD + + # Normalize server key + try: + key = Server(server_key) if server_key else _match_server_key(server) + except ValueError: + key = None + + if key == Server.SOMFY_EUROPE: + return SomfyAuthStrategy( + _ensure_username_password(credentials), + session, + server, + ssl_context, + api_type, + ) + + if key in { + Server.ATLANTIC_COZYTOUCH, + Server.THERMOR_COZYTOUCH, + Server.SAUTER_COZYTOUCH, + }: + return CozytouchAuthStrategy( + _ensure_username_password(credentials), + session, + server, + ssl_context, + api_type, + ) + + if key == Server.NEXITY: + return NexityAuthStrategy( + _ensure_username_password(credentials), + session, + server, + ssl_context, + api_type, + ) + + if key == Server.REXEL: + return RexelAuthStrategy( + _ensure_rexel(credentials), session, server, ssl_context, api_type + ) + + if api_type == APIType.LOCAL: + if isinstance(credentials, LocalTokenCredentials): + return LocalTokenAuthStrategy( + credentials, session, server, ssl_context, api_type + ) + return BearerTokenAuthStrategy( + _ensure_token(credentials), session, server, ssl_context, api_type + ) + + if isinstance(credentials, TokenCredentials) and not isinstance( + credentials, LocalTokenCredentials + ): + return BearerTokenAuthStrategy( + credentials, session, server, ssl_context, api_type + ) + + return SessionLoginStrategy( + _ensure_username_password(credentials), session, server, ssl_context, api_type + ) + + +def _match_server_key(server: OverkizServer) -> Server: + for key, value in SUPPORTED_SERVERS.items(): + if server is value or server.endpoint == value.endpoint: + return Server(key) + + raise ValueError("Unable to match server to a known Server enum.") + + +def _ensure_username_password(credentials: Credentials) -> UsernamePasswordCredentials: + if not isinstance(credentials, UsernamePasswordCredentials): + raise TypeError("UsernamePasswordCredentials are required for this server.") + return credentials + + +def _ensure_token(credentials: Credentials) -> TokenCredentials: + if not isinstance(credentials, TokenCredentials): + raise TypeError("TokenCredentials are required for this server.") + return credentials + + +def _ensure_rexel(credentials: Credentials) -> RexelOAuthCodeCredentials: + if not isinstance(credentials, RexelOAuthCodeCredentials): + raise TypeError("RexelOAuthCodeCredentials are required for this server.") + return credentials diff --git a/pyoverkiz/auth/strategies.py b/pyoverkiz/auth/strategies.py new file mode 100644 index 00000000..e33d8e66 --- /dev/null +++ b/pyoverkiz/auth/strategies.py @@ -0,0 +1,408 @@ +from __future__ import annotations + +import asyncio +import base64 +import binascii +import datetime +import json +import ssl +from collections.abc import Mapping +from typing import Any, cast + +import boto3 +from aiohttp import ClientSession, FormData +from botocore.config import Config +from warrant_lite import WarrantLite + +from pyoverkiz.auth.base import AuthContext, AuthStrategy +from pyoverkiz.auth.credentials import ( + LocalTokenCredentials, + RexelOAuthCodeCredentials, + TokenCredentials, + UsernamePasswordCredentials, +) +from pyoverkiz.const import ( + COZYTOUCH_ATLANTIC_API, + COZYTOUCH_CLIENT_ID, + NEXITY_API, + NEXITY_COGNITO_CLIENT_ID, + NEXITY_COGNITO_REGION, + NEXITY_COGNITO_USER_POOL, + REXEL_OAUTH_CLIENT_ID, + REXEL_OAUTH_SCOPE, + REXEL_OAUTH_TOKEN_URL, + REXEL_REQUIRED_CONSENT, + SOMFY_API, + SOMFY_CLIENT_ID, + SOMFY_CLIENT_SECRET, +) +from pyoverkiz.enums import APIType +from pyoverkiz.exceptions import ( + BadCredentialsException, + CozyTouchBadCredentialsException, + CozyTouchServiceException, + InvalidTokenException, + NexityBadCredentialsException, + NexityServiceException, + SomfyBadCredentialsException, + SomfyServiceException, +) +from pyoverkiz.models import OverkizServer + + +class BaseAuthStrategy(AuthStrategy): + def __init__( + self, + session: ClientSession, + server: OverkizServer, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + self.session = session + self.server = server + self._ssl = ssl_context + self.api_type = api_type + + async def login(self) -> None: + return None + + async def refresh_if_needed(self) -> bool: + return False + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + return {} + + async def close(self) -> None: + return None + + +class SessionLoginStrategy(BaseAuthStrategy): + def __init__( + self, + credentials: UsernamePasswordCredentials, + session: ClientSession, + server: OverkizServer, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + + async def login(self) -> None: + payload = { + "userId": self.credentials.username, + "userPassword": self.credentials.password, + } + await self._post_login(payload) + + async def _post_login(self, data: Mapping[str, Any]) -> None: + async with self.session.post( + f"{self.server.endpoint}login", + data=data, + ssl=self._ssl, + ) as response: + if response.status not in (200, 204): + raise BadCredentialsException( + f"Login failed for {self.server.name}: {response.status}" + ) + + result = await response.json() + if not result.get("success"): + raise BadCredentialsException("Login failed: bad credentials") + + +class SomfyAuthStrategy(BaseAuthStrategy): + def __init__( + self, + credentials: UsernamePasswordCredentials, + session: ClientSession, + server: OverkizServer, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + self.context = AuthContext() + + async def login(self) -> None: + await self._request_access_token( + grant_type="password", + extra_fields={ + "username": self.credentials.username, + "password": self.credentials.password, + }, + ) + + async def refresh_if_needed(self) -> bool: + if not self.context.is_expired() or not self.context.refresh_token: + return False + + await self._request_access_token( + grant_type="refresh_token", + extra_fields={"refresh_token": cast(str, self.context.refresh_token)}, + ) + return True + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + if self.context.access_token: + return {"Authorization": f"Bearer {self.context.access_token}"} + + return {} + + async def _request_access_token( + self, *, grant_type: str, extra_fields: Mapping[str, str] + ) -> None: + form = FormData( + { + "grant_type": grant_type, + "client_id": SOMFY_CLIENT_ID, + "client_secret": SOMFY_CLIENT_SECRET, + **extra_fields, + } + ) + + async with self.session.post( + f"{SOMFY_API}/oauth/oauth/v2/token/jwt", + data=form, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) as response: + token = await response.json() + + if token.get("message") == "error.invalid.grant": + raise SomfyBadCredentialsException(token["message"]) + + access_token = token.get("access_token") + if not access_token: + raise SomfyServiceException("No Somfy access token provided.") + + self.context.access_token = cast(str, access_token) + self.context.refresh_token = token.get("refresh_token") + expires_in = token.get("expires_in") + if expires_in: + self.context.expires_at = datetime.datetime.now() + datetime.timedelta( + seconds=cast(int, expires_in) - 5 + ) + + +class CozytouchAuthStrategy(SessionLoginStrategy): + def __init__( + self, + credentials: UsernamePasswordCredentials, + session: ClientSession, + server: OverkizServer, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + super().__init__(credentials, session, server, ssl_context, api_type) + + async def login(self) -> None: + form = FormData( + { + "grant_type": "password", + "username": f"GA-PRIVATEPERSON/{self.credentials.username}", + "password": self.credentials.password, + } + ) + async with self.session.post( + f"{COZYTOUCH_ATLANTIC_API}/token", + data=form, + headers={ + "Authorization": f"Basic {COZYTOUCH_CLIENT_ID}", + "Content-Type": "application/x-www-form-urlencoded", + }, + ) as response: + token = await response.json() + + if token.get("error") == "invalid_grant": + raise CozyTouchBadCredentialsException(token["error_description"]) + + if "token_type" not in token: + raise CozyTouchServiceException("No CozyTouch token provided.") + + async with self.session.get( + f"{COZYTOUCH_ATLANTIC_API}/magellan/accounts/jwt", + headers={"Authorization": f"Bearer {token['access_token']}"}, + ) as response: + jwt = await response.text() + + if not jwt: + raise CozyTouchServiceException("No JWT token provided.") + + jwt = jwt.strip('"') + + await self._post_login({"jwt": jwt}) + + +class NexityAuthStrategy(SessionLoginStrategy): + def __init__( + self, + credentials: UsernamePasswordCredentials, + session: ClientSession, + server: OverkizServer, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + super().__init__(credentials, session, server, ssl_context, api_type) + + async def login(self) -> None: + loop = asyncio.get_event_loop() + + def _client() -> boto3.session.Session.client: + return boto3.client( + "cognito-idp", config=Config(region_name=NEXITY_COGNITO_REGION) + ) + + client = await loop.run_in_executor(None, _client) + aws = WarrantLite( + username=self.credentials.username, + password=self.credentials.password, + pool_id=NEXITY_COGNITO_USER_POOL, + client_id=NEXITY_COGNITO_CLIENT_ID, + client=client, + ) + + try: + tokens = await loop.run_in_executor(None, aws.authenticate_user) + except Exception as error: + raise NexityBadCredentialsException() from error + + id_token = tokens["AuthenticationResult"]["IdToken"] + + async with self.session.get( + f"{NEXITY_API}/deploy/api/v1/domotic/token", + headers={"Authorization": id_token}, + ) as response: + token = await response.json() + + if "token" not in token: + raise NexityServiceException("No Nexity SSO token provided.") + + user_id = self.credentials.username.replace("@", "_-_") + await self._post_login({"ssoToken": token["token"], "userId": user_id}) + + +class LocalTokenAuthStrategy(BaseAuthStrategy): + def __init__( + self, + credentials: LocalTokenCredentials, + session: ClientSession, + server: OverkizServer, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + + async def login(self) -> None: + if not self.credentials.token: + raise InvalidTokenException("Local API requires a token.") + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + return {"Authorization": f"Bearer {self.credentials.token}"} + + +class RexelAuthStrategy(BaseAuthStrategy): + def __init__( + self, + credentials: RexelOAuthCodeCredentials, + session: ClientSession, + server: OverkizServer, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + self.context = AuthContext() + + async def login(self) -> None: + await self._exchange_token( + { + "grant_type": "authorization_code", + "client_id": REXEL_OAUTH_CLIENT_ID, + "scope": REXEL_OAUTH_SCOPE, + "code": self.credentials.code, + "redirect_uri": self.credentials.redirect_uri, + } + ) + + async def refresh_if_needed(self) -> bool: + if not self.context.is_expired() or not self.context.refresh_token: + return False + + await self._exchange_token( + { + "grant_type": "refresh_token", + "client_id": REXEL_OAUTH_CLIENT_ID, + "scope": REXEL_OAUTH_SCOPE, + "refresh_token": cast(str, self.context.refresh_token), + } + ) + return True + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + if self.context.access_token: + return {"Authorization": f"Bearer {self.context.access_token}"} + return {} + + async def _exchange_token(self, payload: Mapping[str, str]) -> None: + form = FormData(payload) + async with self.session.post( + REXEL_OAUTH_TOKEN_URL, + data=form, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) as response: + token = await response.json() + + access_token = token.get("access_token") + if not access_token: + raise InvalidTokenException("No Rexel access token provided.") + + self._ensure_consent(access_token) + self.context.access_token = cast(str, access_token) + self.context.refresh_token = token.get("refresh_token") + expires_in = token.get("expires_in") + if expires_in: + self.context.expires_at = datetime.datetime.now() + datetime.timedelta( + seconds=cast(int, expires_in) - 5 + ) + + @staticmethod + def _ensure_consent(access_token: str) -> None: + payload = _decode_jwt_payload(access_token) + consent = payload.get("consent") + if consent != REXEL_REQUIRED_CONSENT: + raise InvalidTokenException( + "Consent is missing or revoked for Rexel token." + ) + + +class BearerTokenAuthStrategy(BaseAuthStrategy): + def __init__( + self, + credentials: TokenCredentials, + session: ClientSession, + server: OverkizServer, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + if self.credentials.token: + return {"Authorization": f"Bearer {self.credentials.token}"} + return {} + + +def _decode_jwt_payload(token: str) -> dict[str, Any]: + parts = token.split(".") + if len(parts) < 2: + raise InvalidTokenException("Malformed JWT received.") + + payload_segment = parts[1] + padding = "=" * (-len(payload_segment) % 4) + try: + decoded = base64.urlsafe_b64decode(payload_segment + padding) + return cast(dict[str, Any], json.loads(decoded)) + except (binascii.Error, json.JSONDecodeError) as error: + raise InvalidTokenException("Malformed JWT received.") from error diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 927f35f5..c9033566 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -2,8 +2,6 @@ from __future__ import annotations -import asyncio -import datetime import os import ssl import urllib.parse @@ -13,38 +11,20 @@ from typing import Any, cast import backoff -import boto3 import humps from aiohttp import ( ClientConnectorError, ClientResponse, ClientSession, - FormData, ServerDisconnectedError, ) -from botocore.client import BaseClient -from botocore.config import Config -from warrant_lite import WarrantLite - -from pyoverkiz.const import ( - COZYTOUCH_ATLANTIC_API, - COZYTOUCH_CLIENT_ID, - LOCAL_API_PATH, - NEXITY_API, - NEXITY_COGNITO_CLIENT_ID, - NEXITY_COGNITO_REGION, - NEXITY_COGNITO_USER_POOL, - SOMFY_API, - SOMFY_CLIENT_ID, - SOMFY_CLIENT_SECRET, - SUPPORTED_SERVERS, -) -from pyoverkiz.enums import APIType, CommandMode, Server + +from pyoverkiz.auth import Credentials, build_auth_strategy +from pyoverkiz.const import LOCAL_API_PATH, SUPPORTED_SERVERS +from pyoverkiz.enums import APIType, Server from pyoverkiz.exceptions import ( AccessDeniedToGatewayException, BadCredentialsException, - CozyTouchBadCredentialsException, - CozyTouchServiceException, ExecutionQueueFullException, InvalidCommandException, InvalidEventListenerIdException, @@ -52,8 +32,6 @@ MaintenanceException, MissingAPIKeyException, MissingAuthorizationTokenException, - NexityBadCredentialsException, - NexityServiceException, NoRegisteredEventListenerException, NoSuchResourceException, NotAuthenticatedException, @@ -61,8 +39,6 @@ OverkizException, ServiceUnavailableException, SessionAndBearerInSameRequestException, - SomfyBadCredentialsException, - SomfyServiceException, TooManyAttemptsBannedException, TooManyConcurrentRequestsException, TooManyExecutionsException, @@ -73,6 +49,7 @@ from pyoverkiz.models import ( Action, ActionGroup, + CommandMode, Device, Event, Execution, @@ -152,8 +129,6 @@ def _create_local_ssl_context() -> ssl.SSLContext: class OverkizClient: """Interface class for the Overkiz API.""" - username: str - password: str server: OverkizServer setup: Setup | None devices: list[Device] @@ -161,32 +136,22 @@ class OverkizClient: event_listener_id: str | None session: ClientSession api_type: APIType - - _refresh_token: str | None = None - _expires_in: datetime.datetime | None = None - _access_token: str | None = None _ssl: ssl.SSLContext | bool = True def __init__( self, - username: str, - password: str, server: OverkizServer, + credentials: Credentials, verify_ssl: bool = True, - token: str | None = None, session: ClientSession | None = None, + server_key: Server | str | None = None, ) -> None: """Constructor. - :param username: the username - :param password: the password :param server: OverkizServer :param session: optional ClientSession """ - self.username = username - self.password = password self.server = server - self._access_token = token self.setup: Setup | None = None self.devices: list[Device] = [] @@ -210,6 +175,11 @@ def __init__( else: self.api_type = APIType.CLOUD + inferred_server_key = server_key or self._resolve_server_key() + self._auth = build_auth_strategy( + inferred_server_key, self.server, credentials, self.session, self._ssl + ) + async def __aenter__(self) -> OverkizClient: """Enter the async context manager and return the client.""" return self @@ -223,11 +193,24 @@ async def __aexit__( """Exit the async context manager and close the client session.""" await self.close() + def _resolve_server_key(self) -> Server: + for key, value in SUPPORTED_SERVERS.items(): + if self.server is value or self.server.endpoint == value.endpoint: + return Server(key) + + if self.api_type == APIType.LOCAL: + return Server.SOMFY_DEVELOPER_MODE + + raise OverkizException( + "Unknown server configuration; provide server_key explicitly." + ) + async def close(self) -> None: """Close the session.""" if self.event_listener_id: await self.unregister_event_listener() + await self._auth.close() await self.session.close() async def login( @@ -238,207 +221,20 @@ async def login( Caller must provide one of [userId+userPassword, userId+ssoToken, accessToken, jwt]. """ - # Local authentication + await self._auth.login() + if self.api_type == APIType.LOCAL: if register_event_listener: await self.register_event_listener() else: - # Call a simple endpoint to verify if our token is correct - # Since local API does not have a /login endpoint await self.get_gateways() return True - # Somfy TaHoma authentication using access_token - if self.server == SUPPORTED_SERVERS[Server.SOMFY_EUROPE]: - await self.somfy_tahoma_get_access_token() - - if register_event_listener: - await self.register_event_listener() - - return True - - # CozyTouch authentication using jwt - if self.server in [ - SUPPORTED_SERVERS[Server.ATLANTIC_COZYTOUCH], - SUPPORTED_SERVERS[Server.THERMOR_COZYTOUCH], - SUPPORTED_SERVERS[Server.SAUTER_COZYTOUCH], - ]: - jwt = await self.cozytouch_login() - payload = {"jwt": jwt} - - # Nexity authentication using ssoToken - elif self.server == SUPPORTED_SERVERS[Server.NEXITY]: - sso_token = await self.nexity_login() - user_id = self.username.replace("@", "_-_") # Replace @ for _-_ - payload = {"ssoToken": sso_token, "userId": user_id} - - # Regular authentication using userId+userPassword - else: - payload = {"userId": self.username, "userPassword": self.password} - - response = await self.__post("login", data=payload) - - if response.get("success"): - if register_event_listener: - await self.register_event_listener() - return True - - return False - - async def somfy_tahoma_get_access_token(self) -> str: - """Authenticate via Somfy identity and acquire access_token.""" - # Request access token - async with self.session.post( - SOMFY_API + "/oauth/oauth/v2/token/jwt", - data=FormData( - { - "grant_type": "password", - "username": self.username, - "password": self.password, - "client_id": SOMFY_CLIENT_ID, - "client_secret": SOMFY_CLIENT_SECRET, - } - ), - headers={ - "Content-Type": "application/x-www-form-urlencoded", - }, - ) as response: - token = await response.json() - - # { "message": "error.invalid.grant", "data": [], "uid": "xxx" } - if "message" in token and token["message"] == "error.invalid.grant": - raise SomfyBadCredentialsException(token["message"]) - - if "access_token" not in token: - raise SomfyServiceException("No Somfy access token provided.") - - self._access_token = cast(str, token["access_token"]) - self._refresh_token = token["refresh_token"] - self._expires_in = datetime.datetime.now() + datetime.timedelta( - seconds=token["expires_in"] - 5 - ) - - return self._access_token - - async def refresh_token(self) -> None: - """Update the access and the refresh token. The refresh token will be valid 14 days.""" - if self.server != SUPPORTED_SERVERS[Server.SOMFY_EUROPE]: - return - - if not self._refresh_token: - raise ValueError("No refresh token provided. Login method must be used.") + if register_event_listener: + await self.register_event_listener() - # &grant_type=refresh_token&refresh_token=REFRESH_TOKEN - # Request access token - async with self.session.post( - SOMFY_API + "/oauth/oauth/v2/token/jwt", - data=FormData( - { - "grant_type": "refresh_token", - "refresh_token": self._refresh_token, - "client_id": SOMFY_CLIENT_ID, - "client_secret": SOMFY_CLIENT_SECRET, - } - ), - headers={ - "Content-Type": "application/x-www-form-urlencoded", - }, - ) as response: - token = await response.json() - # { "message": "error.invalid.grant", "data": [], "uid": "xxx" } - if "message" in token and token["message"] == "error.invalid.grant": - raise SomfyBadCredentialsException(token["message"]) - - if "access_token" not in token: - raise SomfyServiceException("No Somfy access token provided.") - - self._access_token = cast(str, token["access_token"]) - self._refresh_token = token["refresh_token"] - self._expires_in = datetime.datetime.now() + datetime.timedelta( - seconds=token["expires_in"] - 5 - ) - - async def cozytouch_login(self) -> str: - """Authenticate via CozyTouch identity and acquire JWT token.""" - # Request access token - async with self.session.post( - COZYTOUCH_ATLANTIC_API + "/token", - data=FormData( - { - "grant_type": "password", - "username": "GA-PRIVATEPERSON/" + self.username, - "password": self.password, - } - ), - headers={ - "Authorization": f"Basic {COZYTOUCH_CLIENT_ID}", - "Content-Type": "application/x-www-form-urlencoded", - }, - ) as response: - token = await response.json() - - # {'error': 'invalid_grant', - # 'error_description': 'Provided Authorization Grant is invalid.'} - if "error" in token and token["error"] == "invalid_grant": - raise CozyTouchBadCredentialsException(token["error_description"]) - - if "token_type" not in token: - raise CozyTouchServiceException("No CozyTouch token provided.") - - # Request JWT - async with self.session.get( - COZYTOUCH_ATLANTIC_API + "/magellan/accounts/jwt", - headers={"Authorization": f"Bearer {token['access_token']}"}, - ) as response: - jwt = await response.text() - - if not jwt: - raise CozyTouchServiceException("No JWT token provided.") - - jwt = jwt.strip('"') # Remove surrounding quotes - - return jwt - - async def nexity_login(self) -> str: - """Authenticate via Nexity identity and acquire SSO token.""" - loop = asyncio.get_event_loop() - - def _get_client() -> BaseClient: - return boto3.client( - "cognito-idp", config=Config(region_name=NEXITY_COGNITO_REGION) - ) - - # Request access token - client = await loop.run_in_executor(None, _get_client) - - aws = WarrantLite( - username=self.username, - password=self.password, - pool_id=NEXITY_COGNITO_USER_POOL, - client_id=NEXITY_COGNITO_CLIENT_ID, - client=client, - ) - - try: - tokens = await loop.run_in_executor(None, aws.authenticate_user) - except Exception as error: - raise NexityBadCredentialsException() from error - - id_token = tokens["AuthenticationResult"]["IdToken"] - - async with self.session.get( - NEXITY_API + "/deploy/api/v1/domotic/token", - headers={ - "Authorization": id_token, - }, - ) as response: - token = await response.json() - - if "token" not in token: - raise NexityServiceException("No Nexity SSO token provided.") - - return cast(str, token["token"]) + return True @retry_on_auth_error async def get_setup(self, refresh: bool = False) -> Setup: @@ -787,11 +583,8 @@ async def get_setup_option_parameter( async def __get(self, path: str) -> Any: """Make a GET request to the OverKiz API.""" - headers = {} - await self._refresh_token_if_expired() - if self._access_token: - headers["Authorization"] = f"Bearer {self._access_token}" + headers = dict(self._auth.auth_headers(path)) async with self.session.get( f"{self.server.endpoint}{path}", @@ -805,11 +598,8 @@ async def __post( self, path: str, payload: JSON | None = None, data: JSON | None = None ) -> Any: """Make a POST request to the OverKiz API.""" - headers = {} - - if path != "login" and self._access_token: - await self._refresh_token_if_expired() - headers["Authorization"] = f"Bearer {self._access_token}" + await self._refresh_token_if_expired() + headers = dict(self._auth.auth_headers(path)) async with self.session.post( f"{self.server.endpoint}{path}", @@ -823,12 +613,8 @@ async def __post( async def __delete(self, path: str) -> None: """Make a DELETE request to the OverKiz API.""" - headers = {} - await self._refresh_token_if_expired() - - if self._access_token: - headers["Authorization"] = f"Bearer {self._access_token}" + headers = dict(self._auth.auth_headers(path)) async with self.session.delete( f"{self.server.endpoint}{path}", @@ -946,12 +732,7 @@ async def check_response(response: ClientResponse) -> None: async def _refresh_token_if_expired(self) -> None: """Check if token is expired and request a new one.""" - if ( - self._expires_in - and self._refresh_token - and self._expires_in <= datetime.datetime.now() - ): - await self.refresh_token() - - if self.event_listener_id: - await self.register_event_listener() + refreshed = await self._auth.refresh_if_needed() + + if refreshed and self.event_listener_id: + await self.register_event_listener() diff --git a/pyoverkiz/const.py b/pyoverkiz/const.py index c664b5da..2d03e177 100644 --- a/pyoverkiz/const.py +++ b/pyoverkiz/const.py @@ -15,6 +15,18 @@ NEXITY_COGNITO_USER_POOL = "eu-west-1_wj277ucoI" NEXITY_COGNITO_REGION = "eu-west-1" +REXEL_BACKEND_API = ( + "https://app-ec-backend-enduser-prod.azurewebsites.net/api/enduser/overkiz/" +) +REXEL_OAUTH_CLIENT_ID = "2b635ede-c3fb-43bc-8d23-f6d17f80e96d" +REXEL_OAUTH_SCOPE = "https://adb2cservicesfrenduserprod.onmicrosoft.com/94f05108-65f7-477a-a84d-e67e1aed6f79/ExternalProvider" +REXEL_OAUTH_TENANT = ( + "https://consumerlogin.rexelservices.fr/670998c0-f737-4d75-a32f-ba9292755b70" +) +REXEL_OAUTH_POLICY = "B2C_1A_SIGNINONLYHOMEASSISTANT" +REXEL_OAUTH_TOKEN_URL = f"{REXEL_OAUTH_TENANT}/oauth2/v2.0/token?p={REXEL_OAUTH_POLICY}" +REXEL_REQUIRED_CONSENT = "homeassistant" + SOMFY_API = "https://accounts.somfy.com" SOMFY_CLIENT_ID = "0d8e920c-1478-11e7-a377-02dd59bd3041_1ewvaqmclfogo4kcsoo0c8k4kso884owg08sg8c40sk4go4ksg" SOMFY_CLIENT_SECRET = "12k73w1n540g8o4cokg0cw84cog840k84cwggscwg884004kgk" @@ -78,7 +90,7 @@ ), Server.REXEL: OverkizServer( name="Rexel Energeasy Connect", - endpoint="https://ha112-1.overkiz.com/enduser-mobile-web/enduserAPI/", + endpoint=REXEL_BACKEND_API, manufacturer="Rexel", configuration_url="https://utilisateur.energeasyconnect.com/user/#/zone/equipements", ), diff --git a/tests/test_client.py b/tests/test_client.py index 71a42e25..4d772665 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -11,6 +11,10 @@ from pytest_asyncio import fixture from pyoverkiz import exceptions +from pyoverkiz.auth.credentials import ( + LocalTokenCredentials, + UsernamePasswordCredentials, +) from pyoverkiz.client import OverkizClient from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, DataType @@ -26,15 +30,17 @@ class TestOverkizClient: @fixture async def client(self): """Fixture providing an OverkizClient configured for the cloud server.""" - return OverkizClient("username", "password", SUPPORTED_SERVERS["somfy_europe"]) + return OverkizClient( + SUPPORTED_SERVERS["somfy_europe"], + UsernamePasswordCredentials("username", "password"), + ) @fixture async def local_client(self): """Fixture providing an OverkizClient configured for a local (developer) server.""" return OverkizClient( - "username", - "password", generate_local_server("gateway-1234-5678-1243.local:8443"), + LocalTokenCredentials("token"), ) @pytest.mark.asyncio From 3250aab467a0f668085c32eb7c71f257ddf5354d Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 14:19:03 +0000 Subject: [PATCH 02/18] Enable keyword only for OverkizClient --- pyoverkiz/client.py | 1 + tests/test_client.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index c9033566..373af539 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -140,6 +140,7 @@ class OverkizClient: def __init__( self, + *, server: OverkizServer, credentials: Credentials, verify_ssl: bool = True, diff --git a/tests/test_client.py b/tests/test_client.py index 4d772665..3e4e4e2a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -31,16 +31,16 @@ class TestOverkizClient: async def client(self): """Fixture providing an OverkizClient configured for the cloud server.""" return OverkizClient( - SUPPORTED_SERVERS["somfy_europe"], - UsernamePasswordCredentials("username", "password"), + server=SUPPORTED_SERVERS["somfy_europe"], + credentials=UsernamePasswordCredentials("username", "password"), ) @fixture async def local_client(self): """Fixture providing an OverkizClient configured for a local (developer) server.""" return OverkizClient( - generate_local_server("gateway-1234-5678-1243.local:8443"), - LocalTokenCredentials("token"), + server=generate_local_server("gateway-1234-5678-1243.local:8443"), + credentials=LocalTokenCredentials("token"), ) @pytest.mark.asyncio From 1e814139b20068b864cea2acc62c534fc915d7e6 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 14:32:08 +0000 Subject: [PATCH 03/18] Refactor authentication handling to replace OverkizServer with ServerConfig across the codebase --- pyoverkiz/auth/factory.py | 11 ++++++++--- pyoverkiz/auth/strategies.py | 23 ++++++++++++++--------- pyoverkiz/client.py | 31 +++++++++++++++++++++++-------- pyoverkiz/const.py | 36 ++++++++++++++++++------------------ pyoverkiz/models.py | 4 ++-- pyoverkiz/utils.py | 8 ++++---- tests/test_client.py | 5 ++--- 7 files changed, 71 insertions(+), 47 deletions(-) diff --git a/pyoverkiz/auth/factory.py b/pyoverkiz/auth/factory.py index 86277677..fc8d3d5d 100644 --- a/pyoverkiz/auth/factory.py +++ b/pyoverkiz/auth/factory.py @@ -23,16 +23,17 @@ ) from pyoverkiz.const import LOCAL_API_PATH, SUPPORTED_SERVERS from pyoverkiz.enums import APIType, Server -from pyoverkiz.models import OverkizServer +from pyoverkiz.models import ServerConfig def build_auth_strategy( server_key: str | Server | None, - server: OverkizServer, + server: ServerConfig, credentials: Credentials, session: ClientSession, ssl_context: ssl.SSLContext | bool, ) -> Any: + """Build the correct auth strategy for the given server and credentials.""" api_type = APIType.LOCAL if LOCAL_API_PATH in server.endpoint else APIType.CLOUD # Normalize server key @@ -98,7 +99,8 @@ def build_auth_strategy( ) -def _match_server_key(server: OverkizServer) -> Server: +def _match_server_key(server: ServerConfig) -> Server: + """Find the `Server` enum corresponding to a `ServerConfig` entry.""" for key, value in SUPPORTED_SERVERS.items(): if server is value or server.endpoint == value.endpoint: return Server(key) @@ -107,18 +109,21 @@ def _match_server_key(server: OverkizServer) -> Server: def _ensure_username_password(credentials: Credentials) -> UsernamePasswordCredentials: + """Validate that credentials are username/password based.""" if not isinstance(credentials, UsernamePasswordCredentials): raise TypeError("UsernamePasswordCredentials are required for this server.") return credentials def _ensure_token(credentials: Credentials) -> TokenCredentials: + """Validate that credentials carry a bearer token.""" if not isinstance(credentials, TokenCredentials): raise TypeError("TokenCredentials are required for this server.") return credentials def _ensure_rexel(credentials: Credentials) -> RexelOAuthCodeCredentials: + """Validate that credentials are of Rexel OAuth code type.""" if not isinstance(credentials, RexelOAuthCodeCredentials): raise TypeError("RexelOAuthCodeCredentials are required for this server.") return credentials diff --git a/pyoverkiz/auth/strategies.py b/pyoverkiz/auth/strategies.py index e33d8e66..17de9202 100644 --- a/pyoverkiz/auth/strategies.py +++ b/pyoverkiz/auth/strategies.py @@ -47,32 +47,37 @@ SomfyBadCredentialsException, SomfyServiceException, ) -from pyoverkiz.models import OverkizServer +from pyoverkiz.models import ServerConfig class BaseAuthStrategy(AuthStrategy): def __init__( self, session: ClientSession, - server: OverkizServer, + server: ServerConfig, ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: + """Store shared auth context for Overkiz API interactions.""" self.session = session self.server = server self._ssl = ssl_context self.api_type = api_type async def login(self) -> None: + """Perform authentication; default is a no-op for subclasses to override.""" return None async def refresh_if_needed(self) -> bool: + """Refresh authentication tokens if needed; default returns False.""" return False def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" return {} async def close(self) -> None: + """Close any resources held by the strategy; default is no-op.""" return None @@ -81,7 +86,7 @@ def __init__( self, credentials: UsernamePasswordCredentials, session: ClientSession, - server: OverkizServer, + server: ServerConfig, ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: @@ -116,7 +121,7 @@ def __init__( self, credentials: UsernamePasswordCredentials, session: ClientSession, - server: OverkizServer, + server: ServerConfig, ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: @@ -189,7 +194,7 @@ def __init__( self, credentials: UsernamePasswordCredentials, session: ClientSession, - server: OverkizServer, + server: ServerConfig, ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: @@ -238,7 +243,7 @@ def __init__( self, credentials: UsernamePasswordCredentials, session: ClientSession, - server: OverkizServer, + server: ServerConfig, ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: @@ -286,7 +291,7 @@ def __init__( self, credentials: LocalTokenCredentials, session: ClientSession, - server: OverkizServer, + server: ServerConfig, ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: @@ -306,7 +311,7 @@ def __init__( self, credentials: RexelOAuthCodeCredentials, session: ClientSession, - server: OverkizServer, + server: ServerConfig, ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: @@ -381,7 +386,7 @@ def __init__( self, credentials: TokenCredentials, session: ClientSession, - server: OverkizServer, + server: ServerConfig, ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 373af539..87fb3acb 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -21,7 +21,7 @@ from pyoverkiz.auth import Credentials, build_auth_strategy from pyoverkiz.const import LOCAL_API_PATH, SUPPORTED_SERVERS -from pyoverkiz.enums import APIType, Server +from pyoverkiz.enums import APIType, CommandMode, Server from pyoverkiz.exceptions import ( AccessDeniedToGatewayException, BadCredentialsException, @@ -49,7 +49,6 @@ from pyoverkiz.models import ( Action, ActionGroup, - CommandMode, Device, Event, Execution, @@ -58,8 +57,8 @@ LocalToken, Option, OptionParameter, - OverkizServer, Place, + ServerConfig, Setup, State, ) @@ -129,7 +128,7 @@ def _create_local_ssl_context() -> ssl.SSLContext: class OverkizClient: """Interface class for the Overkiz API.""" - server: OverkizServer + server: ServerConfig setup: Setup | None devices: list[Device] gateways: list[Gateway] @@ -141,7 +140,7 @@ class OverkizClient: def __init__( self, *, - server: OverkizServer, + server: ServerConfig | Server | str, credentials: Credentials, verify_ssl: bool = True, session: ClientSession | None = None, @@ -149,10 +148,10 @@ def __init__( ) -> None: """Constructor. - :param server: OverkizServer + :param server: ServerConfig :param session: optional ClientSession """ - self.server = server + self.server = self._normalize_server(server) self.setup: Setup | None = None self.devices: list[Device] = [] @@ -182,7 +181,7 @@ def __init__( ) async def __aenter__(self) -> OverkizClient: - """Enter the async context manager and return the client.""" + """Enter async context manager and return the client instance.""" return self async def __aexit__( @@ -194,7 +193,23 @@ async def __aexit__( """Exit the async context manager and close the client session.""" await self.close() + @staticmethod + def _normalize_server(server: ServerConfig | Server | str) -> ServerConfig: + """Resolve user-provided server identifiers into a `ServerConfig`.""" + if isinstance(server, ServerConfig): + return server + + server_key = server.value if isinstance(server, Server) else str(server) + + try: + return SUPPORTED_SERVERS[server_key] + except KeyError as error: + raise OverkizException( + f"Unknown server '{server_key}'. Provide a supported server key or ServerConfig instance." + ) from error + def _resolve_server_key(self) -> Server: + """Infer a `Server` enum for the current server configuration.""" for key, value in SUPPORTED_SERVERS.items(): if self.server is value or self.server.endpoint == value.endpoint: return Server(key) diff --git a/pyoverkiz/const.py b/pyoverkiz/const.py index 2d03e177..8f93d301 100644 --- a/pyoverkiz/const.py +++ b/pyoverkiz/const.py @@ -3,7 +3,7 @@ from __future__ import annotations from pyoverkiz.enums import Server -from pyoverkiz.models import OverkizServer +from pyoverkiz.models import ServerConfig COZYTOUCH_ATLANTIC_API = "https://apis.groupe-atlantic.com" COZYTOUCH_CLIENT_ID = ( @@ -39,98 +39,98 @@ Server.SOMFY_AMERICA, ] -SUPPORTED_SERVERS: dict[str, OverkizServer] = { - Server.ATLANTIC_COZYTOUCH: OverkizServer( +SUPPORTED_SERVERS: dict[str, ServerConfig] = { + Server.ATLANTIC_COZYTOUCH: ServerConfig( name="Atlantic Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Atlantic", configuration_url=None, ), - Server.BRANDT: OverkizServer( + Server.BRANDT: ServerConfig( name="Brandt Smart Control", endpoint="https://ha3-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Brandt", configuration_url=None, ), - Server.FLEXOM: OverkizServer( + Server.FLEXOM: ServerConfig( name="Flexom", endpoint="https://ha108-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Bouygues", configuration_url=None, ), - Server.HEXAOM_HEXACONNECT: OverkizServer( + Server.HEXAOM_HEXACONNECT: ServerConfig( name="Hexaom HexaConnect", endpoint="https://ha5-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hexaom", configuration_url=None, ), - Server.HI_KUMO_ASIA: OverkizServer( + Server.HI_KUMO_ASIA: ServerConfig( name="Hitachi Hi Kumo (Asia)", endpoint="https://ha203-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", configuration_url=None, ), - Server.HI_KUMO_EUROPE: OverkizServer( + Server.HI_KUMO_EUROPE: ServerConfig( name="Hitachi Hi Kumo (Europe)", endpoint="https://ha117-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", configuration_url=None, ), - Server.HI_KUMO_OCEANIA: OverkizServer( + Server.HI_KUMO_OCEANIA: ServerConfig( name="Hitachi Hi Kumo (Oceania)", endpoint="https://ha203-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", configuration_url=None, ), - Server.NEXITY: OverkizServer( + Server.NEXITY: ServerConfig( name="Nexity Eugénie", endpoint="https://ha106-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Nexity", configuration_url=None, ), - Server.REXEL: OverkizServer( + Server.REXEL: ServerConfig( name="Rexel Energeasy Connect", endpoint=REXEL_BACKEND_API, manufacturer="Rexel", configuration_url="https://utilisateur.energeasyconnect.com/user/#/zone/equipements", ), - Server.SAUTER_COZYTOUCH: OverkizServer( # duplicate of Atlantic Cozytouch + Server.SAUTER_COZYTOUCH: ServerConfig( # duplicate of Atlantic Cozytouch name="Sauter Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Sauter", configuration_url=None, ), - Server.SIMU_LIVEIN2: OverkizServer( # alias of https://tahomalink.com + Server.SIMU_LIVEIN2: ServerConfig( # alias of https://tahomalink.com name="SIMU (LiveIn2)", endpoint="https://ha101-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", configuration_url=None, ), - Server.SOMFY_EUROPE: OverkizServer( # alias of https://tahomalink.com + Server.SOMFY_EUROPE: ServerConfig( # alias of https://tahomalink.com name="Somfy (Europe)", endpoint="https://ha101-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", configuration_url=None, ), - Server.SOMFY_AMERICA: OverkizServer( + Server.SOMFY_AMERICA: ServerConfig( name="Somfy (North America)", endpoint="https://ha401-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", configuration_url=None, ), - Server.SOMFY_OCEANIA: OverkizServer( + Server.SOMFY_OCEANIA: ServerConfig( name="Somfy (Oceania)", endpoint="https://ha201-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", configuration_url=None, ), - Server.THERMOR_COZYTOUCH: OverkizServer( # duplicate of Atlantic Cozytouch + Server.THERMOR_COZYTOUCH: ServerConfig( # duplicate of Atlantic Cozytouch name="Thermor Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Thermor", configuration_url=None, ), - Server.UBIWIZZ: OverkizServer( + Server.UBIWIZZ: ServerConfig( name="Ubiwizz", endpoint="https://ha129-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Decelect", diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index 5e826076..e5a72284 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -959,8 +959,8 @@ def __init__( @define(kw_only=True) -class OverkizServer: - """Class to describe an Overkiz server.""" +class ServerConfig: + """Connection target details for an Overkiz-compatible server.""" name: str endpoint: str diff --git a/pyoverkiz/utils.py b/pyoverkiz/utils.py index e395d0bf..64129fbf 100644 --- a/pyoverkiz/utils.py +++ b/pyoverkiz/utils.py @@ -5,7 +5,7 @@ import re from pyoverkiz.const import LOCAL_API_PATH -from pyoverkiz.models import OverkizServer +from pyoverkiz.models import ServerConfig def generate_local_server( @@ -13,9 +13,9 @@ def generate_local_server( name: str = "Somfy Developer Mode", manufacturer: str = "Somfy", configuration_url: str | None = None, -) -> OverkizServer: - """Generate OverkizServer class for connection with a local API (Somfy Developer mode).""" - return OverkizServer( +) -> ServerConfig: + """Generate server configuration for a local API (Somfy Developer mode).""" + return ServerConfig( name=name, endpoint=f"https://{host}{LOCAL_API_PATH}", manufacturer=manufacturer, diff --git a/tests/test_client.py b/tests/test_client.py index 3e4e4e2a..0882a2e6 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -16,8 +16,7 @@ UsernamePasswordCredentials, ) from pyoverkiz.client import OverkizClient -from pyoverkiz.const import SUPPORTED_SERVERS -from pyoverkiz.enums import APIType, DataType +from pyoverkiz.enums import APIType, DataType, Server from pyoverkiz.models import Option from pyoverkiz.utils import generate_local_server @@ -31,7 +30,7 @@ class TestOverkizClient: async def client(self): """Fixture providing an OverkizClient configured for the cloud server.""" return OverkizClient( - server=SUPPORTED_SERVERS["somfy_europe"], + server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials("username", "password"), ) From a3ee82bc5e985bb7662dc5c3ea2657334e33b454 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 16:37:30 +0000 Subject: [PATCH 04/18] Refactor README examples to use Server enum and generate_local_server utility --- README.md | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index f2726a5b..c83da6be 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ PASSWORD = "" async def main() -> None: async with OverkizClient( - server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE], + server=Server.SOMFY_EUROPE, credentials=UsernamePasswordCredentials(USERNAME, PASSWORD), ) as client: try: @@ -95,10 +95,8 @@ import time from pyoverkiz.auth.credentials import LocalTokenCredentials from pyoverkiz.client import OverkizClient -from pyoverkiz.const import OverkizServer +from pyoverkiz.utils import generate_local_server -USERNAME = "" -PASSWORD = "" LOCAL_GATEWAY = "gateway-xxxx-xxxx-xxxx.local" # or use the IP address of your gateway VERIFY_SSL = True # set verify_ssl to False if you don't use the .local hostname @@ -107,12 +105,7 @@ async def main() -> None: token = "" # generate your token via the Somfy app and include it here async with OverkizClient( - server=OverkizServer( - name="Somfy TaHoma (local)", - endpoint=f"https://{LOCAL_GATEWAY}:8443/enduser-mobile-web/1/enduserAPI/", - manufacturer="Somfy", - configuration_url=None, - ), + server=generate_local_server(host=LOCAL_GATEWAY), credentials=LocalTokenCredentials(token), verify_ssl=VERIFY_SSL, ) as client: From 9f28a26d64dbfcdba4076e9aaa188e0ebaa91133 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 16:49:57 +0000 Subject: [PATCH 05/18] Add docstrings to authentication strategies and improve API request method comments --- pyoverkiz/auth/__init__.py | 2 ++ pyoverkiz/auth/base.py | 19 +++++++++++---- pyoverkiz/auth/credentials.py | 11 ++++++++- pyoverkiz/auth/factory.py | 2 ++ pyoverkiz/auth/strategies.py | 44 ++++++++++++++++++++++++++++++++++- pyoverkiz/client.py | 2 +- 6 files changed, 73 insertions(+), 7 deletions(-) diff --git a/pyoverkiz/auth/__init__.py b/pyoverkiz/auth/__init__.py index 6c3317d7..535a2614 100644 --- a/pyoverkiz/auth/__init__.py +++ b/pyoverkiz/auth/__init__.py @@ -1,3 +1,5 @@ +"""Authentication module for pyoverkiz.""" + from __future__ import annotations from pyoverkiz.auth.base import AuthContext, AuthStrategy diff --git a/pyoverkiz/auth/base.py b/pyoverkiz/auth/base.py index d06657f6..6a01e912 100644 --- a/pyoverkiz/auth/base.py +++ b/pyoverkiz/auth/base.py @@ -1,3 +1,5 @@ +"""Base classes for authentication strategies.""" + from __future__ import annotations import datetime @@ -8,11 +10,14 @@ @dataclass(slots=True) class AuthContext: + """Authentication context holding tokens and expiration.""" + access_token: str | None = None refresh_token: str | None = None expires_at: datetime.datetime | None = None def is_expired(self, *, skew_seconds: int = 5) -> bool: + """Check if the access token is expired, considering a skew time.""" if not self.expires_at: return False @@ -22,10 +27,16 @@ def is_expired(self, *, skew_seconds: int = 5) -> bool: class AuthStrategy(Protocol): - async def login(self) -> None: ... + """Protocol for authentication strategies.""" + + async def login(self) -> None: + """Perform login to obtain tokens.""" - async def refresh_if_needed(self) -> bool: ... + async def refresh_if_needed(self) -> bool: + """Refresh tokens if they are expired. Return True if refreshed.""" - def auth_headers(self, path: str | None = None) -> Mapping[str, str]: ... + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Generate authentication headers for requests.""" - async def close(self) -> None: ... + async def close(self) -> None: + """Clean up any resources held by the strategy.""" diff --git a/pyoverkiz/auth/credentials.py b/pyoverkiz/auth/credentials.py index eb4595be..99e2ff9d 100644 --- a/pyoverkiz/auth/credentials.py +++ b/pyoverkiz/auth/credentials.py @@ -1,3 +1,5 @@ +"""Credentials for authentication strategies.""" + from __future__ import annotations from dataclasses import dataclass @@ -9,20 +11,27 @@ class Credentials: @dataclass(slots=True) class UsernamePasswordCredentials(Credentials): + """Credentials using username and password.""" + username: str password: str @dataclass(slots=True) class TokenCredentials(Credentials): + """Credentials using an (API) token.""" + token: str @dataclass(slots=True) -class LocalTokenCredentials(TokenCredentials): ... +class LocalTokenCredentials(TokenCredentials): + """Credentials using a local API token.""" @dataclass(slots=True) class RexelOAuthCodeCredentials(Credentials): + """ "Credentials using Rexel OAuth2 authorization code.""" + code: str redirect_uri: str diff --git a/pyoverkiz/auth/factory.py b/pyoverkiz/auth/factory.py index fc8d3d5d..cc66d57f 100644 --- a/pyoverkiz/auth/factory.py +++ b/pyoverkiz/auth/factory.py @@ -1,3 +1,5 @@ +"""Factory to build authentication strategies based on server and credentials.""" + from __future__ import annotations import ssl diff --git a/pyoverkiz/auth/strategies.py b/pyoverkiz/auth/strategies.py index 17de9202..878fde73 100644 --- a/pyoverkiz/auth/strategies.py +++ b/pyoverkiz/auth/strategies.py @@ -1,3 +1,5 @@ +"""Authentication strategies for Overkiz API.""" + from __future__ import annotations import asyncio @@ -11,6 +13,7 @@ import boto3 from aiohttp import ClientSession, FormData +from botocore.client import BaseClient from botocore.config import Config from warrant_lite import WarrantLite @@ -51,6 +54,8 @@ class BaseAuthStrategy(AuthStrategy): + """Base class for authentication strategies.""" + def __init__( self, session: ClientSession, @@ -82,6 +87,8 @@ async def close(self) -> None: class SessionLoginStrategy(BaseAuthStrategy): + """Authentication strategy using session-based login.""" + def __init__( self, credentials: UsernamePasswordCredentials, @@ -90,10 +97,12 @@ def __init__( ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: + """Initialize SessionLoginStrategy with given parameters.""" super().__init__(session, server, ssl_context, api_type) self.credentials = credentials async def login(self) -> None: + """Perform login using username and password.""" payload = { "userId": self.credentials.username, "userPassword": self.credentials.password, @@ -101,6 +110,7 @@ async def login(self) -> None: await self._post_login(payload) async def _post_login(self, data: Mapping[str, Any]) -> None: + """Post login data to the server and handle response.""" async with self.session.post( f"{self.server.endpoint}login", data=data, @@ -117,6 +127,8 @@ async def _post_login(self, data: Mapping[str, Any]) -> None: class SomfyAuthStrategy(BaseAuthStrategy): + """Authentication strategy using Somfy OAuth2.""" + def __init__( self, credentials: UsernamePasswordCredentials, @@ -125,11 +137,13 @@ def __init__( ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: + """Initialize SomfyAuthStrategy with given parameters.""" super().__init__(session, server, ssl_context, api_type) self.credentials = credentials self.context = AuthContext() async def login(self) -> None: + """Perform login using Somfy OAuth2.""" await self._request_access_token( grant_type="password", extra_fields={ @@ -139,6 +153,7 @@ async def login(self) -> None: ) async def refresh_if_needed(self) -> bool: + """Refresh Somfy OAuth2 tokens if needed.""" if not self.context.is_expired() or not self.context.refresh_token: return False @@ -149,6 +164,7 @@ async def refresh_if_needed(self) -> bool: return True def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" if self.context.access_token: return {"Authorization": f"Bearer {self.context.access_token}"} @@ -190,6 +206,8 @@ async def _request_access_token( class CozytouchAuthStrategy(SessionLoginStrategy): + """Authentication strategy using Cozytouch session-based login.""" + def __init__( self, credentials: UsernamePasswordCredentials, @@ -198,9 +216,11 @@ def __init__( ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: + """Initialize CozytouchAuthStrategy with given parameters.""" super().__init__(credentials, session, server, ssl_context, api_type) async def login(self) -> None: + """Perform login using Cozytouch username and password.""" form = FormData( { "grant_type": "password", @@ -239,6 +259,8 @@ async def login(self) -> None: class NexityAuthStrategy(SessionLoginStrategy): + """Authentication strategy using Nexity session-based login.""" + def __init__( self, credentials: UsernamePasswordCredentials, @@ -247,12 +269,14 @@ def __init__( ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: + """Initialize NexityAuthStrategy with given parameters.""" super().__init__(credentials, session, server, ssl_context, api_type) async def login(self) -> None: + """Perform login using Nexity username and password.""" loop = asyncio.get_event_loop() - def _client() -> boto3.session.Session.client: + def _client() -> BaseClient: return boto3.client( "cognito-idp", config=Config(region_name=NEXITY_COGNITO_REGION) ) @@ -287,6 +311,8 @@ def _client() -> boto3.session.Session.client: class LocalTokenAuthStrategy(BaseAuthStrategy): + """Authentication strategy using a local API token.""" + def __init__( self, credentials: LocalTokenCredentials, @@ -295,18 +321,23 @@ def __init__( ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: + """Initialize LocalTokenAuthStrategy with given parameters.""" super().__init__(session, server, ssl_context, api_type) self.credentials = credentials async def login(self) -> None: + """Validate that a token is provided for local API access.""" if not self.credentials.token: raise InvalidTokenException("Local API requires a token.") def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" return {"Authorization": f"Bearer {self.credentials.token}"} class RexelAuthStrategy(BaseAuthStrategy): + """Authentication strategy using Rexel OAuth2.""" + def __init__( self, credentials: RexelOAuthCodeCredentials, @@ -315,11 +346,13 @@ def __init__( ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: + """Initialize RexelAuthStrategy with given parameters.""" super().__init__(session, server, ssl_context, api_type) self.credentials = credentials self.context = AuthContext() async def login(self) -> None: + """Perform login using Rexel OAuth2 authorization code.""" await self._exchange_token( { "grant_type": "authorization_code", @@ -331,6 +364,7 @@ async def login(self) -> None: ) async def refresh_if_needed(self) -> bool: + """ "Refresh Rexel OAuth2 tokens if needed.""" if not self.context.is_expired() or not self.context.refresh_token: return False @@ -345,11 +379,13 @@ async def refresh_if_needed(self) -> bool: return True def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" if self.context.access_token: return {"Authorization": f"Bearer {self.context.access_token}"} return {} async def _exchange_token(self, payload: Mapping[str, str]) -> None: + """Exchange authorization code or refresh token for access token.""" form = FormData(payload) async with self.session.post( REXEL_OAUTH_TOKEN_URL, @@ -373,6 +409,7 @@ async def _exchange_token(self, payload: Mapping[str, str]) -> None: @staticmethod def _ensure_consent(access_token: str) -> None: + """Ensure that the Rexel token has the required consent.""" payload = _decode_jwt_payload(access_token) consent = payload.get("consent") if consent != REXEL_REQUIRED_CONSENT: @@ -382,6 +419,8 @@ def _ensure_consent(access_token: str) -> None: class BearerTokenAuthStrategy(BaseAuthStrategy): + """Authentication strategy using a static bearer token.""" + def __init__( self, credentials: TokenCredentials, @@ -390,16 +429,19 @@ def __init__( ssl_context: ssl.SSLContext | bool, api_type: APIType, ) -> None: + """Initialize BearerTokenAuthStrategy with given parameters.""" super().__init__(session, server, ssl_context, api_type) self.credentials = credentials def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" if self.credentials.token: return {"Authorization": f"Bearer {self.credentials.token}"} return {} def _decode_jwt_payload(token: str) -> dict[str, Any]: + """Decode the payload of a JWT token.""" parts = token.split(".") if len(parts) < 2: raise InvalidTokenException("Malformed JWT received.") diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 87fb3acb..dd21c92b 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -215,7 +215,7 @@ def _resolve_server_key(self) -> Server: return Server(key) if self.api_type == APIType.LOCAL: - return Server.SOMFY_DEVELOPER_MODE + return Server(Server.SOMFY_DEVELOPER_MODE) raise OverkizException( "Unknown server configuration; provide server_key explicitly." From d7bdda0c1b28284141cf00a3df1adc33a9da2c49 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 17:08:14 +0000 Subject: [PATCH 06/18] Refactor server configuration handling to use create_local_server_config and update related references --- README.md | 4 ++-- pyoverkiz/auth/credentials.py | 2 +- pyoverkiz/auth/factory.py | 44 +++++++++++++++++++++------------- pyoverkiz/auth/strategies.py | 2 +- pyoverkiz/client.py | 45 ++++++++++++++++++----------------- pyoverkiz/const.py | 33 ++++++++++++------------- pyoverkiz/models.py | 4 +++- pyoverkiz/utils.py | 26 ++++++++++++++++++-- tests/test_client.py | 4 ++-- tests/test_utils.py | 10 ++++---- 10 files changed, 105 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index c83da6be..6daa1815 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,7 @@ import time from pyoverkiz.auth.credentials import LocalTokenCredentials from pyoverkiz.client import OverkizClient -from pyoverkiz.utils import generate_local_server +from pyoverkiz.utils import create_local_server_config LOCAL_GATEWAY = "gateway-xxxx-xxxx-xxxx.local" # or use the IP address of your gateway VERIFY_SSL = True # set verify_ssl to False if you don't use the .local hostname @@ -105,7 +105,7 @@ async def main() -> None: token = "" # generate your token via the Somfy app and include it here async with OverkizClient( - server=generate_local_server(host=LOCAL_GATEWAY), + server=create_local_server_config(host=LOCAL_GATEWAY), credentials=LocalTokenCredentials(token), verify_ssl=VERIFY_SSL, ) as client: diff --git a/pyoverkiz/auth/credentials.py b/pyoverkiz/auth/credentials.py index 99e2ff9d..777f950b 100644 --- a/pyoverkiz/auth/credentials.py +++ b/pyoverkiz/auth/credentials.py @@ -31,7 +31,7 @@ class LocalTokenCredentials(TokenCredentials): @dataclass(slots=True) class RexelOAuthCodeCredentials(Credentials): - """ "Credentials using Rexel OAuth2 authorization code.""" + """Credentials using Rexel OAuth2 authorization code.""" code: str redirect_uri: str diff --git a/pyoverkiz/auth/factory.py b/pyoverkiz/auth/factory.py index cc66d57f..2554431b 100644 --- a/pyoverkiz/auth/factory.py +++ b/pyoverkiz/auth/factory.py @@ -23,24 +23,22 @@ SessionLoginStrategy, SomfyAuthStrategy, ) -from pyoverkiz.const import LOCAL_API_PATH, SUPPORTED_SERVERS +from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, Server from pyoverkiz.models import ServerConfig def build_auth_strategy( server_key: str | Server | None, - server: ServerConfig, + server_config: ServerConfig, credentials: Credentials, session: ClientSession, ssl_context: ssl.SSLContext | bool, ) -> Any: """Build the correct auth strategy for the given server and credentials.""" - api_type = APIType.LOCAL if LOCAL_API_PATH in server.endpoint else APIType.CLOUD - # Normalize server key try: - key = Server(server_key) if server_key else _match_server_key(server) + key = Server(server_key) if server_key else _match_server_key(server_config) except ValueError: key = None @@ -48,9 +46,9 @@ def build_auth_strategy( return SomfyAuthStrategy( _ensure_username_password(credentials), session, - server, + server_config, ssl_context, - api_type, + server_config.type, ) if key in { @@ -61,43 +59,55 @@ def build_auth_strategy( return CozytouchAuthStrategy( _ensure_username_password(credentials), session, - server, + server_config, ssl_context, - api_type, + server_config.type, ) if key == Server.NEXITY: return NexityAuthStrategy( _ensure_username_password(credentials), session, - server, + server_config, ssl_context, - api_type, + server_config.type, ) if key == Server.REXEL: return RexelAuthStrategy( - _ensure_rexel(credentials), session, server, ssl_context, api_type + _ensure_rexel(credentials), + session, + server_config, + ssl_context, + server_config.type, ) - if api_type == APIType.LOCAL: + if server_config.type == APIType.LOCAL: if isinstance(credentials, LocalTokenCredentials): return LocalTokenAuthStrategy( - credentials, session, server, ssl_context, api_type + credentials, session, server_config, ssl_context, server_config.type ) return BearerTokenAuthStrategy( - _ensure_token(credentials), session, server, ssl_context, api_type + _ensure_token(credentials), + session, + server_config, + ssl_context, + server_config.type, ) if isinstance(credentials, TokenCredentials) and not isinstance( credentials, LocalTokenCredentials ): return BearerTokenAuthStrategy( - credentials, session, server, ssl_context, api_type + credentials, session, server_config, ssl_context, server_config.type ) return SessionLoginStrategy( - _ensure_username_password(credentials), session, server, ssl_context, api_type + _ensure_username_password(credentials), + session, + server_config, + ssl_context, + server_config.type, ) diff --git a/pyoverkiz/auth/strategies.py b/pyoverkiz/auth/strategies.py index 878fde73..2c363ee4 100644 --- a/pyoverkiz/auth/strategies.py +++ b/pyoverkiz/auth/strategies.py @@ -364,7 +364,7 @@ async def login(self) -> None: ) async def refresh_if_needed(self) -> bool: - """ "Refresh Rexel OAuth2 tokens if needed.""" + """Refresh Rexel OAuth2 tokens if needed.""" if not self.context.is_expired() or not self.context.refresh_token: return False diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index dd21c92b..cd001b7e 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -20,7 +20,7 @@ ) from pyoverkiz.auth import Credentials, build_auth_strategy -from pyoverkiz.const import LOCAL_API_PATH, SUPPORTED_SERVERS +from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, CommandMode, Server from pyoverkiz.exceptions import ( AccessDeniedToGatewayException, @@ -128,13 +128,12 @@ def _create_local_ssl_context() -> ssl.SSLContext: class OverkizClient: """Interface class for the Overkiz API.""" - server: ServerConfig + server_config: ServerConfig setup: Setup | None devices: list[Device] gateways: list[Gateway] event_listener_id: str | None session: ClientSession - api_type: APIType _ssl: ssl.SSLContext | bool = True def __init__( @@ -151,7 +150,7 @@ def __init__( :param server: ServerConfig :param session: optional ClientSession """ - self.server = self._normalize_server(server) + self.server_config = self._normalize_server(server) self.setup: Setup | None = None self.devices: list[Device] = [] @@ -161,23 +160,22 @@ def __init__( self.session = session if session else ClientSession() self._ssl = verify_ssl - if LOCAL_API_PATH in self.server.endpoint: - self.api_type = APIType.LOCAL + if self.server_config.type == APIType.LOCAL and verify_ssl: + # To avoid security issues while authentication to local API, we add the following authority to + # our HTTPS client trust store: https://ca.overkiz.com/overkiz-root-ca-2048.crt + self._ssl = SSL_CONTEXT_LOCAL_API - if verify_ssl: - # To avoid security issues while authentication to local API, we add the following authority to - # our HTTPS client trust store: https://ca.overkiz.com/overkiz-root-ca-2048.crt - self._ssl = SSL_CONTEXT_LOCAL_API - - # Disable strict validation introduced in Python 3.13, which doesn't - # work with Overkiz self-signed gateway certificates - self._ssl.verify_flags &= ~ssl.VERIFY_X509_STRICT - else: - self.api_type = APIType.CLOUD + # Disable strict validation introduced in Python 3.13, which doesn't + # work with Overkiz self-signed gateway certificates + self._ssl.verify_flags &= ~ssl.VERIFY_X509_STRICT inferred_server_key = server_key or self._resolve_server_key() self._auth = build_auth_strategy( - inferred_server_key, self.server, credentials, self.session, self._ssl + inferred_server_key, + self.server_config, + credentials, + self.session, + self._ssl, ) async def __aenter__(self) -> OverkizClient: @@ -211,10 +209,13 @@ def _normalize_server(server: ServerConfig | Server | str) -> ServerConfig: def _resolve_server_key(self) -> Server: """Infer a `Server` enum for the current server configuration.""" for key, value in SUPPORTED_SERVERS.items(): - if self.server is value or self.server.endpoint == value.endpoint: + if ( + self.server_config is value + or self.server_config.endpoint == value.endpoint + ): return Server(key) - if self.api_type == APIType.LOCAL: + if self.server_config.type == APIType.LOCAL: return Server(Server.SOMFY_DEVELOPER_MODE) raise OverkizException( @@ -603,7 +604,7 @@ async def __get(self, path: str) -> Any: headers = dict(self._auth.auth_headers(path)) async with self.session.get( - f"{self.server.endpoint}{path}", + f"{self.server_config.endpoint}{path}", headers=headers, ssl=self._ssl, ) as response: @@ -618,7 +619,7 @@ async def __post( headers = dict(self._auth.auth_headers(path)) async with self.session.post( - f"{self.server.endpoint}{path}", + f"{self.server_config.endpoint}{path}", data=data, json=payload, headers=headers, @@ -633,7 +634,7 @@ async def __delete(self, path: str) -> None: headers = dict(self._auth.auth_headers(path)) async with self.session.delete( - f"{self.server.endpoint}{path}", + f"{self.server_config.endpoint}{path}", headers=headers, ssl=self._ssl, ) as response: diff --git a/pyoverkiz/const.py b/pyoverkiz/const.py index 8f93d301..905a8634 100644 --- a/pyoverkiz/const.py +++ b/pyoverkiz/const.py @@ -3,6 +3,7 @@ from __future__ import annotations from pyoverkiz.enums import Server +from pyoverkiz.enums.server import APIType from pyoverkiz.models import ServerConfig COZYTOUCH_ATLANTIC_API = "https://apis.groupe-atlantic.com" @@ -44,96 +45,96 @@ name="Atlantic Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Atlantic", - configuration_url=None, + type=APIType.CLOUD, ), Server.BRANDT: ServerConfig( name="Brandt Smart Control", endpoint="https://ha3-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Brandt", - configuration_url=None, + type=APIType.CLOUD, ), Server.FLEXOM: ServerConfig( name="Flexom", endpoint="https://ha108-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Bouygues", - configuration_url=None, + type=APIType.CLOUD, ), Server.HEXAOM_HEXACONNECT: ServerConfig( name="Hexaom HexaConnect", endpoint="https://ha5-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hexaom", - configuration_url=None, + type=APIType.CLOUD, ), Server.HI_KUMO_ASIA: ServerConfig( name="Hitachi Hi Kumo (Asia)", endpoint="https://ha203-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", - configuration_url=None, + type=APIType.CLOUD, ), Server.HI_KUMO_EUROPE: ServerConfig( name="Hitachi Hi Kumo (Europe)", endpoint="https://ha117-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", - configuration_url=None, + type=APIType.CLOUD, ), Server.HI_KUMO_OCEANIA: ServerConfig( name="Hitachi Hi Kumo (Oceania)", endpoint="https://ha203-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", - configuration_url=None, + type=APIType.CLOUD, ), Server.NEXITY: ServerConfig( name="Nexity Eugénie", endpoint="https://ha106-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Nexity", - configuration_url=None, + type=APIType.CLOUD, ), Server.REXEL: ServerConfig( name="Rexel Energeasy Connect", endpoint=REXEL_BACKEND_API, manufacturer="Rexel", - configuration_url="https://utilisateur.energeasyconnect.com/user/#/zone/equipements", + type=APIType.CLOUD, ), Server.SAUTER_COZYTOUCH: ServerConfig( # duplicate of Atlantic Cozytouch name="Sauter Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Sauter", - configuration_url=None, + type=APIType.CLOUD, ), Server.SIMU_LIVEIN2: ServerConfig( # alias of https://tahomalink.com name="SIMU (LiveIn2)", endpoint="https://ha101-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", - configuration_url=None, + type=APIType.CLOUD, ), Server.SOMFY_EUROPE: ServerConfig( # alias of https://tahomalink.com name="Somfy (Europe)", endpoint="https://ha101-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", - configuration_url=None, + type=APIType.CLOUD, ), Server.SOMFY_AMERICA: ServerConfig( name="Somfy (North America)", endpoint="https://ha401-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", - configuration_url=None, + type=APIType.CLOUD, ), Server.SOMFY_OCEANIA: ServerConfig( name="Somfy (Oceania)", endpoint="https://ha201-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", - configuration_url=None, + type=APIType.CLOUD, ), Server.THERMOR_COZYTOUCH: ServerConfig( # duplicate of Atlantic Cozytouch name="Thermor Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Thermor", - configuration_url=None, + type=APIType.CLOUD, ), Server.UBIWIZZ: ServerConfig( name="Ubiwizz", endpoint="https://ha129-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Decelect", - configuration_url=None, + type=APIType.CLOUD, ), } diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index e5a72284..d182c974 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -24,6 +24,7 @@ ) from pyoverkiz.enums.command import OverkizCommand, OverkizCommandParam from pyoverkiz.enums.protocol import Protocol +from pyoverkiz.enums.server import APIType from pyoverkiz.obfuscate import obfuscate_email, obfuscate_id, obfuscate_string from pyoverkiz.types import DATA_TYPE_TO_PYTHON, StateType @@ -965,7 +966,8 @@ class ServerConfig: name: str endpoint: str manufacturer: str - configuration_url: str | None + type: APIType | str + configuration_url: str | None = None @define(kw_only=True) diff --git a/pyoverkiz/utils.py b/pyoverkiz/utils.py index 64129fbf..0aabe69f 100644 --- a/pyoverkiz/utils.py +++ b/pyoverkiz/utils.py @@ -5,21 +5,43 @@ import re from pyoverkiz.const import LOCAL_API_PATH +from pyoverkiz.enums.server import APIType from pyoverkiz.models import ServerConfig -def generate_local_server( +def create_local_server_config( + *, host: str, name: str = "Somfy Developer Mode", manufacturer: str = "Somfy", + type: APIType = APIType.LOCAL, configuration_url: str | None = None, ) -> ServerConfig: """Generate server configuration for a local API (Somfy Developer mode).""" - return ServerConfig( + return create_server_config( name=name, endpoint=f"https://{host}{LOCAL_API_PATH}", manufacturer=manufacturer, configuration_url=configuration_url, + type=type, + ) + + +def create_server_config( + *, + name: str, + endpoint: str, + manufacturer: str, + type: APIType = APIType.CLOUD, + configuration_url: str | None = None, +) -> ServerConfig: + """Generate server configuration with the provided endpoint and metadata.""" + return ServerConfig( + name=name, + endpoint=endpoint, + manufacturer=manufacturer, + configuration_url=configuration_url, + type=type, ) diff --git a/tests/test_client.py b/tests/test_client.py index 0882a2e6..a35f783f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -18,7 +18,7 @@ from pyoverkiz.client import OverkizClient from pyoverkiz.enums import APIType, DataType, Server from pyoverkiz.models import Option -from pyoverkiz.utils import generate_local_server +from pyoverkiz.utils import create_local_server_config CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -38,7 +38,7 @@ async def client(self): async def local_client(self): """Fixture providing an OverkizClient configured for a local (developer) server.""" return OverkizClient( - server=generate_local_server("gateway-1234-5678-1243.local:8443"), + server=create_local_server_config("gateway-1234-5678-1243.local:8443"), credentials=LocalTokenCredentials("token"), ) diff --git a/tests/test_utils.py b/tests/test_utils.py index a951078f..8b33b8f7 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,7 +2,7 @@ import pytest -from pyoverkiz.utils import generate_local_server, is_overkiz_gateway +from pyoverkiz.utils import create_local_server_config, is_overkiz_gateway LOCAL_HOST = "gateway-1234-5678-1243.local:8443" LOCAL_HOST_BY_IP = "192.168.1.105:8443" @@ -11,9 +11,9 @@ class TestUtils: """Tests for utility helpers like local server generation and gateway checks.""" - def test_generate_local_server(self): + def test_create_local_server_config(self): """Create a local server descriptor using the host and default values.""" - local_server = generate_local_server(host=LOCAL_HOST) + local_server = create_local_server_config(host=LOCAL_HOST) assert local_server assert ( @@ -24,9 +24,9 @@ def test_generate_local_server(self): assert local_server.name == "Somfy Developer Mode" assert local_server.configuration_url is None - def test_generate_local_server_by_ip(self): + def test_create_local_server_config_by_ip(self): """Create a local server descriptor using an IP host and custom fields.""" - local_server = generate_local_server( + local_server = create_local_server_config( host=LOCAL_HOST_BY_IP, manufacturer="Test Manufacturer", name="Test Name", From d00fe89c481cc5b573eb02b7e7080168b372a82d Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 17:13:11 +0000 Subject: [PATCH 07/18] Remove deprecated local token features --- pyoverkiz/client.py | 49 --------------------------------------------- pyoverkiz/models.py | 12 ----------- 2 files changed, 61 deletions(-) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index cd001b7e..017fe351 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -54,7 +54,6 @@ Execution, Gateway, HistoryExecution, - LocalToken, Option, OptionParameter, Place, @@ -495,54 +494,6 @@ async def get_places(self) -> Place: places = Place(**humps.decamelize(response)) return places - @retry_on_auth_error - async def generate_local_token(self, gateway_id: str) -> str: - """Generates a new token. - - Access scope : Full enduser API access (enduser/*). - """ - response = await self.__get(f"config/{gateway_id}/local/tokens/generate") - - return cast(str, response["token"]) - - @retry_on_auth_error - async def activate_local_token( - self, gateway_id: str, token: str, label: str, scope: str = "devmode" - ) -> str: - """Create a token. - - Access scope : Full enduser API access (enduser/*). - """ - response = await self.__post( - f"config/{gateway_id}/local/tokens", - {"label": label, "token": token, "scope": scope}, - ) - - return cast(str, response["requestId"]) - - @retry_on_auth_error - async def get_local_tokens( - self, gateway_id: str, scope: str = "devmode" - ) -> list[LocalToken]: - """Get all gateway tokens with the given scope. - - Access scope : Full enduser API access (enduser/*). - """ - response = await self.__get(f"config/{gateway_id}/local/tokens/{scope}") - local_tokens = [LocalToken(**lt) for lt in humps.decamelize(response)] - - return local_tokens - - @retry_on_auth_error - async def delete_local_token(self, gateway_id: str, uuid: str) -> bool: - """Delete a token. - - Access scope : Full enduser API access (enduser/*). - """ - await self.__delete(f"config/{gateway_id}/local/tokens/{uuid}") - - return True - @retry_on_auth_error async def execute_scenario(self, oid: str) -> str: """Execute a scenario.""" diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index d182c974..53869352 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -970,18 +970,6 @@ class ServerConfig: configuration_url: str | None = None -@define(kw_only=True) -class LocalToken: - """Descriptor for a local gateway token.""" - - label: str - gateway_id: str = field(repr=obfuscate_id, default=None) - gateway_creation_time: int - uuid: str - scope: str - expiration_time: int | None - - @define(kw_only=True) class OptionParameter: """Key/value pair representing option parameter.""" From 36d48790fcd26198df61cab2166b0e7dc6a314fa Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 17:16:06 +0000 Subject: [PATCH 08/18] Set user agent --- pyoverkiz/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 017fe351..6157d313 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -156,7 +156,11 @@ def __init__( self.gateways: list[Gateway] = [] self.event_listener_id: str | None = None - self.session = session if session else ClientSession() + self.session = ( + session + if session + else ClientSession(headers={"User-Agent": "python-overkiz-api"}) + ) self._ssl = verify_ssl if self.server_config.type == APIType.LOCAL and verify_ssl: From 9e8f33b1549b62a50fdcedd200936e806e8e245f Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 18:21:06 +0000 Subject: [PATCH 09/18] Refactor authentication strategy to use server configuration diraectly and update related server handling --- pyoverkiz/auth/factory.py | 26 ++++++-------------------- pyoverkiz/client.py | 7 ++++--- pyoverkiz/const.py | 16 ++++++++++++++++ pyoverkiz/models.py | 3 ++- pyoverkiz/utils.py | 6 +++++- 5 files changed, 33 insertions(+), 25 deletions(-) diff --git a/pyoverkiz/auth/factory.py b/pyoverkiz/auth/factory.py index 2554431b..8b28f273 100644 --- a/pyoverkiz/auth/factory.py +++ b/pyoverkiz/auth/factory.py @@ -23,26 +23,21 @@ SessionLoginStrategy, SomfyAuthStrategy, ) -from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, Server from pyoverkiz.models import ServerConfig def build_auth_strategy( - server_key: str | Server | None, + *, server_config: ServerConfig, credentials: Credentials, session: ClientSession, ssl_context: ssl.SSLContext | bool, ) -> Any: """Build the correct auth strategy for the given server and credentials.""" - # Normalize server key - try: - key = Server(server_key) if server_key else _match_server_key(server_config) - except ValueError: - key = None + server: Server | None = server_config.server - if key == Server.SOMFY_EUROPE: + if server == Server.SOMFY_EUROPE: return SomfyAuthStrategy( _ensure_username_password(credentials), session, @@ -51,7 +46,7 @@ def build_auth_strategy( server_config.type, ) - if key in { + if server in { Server.ATLANTIC_COZYTOUCH, Server.THERMOR_COZYTOUCH, Server.SAUTER_COZYTOUCH, @@ -64,7 +59,7 @@ def build_auth_strategy( server_config.type, ) - if key == Server.NEXITY: + if server == Server.NEXITY: return NexityAuthStrategy( _ensure_username_password(credentials), session, @@ -73,7 +68,7 @@ def build_auth_strategy( server_config.type, ) - if key == Server.REXEL: + if server == Server.REXEL: return RexelAuthStrategy( _ensure_rexel(credentials), session, @@ -111,15 +106,6 @@ def build_auth_strategy( ) -def _match_server_key(server: ServerConfig) -> Server: - """Find the `Server` enum corresponding to a `ServerConfig` entry.""" - for key, value in SUPPORTED_SERVERS.items(): - if server is value or server.endpoint == value.endpoint: - return Server(key) - - raise ValueError("Unable to match server to a known Server enum.") - - def _ensure_username_password(credentials: Credentials) -> UsernamePasswordCredentials: """Validate that credentials are username/password based.""" if not isinstance(credentials, UsernamePasswordCredentials): diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 6157d313..5ca6f97a 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -172,9 +172,7 @@ def __init__( # work with Overkiz self-signed gateway certificates self._ssl.verify_flags &= ~ssl.VERIFY_X509_STRICT - inferred_server_key = server_key or self._resolve_server_key() self._auth = build_auth_strategy( - inferred_server_key, self.server_config, credentials, self.session, @@ -211,6 +209,9 @@ def _normalize_server(server: ServerConfig | Server | str) -> ServerConfig: def _resolve_server_key(self) -> Server: """Infer a `Server` enum for the current server configuration.""" + if self.server_config.server: + return self.server_config.server + for key, value in SUPPORTED_SERVERS.items(): if ( self.server_config is value @@ -243,7 +244,7 @@ async def login( """ await self._auth.login() - if self.api_type == APIType.LOCAL: + if self.server_config.type == APIType.LOCAL: if register_event_listener: await self.register_event_listener() else: diff --git a/pyoverkiz/const.py b/pyoverkiz/const.py index 905a8634..b4e515b1 100644 --- a/pyoverkiz/const.py +++ b/pyoverkiz/const.py @@ -42,96 +42,112 @@ SUPPORTED_SERVERS: dict[str, ServerConfig] = { Server.ATLANTIC_COZYTOUCH: ServerConfig( + server=Server.ATLANTIC_COZYTOUCH, name="Atlantic Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Atlantic", type=APIType.CLOUD, ), Server.BRANDT: ServerConfig( + server=Server.BRANDT, name="Brandt Smart Control", endpoint="https://ha3-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Brandt", type=APIType.CLOUD, ), Server.FLEXOM: ServerConfig( + server=Server.FLEXOM, name="Flexom", endpoint="https://ha108-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Bouygues", type=APIType.CLOUD, ), Server.HEXAOM_HEXACONNECT: ServerConfig( + server=Server.HEXAOM_HEXACONNECT, name="Hexaom HexaConnect", endpoint="https://ha5-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hexaom", type=APIType.CLOUD, ), Server.HI_KUMO_ASIA: ServerConfig( + server=Server.HI_KUMO_ASIA, name="Hitachi Hi Kumo (Asia)", endpoint="https://ha203-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", type=APIType.CLOUD, ), Server.HI_KUMO_EUROPE: ServerConfig( + server=Server.HI_KUMO_EUROPE, name="Hitachi Hi Kumo (Europe)", endpoint="https://ha117-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", type=APIType.CLOUD, ), Server.HI_KUMO_OCEANIA: ServerConfig( + server=Server.HI_KUMO_OCEANIA, name="Hitachi Hi Kumo (Oceania)", endpoint="https://ha203-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", type=APIType.CLOUD, ), Server.NEXITY: ServerConfig( + server=Server.NEXITY, name="Nexity Eugénie", endpoint="https://ha106-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Nexity", type=APIType.CLOUD, ), Server.REXEL: ServerConfig( + server=Server.REXEL, name="Rexel Energeasy Connect", endpoint=REXEL_BACKEND_API, manufacturer="Rexel", type=APIType.CLOUD, ), Server.SAUTER_COZYTOUCH: ServerConfig( # duplicate of Atlantic Cozytouch + server=Server.SAUTER_COZYTOUCH, name="Sauter Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Sauter", type=APIType.CLOUD, ), Server.SIMU_LIVEIN2: ServerConfig( # alias of https://tahomalink.com + server=Server.SIMU_LIVEIN2, name="SIMU (LiveIn2)", endpoint="https://ha101-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", type=APIType.CLOUD, ), Server.SOMFY_EUROPE: ServerConfig( # alias of https://tahomalink.com + server=Server.SOMFY_EUROPE, name="Somfy (Europe)", endpoint="https://ha101-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", type=APIType.CLOUD, ), Server.SOMFY_AMERICA: ServerConfig( + server=Server.SOMFY_AMERICA, name="Somfy (North America)", endpoint="https://ha401-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", type=APIType.CLOUD, ), Server.SOMFY_OCEANIA: ServerConfig( + server=Server.SOMFY_OCEANIA, name="Somfy (Oceania)", endpoint="https://ha201-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", type=APIType.CLOUD, ), Server.THERMOR_COZYTOUCH: ServerConfig( # duplicate of Atlantic Cozytouch + server=Server.THERMOR_COZYTOUCH, name="Thermor Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Thermor", type=APIType.CLOUD, ), Server.UBIWIZZ: ServerConfig( + server=Server.UBIWIZZ, name="Ubiwizz", endpoint="https://ha129-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Decelect", diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index 53869352..3d76d1e9 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -24,7 +24,7 @@ ) from pyoverkiz.enums.command import OverkizCommand, OverkizCommandParam from pyoverkiz.enums.protocol import Protocol -from pyoverkiz.enums.server import APIType +from pyoverkiz.enums.server import APIType, Server from pyoverkiz.obfuscate import obfuscate_email, obfuscate_id, obfuscate_string from pyoverkiz.types import DATA_TYPE_TO_PYTHON, StateType @@ -963,6 +963,7 @@ def __init__( class ServerConfig: """Connection target details for an Overkiz-compatible server.""" + server: Server | None = None name: str endpoint: str manufacturer: str diff --git a/pyoverkiz/utils.py b/pyoverkiz/utils.py index 0aabe69f..d810f16c 100644 --- a/pyoverkiz/utils.py +++ b/pyoverkiz/utils.py @@ -5,7 +5,7 @@ import re from pyoverkiz.const import LOCAL_API_PATH -from pyoverkiz.enums.server import APIType +from pyoverkiz.enums.server import APIType, Server from pyoverkiz.models import ServerConfig @@ -15,6 +15,7 @@ def create_local_server_config( name: str = "Somfy Developer Mode", manufacturer: str = "Somfy", type: APIType = APIType.LOCAL, + server: Server | None = Server.SOMFY_DEVELOPER_MODE, configuration_url: str | None = None, ) -> ServerConfig: """Generate server configuration for a local API (Somfy Developer mode).""" @@ -22,6 +23,7 @@ def create_local_server_config( name=name, endpoint=f"https://{host}{LOCAL_API_PATH}", manufacturer=manufacturer, + server=server, configuration_url=configuration_url, type=type, ) @@ -32,11 +34,13 @@ def create_server_config( name: str, endpoint: str, manufacturer: str, + server: Server | None = None, type: APIType = APIType.CLOUD, configuration_url: str | None = None, ) -> ServerConfig: """Generate server configuration with the provided endpoint and metadata.""" return ServerConfig( + server=server, name=name, endpoint=endpoint, manufacturer=manufacturer, From a2c8a42d059dfd357aba25de9a30e35ba4c7e655 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 18:24:03 +0000 Subject: [PATCH 10/18] Fix --- pyoverkiz/client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 5ca6f97a..e58bf783 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -173,10 +173,10 @@ def __init__( self._ssl.verify_flags &= ~ssl.VERIFY_X509_STRICT self._auth = build_auth_strategy( - self.server_config, - credentials, - self.session, - self._ssl, + server_config=self.server_config, + credentials=credentials, + session=self.session, + ssl_context=self._ssl, ) async def __aenter__(self) -> OverkizClient: From 33f563ced21bbec0a6c7fc3987e4b2ba4aac2b04 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 18:27:07 +0000 Subject: [PATCH 11/18] Refactor authentication strategy return type to AuthStrategy in build_auth_strategy function --- pyoverkiz/auth/factory.py | 4 ++-- pyoverkiz/client.py | 23 ++--------------------- 2 files changed, 4 insertions(+), 23 deletions(-) diff --git a/pyoverkiz/auth/factory.py b/pyoverkiz/auth/factory.py index 8b28f273..c0901ae6 100644 --- a/pyoverkiz/auth/factory.py +++ b/pyoverkiz/auth/factory.py @@ -3,7 +3,6 @@ from __future__ import annotations import ssl -from typing import Any from aiohttp import ClientSession @@ -15,6 +14,7 @@ UsernamePasswordCredentials, ) from pyoverkiz.auth.strategies import ( + AuthStrategy, BearerTokenAuthStrategy, CozytouchAuthStrategy, LocalTokenAuthStrategy, @@ -33,7 +33,7 @@ def build_auth_strategy( credentials: Credentials, session: ClientSession, ssl_context: ssl.SSLContext | bool, -) -> Any: +) -> AuthStrategy: """Build the correct auth strategy for the given server and credentials.""" server: Server | None = server_config.server diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index e58bf783..82a8a1a2 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -19,7 +19,7 @@ ServerDisconnectedError, ) -from pyoverkiz.auth import Credentials, build_auth_strategy +from pyoverkiz.auth import AuthStrategy, Credentials, build_auth_strategy from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, CommandMode, Server from pyoverkiz.exceptions import ( @@ -134,6 +134,7 @@ class OverkizClient: event_listener_id: str | None session: ClientSession _ssl: ssl.SSLContext | bool = True + _auth: AuthStrategy def __init__( self, @@ -142,7 +143,6 @@ def __init__( credentials: Credentials, verify_ssl: bool = True, session: ClientSession | None = None, - server_key: Server | str | None = None, ) -> None: """Constructor. @@ -207,25 +207,6 @@ def _normalize_server(server: ServerConfig | Server | str) -> ServerConfig: f"Unknown server '{server_key}'. Provide a supported server key or ServerConfig instance." ) from error - def _resolve_server_key(self) -> Server: - """Infer a `Server` enum for the current server configuration.""" - if self.server_config.server: - return self.server_config.server - - for key, value in SUPPORTED_SERVERS.items(): - if ( - self.server_config is value - or self.server_config.endpoint == value.endpoint - ): - return Server(key) - - if self.server_config.type == APIType.LOCAL: - return Server(Server.SOMFY_DEVELOPER_MODE) - - raise OverkizException( - "Unknown server configuration; provide server_key explicitly." - ) - async def close(self) -> None: """Close the session.""" if self.event_listener_id: From 6ec3377781db8b551e90bee48347bd7b904a38d9 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 19:35:32 +0000 Subject: [PATCH 12/18] Add constructor to ServerConfig for better initialization of fields --- pyoverkiz/models.py | 25 +++++++++++++++++++++++-- pyoverkiz/utils.py | 16 ++++++++-------- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index 3d76d1e9..5d1bec43 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -963,13 +963,34 @@ def __init__( class ServerConfig: """Connection target details for an Overkiz-compatible server.""" - server: Server | None = None + server: Server | None name: str endpoint: str manufacturer: str - type: APIType | str + type: APIType configuration_url: str | None = None + def __init__( + self, + *, + server: Server | str | None = None, + name: str, + endpoint: str, + manufacturer: str, + type: str | APIType, + configuration_url: str | None = None, + **_: Any, + ) -> None: + """Initialize ServerConfig and convert enum fields.""" + self.server = ( + server if isinstance(server, Server) or server is None else Server(server) + ) + self.name = name + self.endpoint = endpoint + self.manufacturer = manufacturer + self.type = type if isinstance(type, APIType) else APIType(type) + self.configuration_url = configuration_url + @define(kw_only=True) class OptionParameter: diff --git a/pyoverkiz/utils.py b/pyoverkiz/utils.py index d810f16c..23d6d371 100644 --- a/pyoverkiz/utils.py +++ b/pyoverkiz/utils.py @@ -14,8 +14,6 @@ def create_local_server_config( host: str, name: str = "Somfy Developer Mode", manufacturer: str = "Somfy", - type: APIType = APIType.LOCAL, - server: Server | None = Server.SOMFY_DEVELOPER_MODE, configuration_url: str | None = None, ) -> ServerConfig: """Generate server configuration for a local API (Somfy Developer mode).""" @@ -23,9 +21,9 @@ def create_local_server_config( name=name, endpoint=f"https://{host}{LOCAL_API_PATH}", manufacturer=manufacturer, - server=server, + server=Server.SOMFY_DEVELOPER_MODE, configuration_url=configuration_url, - type=type, + type=APIType.LOCAL, ) @@ -34,18 +32,20 @@ def create_server_config( name: str, endpoint: str, manufacturer: str, - server: Server | None = None, - type: APIType = APIType.CLOUD, + server: Server | str | None = None, + type: APIType | str = APIType.CLOUD, configuration_url: str | None = None, ) -> ServerConfig: """Generate server configuration with the provided endpoint and metadata.""" return ServerConfig( - server=server, + server=server + if isinstance(server, Server) or server is None + else Server(server), name=name, endpoint=endpoint, manufacturer=manufacturer, configuration_url=configuration_url, - type=type, + type=type if isinstance(type, APIType) else APIType(type), ) From d5e418a472b77afb9e177840e68109957a7d8cc9 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 19:46:26 +0000 Subject: [PATCH 13/18] Add comment --- pyoverkiz/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 82a8a1a2..261c0a87 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -229,6 +229,7 @@ async def login( if register_event_listener: await self.register_event_listener() else: + # Validate local API token by calling a simple endpoint await self.get_gateways() return True From f0c8e3a981285a02efbf28a4e891cc0750f4b647 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Mon, 29 Dec 2025 19:50:51 +0000 Subject: [PATCH 14/18] Fix tests --- tests/test_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index a35f783f..af68ffbf 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -38,19 +38,19 @@ async def client(self): async def local_client(self): """Fixture providing an OverkizClient configured for a local (developer) server.""" return OverkizClient( - server=create_local_server_config("gateway-1234-5678-1243.local:8443"), - credentials=LocalTokenCredentials("token"), + server=create_local_server_config(host="gateway-1234-5678-1243.local:8443"), + credentials=LocalTokenCredentials(token="token"), ) @pytest.mark.asyncio async def test_get_api_type_cloud(self, client: OverkizClient): """Verify that a cloud-configured client reports APIType.CLOUD.""" - assert client.api_type == APIType.CLOUD + assert client.server_config.type == APIType.CLOUD @pytest.mark.asyncio async def test_get_api_type_local(self, local_client: OverkizClient): """Verify that a local-configured client reports APIType.LOCAL.""" - assert local_client.api_type == APIType.LOCAL + assert local_client.server_config.type == APIType.LOCAL @pytest.mark.asyncio async def test_get_devices_basic(self, client: OverkizClient): From 38823d2083e0964f32586fee1622dabfee6d8294 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Thu, 1 Jan 2026 18:22:31 +0000 Subject: [PATCH 15/18] Remove unused import of SUPPORTED_SERVERS from README.md --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 6daa1815..3bcaa109 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,6 @@ import asyncio import time from pyoverkiz.auth.credentials import UsernamePasswordCredentials -from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.client import OverkizClient from pyoverkiz.models import Action from pyoverkiz.enums import Server, OverkizCommand From 3c9cfdfd9143a8e93949dd72d69b341cefa7ad5b Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 1 Jan 2026 22:34:20 +0100 Subject: [PATCH 16/18] Address PR review feedback: Fix timezone handling, improve error handling, and add auth tests (#1875) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [x] Fix datetime.now() calls to use UTC timezone (pyoverkiz/auth/strategies.py lines 203, 406; pyoverkiz/auth/base.py line 24) - [x] Handle 204 No Content responses properly in strategies.py line 123 - [x] Add error handling for OAuth token exchange responses in strategies.py line 396 - [x] Remove duplicate enum conversion logic in utils.py create_server_config function - [x] Fix SSL_CONTEXT_LOCAL_API mutation issue by creating a copy per client instance - [x] Add test coverage for authentication module (strategies.py, factory.py, credentials.py) - [x] Revert SSL context creation to avoid blocking I/O at runtime - [x] Add TODO fix comment for mypy type ignore workaround --- ✨ Let Copilot coding agent [set things up for you](https://github.com/iMicknl/python-overkiz-api/issues/new?title=✨+Set+up+Copilot+instructions&body=Configure%20instructions%20for%20this%20repository%20as%20documented%20in%20%5BBest%20practices%20for%20Copilot%20coding%20agent%20in%20your%20repository%5D%28https://gh.io/copilot-coding-agent-tips%29%2E%0A%0A%3COnboard%20this%20repo%3E&assignees=copilot) — coding agent works faster and does higher quality work when set up for your repo. --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: iMicknl <1424596+iMicknl@users.noreply.github.com> --- pyoverkiz/auth/base.py | 6 +- pyoverkiz/auth/strategies.py | 28 +- pyoverkiz/client.py | 7 +- pyoverkiz/utils.py | 9 +- tests/test_auth.py | 480 +++++++++++++++++++++++++++++++++++ 5 files changed, 516 insertions(+), 14 deletions(-) create mode 100644 tests/test_auth.py diff --git a/pyoverkiz/auth/base.py b/pyoverkiz/auth/base.py index 6a01e912..f4db7059 100644 --- a/pyoverkiz/auth/base.py +++ b/pyoverkiz/auth/base.py @@ -21,9 +21,9 @@ def is_expired(self, *, skew_seconds: int = 5) -> bool: if not self.expires_at: return False - return datetime.datetime.now() >= self.expires_at - datetime.timedelta( - seconds=skew_seconds - ) + return datetime.datetime.now( + datetime.UTC + ) >= self.expires_at - datetime.timedelta(seconds=skew_seconds) class AuthStrategy(Protocol): diff --git a/pyoverkiz/auth/strategies.py b/pyoverkiz/auth/strategies.py index 2c363ee4..761ba49c 100644 --- a/pyoverkiz/auth/strategies.py +++ b/pyoverkiz/auth/strategies.py @@ -121,6 +121,10 @@ async def _post_login(self, data: Mapping[str, Any]) -> None: f"Login failed for {self.server.name}: {response.status}" ) + # A 204 No Content response cannot have a body, so skip JSON parsing. + if response.status == 204: + return + result = await response.json() if not result.get("success"): raise BadCredentialsException("Login failed: bad credentials") @@ -200,9 +204,9 @@ async def _request_access_token( self.context.refresh_token = token.get("refresh_token") expires_in = token.get("expires_in") if expires_in: - self.context.expires_at = datetime.datetime.now() + datetime.timedelta( - seconds=cast(int, expires_in) - 5 - ) + self.context.expires_at = datetime.datetime.now( + datetime.UTC + ) + datetime.timedelta(seconds=cast(int, expires_in) - 5) class CozytouchAuthStrategy(SessionLoginStrategy): @@ -394,6 +398,18 @@ async def _exchange_token(self, payload: Mapping[str, str]) -> None: ) as response: token = await response.json() + # Handle OAuth error responses explicitly before accessing the access token. + error = token.get("error") + if error: + description = token.get("error_description") or token.get("message") + if description: + raise InvalidTokenException( + f"Error retrieving Rexel access token: {description}" + ) + raise InvalidTokenException( + f"Error retrieving Rexel access token: {error}" + ) + access_token = token.get("access_token") if not access_token: raise InvalidTokenException("No Rexel access token provided.") @@ -403,9 +419,9 @@ async def _exchange_token(self, payload: Mapping[str, str]) -> None: self.context.refresh_token = token.get("refresh_token") expires_in = token.get("expires_in") if expires_in: - self.context.expires_at = datetime.datetime.now() + datetime.timedelta( - seconds=cast(int, expires_in) - 5 - ) + self.context.expires_at = datetime.datetime.now( + datetime.UTC + ) + datetime.timedelta(seconds=cast(int, expires_in) - 5) @staticmethod def _ensure_consent(access_token: str) -> None: diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 261c0a87..b4d8e776 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -166,7 +166,12 @@ def __init__( if self.server_config.type == APIType.LOCAL and verify_ssl: # To avoid security issues while authentication to local API, we add the following authority to # our HTTPS client trust store: https://ca.overkiz.com/overkiz-root-ca-2048.crt - self._ssl = SSL_CONTEXT_LOCAL_API + # Create a copy of the SSL context to avoid mutating the shared global context + self._ssl = ssl.SSLContext(SSL_CONTEXT_LOCAL_API.protocol) + self._ssl.load_verify_locations( + cafile=os.path.dirname(os.path.realpath(__file__)) + + "/overkiz-root-ca-2048.crt" + ) # Disable strict validation introduced in Python 3.13, which doesn't # work with Overkiz self-signed gateway certificates diff --git a/pyoverkiz/utils.py b/pyoverkiz/utils.py index 23d6d371..f0666ecb 100644 --- a/pyoverkiz/utils.py +++ b/pyoverkiz/utils.py @@ -37,15 +37,16 @@ def create_server_config( configuration_url: str | None = None, ) -> ServerConfig: """Generate server configuration with the provided endpoint and metadata.""" + # TODO fix: ServerConfig.__init__ handles the enum conversion, but mypy doesn't recognize + # this due to attrs @define decorator generating __init__ with stricter signatures, + # so we need type: ignore comments. return ServerConfig( - server=server - if isinstance(server, Server) or server is None - else Server(server), + server=server, # type: ignore[arg-type] name=name, endpoint=endpoint, manufacturer=manufacturer, configuration_url=configuration_url, - type=type if isinstance(type, APIType) else APIType(type), + type=type, # type: ignore[arg-type] ) diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 00000000..16587298 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,480 @@ +"""Tests for authentication module.""" + +from __future__ import annotations + +import datetime +from unittest.mock import AsyncMock, MagicMock + +import pytest +from aiohttp import ClientSession + +from pyoverkiz.auth.base import AuthContext +from pyoverkiz.auth.credentials import ( + LocalTokenCredentials, + RexelOAuthCodeCredentials, + TokenCredentials, + UsernamePasswordCredentials, +) +from pyoverkiz.auth.factory import ( + _ensure_rexel, + _ensure_token, + _ensure_username_password, + build_auth_strategy, +) +from pyoverkiz.auth.strategies import ( + BearerTokenAuthStrategy, + CozytouchAuthStrategy, + LocalTokenAuthStrategy, + NexityAuthStrategy, + RexelAuthStrategy, + SessionLoginStrategy, + SomfyAuthStrategy, +) +from pyoverkiz.enums import APIType, Server +from pyoverkiz.models import ServerConfig + + +class TestAuthContext: + """Test AuthContext functionality.""" + + def test_not_expired_no_expiration(self): + """Test that context without expiration is not expired.""" + context = AuthContext(access_token="test_token") + assert not context.is_expired() + + def test_not_expired_future_expiration(self): + """Test that context with future expiration is not expired.""" + future = datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1) + context = AuthContext(access_token="test_token", expires_at=future) + assert not context.is_expired() + + def test_expired_past_expiration(self): + """Test that context with past expiration is expired.""" + past = datetime.datetime.now(datetime.UTC) - datetime.timedelta(hours=1) + context = AuthContext(access_token="test_token", expires_at=past) + assert context.is_expired() + + def test_expired_with_skew(self): + """Test that context respects skew time.""" + # Expires in 3 seconds, but default skew is 5 + soon = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=3) + context = AuthContext(access_token="test_token", expires_at=soon) + assert context.is_expired() + + def test_not_expired_with_custom_skew(self): + """Test that custom skew time can be provided.""" + soon = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=3) + context = AuthContext(access_token="test_token", expires_at=soon) + assert not context.is_expired(skew_seconds=1) + + +class TestCredentials: + """Test credential dataclasses.""" + + def test_username_password_credentials(self): + """Test UsernamePasswordCredentials creation.""" + creds = UsernamePasswordCredentials("user@example.com", "password123") + assert creds.username == "user@example.com" + assert creds.password == "password123" + + def test_token_credentials(self): + """Test TokenCredentials creation.""" + creds = TokenCredentials("my_token_123") + assert creds.token == "my_token_123" + + def test_local_token_credentials(self): + """Test LocalTokenCredentials creation.""" + creds = LocalTokenCredentials("local_token_456") + assert creds.token == "local_token_456" + assert isinstance(creds, TokenCredentials) + + def test_rexel_oauth_credentials(self): + """Test RexelOAuthCodeCredentials creation.""" + creds = RexelOAuthCodeCredentials("auth_code_xyz", "http://redirect.uri") + assert creds.code == "auth_code_xyz" + assert creds.redirect_uri == "http://redirect.uri" + + +class TestAuthFactory: + """Test authentication factory functions.""" + + def test_ensure_username_password_valid(self): + """Test that valid username/password credentials pass validation.""" + creds = UsernamePasswordCredentials("user", "pass") + result = _ensure_username_password(creds) + assert result is creds + + def test_ensure_username_password_invalid(self): + """Test that invalid credentials raise TypeError.""" + creds = TokenCredentials("token") + with pytest.raises(TypeError, match="UsernamePasswordCredentials are required"): + _ensure_username_password(creds) + + def test_ensure_token_valid(self): + """Test that valid token credentials pass validation.""" + creds = TokenCredentials("token") + result = _ensure_token(creds) + assert result is creds + + def test_ensure_token_local_valid(self): + """Test that LocalTokenCredentials also pass token validation.""" + creds = LocalTokenCredentials("local_token") + result = _ensure_token(creds) + assert result is creds + + def test_ensure_token_invalid(self): + """Test that invalid credentials raise TypeError.""" + creds = UsernamePasswordCredentials("user", "pass") + with pytest.raises(TypeError, match="TokenCredentials are required"): + _ensure_token(creds) + + def test_ensure_rexel_valid(self): + """Test that valid Rexel credentials pass validation.""" + creds = RexelOAuthCodeCredentials("code", "uri") + result = _ensure_rexel(creds) + assert result is creds + + def test_ensure_rexel_invalid(self): + """Test that invalid credentials raise TypeError.""" + creds = UsernamePasswordCredentials("user", "pass") + with pytest.raises(TypeError, match="RexelOAuthCodeCredentials are required"): + _ensure_rexel(creds) + + @pytest.mark.asyncio + async def test_build_auth_strategy_somfy(self): + """Test building Somfy auth strategy.""" + server_config = ServerConfig( + server=Server.SOMFY_EUROPE, + name="Somfy", + endpoint="https://api.somfy.com", + manufacturer="Somfy", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, SomfyAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_cozytouch(self): + """Test building Cozytouch auth strategy.""" + server_config = ServerConfig( + server=Server.ATLANTIC_COZYTOUCH, + name="Cozytouch", + endpoint="https://api.cozytouch.com", + manufacturer="Atlantic", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, CozytouchAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_nexity(self): + """Test building Nexity auth strategy.""" + server_config = ServerConfig( + server=Server.NEXITY, + name="Nexity", + endpoint="https://api.nexity.com", + manufacturer="Nexity", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, NexityAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_rexel(self): + """Test building Rexel auth strategy.""" + server_config = ServerConfig( + server=Server.REXEL, + name="Rexel", + endpoint="https://api.rexel.com", + manufacturer="Rexel", + type=APIType.CLOUD, + ) + credentials = RexelOAuthCodeCredentials("code", "http://redirect.uri") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, RexelAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_local_token(self): + """Test building local token auth strategy.""" + server_config = ServerConfig( + server=None, + name="Local", + endpoint="https://gateway.local", + manufacturer="Overkiz", + type=APIType.LOCAL, + ) + credentials = LocalTokenCredentials("local_token") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, LocalTokenAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_local_bearer(self): + """Test building local bearer token auth strategy.""" + server_config = ServerConfig( + server=None, + name="Local", + endpoint="https://gateway.local", + manufacturer="Overkiz", + type=APIType.LOCAL, + ) + credentials = TokenCredentials("bearer_token") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, BearerTokenAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_cloud_bearer(self): + """Test building cloud bearer token auth strategy.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Somfy Oceania", + endpoint="https://api.somfy.com.au", + manufacturer="Somfy", + type=APIType.CLOUD, + ) + credentials = TokenCredentials("bearer_token") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, BearerTokenAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_session_login(self): + """Test building generic session login auth strategy.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Somfy Oceania", + endpoint="https://api.somfy.com.au", + manufacturer="Somfy", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, SessionLoginStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_wrong_credentials_type(self): + """Test that wrong credentials type raises TypeError.""" + server_config = ServerConfig( + server=Server.SOMFY_EUROPE, + name="Somfy", + endpoint="https://api.somfy.com", + manufacturer="Somfy", + type=APIType.CLOUD, + ) + credentials = TokenCredentials("token") # Wrong type for Somfy + session = AsyncMock(spec=ClientSession) + + with pytest.raises(TypeError, match="UsernamePasswordCredentials are required"): + build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + +class TestSessionLoginStrategy: + """Test SessionLoginStrategy.""" + + @pytest.mark.asyncio + async def test_login_success(self): + """Test successful login with 200 response.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + mock_response = MagicMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={"success": True}) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + session.post = MagicMock(return_value=mock_response) + + strategy = SessionLoginStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + await strategy.login() + + session.post.assert_called_once() + + @pytest.mark.asyncio + async def test_login_204_no_content(self): + """Test login with 204 No Content response.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + mock_response = MagicMock() + mock_response.status = 204 + mock_response.json = AsyncMock() + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + session.post = MagicMock(return_value=mock_response) + + strategy = SessionLoginStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + await strategy.login() + + # Should not call json() for 204 response + assert not mock_response.json.called + + @pytest.mark.asyncio + async def test_refresh_if_needed_no_refresh(self): + """Test that refresh_if_needed returns False when no refresh needed.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = SessionLoginStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + result = await strategy.refresh_if_needed() + + assert not result + + def test_auth_headers_no_token(self): + """Test that auth headers return empty dict when no token.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = SessionLoginStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + headers = strategy.auth_headers() + + assert headers == {} + + +class TestBearerTokenAuthStrategy: + """Test BearerTokenAuthStrategy.""" + + @pytest.mark.asyncio + async def test_login_no_op(self): + """Test that login is a no-op for bearer tokens.""" + server_config = ServerConfig( + server=None, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = TokenCredentials("my_bearer_token") + session = AsyncMock(spec=ClientSession) + + strategy = BearerTokenAuthStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + result = await strategy.login() + + # Login should be a no-op + assert result is None + + def test_auth_headers_with_token(self): + """Test that auth headers include Bearer token.""" + server_config = ServerConfig( + server=None, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = TokenCredentials("my_bearer_token") + session = AsyncMock(spec=ClientSession) + + strategy = BearerTokenAuthStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + headers = strategy.auth_headers() + + assert headers == {"Authorization": "Bearer my_bearer_token"} From b4624719b3d3ff7ead96218de0a8419fc93a2e4e Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sat, 3 Jan 2026 19:58:36 +0000 Subject: [PATCH 17/18] Enhance SSL context handling for local API and improve Rexel auth strategy tests --- pyoverkiz/client.py | 22 ++++++++---------- pyoverkiz/utils.py | 7 ++---- tests/test_auth.py | 54 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 18 deletions(-) diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index b4d8e776..c81b20ea 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -113,10 +113,16 @@ def _create_local_ssl_context() -> ssl.SSLContext: This method is not async-friendly and should be called from a thread because it will load certificates from disk and do other blocking I/O. """ - return ssl.create_default_context( + context = ssl.create_default_context( cafile=os.path.dirname(os.path.realpath(__file__)) + "/overkiz-root-ca-2048.crt" ) + # Disable strict validation introduced in Python 3.13, which doesn't work with + # Overkiz self-signed gateway certificates. Applied once to the shared context. + context.verify_flags &= ~ssl.VERIFY_X509_STRICT + + return context + # The default SSLContext objects are created at import time # since they do blocking I/O to load certificates from disk, @@ -164,18 +170,8 @@ def __init__( self._ssl = verify_ssl if self.server_config.type == APIType.LOCAL and verify_ssl: - # To avoid security issues while authentication to local API, we add the following authority to - # our HTTPS client trust store: https://ca.overkiz.com/overkiz-root-ca-2048.crt - # Create a copy of the SSL context to avoid mutating the shared global context - self._ssl = ssl.SSLContext(SSL_CONTEXT_LOCAL_API.protocol) - self._ssl.load_verify_locations( - cafile=os.path.dirname(os.path.realpath(__file__)) - + "/overkiz-root-ca-2048.crt" - ) - - # Disable strict validation introduced in Python 3.13, which doesn't - # work with Overkiz self-signed gateway certificates - self._ssl.verify_flags &= ~ssl.VERIFY_X509_STRICT + # Use the prebuilt SSL context with disabled strict validation for local API. + self._ssl = SSL_CONTEXT_LOCAL_API self._auth = build_auth_strategy( server_config=self.server_config, diff --git a/pyoverkiz/utils.py b/pyoverkiz/utils.py index f0666ecb..0bb22dad 100644 --- a/pyoverkiz/utils.py +++ b/pyoverkiz/utils.py @@ -37,16 +37,13 @@ def create_server_config( configuration_url: str | None = None, ) -> ServerConfig: """Generate server configuration with the provided endpoint and metadata.""" - # TODO fix: ServerConfig.__init__ handles the enum conversion, but mypy doesn't recognize - # this due to attrs @define decorator generating __init__ with stricter signatures, - # so we need type: ignore comments. return ServerConfig( - server=server, # type: ignore[arg-type] + server=server, name=name, endpoint=endpoint, manufacturer=manufacturer, configuration_url=configuration_url, - type=type, # type: ignore[arg-type] + type=type, ) diff --git a/tests/test_auth.py b/tests/test_auth.py index 16587298..8106d29d 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -2,7 +2,9 @@ from __future__ import annotations +import base64 import datetime +import json from unittest.mock import AsyncMock, MagicMock import pytest @@ -29,8 +31,10 @@ RexelAuthStrategy, SessionLoginStrategy, SomfyAuthStrategy, + _decode_jwt_payload, ) from pyoverkiz.enums import APIType, Server +from pyoverkiz.exceptions import InvalidTokenException from pyoverkiz.models import ServerConfig @@ -478,3 +482,53 @@ def test_auth_headers_with_token(self): headers = strategy.auth_headers() assert headers == {"Authorization": "Bearer my_bearer_token"} + + +class TestRexelAuthStrategy: + """Tests for Rexel auth specifics.""" + + @pytest.mark.asyncio + async def test_exchange_token_error_response(self): + """Ensure OAuth error payloads raise InvalidTokenException before parsing access token.""" + server_config = ServerConfig( + server=Server.REXEL, + name="Rexel", + endpoint="https://api.rexel.com", + manufacturer="Rexel", + type=APIType.CLOUD, + ) + credentials = RexelOAuthCodeCredentials("code", "https://redirect") + session = AsyncMock(spec=ClientSession) + + mock_response = MagicMock() + mock_response.status = 400 + mock_response.json = AsyncMock( + return_value={"error": "invalid_grant", "error_description": "bad grant"} + ) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + session.post = MagicMock(return_value=mock_response) + + strategy = RexelAuthStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + + with pytest.raises(InvalidTokenException, match="bad grant"): + await strategy._exchange_token({"grant_type": "authorization_code"}) + + def test_ensure_consent_missing(self): + """Raising when JWT consent claim is missing or incorrect.""" + payload_segment = ( + base64.urlsafe_b64encode(json.dumps({"consent": "other"}).encode()) + .decode() + .rstrip("=") + ) + token = f"header.{payload_segment}.sig" + + with pytest.raises(InvalidTokenException, match="Consent is missing"): + RexelAuthStrategy._ensure_consent(token) + + def test_decode_jwt_payload_invalid_format(self): + """Malformed tokens raise InvalidTokenException during decoding.""" + with pytest.raises(InvalidTokenException): + _decode_jwt_payload("invalid.token") From db78f52cc1fea25be4af27c9955cfe7107507d57 Mon Sep 17 00:00:00 2001 From: Mick Vleeshouwer Date: Sat, 3 Jan 2026 20:02:30 +0000 Subject: [PATCH 18/18] Add mypy ignore --- pyoverkiz/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyoverkiz/utils.py b/pyoverkiz/utils.py index 0bb22dad..6c28cdd3 100644 --- a/pyoverkiz/utils.py +++ b/pyoverkiz/utils.py @@ -38,12 +38,12 @@ def create_server_config( ) -> ServerConfig: """Generate server configuration with the provided endpoint and metadata.""" return ServerConfig( - server=server, + server=server, # type: ignore[arg-type] name=name, endpoint=endpoint, manufacturer=manufacturer, configuration_url=configuration_url, - type=type, + type=type, # type: ignore[arg-type] )