Skip to content

Commit 8c17a86

Browse files
authored
[DEV-3805] Move token fetcher to use dataclasses (#201)
Signed-off-by: Max Chesterfield <max.chesterfield@zepben.com>
1 parent e5e1057 commit 8c17a86

3 files changed

Lines changed: 64 additions & 44 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ The Python Evolve SDK contains everything necessary to communicate with a [Zepbe
66

77
# Requirements #
88

9-
- Python 3.9 or later
9+
- Python 3.10 or later
1010

1111
# Installation #
1212

changelog.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* None.
1111

1212
### Fixes
13-
* None.
13+
* Moved ZepbenTokenAuth to use python dataclasses instead of `zepben.ewb.dataclassy`, existing code should work as is.
1414

1515
### Notes
1616
* None.

src/zepben/ewb/auth/client/zepben_token_fetcher.py

Lines changed: 62 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
__all__ = ["ZepbenTokenFetcher", "create_token_fetcher", "get_token_fetcher", "create_token_fetcher_managed_identity"]
88

99
import warnings
10+
from dataclasses import dataclass, Field, field, InitVar
1011
from datetime import datetime
11-
from typing import Optional, Union, Callable, Dict
12+
from typing import Optional, Callable
1213

1314
import jwt
1415
import requests
15-
from dataclassy import dataclass
16+
from requests import Response
1617
from urllib3.exceptions import InsecureRequestWarning
1718

1819
from zepben.ewb.auth.common.auth_exception import AuthException
@@ -21,11 +22,15 @@
2122
from zepben.ewb.auth.common.auth_provider_config import AuthProviderConfig, create_auth_provider_config, fetch_provider_details
2223

2324

24-
def _fetch_token_generator(is_entraid: bool, use_identity: bool, identity_url: Optional[str] = None) -> Callable[
25-
[Dict, Dict, str, bool, bool], requests.Response]:
25+
def _fetch_token_generator(
26+
is_entraid: bool,
27+
use_identity: bool,
28+
identity_url: Optional[str] = None
29+
) -> Callable[[dict, dict, str, Optional[bool], Optional[bool]], Response]:
30+
2631
def post(
27-
refresh_request_data: Dict,
28-
token_request_data: Dict,
32+
refresh_request_data: dict,
33+
token_request_data: dict,
2934
token_endpoint: str,
3035
refresh: bool,
3136
verify: bool
@@ -48,7 +53,14 @@ def post(
4853
verify=verify
4954
)
5055

51-
def _get_token_response(refresh_request_data: Dict, token_request_data: Dict, token_endpoint: str, refresh: bool, verify: bool) -> requests.Response:
56+
def _get_token_response(
57+
refresh_request_data: dict,
58+
token_request_data: dict,
59+
token_endpoint: str,
60+
refresh: bool,
61+
verify: bool
62+
) -> requests.Response:
63+
5264
refresh = not is_entraid and refresh # At the moment Azure auth doesn't support refresh tokens. So we always force new tokens.
5365

5466
return post(
@@ -59,53 +71,59 @@ def _get_token_response(refresh_request_data: Dict, token_request_data: Dict, to
5971
verify
6072
)
6173

62-
def _get_token_response_from_identity(refresh_request_data: Dict, token_request_data: Dict, token_endpoint: str, refresh: bool = False,
63-
verify: bool = False) -> requests.Response:
74+
def _get_token_response_from_identity(
75+
refresh_request_data: dict,
76+
token_request_data: dict,
77+
token_endpoint: str,
78+
refresh: Optional[bool] = False,
79+
verify: Optional[bool] = False
80+
) -> requests.Response:
81+
6482
return requests.get(identity_url, headers={"Metadata": "true"}, verify=verify)
6583

6684
if use_identity:
6785
if not identity_url:
68-
raise ValueError("Misconfiguration dectected - if use_identity is true, identity_url must also be provided. This is a bug, contact Zepben.")
86+
raise ValueError("Misconfiguration detected - if use_identity is true, identity_url must also be provided. This is a bug, contact Zepben.")
6987
return _get_token_response_from_identity
7088
else:
7189
return _get_token_response
7290

7391

74-
@dataclass
75-
class ZepbenTokenFetcher(object):
92+
@dataclass(init=True, repr=True, eq=True)
93+
class ZepbenTokenFetcher:
7694
"""
7795
Fetches access tokens from an authentication provider using the OAuth 2.0 protocol.
78-
"""
7996
80-
auth_method: AuthMethod = AuthMethod.OAUTH
81-
""" Deprecated. Kept for backwards compatibility, but this is now unused. """
97+
:param audience: Audience to use when requesting tokens
98+
:param token_endpoint: The domain of the token issuer.
99+
:param token_request_data: Data to pass in token requests.
100+
:param refresh_request_data: Data to pass in refresh token requests.
101+
:param verify: Passed through to requests.post(). When this is a boolean, it determines whether to verify the HTTPS
102+
certificate of the OAUTH service or not. When this is a string, it is used as the filename of the certificate
103+
truststore to use when verifying the OAUTH service.
104+
:param auth_method: Deprecated. Kept for backwards compatibility, but this is now unused.
105+
"""
82106

83107
audience: str
84-
""" Audience to use when requesting tokens """
85-
86-
token_endpoint: str
87-
""" The domain of the token issuer. """
88-
89-
token_request_data = {}
90-
""" Data to pass in token requests. """
91-
92-
refresh_request_data = {}
93-
""" Data to pass in refresh token requests. """
108+
issuer: Optional[str] = None
109+
token_endpoint: Optional[str] = None
110+
token_request_data: Optional[dict] = field(default_factory=dict)
111+
refresh_request_data: Optional[dict] = field(default_factory=dict)
112+
verify: Optional[bool | str] = None
113+
auth_method: Optional[AuthMethod] = None
94114

95-
verify: Union[bool, str] = True
96-
"""
97-
Passed through to requests.post(). When this is a boolean, it determines whether or not to verify the HTTPS certificate of the OAUTH service.
98-
When this is a string, it is used as the filename of the certificate truststore to use when verifying the OAUTH service.
99-
"""
115+
_request_token: InitVar[Callable[[dict, dict, str, Optional[bool], Optional[bool]], requests.Response]] = None
100116

101-
_request_token: Callable[[Dict, Dict, str, bool, bool], requests.Response] = _fetch_token_generator(False, False)
117+
_access_token: Optional[str] = None
118+
_refresh_token: Optional[str] = None
119+
_token_expiry: Optional[datetime] = datetime.min
120+
token_type: Optional[str] = None
102121

103-
_access_token = None
104-
_refresh_token = None
105-
_token_expiry = datetime.min
106-
_token_type = None
122+
def __post_init__(self, _request_token):
123+
if _request_token is None:
124+
_request_token = _fetch_token_generator(False, False)
125+
self._request_token = _request_token
107126

108-
def __init__(self):
109127
self.token_request_data["audience"] = self.audience
110128
self.refresh_request_data["audience"] = self.audience
111129

@@ -134,7 +152,7 @@ def fetch_token(self) -> str:
134152

135153
return f"{self._token_type} {self._access_token}"
136154

137-
def _fetch_token(self, refresh: bool = False):
155+
def _fetch_token(self, refresh: Optional[bool] = False):
138156
if refresh:
139157
self.refresh_request_data["refresh_token"] = self._refresh_token
140158

@@ -174,11 +192,11 @@ def _fetch_token(self, refresh: bool = False):
174192

175193
def create_token_fetcher(
176194
conf_address: str,
177-
verify_conf: Union[bool, str] = True,
178-
verify_auth: Union[bool, str] = True,
179-
auth_type_field: str = "authType",
180-
audience_field: str = "audience",
181-
issuer_field: str = "issuer",
195+
verify_conf: Optional[bool | str] = True,
196+
verify_auth: Optional[bool | str] = True,
197+
auth_type_field: Optional[str] = "authType",
198+
audience_field: Optional[str] = "audience",
199+
issuer_field: Optional[str] = "issuer",
182200
) -> Optional[ZepbenTokenFetcher]:
183201
"""
184202
Helper method to fetch auth related configuration from `conf_address` and create a :class:`ZepbenTokenFetcher`
@@ -194,6 +212,7 @@ def create_token_fetcher(
194212
195213
:returns: A :class:`ZepbenTokenFetcher` if the server reported authentication was configured, otherwise None.
196214
"""
215+
197216
with warnings.catch_warnings():
198217
if not verify_conf:
199218
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
@@ -264,6 +283,7 @@ def create_token_fetcher_managed_identity(identity_url: str, verify_auth: bool)
264283
"http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5ffcfee6-34cd-4c5c-bb7e-c5261d739341"
265284
:param verify_auth: Whether to verify certificates for the identity_url. Only applies for https URLs.
266285
"""
286+
267287
return ZepbenTokenFetcher(
268288
audience="",
269289
issuer="",

0 commit comments

Comments
 (0)