-
Notifications
You must be signed in to change notification settings - Fork 34
Refactor authentication handling in OverkizClient and add new credential classes #1867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v2/main
Are you sure you want to change the base?
Conversation
c82af05 to
1849f13
Compare
…ial classes - Introduced UsernamePasswordCredentials and LocalTokenCredentials for better credential management. - Updated OverkizClient to utilize the new credential classes and refactored login logic. - Added authentication strategies for various servers, including Somfy and Rexel. - Created new modules for auth strategies and credentials to improve code organization. - Enhanced README with updated usage examples for the new authentication methods.
…Config across the codebase
…fig and update related references
…y and update related server handling
…_auth_strategy function
98d9d6a to
38823d2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a comprehensive refactoring of authentication handling in the pyoverkiz library by extracting authentication logic into a dedicated module with credential classes and authentication strategies. The refactoring improves code organization and extensibility while introducing breaking changes to the OverkizClient API.
- Introduced new authentication module with credential classes (UsernamePasswordCredentials, LocalTokenCredentials, RexelOAuthCodeCredentials, TokenCredentials) and strategy-based authentication
- Refactored OverkizClient constructor to require ServerConfig and Credentials objects as keyword-only arguments
- Added support for Rexel OAuth2 authentication flow with consent validation
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| pyoverkiz/auth/base.py | Defines AuthContext and AuthStrategy protocol for token management |
| pyoverkiz/auth/credentials.py | Introduces credential classes for different authentication methods |
| pyoverkiz/auth/strategies.py | Implements authentication strategies for various servers (Somfy, Cozytouch, Nexity, Rexel, Local) |
| pyoverkiz/auth/factory.py | Factory function to build appropriate auth strategy based on server and credentials |
| pyoverkiz/auth/init.py | Module exports for the authentication package |
| pyoverkiz/client.py | Refactored to use new authentication module; removed inline auth logic |
| pyoverkiz/models.py | Renamed OverkizServer to ServerConfig with additional server and type properties |
| pyoverkiz/const.py | Updated server configurations to use ServerConfig; added Rexel OAuth constants |
| pyoverkiz/utils.py | Renamed generate_local_server to create_local_server_config; added create_server_config helper |
| tests/test_client.py | Updated to use new credential classes and Server enum |
| tests/test_utils.py | Updated test names to match renamed utility functions |
| README.md | Updated examples to demonstrate new authentication approach |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| raise BadCredentialsException( | ||
| f"Login failed for {self.server.name}: {response.status}" | ||
| ) | ||
|
|
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The response might not contain JSON (e.g., for 204 No Content status). Consider checking the response status or content-type before calling response.json(), or handling potential JSON decode errors.
| # A 204 No Content response cannot have a body, so skip JSON parsing. | |
| if response.status == 204: | |
| return |
pyoverkiz/auth/strategies.py
Outdated
| self.context.refresh_token = token.get("refresh_token") | ||
| expires_in = token.get("expires_in") | ||
| if expires_in: | ||
| self.context.expires_at = datetime.datetime.now() + datetime.timedelta( |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using datetime.datetime.now() without timezone information can cause issues in systems with different timezone configurations. Consider using datetime.datetime.now(tz=datetime.timezone.utc) for consistency.
| self.context.expires_at = datetime.datetime.now() + datetime.timedelta( | |
| self.context.expires_at = datetime.datetime.now( | |
| tz=datetime.timezone.utc | |
| ) + datetime.timedelta( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datetime.datetime.now(datetime.UTC) here and in any place where we get the current date.
pyoverkiz/auth/base.py
Outdated
| if not self.expires_at: | ||
| return False | ||
|
|
||
| return datetime.datetime.now() >= self.expires_at - datetime.timedelta( |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using datetime.datetime.now() without timezone information can cause issues in systems with different timezone configurations. Consider using datetime.datetime.now(tz=datetime.timezone.utc) for consistency with the expires_at timestamps set in the auth strategies.
| return datetime.datetime.now() >= self.expires_at - datetime.timedelta( | |
| return datetime.datetime.now(tz=datetime.timezone.utc) >= self.expires_at - datetime.timedelta( |
pyoverkiz/utils.py
Outdated
| server=server | ||
| if isinstance(server, Server) or server is None | ||
| else Server(server), | ||
| name=name, | ||
| endpoint=endpoint, | ||
| manufacturer=manufacturer, | ||
| configuration_url=configuration_url, | ||
| type=type if isinstance(type, APIType) else APIType(type), |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The enum conversion logic in create_server_config duplicates the conversion already done in ServerConfig.init. Since ServerConfig.init handles the conversion, this function can pass the parameters directly without pre-converting them.
| server=server | |
| if isinstance(server, Server) or server is None | |
| else Server(server), | |
| name=name, | |
| endpoint=endpoint, | |
| manufacturer=manufacturer, | |
| configuration_url=configuration_url, | |
| type=type if isinstance(type, APIType) else APIType(type), | |
| server=server, | |
| name=name, | |
| endpoint=endpoint, | |
| manufacturer=manufacturer, | |
| configuration_url=configuration_url, | |
| type=type, |
| headers={"Content-Type": "application/x-www-form-urlencoded"}, | ||
| ) as response: | ||
| token = await response.json() | ||
|
|
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The response.json() call should handle potential errors from the OAuth endpoint. If the token exchange fails, the response may contain an error field that should be checked before attempting to access the access_token.
| # Handle OAuth error responses explicitly before accessing the access token. | |
| error = token.get("error") | |
| if error: | |
| description = token.get("error_description") or token.get("message") | |
| if description: | |
| raise InvalidTokenException( | |
| f"Error retrieving Rexel access token: {description}" | |
| ) | |
| raise InvalidTokenException( | |
| f"Error retrieving Rexel access token: {error}" | |
| ) |
| """Authentication strategies for Overkiz API.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import base64 | ||
| import binascii | ||
| import datetime | ||
| import json | ||
| import ssl | ||
| from collections.abc import Mapping | ||
| from typing import Any, cast | ||
|
|
||
| import boto3 | ||
| from aiohttp import ClientSession, FormData | ||
| from botocore.client import BaseClient | ||
| from botocore.config import Config | ||
| from warrant_lite import WarrantLite | ||
|
|
||
| from pyoverkiz.auth.base import AuthContext, AuthStrategy | ||
| from pyoverkiz.auth.credentials import ( | ||
| LocalTokenCredentials, | ||
| RexelOAuthCodeCredentials, | ||
| TokenCredentials, | ||
| UsernamePasswordCredentials, | ||
| ) | ||
| from pyoverkiz.const import ( | ||
| COZYTOUCH_ATLANTIC_API, | ||
| COZYTOUCH_CLIENT_ID, | ||
| NEXITY_API, | ||
| NEXITY_COGNITO_CLIENT_ID, | ||
| NEXITY_COGNITO_REGION, | ||
| NEXITY_COGNITO_USER_POOL, | ||
| REXEL_OAUTH_CLIENT_ID, | ||
| REXEL_OAUTH_SCOPE, | ||
| REXEL_OAUTH_TOKEN_URL, | ||
| REXEL_REQUIRED_CONSENT, | ||
| SOMFY_API, | ||
| SOMFY_CLIENT_ID, | ||
| SOMFY_CLIENT_SECRET, | ||
| ) | ||
| from pyoverkiz.enums import APIType | ||
| from pyoverkiz.exceptions import ( | ||
| BadCredentialsException, | ||
| CozyTouchBadCredentialsException, | ||
| CozyTouchServiceException, | ||
| InvalidTokenException, | ||
| NexityBadCredentialsException, | ||
| NexityServiceException, | ||
| SomfyBadCredentialsException, | ||
| SomfyServiceException, | ||
| ) | ||
| from pyoverkiz.models import ServerConfig | ||
|
|
||
|
|
||
| class BaseAuthStrategy(AuthStrategy): | ||
| """Base class for authentication strategies.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| session: ClientSession, | ||
| server: ServerConfig, | ||
| ssl_context: ssl.SSLContext | bool, | ||
| api_type: APIType, | ||
| ) -> None: | ||
| """Store shared auth context for Overkiz API interactions.""" | ||
| self.session = session | ||
| self.server = server | ||
| self._ssl = ssl_context | ||
| self.api_type = api_type | ||
|
|
||
| async def login(self) -> None: | ||
| """Perform authentication; default is a no-op for subclasses to override.""" | ||
| return None | ||
|
|
||
| async def refresh_if_needed(self) -> bool: | ||
| """Refresh authentication tokens if needed; default returns False.""" | ||
| return False | ||
|
|
||
| def auth_headers(self, path: str | None = None) -> Mapping[str, str]: | ||
| """Return authentication headers for a request path.""" | ||
| return {} | ||
|
|
||
| async def close(self) -> None: | ||
| """Close any resources held by the strategy; default is no-op.""" | ||
| return None | ||
|
|
||
|
|
||
| class SessionLoginStrategy(BaseAuthStrategy): | ||
| """Authentication strategy using session-based login.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| credentials: UsernamePasswordCredentials, | ||
| session: ClientSession, | ||
| server: ServerConfig, | ||
| ssl_context: ssl.SSLContext | bool, | ||
| api_type: APIType, | ||
| ) -> None: | ||
| """Initialize SessionLoginStrategy with given parameters.""" | ||
| super().__init__(session, server, ssl_context, api_type) | ||
| self.credentials = credentials | ||
|
|
||
| async def login(self) -> None: | ||
| """Perform login using username and password.""" | ||
| payload = { | ||
| "userId": self.credentials.username, | ||
| "userPassword": self.credentials.password, | ||
| } | ||
| await self._post_login(payload) | ||
|
|
||
| async def _post_login(self, data: Mapping[str, Any]) -> None: | ||
| """Post login data to the server and handle response.""" | ||
| async with self.session.post( | ||
| f"{self.server.endpoint}login", | ||
| data=data, | ||
| ssl=self._ssl, | ||
| ) as response: | ||
| if response.status not in (200, 204): | ||
| raise BadCredentialsException( | ||
| f"Login failed for {self.server.name}: {response.status}" | ||
| ) | ||
|
|
||
| result = await response.json() | ||
| if not result.get("success"): | ||
| raise BadCredentialsException("Login failed: bad credentials") | ||
|
|
||
|
|
||
| class SomfyAuthStrategy(BaseAuthStrategy): | ||
| """Authentication strategy using Somfy OAuth2.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| credentials: UsernamePasswordCredentials, | ||
| session: ClientSession, | ||
| server: ServerConfig, | ||
| ssl_context: ssl.SSLContext | bool, | ||
| api_type: APIType, | ||
| ) -> None: | ||
| """Initialize SomfyAuthStrategy with given parameters.""" | ||
| super().__init__(session, server, ssl_context, api_type) | ||
| self.credentials = credentials | ||
| self.context = AuthContext() | ||
|
|
||
| async def login(self) -> None: | ||
| """Perform login using Somfy OAuth2.""" | ||
| await self._request_access_token( | ||
| grant_type="password", | ||
| extra_fields={ | ||
| "username": self.credentials.username, | ||
| "password": self.credentials.password, | ||
| }, | ||
| ) | ||
|
|
||
| async def refresh_if_needed(self) -> bool: | ||
| """Refresh Somfy OAuth2 tokens if needed.""" | ||
| if not self.context.is_expired() or not self.context.refresh_token: | ||
| return False | ||
|
|
||
| await self._request_access_token( | ||
| grant_type="refresh_token", | ||
| extra_fields={"refresh_token": cast(str, self.context.refresh_token)}, | ||
| ) | ||
| return True | ||
|
|
||
| def auth_headers(self, path: str | None = None) -> Mapping[str, str]: | ||
| """Return authentication headers for a request path.""" | ||
| if self.context.access_token: | ||
| return {"Authorization": f"Bearer {self.context.access_token}"} | ||
|
|
||
| return {} | ||
|
|
||
| async def _request_access_token( | ||
| self, *, grant_type: str, extra_fields: Mapping[str, str] | ||
| ) -> None: | ||
| form = FormData( | ||
| { | ||
| "grant_type": grant_type, | ||
| "client_id": SOMFY_CLIENT_ID, | ||
| "client_secret": SOMFY_CLIENT_SECRET, | ||
| **extra_fields, | ||
| } | ||
| ) | ||
|
|
||
| async with self.session.post( | ||
| f"{SOMFY_API}/oauth/oauth/v2/token/jwt", | ||
| data=form, | ||
| headers={"Content-Type": "application/x-www-form-urlencoded"}, | ||
| ) as response: | ||
| token = await response.json() | ||
|
|
||
| if token.get("message") == "error.invalid.grant": | ||
| raise SomfyBadCredentialsException(token["message"]) | ||
|
|
||
| access_token = token.get("access_token") | ||
| if not access_token: | ||
| raise SomfyServiceException("No Somfy access token provided.") | ||
|
|
||
| self.context.access_token = cast(str, access_token) | ||
| self.context.refresh_token = token.get("refresh_token") | ||
| expires_in = token.get("expires_in") | ||
| if expires_in: | ||
| self.context.expires_at = datetime.datetime.now() + datetime.timedelta( | ||
| seconds=cast(int, expires_in) - 5 | ||
| ) | ||
|
|
||
|
|
||
| class CozytouchAuthStrategy(SessionLoginStrategy): | ||
| """Authentication strategy using Cozytouch session-based login.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| credentials: UsernamePasswordCredentials, | ||
| session: ClientSession, | ||
| server: ServerConfig, | ||
| ssl_context: ssl.SSLContext | bool, | ||
| api_type: APIType, | ||
| ) -> None: | ||
| """Initialize CozytouchAuthStrategy with given parameters.""" | ||
| super().__init__(credentials, session, server, ssl_context, api_type) | ||
|
|
||
| async def login(self) -> None: | ||
| """Perform login using Cozytouch username and password.""" | ||
| form = FormData( | ||
| { | ||
| "grant_type": "password", | ||
| "username": f"GA-PRIVATEPERSON/{self.credentials.username}", | ||
| "password": self.credentials.password, | ||
| } | ||
| ) | ||
| async with self.session.post( | ||
| f"{COZYTOUCH_ATLANTIC_API}/token", | ||
| data=form, | ||
| headers={ | ||
| "Authorization": f"Basic {COZYTOUCH_CLIENT_ID}", | ||
| "Content-Type": "application/x-www-form-urlencoded", | ||
| }, | ||
| ) as response: | ||
| token = await response.json() | ||
|
|
||
| if token.get("error") == "invalid_grant": | ||
| raise CozyTouchBadCredentialsException(token["error_description"]) | ||
|
|
||
| if "token_type" not in token: | ||
| raise CozyTouchServiceException("No CozyTouch token provided.") | ||
|
|
||
| async with self.session.get( | ||
| f"{COZYTOUCH_ATLANTIC_API}/magellan/accounts/jwt", | ||
| headers={"Authorization": f"Bearer {token['access_token']}"}, | ||
| ) as response: | ||
| jwt = await response.text() | ||
|
|
||
| if not jwt: | ||
| raise CozyTouchServiceException("No JWT token provided.") | ||
|
|
||
| jwt = jwt.strip('"') | ||
|
|
||
| await self._post_login({"jwt": jwt}) | ||
|
|
||
|
|
||
| class NexityAuthStrategy(SessionLoginStrategy): | ||
| """Authentication strategy using Nexity session-based login.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| credentials: UsernamePasswordCredentials, | ||
| session: ClientSession, | ||
| server: ServerConfig, | ||
| ssl_context: ssl.SSLContext | bool, | ||
| api_type: APIType, | ||
| ) -> None: | ||
| """Initialize NexityAuthStrategy with given parameters.""" | ||
| super().__init__(credentials, session, server, ssl_context, api_type) | ||
|
|
||
| async def login(self) -> None: | ||
| """Perform login using Nexity username and password.""" | ||
| loop = asyncio.get_event_loop() | ||
|
|
||
| def _client() -> BaseClient: | ||
| return boto3.client( | ||
| "cognito-idp", config=Config(region_name=NEXITY_COGNITO_REGION) | ||
| ) | ||
|
|
||
| client = await loop.run_in_executor(None, _client) | ||
| aws = WarrantLite( | ||
| username=self.credentials.username, | ||
| password=self.credentials.password, | ||
| pool_id=NEXITY_COGNITO_USER_POOL, | ||
| client_id=NEXITY_COGNITO_CLIENT_ID, | ||
| client=client, | ||
| ) | ||
|
|
||
| try: | ||
| tokens = await loop.run_in_executor(None, aws.authenticate_user) | ||
| except Exception as error: | ||
| raise NexityBadCredentialsException() from error | ||
|
|
||
| id_token = tokens["AuthenticationResult"]["IdToken"] | ||
|
|
||
| async with self.session.get( | ||
| f"{NEXITY_API}/deploy/api/v1/domotic/token", | ||
| headers={"Authorization": id_token}, | ||
| ) as response: | ||
| token = await response.json() | ||
|
|
||
| if "token" not in token: | ||
| raise NexityServiceException("No Nexity SSO token provided.") | ||
|
|
||
| user_id = self.credentials.username.replace("@", "_-_") | ||
| await self._post_login({"ssoToken": token["token"], "userId": user_id}) | ||
|
|
||
|
|
||
| class LocalTokenAuthStrategy(BaseAuthStrategy): | ||
| """Authentication strategy using a local API token.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| credentials: LocalTokenCredentials, | ||
| session: ClientSession, | ||
| server: ServerConfig, | ||
| ssl_context: ssl.SSLContext | bool, | ||
| api_type: APIType, | ||
| ) -> None: | ||
| """Initialize LocalTokenAuthStrategy with given parameters.""" | ||
| super().__init__(session, server, ssl_context, api_type) | ||
| self.credentials = credentials | ||
|
|
||
| async def login(self) -> None: | ||
| """Validate that a token is provided for local API access.""" | ||
| if not self.credentials.token: | ||
| raise InvalidTokenException("Local API requires a token.") | ||
|
|
||
| def auth_headers(self, path: str | None = None) -> Mapping[str, str]: | ||
| """Return authentication headers for a request path.""" | ||
| return {"Authorization": f"Bearer {self.credentials.token}"} | ||
|
|
||
|
|
||
| class RexelAuthStrategy(BaseAuthStrategy): | ||
| """Authentication strategy using Rexel OAuth2.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| credentials: RexelOAuthCodeCredentials, | ||
| session: ClientSession, | ||
| server: ServerConfig, | ||
| ssl_context: ssl.SSLContext | bool, | ||
| api_type: APIType, | ||
| ) -> None: | ||
| """Initialize RexelAuthStrategy with given parameters.""" | ||
| super().__init__(session, server, ssl_context, api_type) | ||
| self.credentials = credentials | ||
| self.context = AuthContext() | ||
|
|
||
| async def login(self) -> None: | ||
| """Perform login using Rexel OAuth2 authorization code.""" | ||
| await self._exchange_token( | ||
| { | ||
| "grant_type": "authorization_code", | ||
| "client_id": REXEL_OAUTH_CLIENT_ID, | ||
| "scope": REXEL_OAUTH_SCOPE, | ||
| "code": self.credentials.code, | ||
| "redirect_uri": self.credentials.redirect_uri, | ||
| } | ||
| ) | ||
|
|
||
| async def refresh_if_needed(self) -> bool: | ||
| """Refresh Rexel OAuth2 tokens if needed.""" | ||
| if not self.context.is_expired() or not self.context.refresh_token: | ||
| return False | ||
|
|
||
| await self._exchange_token( | ||
| { | ||
| "grant_type": "refresh_token", | ||
| "client_id": REXEL_OAUTH_CLIENT_ID, | ||
| "scope": REXEL_OAUTH_SCOPE, | ||
| "refresh_token": cast(str, self.context.refresh_token), | ||
| } | ||
| ) | ||
| return True | ||
|
|
||
| def auth_headers(self, path: str | None = None) -> Mapping[str, str]: | ||
| """Return authentication headers for a request path.""" | ||
| if self.context.access_token: | ||
| return {"Authorization": f"Bearer {self.context.access_token}"} | ||
| return {} | ||
|
|
||
| async def _exchange_token(self, payload: Mapping[str, str]) -> None: | ||
| """Exchange authorization code or refresh token for access token.""" | ||
| form = FormData(payload) | ||
| async with self.session.post( | ||
| REXEL_OAUTH_TOKEN_URL, | ||
| data=form, | ||
| headers={"Content-Type": "application/x-www-form-urlencoded"}, | ||
| ) as response: | ||
| token = await response.json() | ||
|
|
||
| access_token = token.get("access_token") | ||
| if not access_token: | ||
| raise InvalidTokenException("No Rexel access token provided.") | ||
|
|
||
| self._ensure_consent(access_token) | ||
| self.context.access_token = cast(str, access_token) | ||
| self.context.refresh_token = token.get("refresh_token") | ||
| expires_in = token.get("expires_in") | ||
| if expires_in: | ||
| self.context.expires_at = datetime.datetime.now() + datetime.timedelta( | ||
| seconds=cast(int, expires_in) - 5 | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _ensure_consent(access_token: str) -> None: | ||
| """Ensure that the Rexel token has the required consent.""" | ||
| payload = _decode_jwt_payload(access_token) | ||
| consent = payload.get("consent") | ||
| if consent != REXEL_REQUIRED_CONSENT: | ||
| raise InvalidTokenException( | ||
| "Consent is missing or revoked for Rexel token." | ||
| ) | ||
|
|
||
|
|
||
| class BearerTokenAuthStrategy(BaseAuthStrategy): | ||
| """Authentication strategy using a static bearer token.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| credentials: TokenCredentials, | ||
| session: ClientSession, | ||
| server: ServerConfig, | ||
| ssl_context: ssl.SSLContext | bool, | ||
| api_type: APIType, | ||
| ) -> None: | ||
| """Initialize BearerTokenAuthStrategy with given parameters.""" | ||
| super().__init__(session, server, ssl_context, api_type) | ||
| self.credentials = credentials | ||
|
|
||
| def auth_headers(self, path: str | None = None) -> Mapping[str, str]: | ||
| """Return authentication headers for a request path.""" | ||
| if self.credentials.token: | ||
| return {"Authorization": f"Bearer {self.credentials.token}"} | ||
| return {} | ||
|
|
||
|
|
||
| def _decode_jwt_payload(token: str) -> dict[str, Any]: | ||
| """Decode the payload of a JWT token.""" | ||
| parts = token.split(".") | ||
| if len(parts) < 2: | ||
| raise InvalidTokenException("Malformed JWT received.") | ||
|
|
||
| payload_segment = parts[1] | ||
| padding = "=" * (-len(payload_segment) % 4) | ||
| try: | ||
| decoded = base64.urlsafe_b64decode(payload_segment + padding) | ||
| return cast(dict[str, Any], json.loads(decoded)) | ||
| except (binascii.Error, json.JSONDecodeError) as error: | ||
| raise InvalidTokenException("Malformed JWT received.") from error |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new authentication module (auth/strategies.py, auth/factory.py, auth/credentials.py) lacks dedicated test coverage. Given that authentication is a critical component and the module contains complex OAuth flows, token refresh logic, and JWT decoding, comprehensive test coverage should be added.
| if self.server_config.type == APIType.LOCAL and verify_ssl: | ||
| # To avoid security issues while authentication to local API, we add the following authority to | ||
| # our HTTPS client trust store: https://ca.overkiz.com/overkiz-root-ca-2048.crt | ||
| self._ssl = SSL_CONTEXT_LOCAL_API | ||
|
|
||
| if verify_ssl: | ||
| # To avoid security issues while authentication to local API, we add the following authority to | ||
| # our HTTPS client trust store: https://ca.overkiz.com/overkiz-root-ca-2048.crt | ||
| self._ssl = SSL_CONTEXT_LOCAL_API | ||
| # Disable strict validation introduced in Python 3.13, which doesn't | ||
| # work with Overkiz self-signed gateway certificates | ||
| self._ssl.verify_flags &= ~ssl.VERIFY_X509_STRICT |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mutating the shared SSL_CONTEXT_LOCAL_API object can cause issues when multiple OverkizClient instances are created. The verify_flags modification affects the global context. Consider creating a copy of the SSL context for each client instance.
pyoverkiz/auth/strategies.py
Outdated
| self.context.refresh_token = token.get("refresh_token") | ||
| expires_in = token.get("expires_in") | ||
| if expires_in: | ||
| self.context.expires_at = datetime.datetime.now() + datetime.timedelta( |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using datetime.datetime.now() without timezone information can cause issues in systems with different timezone configurations. Consider using datetime.datetime.now(tz=datetime.timezone.utc) for consistency.
| self.context.expires_at = datetime.datetime.now() + datetime.timedelta( | |
| self.context.expires_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta( |
| ) -> None: | ||
| """Initialize ServerConfig and convert enum fields.""" | ||
| self.server = ( | ||
| server if isinstance(server, Server) or server is None else Server(server) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create_server_config() in utils.py:40-42 does the same conversion
|
@copilot can you address the feedback in this PR? |
…ling, and add auth tests (#1875) - [x] Fix datetime.now() calls to use UTC timezone (pyoverkiz/auth/strategies.py lines 203, 406; pyoverkiz/auth/base.py line 24) - [x] Handle 204 No Content responses properly in strategies.py line 123 - [x] Add error handling for OAuth token exchange responses in strategies.py line 396 - [x] Remove duplicate enum conversion logic in utils.py create_server_config function - [x] Fix SSL_CONTEXT_LOCAL_API mutation issue by creating a copy per client instance - [x] Add test coverage for authentication module (strategies.py, factory.py, credentials.py) - [x] Revert SSL context creation to avoid blocking I/O at runtime - [x] Add TODO fix comment for mypy type ignore workaround <!-- START COPILOT CODING AGENT TIPS --> --- ✨ Let Copilot coding agent [set things up for you](https://github.com/iMicknl/python-overkiz-api/issues/new?title=✨+Set+up+Copilot+instructions&body=Configure%20instructions%20for%20this%20repository%20as%20documented%20in%20%5BBest%20practices%20for%20Copilot%20coding%20agent%20in%20your%20repository%5D%28https://gh.io/copilot-coding-agent-tips%29%2E%0A%0A%3COnboard%20this%20repo%3E&assignees=copilot) — coding agent works faster and does higher quality work when set up for your repo. --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: iMicknl <1424596+iMicknl@users.noreply.github.com>
Breaking
OverkizServerclass is renamed toServerConfigand has additionalserverandtype(cloud/local) propertygenerate_local_serveris renamed tocreate_local_server_configclient.api_typeis removed and now available viaServerConfig(e.g.client.server_config.type)ServerConfigviaserverCredentialsclass viacredentials, e.g.UsernamePasswordCredentials(USERNAME, PASSWORD)for most server.