diff --git a/README.md b/README.md index bfe04a20..3bcaa109 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ pip install pyoverkiz import asyncio import time -from pyoverkiz.const import SUPPORTED_SERVERS +from pyoverkiz.auth.credentials import UsernamePasswordCredentials from pyoverkiz.client import OverkizClient from pyoverkiz.models import Action from pyoverkiz.enums import Server, OverkizCommand @@ -48,7 +48,8 @@ PASSWORD = "" async def main() -> None: async with OverkizClient( - USERNAME, PASSWORD, server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE] + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials(USERNAME, PASSWORD), ) as client: try: await client.login() @@ -90,14 +91,11 @@ asyncio.run(main()) ```python import asyncio import time -import aiohttp +from pyoverkiz.auth.credentials import LocalTokenCredentials from pyoverkiz.client import OverkizClient -from pyoverkiz.const import SUPPORTED_SERVERS, OverkizServer -from pyoverkiz.enums import Server +from pyoverkiz.utils import create_local_server_config -USERNAME = "" -PASSWORD = "" LOCAL_GATEWAY = "gateway-xxxx-xxxx-xxxx.local" # or use the IP address of your gateway VERIFY_SSL = True # set verify_ssl to False if you don't use the .local hostname @@ -105,23 +103,10 @@ VERIFY_SSL = True # set verify_ssl to False if you don't use the .local hostnam async def main() -> None: token = "" # generate your token via the Somfy app and include it here - # Local Connection - session = aiohttp.ClientSession( - connector=aiohttp.TCPConnector(verify_ssl=VERIFY_SSL) - ) - async with OverkizClient( - username="", - password="", - token=token, - session=session, + server=create_local_server_config(host=LOCAL_GATEWAY), + credentials=LocalTokenCredentials(token), verify_ssl=VERIFY_SSL, - server=OverkizServer( - name="Somfy TaHoma (local)", - endpoint=f"https://{LOCAL_GATEWAY}:8443/enduser-mobile-web/1/enduserAPI/", - manufacturer="Somfy", - configuration_url=None, - ), ) as client: await client.login() diff --git a/pyoverkiz/auth/__init__.py b/pyoverkiz/auth/__init__.py new file mode 100644 index 00000000..535a2614 --- /dev/null +++ b/pyoverkiz/auth/__init__.py @@ -0,0 +1,24 @@ +"""Authentication module for pyoverkiz.""" + +from __future__ import annotations + +from pyoverkiz.auth.base import AuthContext, AuthStrategy +from pyoverkiz.auth.credentials import ( + Credentials, + LocalTokenCredentials, + RexelOAuthCodeCredentials, + TokenCredentials, + UsernamePasswordCredentials, +) +from pyoverkiz.auth.factory import build_auth_strategy + +__all__ = [ + "AuthContext", + "AuthStrategy", + "Credentials", + "LocalTokenCredentials", + "RexelOAuthCodeCredentials", + "TokenCredentials", + "UsernamePasswordCredentials", + "build_auth_strategy", +] diff --git a/pyoverkiz/auth/base.py b/pyoverkiz/auth/base.py new file mode 100644 index 00000000..f4db7059 --- /dev/null +++ b/pyoverkiz/auth/base.py @@ -0,0 +1,42 @@ +"""Base classes for authentication strategies.""" + +from __future__ import annotations + +import datetime +from collections.abc import Mapping +from dataclasses import dataclass +from typing import Protocol + + +@dataclass(slots=True) +class AuthContext: + """Authentication context holding tokens and expiration.""" + + access_token: str | None = None + refresh_token: str | None = None + expires_at: datetime.datetime | None = None + + def is_expired(self, *, skew_seconds: int = 5) -> bool: + """Check if the access token is expired, considering a skew time.""" + if not self.expires_at: + return False + + return datetime.datetime.now( + datetime.UTC + ) >= self.expires_at - datetime.timedelta(seconds=skew_seconds) + + +class AuthStrategy(Protocol): + """Protocol for authentication strategies.""" + + async def login(self) -> None: + """Perform login to obtain tokens.""" + + async def refresh_if_needed(self) -> bool: + """Refresh tokens if they are expired. Return True if refreshed.""" + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Generate authentication headers for requests.""" + + async def close(self) -> None: + """Clean up any resources held by the strategy.""" diff --git a/pyoverkiz/auth/credentials.py b/pyoverkiz/auth/credentials.py new file mode 100644 index 00000000..777f950b --- /dev/null +++ b/pyoverkiz/auth/credentials.py @@ -0,0 +1,37 @@ +"""Credentials for authentication strategies.""" + +from __future__ import annotations + +from dataclasses import dataclass + + +class Credentials: + """Marker base class for auth credentials.""" + + +@dataclass(slots=True) +class UsernamePasswordCredentials(Credentials): + """Credentials using username and password.""" + + username: str + password: str + + +@dataclass(slots=True) +class TokenCredentials(Credentials): + """Credentials using an (API) token.""" + + token: str + + +@dataclass(slots=True) +class LocalTokenCredentials(TokenCredentials): + """Credentials using a local API token.""" + + +@dataclass(slots=True) +class RexelOAuthCodeCredentials(Credentials): + """Credentials using Rexel OAuth2 authorization code.""" + + code: str + redirect_uri: str diff --git a/pyoverkiz/auth/factory.py b/pyoverkiz/auth/factory.py new file mode 100644 index 00000000..c0901ae6 --- /dev/null +++ b/pyoverkiz/auth/factory.py @@ -0,0 +1,127 @@ +"""Factory to build authentication strategies based on server and credentials.""" + +from __future__ import annotations + +import ssl + +from aiohttp import ClientSession + +from pyoverkiz.auth.credentials import ( + Credentials, + LocalTokenCredentials, + RexelOAuthCodeCredentials, + TokenCredentials, + UsernamePasswordCredentials, +) +from pyoverkiz.auth.strategies import ( + AuthStrategy, + BearerTokenAuthStrategy, + CozytouchAuthStrategy, + LocalTokenAuthStrategy, + NexityAuthStrategy, + RexelAuthStrategy, + SessionLoginStrategy, + SomfyAuthStrategy, +) +from pyoverkiz.enums import APIType, Server +from pyoverkiz.models import ServerConfig + + +def build_auth_strategy( + *, + server_config: ServerConfig, + credentials: Credentials, + session: ClientSession, + ssl_context: ssl.SSLContext | bool, +) -> AuthStrategy: + """Build the correct auth strategy for the given server and credentials.""" + server: Server | None = server_config.server + + if server == Server.SOMFY_EUROPE: + return SomfyAuthStrategy( + _ensure_username_password(credentials), + session, + server_config, + ssl_context, + server_config.type, + ) + + if server in { + Server.ATLANTIC_COZYTOUCH, + Server.THERMOR_COZYTOUCH, + Server.SAUTER_COZYTOUCH, + }: + return CozytouchAuthStrategy( + _ensure_username_password(credentials), + session, + server_config, + ssl_context, + server_config.type, + ) + + if server == Server.NEXITY: + return NexityAuthStrategy( + _ensure_username_password(credentials), + session, + server_config, + ssl_context, + server_config.type, + ) + + if server == Server.REXEL: + return RexelAuthStrategy( + _ensure_rexel(credentials), + session, + server_config, + ssl_context, + server_config.type, + ) + + if server_config.type == APIType.LOCAL: + if isinstance(credentials, LocalTokenCredentials): + return LocalTokenAuthStrategy( + credentials, session, server_config, ssl_context, server_config.type + ) + return BearerTokenAuthStrategy( + _ensure_token(credentials), + session, + server_config, + ssl_context, + server_config.type, + ) + + if isinstance(credentials, TokenCredentials) and not isinstance( + credentials, LocalTokenCredentials + ): + return BearerTokenAuthStrategy( + credentials, session, server_config, ssl_context, server_config.type + ) + + return SessionLoginStrategy( + _ensure_username_password(credentials), + session, + server_config, + ssl_context, + server_config.type, + ) + + +def _ensure_username_password(credentials: Credentials) -> UsernamePasswordCredentials: + """Validate that credentials are username/password based.""" + if not isinstance(credentials, UsernamePasswordCredentials): + raise TypeError("UsernamePasswordCredentials are required for this server.") + return credentials + + +def _ensure_token(credentials: Credentials) -> TokenCredentials: + """Validate that credentials carry a bearer token.""" + if not isinstance(credentials, TokenCredentials): + raise TypeError("TokenCredentials are required for this server.") + return credentials + + +def _ensure_rexel(credentials: Credentials) -> RexelOAuthCodeCredentials: + """Validate that credentials are of Rexel OAuth code type.""" + if not isinstance(credentials, RexelOAuthCodeCredentials): + raise TypeError("RexelOAuthCodeCredentials are required for this server.") + return credentials diff --git a/pyoverkiz/auth/strategies.py b/pyoverkiz/auth/strategies.py new file mode 100644 index 00000000..761ba49c --- /dev/null +++ b/pyoverkiz/auth/strategies.py @@ -0,0 +1,471 @@ +"""Authentication strategies for Overkiz API.""" + +from __future__ import annotations + +import asyncio +import base64 +import binascii +import datetime +import json +import ssl +from collections.abc import Mapping +from typing import Any, cast + +import boto3 +from aiohttp import ClientSession, FormData +from botocore.client import BaseClient +from botocore.config import Config +from warrant_lite import WarrantLite + +from pyoverkiz.auth.base import AuthContext, AuthStrategy +from pyoverkiz.auth.credentials import ( + LocalTokenCredentials, + RexelOAuthCodeCredentials, + TokenCredentials, + UsernamePasswordCredentials, +) +from pyoverkiz.const import ( + COZYTOUCH_ATLANTIC_API, + COZYTOUCH_CLIENT_ID, + NEXITY_API, + NEXITY_COGNITO_CLIENT_ID, + NEXITY_COGNITO_REGION, + NEXITY_COGNITO_USER_POOL, + REXEL_OAUTH_CLIENT_ID, + REXEL_OAUTH_SCOPE, + REXEL_OAUTH_TOKEN_URL, + REXEL_REQUIRED_CONSENT, + SOMFY_API, + SOMFY_CLIENT_ID, + SOMFY_CLIENT_SECRET, +) +from pyoverkiz.enums import APIType +from pyoverkiz.exceptions import ( + BadCredentialsException, + CozyTouchBadCredentialsException, + CozyTouchServiceException, + InvalidTokenException, + NexityBadCredentialsException, + NexityServiceException, + SomfyBadCredentialsException, + SomfyServiceException, +) +from pyoverkiz.models import ServerConfig + + +class BaseAuthStrategy(AuthStrategy): + """Base class for authentication strategies.""" + + def __init__( + self, + session: ClientSession, + server: ServerConfig, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + """Store shared auth context for Overkiz API interactions.""" + self.session = session + self.server = server + self._ssl = ssl_context + self.api_type = api_type + + async def login(self) -> None: + """Perform authentication; default is a no-op for subclasses to override.""" + return None + + async def refresh_if_needed(self) -> bool: + """Refresh authentication tokens if needed; default returns False.""" + return False + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" + return {} + + async def close(self) -> None: + """Close any resources held by the strategy; default is no-op.""" + return None + + +class SessionLoginStrategy(BaseAuthStrategy): + """Authentication strategy using session-based login.""" + + def __init__( + self, + credentials: UsernamePasswordCredentials, + session: ClientSession, + server: ServerConfig, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + """Initialize SessionLoginStrategy with given parameters.""" + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + + async def login(self) -> None: + """Perform login using username and password.""" + payload = { + "userId": self.credentials.username, + "userPassword": self.credentials.password, + } + await self._post_login(payload) + + async def _post_login(self, data: Mapping[str, Any]) -> None: + """Post login data to the server and handle response.""" + async with self.session.post( + f"{self.server.endpoint}login", + data=data, + ssl=self._ssl, + ) as response: + if response.status not in (200, 204): + raise BadCredentialsException( + f"Login failed for {self.server.name}: {response.status}" + ) + + # A 204 No Content response cannot have a body, so skip JSON parsing. + if response.status == 204: + return + + result = await response.json() + if not result.get("success"): + raise BadCredentialsException("Login failed: bad credentials") + + +class SomfyAuthStrategy(BaseAuthStrategy): + """Authentication strategy using Somfy OAuth2.""" + + def __init__( + self, + credentials: UsernamePasswordCredentials, + session: ClientSession, + server: ServerConfig, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + """Initialize SomfyAuthStrategy with given parameters.""" + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + self.context = AuthContext() + + async def login(self) -> None: + """Perform login using Somfy OAuth2.""" + await self._request_access_token( + grant_type="password", + extra_fields={ + "username": self.credentials.username, + "password": self.credentials.password, + }, + ) + + async def refresh_if_needed(self) -> bool: + """Refresh Somfy OAuth2 tokens if needed.""" + if not self.context.is_expired() or not self.context.refresh_token: + return False + + await self._request_access_token( + grant_type="refresh_token", + extra_fields={"refresh_token": cast(str, self.context.refresh_token)}, + ) + return True + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" + if self.context.access_token: + return {"Authorization": f"Bearer {self.context.access_token}"} + + return {} + + async def _request_access_token( + self, *, grant_type: str, extra_fields: Mapping[str, str] + ) -> None: + form = FormData( + { + "grant_type": grant_type, + "client_id": SOMFY_CLIENT_ID, + "client_secret": SOMFY_CLIENT_SECRET, + **extra_fields, + } + ) + + async with self.session.post( + f"{SOMFY_API}/oauth/oauth/v2/token/jwt", + data=form, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) as response: + token = await response.json() + + if token.get("message") == "error.invalid.grant": + raise SomfyBadCredentialsException(token["message"]) + + access_token = token.get("access_token") + if not access_token: + raise SomfyServiceException("No Somfy access token provided.") + + self.context.access_token = cast(str, access_token) + self.context.refresh_token = token.get("refresh_token") + expires_in = token.get("expires_in") + if expires_in: + self.context.expires_at = datetime.datetime.now( + datetime.UTC + ) + datetime.timedelta(seconds=cast(int, expires_in) - 5) + + +class CozytouchAuthStrategy(SessionLoginStrategy): + """Authentication strategy using Cozytouch session-based login.""" + + def __init__( + self, + credentials: UsernamePasswordCredentials, + session: ClientSession, + server: ServerConfig, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + """Initialize CozytouchAuthStrategy with given parameters.""" + super().__init__(credentials, session, server, ssl_context, api_type) + + async def login(self) -> None: + """Perform login using Cozytouch username and password.""" + form = FormData( + { + "grant_type": "password", + "username": f"GA-PRIVATEPERSON/{self.credentials.username}", + "password": self.credentials.password, + } + ) + async with self.session.post( + f"{COZYTOUCH_ATLANTIC_API}/token", + data=form, + headers={ + "Authorization": f"Basic {COZYTOUCH_CLIENT_ID}", + "Content-Type": "application/x-www-form-urlencoded", + }, + ) as response: + token = await response.json() + + if token.get("error") == "invalid_grant": + raise CozyTouchBadCredentialsException(token["error_description"]) + + if "token_type" not in token: + raise CozyTouchServiceException("No CozyTouch token provided.") + + async with self.session.get( + f"{COZYTOUCH_ATLANTIC_API}/magellan/accounts/jwt", + headers={"Authorization": f"Bearer {token['access_token']}"}, + ) as response: + jwt = await response.text() + + if not jwt: + raise CozyTouchServiceException("No JWT token provided.") + + jwt = jwt.strip('"') + + await self._post_login({"jwt": jwt}) + + +class NexityAuthStrategy(SessionLoginStrategy): + """Authentication strategy using Nexity session-based login.""" + + def __init__( + self, + credentials: UsernamePasswordCredentials, + session: ClientSession, + server: ServerConfig, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + """Initialize NexityAuthStrategy with given parameters.""" + super().__init__(credentials, session, server, ssl_context, api_type) + + async def login(self) -> None: + """Perform login using Nexity username and password.""" + loop = asyncio.get_event_loop() + + def _client() -> BaseClient: + return boto3.client( + "cognito-idp", config=Config(region_name=NEXITY_COGNITO_REGION) + ) + + client = await loop.run_in_executor(None, _client) + aws = WarrantLite( + username=self.credentials.username, + password=self.credentials.password, + pool_id=NEXITY_COGNITO_USER_POOL, + client_id=NEXITY_COGNITO_CLIENT_ID, + client=client, + ) + + try: + tokens = await loop.run_in_executor(None, aws.authenticate_user) + except Exception as error: + raise NexityBadCredentialsException() from error + + id_token = tokens["AuthenticationResult"]["IdToken"] + + async with self.session.get( + f"{NEXITY_API}/deploy/api/v1/domotic/token", + headers={"Authorization": id_token}, + ) as response: + token = await response.json() + + if "token" not in token: + raise NexityServiceException("No Nexity SSO token provided.") + + user_id = self.credentials.username.replace("@", "_-_") + await self._post_login({"ssoToken": token["token"], "userId": user_id}) + + +class LocalTokenAuthStrategy(BaseAuthStrategy): + """Authentication strategy using a local API token.""" + + def __init__( + self, + credentials: LocalTokenCredentials, + session: ClientSession, + server: ServerConfig, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + """Initialize LocalTokenAuthStrategy with given parameters.""" + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + + async def login(self) -> None: + """Validate that a token is provided for local API access.""" + if not self.credentials.token: + raise InvalidTokenException("Local API requires a token.") + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" + return {"Authorization": f"Bearer {self.credentials.token}"} + + +class RexelAuthStrategy(BaseAuthStrategy): + """Authentication strategy using Rexel OAuth2.""" + + def __init__( + self, + credentials: RexelOAuthCodeCredentials, + session: ClientSession, + server: ServerConfig, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + """Initialize RexelAuthStrategy with given parameters.""" + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + self.context = AuthContext() + + async def login(self) -> None: + """Perform login using Rexel OAuth2 authorization code.""" + await self._exchange_token( + { + "grant_type": "authorization_code", + "client_id": REXEL_OAUTH_CLIENT_ID, + "scope": REXEL_OAUTH_SCOPE, + "code": self.credentials.code, + "redirect_uri": self.credentials.redirect_uri, + } + ) + + async def refresh_if_needed(self) -> bool: + """Refresh Rexel OAuth2 tokens if needed.""" + if not self.context.is_expired() or not self.context.refresh_token: + return False + + await self._exchange_token( + { + "grant_type": "refresh_token", + "client_id": REXEL_OAUTH_CLIENT_ID, + "scope": REXEL_OAUTH_SCOPE, + "refresh_token": cast(str, self.context.refresh_token), + } + ) + return True + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" + if self.context.access_token: + return {"Authorization": f"Bearer {self.context.access_token}"} + return {} + + async def _exchange_token(self, payload: Mapping[str, str]) -> None: + """Exchange authorization code or refresh token for access token.""" + form = FormData(payload) + async with self.session.post( + REXEL_OAUTH_TOKEN_URL, + data=form, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) as response: + token = await response.json() + + # Handle OAuth error responses explicitly before accessing the access token. + error = token.get("error") + if error: + description = token.get("error_description") or token.get("message") + if description: + raise InvalidTokenException( + f"Error retrieving Rexel access token: {description}" + ) + raise InvalidTokenException( + f"Error retrieving Rexel access token: {error}" + ) + + access_token = token.get("access_token") + if not access_token: + raise InvalidTokenException("No Rexel access token provided.") + + self._ensure_consent(access_token) + self.context.access_token = cast(str, access_token) + self.context.refresh_token = token.get("refresh_token") + expires_in = token.get("expires_in") + if expires_in: + self.context.expires_at = datetime.datetime.now( + datetime.UTC + ) + datetime.timedelta(seconds=cast(int, expires_in) - 5) + + @staticmethod + def _ensure_consent(access_token: str) -> None: + """Ensure that the Rexel token has the required consent.""" + payload = _decode_jwt_payload(access_token) + consent = payload.get("consent") + if consent != REXEL_REQUIRED_CONSENT: + raise InvalidTokenException( + "Consent is missing or revoked for Rexel token." + ) + + +class BearerTokenAuthStrategy(BaseAuthStrategy): + """Authentication strategy using a static bearer token.""" + + def __init__( + self, + credentials: TokenCredentials, + session: ClientSession, + server: ServerConfig, + ssl_context: ssl.SSLContext | bool, + api_type: APIType, + ) -> None: + """Initialize BearerTokenAuthStrategy with given parameters.""" + super().__init__(session, server, ssl_context, api_type) + self.credentials = credentials + + def auth_headers(self, path: str | None = None) -> Mapping[str, str]: + """Return authentication headers for a request path.""" + if self.credentials.token: + return {"Authorization": f"Bearer {self.credentials.token}"} + return {} + + +def _decode_jwt_payload(token: str) -> dict[str, Any]: + """Decode the payload of a JWT token.""" + parts = token.split(".") + if len(parts) < 2: + raise InvalidTokenException("Malformed JWT received.") + + payload_segment = parts[1] + padding = "=" * (-len(payload_segment) % 4) + try: + decoded = base64.urlsafe_b64decode(payload_segment + padding) + return cast(dict[str, Any], json.loads(decoded)) + except (binascii.Error, json.JSONDecodeError) as error: + raise InvalidTokenException("Malformed JWT received.") from error diff --git a/pyoverkiz/client.py b/pyoverkiz/client.py index 927f35f5..c81b20ea 100644 --- a/pyoverkiz/client.py +++ b/pyoverkiz/client.py @@ -2,8 +2,6 @@ from __future__ import annotations -import asyncio -import datetime import os import ssl import urllib.parse @@ -13,38 +11,20 @@ from typing import Any, cast import backoff -import boto3 import humps from aiohttp import ( ClientConnectorError, ClientResponse, ClientSession, - FormData, ServerDisconnectedError, ) -from botocore.client import BaseClient -from botocore.config import Config -from warrant_lite import WarrantLite - -from pyoverkiz.const import ( - COZYTOUCH_ATLANTIC_API, - COZYTOUCH_CLIENT_ID, - LOCAL_API_PATH, - NEXITY_API, - NEXITY_COGNITO_CLIENT_ID, - NEXITY_COGNITO_REGION, - NEXITY_COGNITO_USER_POOL, - SOMFY_API, - SOMFY_CLIENT_ID, - SOMFY_CLIENT_SECRET, - SUPPORTED_SERVERS, -) + +from pyoverkiz.auth import AuthStrategy, Credentials, build_auth_strategy +from pyoverkiz.const import SUPPORTED_SERVERS from pyoverkiz.enums import APIType, CommandMode, Server from pyoverkiz.exceptions import ( AccessDeniedToGatewayException, BadCredentialsException, - CozyTouchBadCredentialsException, - CozyTouchServiceException, ExecutionQueueFullException, InvalidCommandException, InvalidEventListenerIdException, @@ -52,8 +32,6 @@ MaintenanceException, MissingAPIKeyException, MissingAuthorizationTokenException, - NexityBadCredentialsException, - NexityServiceException, NoRegisteredEventListenerException, NoSuchResourceException, NotAuthenticatedException, @@ -61,8 +39,6 @@ OverkizException, ServiceUnavailableException, SessionAndBearerInSameRequestException, - SomfyBadCredentialsException, - SomfyServiceException, TooManyAttemptsBannedException, TooManyConcurrentRequestsException, TooManyExecutionsException, @@ -78,11 +54,10 @@ Execution, Gateway, HistoryExecution, - LocalToken, Option, OptionParameter, - OverkizServer, Place, + ServerConfig, Setup, State, ) @@ -138,10 +113,16 @@ def _create_local_ssl_context() -> ssl.SSLContext: This method is not async-friendly and should be called from a thread because it will load certificates from disk and do other blocking I/O. """ - return ssl.create_default_context( + context = ssl.create_default_context( cafile=os.path.dirname(os.path.realpath(__file__)) + "/overkiz-root-ca-2048.crt" ) + # Disable strict validation introduced in Python 3.13, which doesn't work with + # Overkiz self-signed gateway certificates. Applied once to the shared context. + context.verify_flags &= ~ssl.VERIFY_X509_STRICT + + return context + # The default SSLContext objects are created at import time # since they do blocking I/O to load certificates from disk, @@ -152,66 +133,55 @@ def _create_local_ssl_context() -> ssl.SSLContext: class OverkizClient: """Interface class for the Overkiz API.""" - username: str - password: str - server: OverkizServer + server_config: ServerConfig setup: Setup | None devices: list[Device] gateways: list[Gateway] event_listener_id: str | None session: ClientSession - api_type: APIType - - _refresh_token: str | None = None - _expires_in: datetime.datetime | None = None - _access_token: str | None = None _ssl: ssl.SSLContext | bool = True + _auth: AuthStrategy def __init__( self, - username: str, - password: str, - server: OverkizServer, + *, + server: ServerConfig | Server | str, + credentials: Credentials, verify_ssl: bool = True, - token: str | None = None, session: ClientSession | None = None, ) -> None: """Constructor. - :param username: the username - :param password: the password - :param server: OverkizServer + :param server: ServerConfig :param session: optional ClientSession """ - self.username = username - self.password = password - self.server = server - self._access_token = token + self.server_config = self._normalize_server(server) self.setup: Setup | None = None self.devices: list[Device] = [] self.gateways: list[Gateway] = [] self.event_listener_id: str | None = None - self.session = session if session else ClientSession() + self.session = ( + session + if session + else ClientSession(headers={"User-Agent": "python-overkiz-api"}) + ) self._ssl = verify_ssl - if LOCAL_API_PATH in self.server.endpoint: - self.api_type = APIType.LOCAL - - if verify_ssl: - # To avoid security issues while authentication to local API, we add the following authority to - # our HTTPS client trust store: https://ca.overkiz.com/overkiz-root-ca-2048.crt - self._ssl = SSL_CONTEXT_LOCAL_API + if self.server_config.type == APIType.LOCAL and verify_ssl: + # Use the prebuilt SSL context with disabled strict validation for local API. + self._ssl = SSL_CONTEXT_LOCAL_API - # Disable strict validation introduced in Python 3.13, which doesn't - # work with Overkiz self-signed gateway certificates - self._ssl.verify_flags &= ~ssl.VERIFY_X509_STRICT - else: - self.api_type = APIType.CLOUD + self._auth = build_auth_strategy( + server_config=self.server_config, + credentials=credentials, + session=self.session, + ssl_context=self._ssl, + ) async def __aenter__(self) -> OverkizClient: - """Enter the async context manager and return the client.""" + """Enter async context manager and return the client instance.""" return self async def __aexit__( @@ -223,11 +193,27 @@ async def __aexit__( """Exit the async context manager and close the client session.""" await self.close() + @staticmethod + def _normalize_server(server: ServerConfig | Server | str) -> ServerConfig: + """Resolve user-provided server identifiers into a `ServerConfig`.""" + if isinstance(server, ServerConfig): + return server + + server_key = server.value if isinstance(server, Server) else str(server) + + try: + return SUPPORTED_SERVERS[server_key] + except KeyError as error: + raise OverkizException( + f"Unknown server '{server_key}'. Provide a supported server key or ServerConfig instance." + ) from error + async def close(self) -> None: """Close the session.""" if self.event_listener_id: await self.unregister_event_listener() + await self._auth.close() await self.session.close() async def login( @@ -238,207 +224,21 @@ async def login( Caller must provide one of [userId+userPassword, userId+ssoToken, accessToken, jwt]. """ - # Local authentication - if self.api_type == APIType.LOCAL: + await self._auth.login() + + if self.server_config.type == APIType.LOCAL: if register_event_listener: await self.register_event_listener() else: - # Call a simple endpoint to verify if our token is correct - # Since local API does not have a /login endpoint + # Validate local API token by calling a simple endpoint await self.get_gateways() return True - # Somfy TaHoma authentication using access_token - if self.server == SUPPORTED_SERVERS[Server.SOMFY_EUROPE]: - await self.somfy_tahoma_get_access_token() - - if register_event_listener: - await self.register_event_listener() - - return True - - # CozyTouch authentication using jwt - if self.server in [ - SUPPORTED_SERVERS[Server.ATLANTIC_COZYTOUCH], - SUPPORTED_SERVERS[Server.THERMOR_COZYTOUCH], - SUPPORTED_SERVERS[Server.SAUTER_COZYTOUCH], - ]: - jwt = await self.cozytouch_login() - payload = {"jwt": jwt} - - # Nexity authentication using ssoToken - elif self.server == SUPPORTED_SERVERS[Server.NEXITY]: - sso_token = await self.nexity_login() - user_id = self.username.replace("@", "_-_") # Replace @ for _-_ - payload = {"ssoToken": sso_token, "userId": user_id} - - # Regular authentication using userId+userPassword - else: - payload = {"userId": self.username, "userPassword": self.password} - - response = await self.__post("login", data=payload) - - if response.get("success"): - if register_event_listener: - await self.register_event_listener() - return True - - return False - - async def somfy_tahoma_get_access_token(self) -> str: - """Authenticate via Somfy identity and acquire access_token.""" - # Request access token - async with self.session.post( - SOMFY_API + "/oauth/oauth/v2/token/jwt", - data=FormData( - { - "grant_type": "password", - "username": self.username, - "password": self.password, - "client_id": SOMFY_CLIENT_ID, - "client_secret": SOMFY_CLIENT_SECRET, - } - ), - headers={ - "Content-Type": "application/x-www-form-urlencoded", - }, - ) as response: - token = await response.json() - - # { "message": "error.invalid.grant", "data": [], "uid": "xxx" } - if "message" in token and token["message"] == "error.invalid.grant": - raise SomfyBadCredentialsException(token["message"]) - - if "access_token" not in token: - raise SomfyServiceException("No Somfy access token provided.") - - self._access_token = cast(str, token["access_token"]) - self._refresh_token = token["refresh_token"] - self._expires_in = datetime.datetime.now() + datetime.timedelta( - seconds=token["expires_in"] - 5 - ) - - return self._access_token - - async def refresh_token(self) -> None: - """Update the access and the refresh token. The refresh token will be valid 14 days.""" - if self.server != SUPPORTED_SERVERS[Server.SOMFY_EUROPE]: - return - - if not self._refresh_token: - raise ValueError("No refresh token provided. Login method must be used.") - - # &grant_type=refresh_token&refresh_token=REFRESH_TOKEN - # Request access token - async with self.session.post( - SOMFY_API + "/oauth/oauth/v2/token/jwt", - data=FormData( - { - "grant_type": "refresh_token", - "refresh_token": self._refresh_token, - "client_id": SOMFY_CLIENT_ID, - "client_secret": SOMFY_CLIENT_SECRET, - } - ), - headers={ - "Content-Type": "application/x-www-form-urlencoded", - }, - ) as response: - token = await response.json() - # { "message": "error.invalid.grant", "data": [], "uid": "xxx" } - if "message" in token and token["message"] == "error.invalid.grant": - raise SomfyBadCredentialsException(token["message"]) - - if "access_token" not in token: - raise SomfyServiceException("No Somfy access token provided.") - - self._access_token = cast(str, token["access_token"]) - self._refresh_token = token["refresh_token"] - self._expires_in = datetime.datetime.now() + datetime.timedelta( - seconds=token["expires_in"] - 5 - ) - - async def cozytouch_login(self) -> str: - """Authenticate via CozyTouch identity and acquire JWT token.""" - # Request access token - async with self.session.post( - COZYTOUCH_ATLANTIC_API + "/token", - data=FormData( - { - "grant_type": "password", - "username": "GA-PRIVATEPERSON/" + self.username, - "password": self.password, - } - ), - headers={ - "Authorization": f"Basic {COZYTOUCH_CLIENT_ID}", - "Content-Type": "application/x-www-form-urlencoded", - }, - ) as response: - token = await response.json() - - # {'error': 'invalid_grant', - # 'error_description': 'Provided Authorization Grant is invalid.'} - if "error" in token and token["error"] == "invalid_grant": - raise CozyTouchBadCredentialsException(token["error_description"]) - - if "token_type" not in token: - raise CozyTouchServiceException("No CozyTouch token provided.") - - # Request JWT - async with self.session.get( - COZYTOUCH_ATLANTIC_API + "/magellan/accounts/jwt", - headers={"Authorization": f"Bearer {token['access_token']}"}, - ) as response: - jwt = await response.text() - - if not jwt: - raise CozyTouchServiceException("No JWT token provided.") - - jwt = jwt.strip('"') # Remove surrounding quotes - - return jwt - - async def nexity_login(self) -> str: - """Authenticate via Nexity identity and acquire SSO token.""" - loop = asyncio.get_event_loop() + if register_event_listener: + await self.register_event_listener() - def _get_client() -> BaseClient: - return boto3.client( - "cognito-idp", config=Config(region_name=NEXITY_COGNITO_REGION) - ) - - # Request access token - client = await loop.run_in_executor(None, _get_client) - - aws = WarrantLite( - username=self.username, - password=self.password, - pool_id=NEXITY_COGNITO_USER_POOL, - client_id=NEXITY_COGNITO_CLIENT_ID, - client=client, - ) - - try: - tokens = await loop.run_in_executor(None, aws.authenticate_user) - except Exception as error: - raise NexityBadCredentialsException() from error - - id_token = tokens["AuthenticationResult"]["IdToken"] - - async with self.session.get( - NEXITY_API + "/deploy/api/v1/domotic/token", - headers={ - "Authorization": id_token, - }, - ) as response: - token = await response.json() - - if "token" not in token: - raise NexityServiceException("No Nexity SSO token provided.") - - return cast(str, token["token"]) + return True @retry_on_auth_error async def get_setup(self, refresh: bool = False) -> Setup: @@ -682,54 +482,6 @@ async def get_places(self) -> Place: places = Place(**humps.decamelize(response)) return places - @retry_on_auth_error - async def generate_local_token(self, gateway_id: str) -> str: - """Generates a new token. - - Access scope : Full enduser API access (enduser/*). - """ - response = await self.__get(f"config/{gateway_id}/local/tokens/generate") - - return cast(str, response["token"]) - - @retry_on_auth_error - async def activate_local_token( - self, gateway_id: str, token: str, label: str, scope: str = "devmode" - ) -> str: - """Create a token. - - Access scope : Full enduser API access (enduser/*). - """ - response = await self.__post( - f"config/{gateway_id}/local/tokens", - {"label": label, "token": token, "scope": scope}, - ) - - return cast(str, response["requestId"]) - - @retry_on_auth_error - async def get_local_tokens( - self, gateway_id: str, scope: str = "devmode" - ) -> list[LocalToken]: - """Get all gateway tokens with the given scope. - - Access scope : Full enduser API access (enduser/*). - """ - response = await self.__get(f"config/{gateway_id}/local/tokens/{scope}") - local_tokens = [LocalToken(**lt) for lt in humps.decamelize(response)] - - return local_tokens - - @retry_on_auth_error - async def delete_local_token(self, gateway_id: str, uuid: str) -> bool: - """Delete a token. - - Access scope : Full enduser API access (enduser/*). - """ - await self.__delete(f"config/{gateway_id}/local/tokens/{uuid}") - - return True - @retry_on_auth_error async def execute_scenario(self, oid: str) -> str: """Execute a scenario.""" @@ -787,14 +539,11 @@ async def get_setup_option_parameter( async def __get(self, path: str) -> Any: """Make a GET request to the OverKiz API.""" - headers = {} - await self._refresh_token_if_expired() - if self._access_token: - headers["Authorization"] = f"Bearer {self._access_token}" + headers = dict(self._auth.auth_headers(path)) async with self.session.get( - f"{self.server.endpoint}{path}", + f"{self.server_config.endpoint}{path}", headers=headers, ssl=self._ssl, ) as response: @@ -805,14 +554,11 @@ async def __post( self, path: str, payload: JSON | None = None, data: JSON | None = None ) -> Any: """Make a POST request to the OverKiz API.""" - headers = {} - - if path != "login" and self._access_token: - await self._refresh_token_if_expired() - headers["Authorization"] = f"Bearer {self._access_token}" + await self._refresh_token_if_expired() + headers = dict(self._auth.auth_headers(path)) async with self.session.post( - f"{self.server.endpoint}{path}", + f"{self.server_config.endpoint}{path}", data=data, json=payload, headers=headers, @@ -823,15 +569,11 @@ async def __post( async def __delete(self, path: str) -> None: """Make a DELETE request to the OverKiz API.""" - headers = {} - await self._refresh_token_if_expired() - - if self._access_token: - headers["Authorization"] = f"Bearer {self._access_token}" + headers = dict(self._auth.auth_headers(path)) async with self.session.delete( - f"{self.server.endpoint}{path}", + f"{self.server_config.endpoint}{path}", headers=headers, ssl=self._ssl, ) as response: @@ -946,12 +688,7 @@ async def check_response(response: ClientResponse) -> None: async def _refresh_token_if_expired(self) -> None: """Check if token is expired and request a new one.""" - if ( - self._expires_in - and self._refresh_token - and self._expires_in <= datetime.datetime.now() - ): - await self.refresh_token() - - if self.event_listener_id: - await self.register_event_listener() + refreshed = await self._auth.refresh_if_needed() + + if refreshed and self.event_listener_id: + await self.register_event_listener() diff --git a/pyoverkiz/const.py b/pyoverkiz/const.py index c664b5da..b4e515b1 100644 --- a/pyoverkiz/const.py +++ b/pyoverkiz/const.py @@ -3,7 +3,8 @@ from __future__ import annotations from pyoverkiz.enums import Server -from pyoverkiz.models import OverkizServer +from pyoverkiz.enums.server import APIType +from pyoverkiz.models import ServerConfig COZYTOUCH_ATLANTIC_API = "https://apis.groupe-atlantic.com" COZYTOUCH_CLIENT_ID = ( @@ -15,6 +16,18 @@ NEXITY_COGNITO_USER_POOL = "eu-west-1_wj277ucoI" NEXITY_COGNITO_REGION = "eu-west-1" +REXEL_BACKEND_API = ( + "https://app-ec-backend-enduser-prod.azurewebsites.net/api/enduser/overkiz/" +) +REXEL_OAUTH_CLIENT_ID = "2b635ede-c3fb-43bc-8d23-f6d17f80e96d" +REXEL_OAUTH_SCOPE = "https://adb2cservicesfrenduserprod.onmicrosoft.com/94f05108-65f7-477a-a84d-e67e1aed6f79/ExternalProvider" +REXEL_OAUTH_TENANT = ( + "https://consumerlogin.rexelservices.fr/670998c0-f737-4d75-a32f-ba9292755b70" +) +REXEL_OAUTH_POLICY = "B2C_1A_SIGNINONLYHOMEASSISTANT" +REXEL_OAUTH_TOKEN_URL = f"{REXEL_OAUTH_TENANT}/oauth2/v2.0/token?p={REXEL_OAUTH_POLICY}" +REXEL_REQUIRED_CONSENT = "homeassistant" + SOMFY_API = "https://accounts.somfy.com" SOMFY_CLIENT_ID = "0d8e920c-1478-11e7-a377-02dd59bd3041_1ewvaqmclfogo4kcsoo0c8k4kso884owg08sg8c40sk4go4ksg" SOMFY_CLIENT_SECRET = "12k73w1n540g8o4cokg0cw84cog840k84cwggscwg884004kgk" @@ -27,101 +40,117 @@ Server.SOMFY_AMERICA, ] -SUPPORTED_SERVERS: dict[str, OverkizServer] = { - Server.ATLANTIC_COZYTOUCH: OverkizServer( +SUPPORTED_SERVERS: dict[str, ServerConfig] = { + Server.ATLANTIC_COZYTOUCH: ServerConfig( + server=Server.ATLANTIC_COZYTOUCH, name="Atlantic Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Atlantic", - configuration_url=None, + type=APIType.CLOUD, ), - Server.BRANDT: OverkizServer( + Server.BRANDT: ServerConfig( + server=Server.BRANDT, name="Brandt Smart Control", endpoint="https://ha3-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Brandt", - configuration_url=None, + type=APIType.CLOUD, ), - Server.FLEXOM: OverkizServer( + Server.FLEXOM: ServerConfig( + server=Server.FLEXOM, name="Flexom", endpoint="https://ha108-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Bouygues", - configuration_url=None, + type=APIType.CLOUD, ), - Server.HEXAOM_HEXACONNECT: OverkizServer( + Server.HEXAOM_HEXACONNECT: ServerConfig( + server=Server.HEXAOM_HEXACONNECT, name="Hexaom HexaConnect", endpoint="https://ha5-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hexaom", - configuration_url=None, + type=APIType.CLOUD, ), - Server.HI_KUMO_ASIA: OverkizServer( + Server.HI_KUMO_ASIA: ServerConfig( + server=Server.HI_KUMO_ASIA, name="Hitachi Hi Kumo (Asia)", endpoint="https://ha203-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", - configuration_url=None, + type=APIType.CLOUD, ), - Server.HI_KUMO_EUROPE: OverkizServer( + Server.HI_KUMO_EUROPE: ServerConfig( + server=Server.HI_KUMO_EUROPE, name="Hitachi Hi Kumo (Europe)", endpoint="https://ha117-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", - configuration_url=None, + type=APIType.CLOUD, ), - Server.HI_KUMO_OCEANIA: OverkizServer( + Server.HI_KUMO_OCEANIA: ServerConfig( + server=Server.HI_KUMO_OCEANIA, name="Hitachi Hi Kumo (Oceania)", endpoint="https://ha203-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Hitachi", - configuration_url=None, + type=APIType.CLOUD, ), - Server.NEXITY: OverkizServer( + Server.NEXITY: ServerConfig( + server=Server.NEXITY, name="Nexity Eugénie", endpoint="https://ha106-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Nexity", - configuration_url=None, + type=APIType.CLOUD, ), - Server.REXEL: OverkizServer( + Server.REXEL: ServerConfig( + server=Server.REXEL, name="Rexel Energeasy Connect", - endpoint="https://ha112-1.overkiz.com/enduser-mobile-web/enduserAPI/", + endpoint=REXEL_BACKEND_API, manufacturer="Rexel", - configuration_url="https://utilisateur.energeasyconnect.com/user/#/zone/equipements", + type=APIType.CLOUD, ), - Server.SAUTER_COZYTOUCH: OverkizServer( # duplicate of Atlantic Cozytouch + Server.SAUTER_COZYTOUCH: ServerConfig( # duplicate of Atlantic Cozytouch + server=Server.SAUTER_COZYTOUCH, name="Sauter Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Sauter", - configuration_url=None, + type=APIType.CLOUD, ), - Server.SIMU_LIVEIN2: OverkizServer( # alias of https://tahomalink.com + Server.SIMU_LIVEIN2: ServerConfig( # alias of https://tahomalink.com + server=Server.SIMU_LIVEIN2, name="SIMU (LiveIn2)", endpoint="https://ha101-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", - configuration_url=None, + type=APIType.CLOUD, ), - Server.SOMFY_EUROPE: OverkizServer( # alias of https://tahomalink.com + Server.SOMFY_EUROPE: ServerConfig( # alias of https://tahomalink.com + server=Server.SOMFY_EUROPE, name="Somfy (Europe)", endpoint="https://ha101-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", - configuration_url=None, + type=APIType.CLOUD, ), - Server.SOMFY_AMERICA: OverkizServer( + Server.SOMFY_AMERICA: ServerConfig( + server=Server.SOMFY_AMERICA, name="Somfy (North America)", endpoint="https://ha401-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", - configuration_url=None, + type=APIType.CLOUD, ), - Server.SOMFY_OCEANIA: OverkizServer( + Server.SOMFY_OCEANIA: ServerConfig( + server=Server.SOMFY_OCEANIA, name="Somfy (Oceania)", endpoint="https://ha201-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Somfy", - configuration_url=None, + type=APIType.CLOUD, ), - Server.THERMOR_COZYTOUCH: OverkizServer( # duplicate of Atlantic Cozytouch + Server.THERMOR_COZYTOUCH: ServerConfig( # duplicate of Atlantic Cozytouch + server=Server.THERMOR_COZYTOUCH, name="Thermor Cozytouch", endpoint="https://ha110-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Thermor", - configuration_url=None, + type=APIType.CLOUD, ), - Server.UBIWIZZ: OverkizServer( + Server.UBIWIZZ: ServerConfig( + server=Server.UBIWIZZ, name="Ubiwizz", endpoint="https://ha129-1.overkiz.com/enduser-mobile-web/enduserAPI/", manufacturer="Decelect", - configuration_url=None, + type=APIType.CLOUD, ), } diff --git a/pyoverkiz/models.py b/pyoverkiz/models.py index 5e826076..5d1bec43 100644 --- a/pyoverkiz/models.py +++ b/pyoverkiz/models.py @@ -24,6 +24,7 @@ ) from pyoverkiz.enums.command import OverkizCommand, OverkizCommandParam from pyoverkiz.enums.protocol import Protocol +from pyoverkiz.enums.server import APIType, Server from pyoverkiz.obfuscate import obfuscate_email, obfuscate_id, obfuscate_string from pyoverkiz.types import DATA_TYPE_TO_PYTHON, StateType @@ -959,25 +960,36 @@ def __init__( @define(kw_only=True) -class OverkizServer: - """Class to describe an Overkiz server.""" +class ServerConfig: + """Connection target details for an Overkiz-compatible server.""" + server: Server | None name: str endpoint: str manufacturer: str - configuration_url: str | None + type: APIType + configuration_url: str | None = None - -@define(kw_only=True) -class LocalToken: - """Descriptor for a local gateway token.""" - - label: str - gateway_id: str = field(repr=obfuscate_id, default=None) - gateway_creation_time: int - uuid: str - scope: str - expiration_time: int | None + def __init__( + self, + *, + server: Server | str | None = None, + name: str, + endpoint: str, + manufacturer: str, + type: str | APIType, + configuration_url: str | None = None, + **_: Any, + ) -> None: + """Initialize ServerConfig and convert enum fields.""" + self.server = ( + server if isinstance(server, Server) or server is None else Server(server) + ) + self.name = name + self.endpoint = endpoint + self.manufacturer = manufacturer + self.type = type if isinstance(type, APIType) else APIType(type) + self.configuration_url = configuration_url @define(kw_only=True) diff --git a/pyoverkiz/utils.py b/pyoverkiz/utils.py index e395d0bf..6c28cdd3 100644 --- a/pyoverkiz/utils.py +++ b/pyoverkiz/utils.py @@ -5,21 +5,45 @@ import re from pyoverkiz.const import LOCAL_API_PATH -from pyoverkiz.models import OverkizServer +from pyoverkiz.enums.server import APIType, Server +from pyoverkiz.models import ServerConfig -def generate_local_server( +def create_local_server_config( + *, host: str, name: str = "Somfy Developer Mode", manufacturer: str = "Somfy", configuration_url: str | None = None, -) -> OverkizServer: - """Generate OverkizServer class for connection with a local API (Somfy Developer mode).""" - return OverkizServer( +) -> ServerConfig: + """Generate server configuration for a local API (Somfy Developer mode).""" + return create_server_config( name=name, endpoint=f"https://{host}{LOCAL_API_PATH}", manufacturer=manufacturer, + server=Server.SOMFY_DEVELOPER_MODE, configuration_url=configuration_url, + type=APIType.LOCAL, + ) + + +def create_server_config( + *, + name: str, + endpoint: str, + manufacturer: str, + server: Server | str | None = None, + type: APIType | str = APIType.CLOUD, + configuration_url: str | None = None, +) -> ServerConfig: + """Generate server configuration with the provided endpoint and metadata.""" + return ServerConfig( + server=server, # type: ignore[arg-type] + name=name, + endpoint=endpoint, + manufacturer=manufacturer, + configuration_url=configuration_url, + type=type, # type: ignore[arg-type] ) diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 00000000..8106d29d --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,534 @@ +"""Tests for authentication module.""" + +from __future__ import annotations + +import base64 +import datetime +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest +from aiohttp import ClientSession + +from pyoverkiz.auth.base import AuthContext +from pyoverkiz.auth.credentials import ( + LocalTokenCredentials, + RexelOAuthCodeCredentials, + TokenCredentials, + UsernamePasswordCredentials, +) +from pyoverkiz.auth.factory import ( + _ensure_rexel, + _ensure_token, + _ensure_username_password, + build_auth_strategy, +) +from pyoverkiz.auth.strategies import ( + BearerTokenAuthStrategy, + CozytouchAuthStrategy, + LocalTokenAuthStrategy, + NexityAuthStrategy, + RexelAuthStrategy, + SessionLoginStrategy, + SomfyAuthStrategy, + _decode_jwt_payload, +) +from pyoverkiz.enums import APIType, Server +from pyoverkiz.exceptions import InvalidTokenException +from pyoverkiz.models import ServerConfig + + +class TestAuthContext: + """Test AuthContext functionality.""" + + def test_not_expired_no_expiration(self): + """Test that context without expiration is not expired.""" + context = AuthContext(access_token="test_token") + assert not context.is_expired() + + def test_not_expired_future_expiration(self): + """Test that context with future expiration is not expired.""" + future = datetime.datetime.now(datetime.UTC) + datetime.timedelta(hours=1) + context = AuthContext(access_token="test_token", expires_at=future) + assert not context.is_expired() + + def test_expired_past_expiration(self): + """Test that context with past expiration is expired.""" + past = datetime.datetime.now(datetime.UTC) - datetime.timedelta(hours=1) + context = AuthContext(access_token="test_token", expires_at=past) + assert context.is_expired() + + def test_expired_with_skew(self): + """Test that context respects skew time.""" + # Expires in 3 seconds, but default skew is 5 + soon = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=3) + context = AuthContext(access_token="test_token", expires_at=soon) + assert context.is_expired() + + def test_not_expired_with_custom_skew(self): + """Test that custom skew time can be provided.""" + soon = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=3) + context = AuthContext(access_token="test_token", expires_at=soon) + assert not context.is_expired(skew_seconds=1) + + +class TestCredentials: + """Test credential dataclasses.""" + + def test_username_password_credentials(self): + """Test UsernamePasswordCredentials creation.""" + creds = UsernamePasswordCredentials("user@example.com", "password123") + assert creds.username == "user@example.com" + assert creds.password == "password123" + + def test_token_credentials(self): + """Test TokenCredentials creation.""" + creds = TokenCredentials("my_token_123") + assert creds.token == "my_token_123" + + def test_local_token_credentials(self): + """Test LocalTokenCredentials creation.""" + creds = LocalTokenCredentials("local_token_456") + assert creds.token == "local_token_456" + assert isinstance(creds, TokenCredentials) + + def test_rexel_oauth_credentials(self): + """Test RexelOAuthCodeCredentials creation.""" + creds = RexelOAuthCodeCredentials("auth_code_xyz", "http://redirect.uri") + assert creds.code == "auth_code_xyz" + assert creds.redirect_uri == "http://redirect.uri" + + +class TestAuthFactory: + """Test authentication factory functions.""" + + def test_ensure_username_password_valid(self): + """Test that valid username/password credentials pass validation.""" + creds = UsernamePasswordCredentials("user", "pass") + result = _ensure_username_password(creds) + assert result is creds + + def test_ensure_username_password_invalid(self): + """Test that invalid credentials raise TypeError.""" + creds = TokenCredentials("token") + with pytest.raises(TypeError, match="UsernamePasswordCredentials are required"): + _ensure_username_password(creds) + + def test_ensure_token_valid(self): + """Test that valid token credentials pass validation.""" + creds = TokenCredentials("token") + result = _ensure_token(creds) + assert result is creds + + def test_ensure_token_local_valid(self): + """Test that LocalTokenCredentials also pass token validation.""" + creds = LocalTokenCredentials("local_token") + result = _ensure_token(creds) + assert result is creds + + def test_ensure_token_invalid(self): + """Test that invalid credentials raise TypeError.""" + creds = UsernamePasswordCredentials("user", "pass") + with pytest.raises(TypeError, match="TokenCredentials are required"): + _ensure_token(creds) + + def test_ensure_rexel_valid(self): + """Test that valid Rexel credentials pass validation.""" + creds = RexelOAuthCodeCredentials("code", "uri") + result = _ensure_rexel(creds) + assert result is creds + + def test_ensure_rexel_invalid(self): + """Test that invalid credentials raise TypeError.""" + creds = UsernamePasswordCredentials("user", "pass") + with pytest.raises(TypeError, match="RexelOAuthCodeCredentials are required"): + _ensure_rexel(creds) + + @pytest.mark.asyncio + async def test_build_auth_strategy_somfy(self): + """Test building Somfy auth strategy.""" + server_config = ServerConfig( + server=Server.SOMFY_EUROPE, + name="Somfy", + endpoint="https://api.somfy.com", + manufacturer="Somfy", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, SomfyAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_cozytouch(self): + """Test building Cozytouch auth strategy.""" + server_config = ServerConfig( + server=Server.ATLANTIC_COZYTOUCH, + name="Cozytouch", + endpoint="https://api.cozytouch.com", + manufacturer="Atlantic", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, CozytouchAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_nexity(self): + """Test building Nexity auth strategy.""" + server_config = ServerConfig( + server=Server.NEXITY, + name="Nexity", + endpoint="https://api.nexity.com", + manufacturer="Nexity", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, NexityAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_rexel(self): + """Test building Rexel auth strategy.""" + server_config = ServerConfig( + server=Server.REXEL, + name="Rexel", + endpoint="https://api.rexel.com", + manufacturer="Rexel", + type=APIType.CLOUD, + ) + credentials = RexelOAuthCodeCredentials("code", "http://redirect.uri") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, RexelAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_local_token(self): + """Test building local token auth strategy.""" + server_config = ServerConfig( + server=None, + name="Local", + endpoint="https://gateway.local", + manufacturer="Overkiz", + type=APIType.LOCAL, + ) + credentials = LocalTokenCredentials("local_token") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, LocalTokenAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_local_bearer(self): + """Test building local bearer token auth strategy.""" + server_config = ServerConfig( + server=None, + name="Local", + endpoint="https://gateway.local", + manufacturer="Overkiz", + type=APIType.LOCAL, + ) + credentials = TokenCredentials("bearer_token") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, BearerTokenAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_cloud_bearer(self): + """Test building cloud bearer token auth strategy.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Somfy Oceania", + endpoint="https://api.somfy.com.au", + manufacturer="Somfy", + type=APIType.CLOUD, + ) + credentials = TokenCredentials("bearer_token") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, BearerTokenAuthStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_session_login(self): + """Test building generic session login auth strategy.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Somfy Oceania", + endpoint="https://api.somfy.com.au", + manufacturer="Somfy", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + assert isinstance(strategy, SessionLoginStrategy) + + @pytest.mark.asyncio + async def test_build_auth_strategy_wrong_credentials_type(self): + """Test that wrong credentials type raises TypeError.""" + server_config = ServerConfig( + server=Server.SOMFY_EUROPE, + name="Somfy", + endpoint="https://api.somfy.com", + manufacturer="Somfy", + type=APIType.CLOUD, + ) + credentials = TokenCredentials("token") # Wrong type for Somfy + session = AsyncMock(spec=ClientSession) + + with pytest.raises(TypeError, match="UsernamePasswordCredentials are required"): + build_auth_strategy( + server_config=server_config, + credentials=credentials, + session=session, + ssl_context=True, + ) + + +class TestSessionLoginStrategy: + """Test SessionLoginStrategy.""" + + @pytest.mark.asyncio + async def test_login_success(self): + """Test successful login with 200 response.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + mock_response = MagicMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={"success": True}) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + session.post = MagicMock(return_value=mock_response) + + strategy = SessionLoginStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + await strategy.login() + + session.post.assert_called_once() + + @pytest.mark.asyncio + async def test_login_204_no_content(self): + """Test login with 204 No Content response.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + mock_response = MagicMock() + mock_response.status = 204 + mock_response.json = AsyncMock() + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + session.post = MagicMock(return_value=mock_response) + + strategy = SessionLoginStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + await strategy.login() + + # Should not call json() for 204 response + assert not mock_response.json.called + + @pytest.mark.asyncio + async def test_refresh_if_needed_no_refresh(self): + """Test that refresh_if_needed returns False when no refresh needed.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = SessionLoginStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + result = await strategy.refresh_if_needed() + + assert not result + + def test_auth_headers_no_token(self): + """Test that auth headers return empty dict when no token.""" + server_config = ServerConfig( + server=Server.SOMFY_OCEANIA, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = UsernamePasswordCredentials("user", "pass") + session = AsyncMock(spec=ClientSession) + + strategy = SessionLoginStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + headers = strategy.auth_headers() + + assert headers == {} + + +class TestBearerTokenAuthStrategy: + """Test BearerTokenAuthStrategy.""" + + @pytest.mark.asyncio + async def test_login_no_op(self): + """Test that login is a no-op for bearer tokens.""" + server_config = ServerConfig( + server=None, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = TokenCredentials("my_bearer_token") + session = AsyncMock(spec=ClientSession) + + strategy = BearerTokenAuthStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + result = await strategy.login() + + # Login should be a no-op + assert result is None + + def test_auth_headers_with_token(self): + """Test that auth headers include Bearer token.""" + server_config = ServerConfig( + server=None, + name="Test", + endpoint="https://api.test.com/", + manufacturer="Test", + type=APIType.CLOUD, + ) + credentials = TokenCredentials("my_bearer_token") + session = AsyncMock(spec=ClientSession) + + strategy = BearerTokenAuthStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + headers = strategy.auth_headers() + + assert headers == {"Authorization": "Bearer my_bearer_token"} + + +class TestRexelAuthStrategy: + """Tests for Rexel auth specifics.""" + + @pytest.mark.asyncio + async def test_exchange_token_error_response(self): + """Ensure OAuth error payloads raise InvalidTokenException before parsing access token.""" + server_config = ServerConfig( + server=Server.REXEL, + name="Rexel", + endpoint="https://api.rexel.com", + manufacturer="Rexel", + type=APIType.CLOUD, + ) + credentials = RexelOAuthCodeCredentials("code", "https://redirect") + session = AsyncMock(spec=ClientSession) + + mock_response = MagicMock() + mock_response.status = 400 + mock_response.json = AsyncMock( + return_value={"error": "invalid_grant", "error_description": "bad grant"} + ) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + session.post = MagicMock(return_value=mock_response) + + strategy = RexelAuthStrategy( + credentials, session, server_config, True, APIType.CLOUD + ) + + with pytest.raises(InvalidTokenException, match="bad grant"): + await strategy._exchange_token({"grant_type": "authorization_code"}) + + def test_ensure_consent_missing(self): + """Raising when JWT consent claim is missing or incorrect.""" + payload_segment = ( + base64.urlsafe_b64encode(json.dumps({"consent": "other"}).encode()) + .decode() + .rstrip("=") + ) + token = f"header.{payload_segment}.sig" + + with pytest.raises(InvalidTokenException, match="Consent is missing"): + RexelAuthStrategy._ensure_consent(token) + + def test_decode_jwt_payload_invalid_format(self): + """Malformed tokens raise InvalidTokenException during decoding.""" + with pytest.raises(InvalidTokenException): + _decode_jwt_payload("invalid.token") diff --git a/tests/test_client.py b/tests/test_client.py index 71a42e25..af68ffbf 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -11,11 +11,14 @@ from pytest_asyncio import fixture from pyoverkiz import exceptions +from pyoverkiz.auth.credentials import ( + LocalTokenCredentials, + UsernamePasswordCredentials, +) from pyoverkiz.client import OverkizClient -from pyoverkiz.const import SUPPORTED_SERVERS -from pyoverkiz.enums import APIType, DataType +from pyoverkiz.enums import APIType, DataType, Server from pyoverkiz.models import Option -from pyoverkiz.utils import generate_local_server +from pyoverkiz.utils import create_local_server_config CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -26,26 +29,28 @@ class TestOverkizClient: @fixture async def client(self): """Fixture providing an OverkizClient configured for the cloud server.""" - return OverkizClient("username", "password", SUPPORTED_SERVERS["somfy_europe"]) + return OverkizClient( + server=Server.SOMFY_EUROPE, + credentials=UsernamePasswordCredentials("username", "password"), + ) @fixture async def local_client(self): """Fixture providing an OverkizClient configured for a local (developer) server.""" return OverkizClient( - "username", - "password", - generate_local_server("gateway-1234-5678-1243.local:8443"), + server=create_local_server_config(host="gateway-1234-5678-1243.local:8443"), + credentials=LocalTokenCredentials(token="token"), ) @pytest.mark.asyncio async def test_get_api_type_cloud(self, client: OverkizClient): """Verify that a cloud-configured client reports APIType.CLOUD.""" - assert client.api_type == APIType.CLOUD + assert client.server_config.type == APIType.CLOUD @pytest.mark.asyncio async def test_get_api_type_local(self, local_client: OverkizClient): """Verify that a local-configured client reports APIType.LOCAL.""" - assert local_client.api_type == APIType.LOCAL + assert local_client.server_config.type == APIType.LOCAL @pytest.mark.asyncio async def test_get_devices_basic(self, client: OverkizClient): diff --git a/tests/test_utils.py b/tests/test_utils.py index a951078f..8b33b8f7 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,7 +2,7 @@ import pytest -from pyoverkiz.utils import generate_local_server, is_overkiz_gateway +from pyoverkiz.utils import create_local_server_config, is_overkiz_gateway LOCAL_HOST = "gateway-1234-5678-1243.local:8443" LOCAL_HOST_BY_IP = "192.168.1.105:8443" @@ -11,9 +11,9 @@ class TestUtils: """Tests for utility helpers like local server generation and gateway checks.""" - def test_generate_local_server(self): + def test_create_local_server_config(self): """Create a local server descriptor using the host and default values.""" - local_server = generate_local_server(host=LOCAL_HOST) + local_server = create_local_server_config(host=LOCAL_HOST) assert local_server assert ( @@ -24,9 +24,9 @@ def test_generate_local_server(self): assert local_server.name == "Somfy Developer Mode" assert local_server.configuration_url is None - def test_generate_local_server_by_ip(self): + def test_create_local_server_config_by_ip(self): """Create a local server descriptor using an IP host and custom fields.""" - local_server = generate_local_server( + local_server = create_local_server_config( host=LOCAL_HOST_BY_IP, manufacturer="Test Manufacturer", name="Test Name",