diff --git a/README.md b/README.md index ff5e46c..0c63cfe 100644 --- a/README.md +++ b/README.md @@ -173,6 +173,12 @@ The SDK handles per-domain OIDC discovery, JWKS fetching, issuer validation, and For more details and examples, see [examples/MultipleCustomDomains.md](examples/MultipleCustomDomains.md). +### 6. Session Expiry from the Upstream IdP + +For enterprise connections, the upstream identity provider can cap how long a user's session lives. When the connection is configured to honor it, Auth0 includes a `session_expiry` claim in the ID token, and the SDK enforces this ceiling on every session read. Once it is reached, `get_user()` and `get_session()` return `None`, and `get_access_token()` raises an `AccessTokenError` with code `session_expired`. + +For more details and examples, see [examples/RetrievingData.md](examples/RetrievingData.md#session-expiry-from-the-upstream-idp). + ## Feedback ### Contributing diff --git a/examples/RetrievingData.md b/examples/RetrievingData.md index 88fb610..221c1a8 100644 --- a/examples/RetrievingData.md +++ b/examples/RetrievingData.md @@ -70,6 +70,39 @@ access_token = await server_client.get_access_token(store_options=store_options) Read more above in [Configuring the Store](./ConfigureStore.md). +## Session Expiry from the Upstream IdP + +For enterprise connections, the upstream identity provider can impose a ceiling on how long the user's session may live. When the connection is configured to honor it, Auth0 includes a `session_expiry` claim (an absolute Unix timestamp, in seconds) in the ID token. The SDK reads this value at login, stores it with the session, and enforces it on every subsequent read. + +Once the ceiling is reached, the read methods behave as follows: + +- `get_user()` returns `None`, as if no session exists. +- `get_session()` returns `None`, as if no session exists. +- `get_access_token()` raises an `AccessTokenError` with code `session_expired`. + +```python +from auth0_server_python.error import AccessTokenError, AccessTokenErrorCode + +try: + access_token = await server_client.get_access_token(store_options=store_options) +except AccessTokenError as error: + if error.code == AccessTokenErrorCode.SESSION_EXPIRED: + # The upstream session ceiling has been reached; start a new login. + ... +``` + +When the ceiling is reached, the SDK deletes the stored session before returning, so the next request starts clean. + +The `session_expiry` value is also surfaced through the user claims, so you can read it without triggering enforcement: + +```python +user = await server_client.get_user(store_options=store_options) +session_expires_at = (user or {}).get("session_expiry") +``` + +> [!NOTE] +> Enforcement applies a small negative leeway (30 seconds) to account for clock skew, so a session is treated as expired slightly before the exact `session_expiry` timestamp. The refresh-token grant preserves the original ceiling - refreshing an access token does not extend the upstream session. + ## Multi-Resource Refresh Tokens (MRRT) Multi-Resource Refresh Tokens allow using a single refresh token to obtain access tokens for multiple audiences, simplifying token management in applications that interact with multiple backend services. diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index 91de45d..0ba698b 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -638,9 +638,14 @@ async def complete_interactive_login( user_info = token_response.get("userinfo") user_claims = None id_token = token_response.get("id_token") + # IPSIE session_expiry ceiling, read from the verified ID token claims. + session_expires_at = None if user_info: user_claims = UserClaims.parse_obj(user_info) + # authlib populates `userinfo` from parsed ID token claims, so the + # IPSIE session_expiry claim may surface here. + session_expires_at = State.extract_session_expiry(user_info) elif id_token: # Fetch JWKS for signature verification jwks = await self._get_jwks_cached(origin_domain, metadata) @@ -657,6 +662,8 @@ async def complete_interactive_login( raise IssuerValidationError("ID token issuer mismatch. Ensure your Auth0 domain is configured correctly.") user_claims = UserClaims.parse_obj(claims) + # IPSIE session_expiry ceiling from the verified ID token. + session_expires_at = State.extract_session_expiry(claims) except ValueError as e: raise ApiError("jwks_key_not_found", str(e)) except jwt.InvalidSignatureError as e: @@ -708,7 +715,8 @@ async def complete_interactive_login( domain=origin_domain, internal={ "sid": sid, - "created_at": int(time.time()) + "created_at": int(time.time()), + "session_expires_at": session_expires_at } ) @@ -734,6 +742,23 @@ async def complete_interactive_login( # Methods for retrieving user information, session data, and logout operations. # ============================================================================ + async def _is_session_expired_by_ceiling( + self, state_data_dict: dict, store_options: Optional[dict[str, Any]] = None + ) -> bool: + """ + Enforce the IPSIE session_expiry ceiling on a session read. + + Returns True (and deletes the stored session) when the upstream + IdP-asserted ceiling has been reached. Sessions without a + session_expires_at value are never expired on this basis. + """ + internal = state_data_dict.get("internal") or {} + session_expires_at = internal.get("session_expires_at") + if State.is_session_expiry_reached(session_expires_at): + await self._state_store.delete(self._state_identifier, options=store_options) + return True + return False + async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Optional[dict[str, Any]]: """ Retrieves the user from the store, or None if no user found. @@ -760,6 +785,10 @@ async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Opti if self._normalize_url(session_domain) != self._normalize_url(current_domain): return None + # IPSIE: force re-auth once the upstream IdP session ceiling passes. + if await self._is_session_expired_by_ceiling(state_data, store_options): + return None + return state_data.get("user") return None @@ -789,6 +818,10 @@ async def get_session(self, store_options: Optional[dict[str, Any]] = None) -> O if self._normalize_url(session_domain) != self._normalize_url(current_domain): return None + # IPSIE: force re-auth once the upstream IdP session ceiling passes. + if await self._is_session_expired_by_ceiling(state_data, store_options): + return None + session_data = {k: v for k, v in state_data.items() if k != "internal"} return session_data @@ -972,6 +1005,19 @@ async def get_access_token( merged_scope = self._merge_scope_with_defaults(scope, audience) + # IPSIE: once the upstream IdP session ceiling has passed, the session + # is expired. Surface "session expired" and do NOT serve a cached token + # or attempt a refresh-token exchange (which would race the platform's + # session revocation). + internal = (state_data_dict or {}).get("internal") or {} + if State.is_session_expiry_reached(internal.get("session_expires_at")): + await self._state_store.delete(self._state_identifier, options=store_options) + raise AccessTokenError( + AccessTokenErrorCode.SESSION_EXPIRED, + "The session has expired because the upstream identity provider's " + "session_expiry was reached. The user needs to re-authenticate." + ) + # Find matching token set token_set = None if state_data_dict and "token_sets" in state_data_dict: diff --git a/src/auth0_server_python/auth_types/__init__.py b/src/auth0_server_python/auth_types/__init__.py index 055103a..9fb6ca2 100644 --- a/src/auth0_server_python/auth_types/__init__.py +++ b/src/auth0_server_python/auth_types/__init__.py @@ -22,6 +22,8 @@ class UserClaims(BaseModel): email: Optional[str] = None email_verified: Optional[bool] = None org_id: Optional[str] = None + # IPSIE SL1 claim: upstream IdP-asserted RP session ceiling (Unix seconds). + session_expiry: Optional[int] = None class Config: extra = "allow" # Allow additional fields not defined in the model @@ -54,6 +56,10 @@ class InternalStateData(BaseModel): """ sid: str created_at: int + # IPSIE session_expiry ceiling (Unix seconds), stamped at session creation + # from the ID token's session_expiry claim. None when the upstream IdP did + # not assert one — in which case existing session behavior is unchanged. + session_expires_at: Optional[int] = None class SessionData(BaseModel): diff --git a/src/auth0_server_python/error/__init__.py b/src/auth0_server_python/error/__init__.py index db4f28e..9de7615 100644 --- a/src/auth0_server_python/error/__init__.py +++ b/src/auth0_server_python/error/__init__.py @@ -198,6 +198,7 @@ class AccessTokenErrorCode: INCORRECT_AUDIENCE = "incorrect_audience" MISSING_SESSION_DOMAIN = "missing_session_domain" DOMAIN_MISMATCH = "domain_mismatch" + SESSION_EXPIRED = "session_expired" class AccessTokenForConnectionErrorCode: diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 47ba774..19ad3ee 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -46,7 +46,7 @@ PollingApiError, StartLinkUserError, ) -from auth0_server_python.utils import PKCE +from auth0_server_python.utils import PKCE, State @pytest.mark.asyncio @@ -4816,3 +4816,208 @@ async def _fake_fetch(self, domain): assert exc.value.mfa_requirements is not None finally: ServerClient._fetch_oidc_metadata = original_fetch + + +# ============================================================================= +# IPSIE session_expiry enforcement +# ============================================================================= + + +def test_extract_session_expiry_valid(): + assert State.extract_session_expiry({"session_expiry": 1893456000}) == 1893456000 + + +def test_extract_session_expiry_absent_or_empty(): + assert State.extract_session_expiry(None) is None + assert State.extract_session_expiry({}) is None + assert State.extract_session_expiry({"session_expiry": None}) is None + + +def test_extract_session_expiry_rejects_non_int_and_non_positive(): + # bool is an int subclass but must be rejected + assert State.extract_session_expiry({"session_expiry": True}) is None + assert State.extract_session_expiry({"session_expiry": "1893456000"}) is None + assert State.extract_session_expiry({"session_expiry": 1893456000.0}) is None + assert State.extract_session_expiry({"session_expiry": 0}) is None + assert State.extract_session_expiry({"session_expiry": -5}) is None + + +def test_is_session_expiry_reached_none_never_expires(): + assert State.is_session_expiry_reached(None) is False + + +def test_is_session_expiry_reached_future_and_past(): + now = int(time.time()) + # Comfortably in the future (beyond the leeway window) -> not reached. + assert State.is_session_expiry_reached(now + 3600) is False + # In the past -> reached. + assert State.is_session_expiry_reached(now - 10) is True + + +def test_is_session_expiry_reached_applies_negative_leeway(): + now = int(time.time()) + # Ceiling is 10s away but leeway is 30s, so it's treated as already reached. + assert State.is_session_expiry_reached(now + 10) is True + + +@pytest.mark.asyncio +async def test_get_session_expired_by_ceiling_returns_none_and_deletes(): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "user": {"sub": "user123"}, + "id_token": "token123", + "internal": {"sid": "some_sid", "created_at": now - 100, "session_expires_at": now - 10}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + session_data = await client.get_session() + assert session_data is None + mock_state_store.delete.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_get_session_within_ceiling_ok(): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "user": {"sub": "user123"}, + "id_token": "token123", + "internal": {"sid": "some_sid", "created_at": now, "session_expires_at": now + 3600}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + session_data = await client.get_session() + assert session_data is not None + assert session_data["user"] == {"sub": "user123"} + mock_state_store.delete.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_get_user_expired_by_ceiling_returns_none_and_deletes(): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "user": {"sub": "user123"}, + "internal": {"sid": "some_sid", "created_at": now - 100, "session_expires_at": now - 10}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + user = await client.get_user() + assert user is None + mock_state_store.delete.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_get_user_no_ceiling_unaffected(): + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "user": {"sub": "user123"}, + "internal": {"sid": "some_sid", "created_at": int(time.time())}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + user = await client.get_user() + assert user == {"sub": "user123"} + mock_state_store.delete.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_get_access_token_expired_by_ceiling_raises_without_refresh(mocker): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "refresh_token": "refresh_xyz", + "token_sets": [ + { + "audience": "default", + "access_token": "cached_token", + "expires_at": now + 500, # still valid, but ceiling overrides + } + ], + "internal": {"sid": "some_sid", "created_at": now - 100, "session_expires_at": now - 10}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + # If the refresh path is reached, that's a bug — make it explode. + refresh_spy = mocker.patch.object( + client, "get_token_by_refresh_token", new_callable=AsyncMock, + side_effect=AssertionError("refresh must not be attempted after ceiling"), + ) + + with pytest.raises(AccessTokenError) as exc: + await client.get_access_token() + + assert exc.value.code == AccessTokenErrorCode.SESSION_EXPIRED + refresh_spy.assert_not_awaited() + mock_state_store.delete.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_get_access_token_within_ceiling_serves_cached(): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "refresh_token": "refresh_xyz", + "token_sets": [ + { + "audience": "default", + "access_token": "cached_token", + "expires_at": now + 500, + } + ], + "internal": {"sid": "some_sid", "created_at": now, "session_expires_at": now + 3600}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + token = await client.get_access_token() + assert token == "cached_token" + mock_state_store.delete.assert_not_awaited() diff --git a/src/auth0_server_python/utils/helpers.py b/src/auth0_server_python/utils/helpers.py index 05cb0f8..11d8fbf 100644 --- a/src/auth0_server_python/utils/helpers.py +++ b/src/auth0_server_python/utils/helpers.py @@ -37,6 +37,48 @@ def generate_code_challenge(cls, code_verifier: str) -> str: class State: + # IPSIE session_expiry: clock-skew leeway (seconds). The session is treated + # as expired slightly *before* the wall-clock ceiling so the SDK never + # serves a session the Auth0 platform has already revoked. Per SDK Product + # Spec guidance. + SESSION_EXPIRY_LEEWAY_SECONDS = 30 + + @classmethod + def extract_session_expiry(cls, claims: Optional[dict[str, Any]]) -> Optional[int]: + """ + Read the IPSIE `session_expiry` claim (Unix seconds) from decoded ID + token claims. Returns None when absent or invalid so existing session + behavior is preserved (the feature is opt-in via the upstream + connection option). + + The IPSIE SL1 profile defines `session_expiry` as a JSON integer of + seconds since epoch. Non-integer or non-positive values are rejected + rather than trusted as a session ceiling. + """ + if not claims: + return None + value = claims.get("session_expiry") + if value is None: + return None + # bool is an int subclass — exclude it explicitly. + if isinstance(value, bool) or not isinstance(value, int): + return None + if value <= 0: + return None + return value + + @classmethod + def is_session_expiry_reached(cls, session_expires_at: Optional[int]) -> bool: + """ + True when the IPSIE session ceiling has been reached (applying negative + leeway for clock skew). None means no ceiling was asserted, so the + session is never expired on this basis. + """ + if session_expires_at is None: + return False + now = int(time.time()) + return now >= (session_expires_at - cls.SESSION_EXPIRY_LEEWAY_SECONDS) + @classmethod def update_state_data( cls, @@ -91,12 +133,20 @@ def update_state_data( else ts for ts in token_sets ] + # Preserve the IPSIE session_expiry ceiling stamped at login. The + # platform does not re-emit session_expiry on a refresh-token grant + # (it doesn't round-trip the upstream IdP), so the value from the + # refreshed ID token must NOT overwrite or erase the original + # ceiling — doing so would let the session outlive its bound. + internal = dict(state_data_dict.get("internal") or {}) + # Return updated state data return { **state_data_dict, "id_token": token_endpoint_response.get("id_token"), "refresh_token": token_endpoint_response.get("refresh_token") or state_data_dict.get("refresh_token"), - "token_sets": token_sets + "token_sets": token_sets, + "internal": internal } else: # Create completely new state data