|
15 | 15 |
|
16 | 16 | """Agent-enhanced OAuth2 authentication manager for Asgardeo AI.""" |
17 | 17 |
|
| 18 | +import asyncio |
18 | 19 | import logging |
19 | 20 | import base64 |
20 | 21 | import os |
21 | | -from typing import Dict, List, Optional, Tuple, Any |
| 22 | +import time |
| 23 | +from typing import Callable, Dict, List, Optional, Tuple, Any |
22 | 24 | from urllib.parse import urlencode |
23 | 25 | from dataclasses import dataclass |
24 | 26 |
|
25 | 27 | from asgardeo import ( |
26 | | - AsgardeoConfig, |
27 | | - OAuthToken, |
28 | | - FlowStatus, |
29 | | - AsgardeoNativeAuthClient, |
| 28 | + AsgardeoConfig, |
| 29 | + OAuthToken, |
| 30 | + FlowStatus, |
| 31 | + AsgardeoNativeAuthClient, |
30 | 32 | AsgardeoTokenClient, |
31 | 33 | AuthenticationError, |
| 34 | + CIBAAuthenticationError, |
| 35 | + CIBAResponse, |
| 36 | + CIBAStatus, |
32 | 37 | TokenError, |
33 | 38 | ValidationError, |
34 | 39 | generate_pkce_pair, |
@@ -259,6 +264,130 @@ async def get_obo_token( |
259 | 264 | logger.error(f"OBO token exchange failed: {e}") |
260 | 265 | raise TokenError(f"OBO token exchange failed: {e}") |
261 | 266 |
|
| 267 | + async def _poll_for_token( |
| 268 | + self, |
| 269 | + ciba_response: CIBAResponse, |
| 270 | + scope: Optional[str] = None, |
| 271 | + timeout: Optional[int] = None, |
| 272 | + ) -> OAuthToken: |
| 273 | + """Poll the token endpoint until CIBA authentication completes. |
| 274 | +
|
| 275 | + :param ciba_response: CIBA initiation response with auth_req_id, interval, expires_in |
| 276 | + :param scope: Optional scope override |
| 277 | + :param timeout: Optional max wait time in seconds (defaults to ciba_response.expires_in) |
| 278 | + :return: OAuthToken on successful authentication |
| 279 | + :raises CIBAAuthenticationError: If authentication is denied or expires |
| 280 | + """ |
| 281 | + interval = ciba_response.interval |
| 282 | + max_wait = min( |
| 283 | + timeout or ciba_response.expires_in, |
| 284 | + ciba_response.expires_in, |
| 285 | + ) |
| 286 | + start_time = time.monotonic() |
| 287 | + |
| 288 | + while True: |
| 289 | + elapsed = time.monotonic() - start_time |
| 290 | + if elapsed >= max_wait: |
| 291 | + raise CIBAAuthenticationError( |
| 292 | + "CIBA authentication timed out: exceeded maximum wait time." |
| 293 | + ) |
| 294 | + |
| 295 | + await asyncio.sleep(interval) |
| 296 | + |
| 297 | + try: |
| 298 | + token = await self.token_client.get_token( |
| 299 | + "urn:openid:params:grant-type:ciba", |
| 300 | + auth_req_id=ciba_response.auth_req_id, |
| 301 | + scope=scope, |
| 302 | + ) |
| 303 | + return token |
| 304 | + except CIBAAuthenticationError as e: |
| 305 | + error_msg = str(e) |
| 306 | + if CIBAStatus.AUTHORIZATION_PENDING in error_msg: |
| 307 | + logger.debug("CIBA authorization pending, continuing to poll...") |
| 308 | + continue |
| 309 | + elif CIBAStatus.SLOW_DOWN in error_msg: |
| 310 | + interval += 5 |
| 311 | + logger.debug(f"CIBA slow_down received, increasing interval to {interval}s") |
| 312 | + continue |
| 313 | + elif CIBAStatus.EXPIRED_TOKEN in error_msg: |
| 314 | + raise CIBAAuthenticationError( |
| 315 | + "CIBA authentication request expired. The user did not authenticate in time." |
| 316 | + ) |
| 317 | + elif CIBAStatus.ACCESS_DENIED in error_msg: |
| 318 | + raise CIBAAuthenticationError( |
| 319 | + "CIBA authentication denied. The user rejected the authentication request." |
| 320 | + ) |
| 321 | + else: |
| 322 | + raise |
| 323 | + |
| 324 | + async def get_obo_token_with_ciba( |
| 325 | + self, |
| 326 | + login_hint: str, |
| 327 | + agent_token: OAuthToken, |
| 328 | + scopes: Optional[List[str]] = None, |
| 329 | + binding_message: Optional[str] = None, |
| 330 | + notification_channel: Optional[str] = None, |
| 331 | + timeout: Optional[int] = None, |
| 332 | + on_initiated: Optional[Callable[[CIBAResponse], None]] = None, |
| 333 | + ) -> Tuple[CIBAResponse, OAuthToken]: |
| 334 | + """Get on-behalf-of token using CIBA flow. |
| 335 | +
|
| 336 | + Initiates a CIBA request for a user identified by login_hint, |
| 337 | + then polls until the user authenticates. The actor_token is sent |
| 338 | + in the CIBA initiation to establish OBO delegation. |
| 339 | +
|
| 340 | + :param login_hint: Username or identifier of the user to authenticate |
| 341 | + :param agent_token: The agent's OAuthToken (used as actor_token for delegation) |
| 342 | + :param scopes: List of OAuth scopes to request |
| 343 | + :param binding_message: Message displayed to the user during authentication |
| 344 | + :param notification_channel: Notification channel (email, sms, external) |
| 345 | + :param timeout: Maximum time to wait for authentication in seconds |
| 346 | + :param on_initiated: Optional callback invoked with CIBAResponse immediately after |
| 347 | + the CIBA request is accepted and before polling begins. Use this to notify the |
| 348 | + caller that a push/email/SMS has been sent and polling is starting. |
| 349 | + Accepts both sync and async callables. |
| 350 | + :return: Tuple of (CIBAResponse, OAuthToken) |
| 351 | + """ |
| 352 | + if not login_hint: |
| 353 | + raise ValidationError("login_hint is required for CIBA OBO token exchange.") |
| 354 | + if not agent_token: |
| 355 | + raise ValidationError("agent_token is required for CIBA OBO token exchange.") |
| 356 | + |
| 357 | + scope_str = " ".join(scopes) if scopes else None |
| 358 | + |
| 359 | + try: |
| 360 | + ciba_response = await self.token_client.initiate_ciba( |
| 361 | + login_hint=login_hint, |
| 362 | + scope=scope_str, |
| 363 | + binding_message=binding_message, |
| 364 | + notification_channel=notification_channel, |
| 365 | + actor_token=agent_token.access_token, |
| 366 | + ) |
| 367 | + |
| 368 | + logger.info( |
| 369 | + f"CIBA initiated for user '{login_hint}'. auth_req_id: {ciba_response.auth_req_id}, " |
| 370 | + f"expires_in: {ciba_response.expires_in}s" |
| 371 | + ) |
| 372 | + |
| 373 | + if on_initiated is not None: |
| 374 | + result = on_initiated(ciba_response) |
| 375 | + if asyncio.iscoroutine(result): |
| 376 | + await result |
| 377 | + |
| 378 | + token = await self._poll_for_token( |
| 379 | + ciba_response=ciba_response, |
| 380 | + scope=scope_str, |
| 381 | + timeout=timeout, |
| 382 | + ) |
| 383 | + return ciba_response, token |
| 384 | + |
| 385 | + except (CIBAAuthenticationError, ValidationError): |
| 386 | + raise |
| 387 | + except Exception as e: |
| 388 | + logger.error(f"CIBA OBO token exchange failed: {e}") |
| 389 | + raise TokenError(f"CIBA OBO token exchange failed: {e}") |
| 390 | + |
262 | 391 | async def revoke_token( |
263 | 392 | self, |
264 | 393 | token: str, |
|
0 commit comments