Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
2 changes: 2 additions & 0 deletions src/business_logic/common/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class InvalidClientIdError(Exception):
...
6 changes: 6 additions & 0 deletions src/business_logic/common/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Protocol, Any


class ValidatorProtocol(Protocol):
async def __call__(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError
17 changes: 17 additions & 0 deletions src/business_logic/common/validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from src.business_logic.common.errors import InvalidClientIdError

from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.data_access.postgresql.repositories import ClientRepository


class ValidateClient:
def __init__(
self,
client_repo: 'ClientRepository'
) -> None:
self._client_repo = client_repo

async def __call__(self, client_id: str) -> None:
if not await self._client_repo.exists(client_id=client_id):
raise InvalidClientIdError('Client with specified client_id doesn\'t exists')
Empty file.
3 changes: 3 additions & 0 deletions src/business_logic/endsession/dto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .request import RequestEndSessionModel

__all__ = ["RequestEndSessionModel"]
15 changes: 15 additions & 0 deletions src/business_logic/endsession/dto/request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Optional

from pydantic import BaseModel


class RequestEndSessionModel(BaseModel):
id_token_hint: str
post_logout_redirect_uri: Optional[str]
state: Optional[str]

class Config:
orm_mode = True

def __repr__(self) -> str: # pragma: no cover
return f"Model {self.__class__.__name__}"
54 changes: 54 additions & 0 deletions src/business_logic/endsession/endsession_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from src.data_access.postgresql.repositories.client import ClientRepository
from src.data_access.postgresql.repositories.persistent_grant import PersistentGrantRepository
from src.business_logic.dependencies.database import get_repository_no_depends
from src.business_logic.services.jwt_token import JWTService

from .dto.request import RequestEndSessionModel
from .validators import ValidateLogoutRedirectUri

from typing import Union, Optional, Any
from src.business_logic.common.interfaces import ValidatorProtocol


class EndSessionService:
"""
Service for endsession endpoint ......
"""
def __init__(
self,
client_repo: ClientRepository,
persistent_grant_repo: PersistentGrantRepository,
jwt_service: JWTService,
) -> None:
self.client_repo = client_repo
self.persistent_grant_repo = persistent_grant_repo
self.jwt_service = jwt_service
self.logout_redirect_uri_validator: ValidatorProtocol = ValidateLogoutRedirectUri(self.client_repo)

async def end_session(self, request_model: RequestEndSessionModel) -> Optional[str]:
decoded_id_token_hint = await self._decode_id_token_hint(id_token_hint=request_model.id_token_hint)

await self._logout(
client_id=decoded_id_token_hint['client_id'],
user_id=decoded_id_token_hint['sub']
)

if request_model.post_logout_redirect_uri:
await self.logout_redirect_uri_validator(request_model=request_model,
client_id=decoded_id_token_hint["client_id"])
logout_redirect_uri = request_model.post_logout_redirect_uri
if request_model.state:
logout_redirect_uri += f"&state={request_model.state}"
return logout_redirect_uri
return None

async def _decode_id_token_hint(self, id_token_hint: str) -> dict[str, Any]:
decoded_data = await self.jwt_service.decode_token(token=id_token_hint)
return decoded_data

async def _logout(self, client_id: str, user_id: int) -> None:
await self.persistent_grant_repo.delete_persistent_grant_by_client_and_user_id(
client_id=client_id,
user_id=user_id
)

2 changes: 2 additions & 0 deletions src/business_logic/endsession/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class InvalidLogoutRedirectUriError(Exception):
...
11 changes: 11 additions & 0 deletions src/business_logic/endsession/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Protocol, Optional, Any
from typing import TYPE_CHECKING
# if TYPE_CHECKING:
from src.business_logic.endsession.dto import RequestEndSessionModel

class EndSessionServiceProtocol(Protocol):

async def end_session(self) -> Optional[str]:
raise NotImplementedError


Empty file.
1 change: 1 addition & 0 deletions src/business_logic/endsession/validators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .validate_logout_redirect_uri import ValidateLogoutRedirectUri
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from src.data_access.postgresql.repositories.client import ClientRepository
from src.business_logic.endsession.dto import RequestEndSessionModel
from src.data_access.postgresql.repositories import PersistentGrantRepository
from src.business_logic.endsession.errors import InvalidLogoutRedirectUriError
from typing import Any

class ValidateLogoutRedirectUri:
"""
Checks that id_token_hint exists.
"""
def __init__(
self,
client_repo: ClientRepository
):
self._client_repo = client_repo

async def __call__(self, request_model: RequestEndSessionModel,
client_id: str) -> None:
if not await self._client_repo.validate_post_logout_redirect_uri(
client_id,
request_model.post_logout_redirect_uri,
):
raise InvalidLogoutRedirectUriError("Invalid post logout redirect uri")
5 changes: 5 additions & 0 deletions src/business_logic/get_tokens/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .factory import TokenServiceFactory
from .interfaces import TokenServiceProtocol


__all__ = ['TokenServiceFactory', 'TokenServiceProtocol']
5 changes: 5 additions & 0 deletions src/business_logic/get_tokens/dto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .request import RequestTokenModel
from .response import ResponseTokenModel


__all__ = ['RequestTokenModel', 'ResponseTokenModel']
39 changes: 39 additions & 0 deletions src/business_logic/get_tokens/dto/request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from dataclasses import dataclass
from typing import Optional
from fastapi import Form
from pydantic import BaseModel


class RequestTokenModel(BaseModel):
client_id: Optional[str]
client_secret: Optional[str]
grant_type: str
scope: Optional[str]
redirect_uri: Optional[str]
code: Optional[str]
code_verifier: Optional[str]
username: Optional[str]
password: Optional[str]
acr_values: Optional[str]
refresh_token: Optional[str]
device_code: Optional[str]

@classmethod
def as_form(
cls,
client_id: Optional[str] = Form(...),
client_secret: Optional[str] = Form(None),
grant_type: str = Form(...),
scope: str = Form(None),
redirect_uri: Optional[str] = Form(None),
code: Optional[str] = Form(None),
code_verifier: Optional[str] = Form(None),
username: Optional[str] = Form(None),
password: Optional[str] = Form(None),
acr_values: Optional[str] = Form(None),
refresh_token: Optional[str] = Form(None),
device_code: Optional[str] = Form(None)
) -> 'RequestTokenModel':
return cls(client_id=client_id, client_secret=client_secret, grant_type=grant_type, scope=scope,
redirect_uri=redirect_uri, code=code, code_verifier=code_verifier, username=username,
password=password, acr_values=acr_values, refresh_token=refresh_token, device_code=device_code)
17 changes: 17 additions & 0 deletions src/business_logic/get_tokens/dto/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from pydantic import BaseModel
from typing import Optional


class ResponseTokenModel(BaseModel):

access_token: Optional[str]
token_type: Optional[str]
refresh_token: Optional[str]
expires_in: Optional[int]
id_token: Optional[str]
refresh_expires_in : Optional[int]
not_before_policy : Optional[int] = None
scope : Optional[str] = None

class Config:
orm_mode = True
10 changes: 10 additions & 0 deletions src/business_logic/get_tokens/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class InvalidGrantError(Exception):
...


class InvalidRedirectUriError(Exception):
...


class UnsupportedGrantTypeError(Exception):
...
67 changes: 67 additions & 0 deletions src/business_logic/get_tokens/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations
from src.business_logic.get_tokens.service_impls import (
AuthorizationCodeTokenService,
RefreshTokenGrantService,
)
from src.business_logic.get_tokens.validators import (
ValidatePersistentGrant,
ValidateRedirectUri,
ValidateGrantByClient,
ValidateGrantExpired,
)
from src.business_logic.get_tokens.errors import UnsupportedGrantTypeError
from src.business_logic.common.validators import ValidateClient


from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .interfaces import TokenServiceProtocol
from src.data_access.postgresql.repositories import (
BlacklistedTokenRepository,
ClientRepository,
DeviceRepository,
PersistentGrantRepository,
UserRepository,
)
from src.business_logic.jwt_manager.interfaces import JWTManagerProtocol


class TokenServiceFactory:
def __init__(
self,
client_repo: ClientRepository,
persistent_grant_repo: PersistentGrantRepository,
user_repo: UserRepository,
device_repo: DeviceRepository,
jwt_manager: JWTManagerProtocol,
blacklisted_repo: BlacklistedTokenRepository,
) -> None:
self._client_repo = client_repo
self._persistent_grant_repo = persistent_grant_repo
self._user_repo = user_repo
self._device_repo = device_repo
self._jwt_manager = jwt_manager
self._blacklisted_repo = blacklisted_repo

def get_service_impl(self, grant_type: str) -> TokenServiceProtocol:
if grant_type == 'authorization_code':
return AuthorizationCodeTokenService(
grant_validator=ValidatePersistentGrant(persistent_grant_repo=self._persistent_grant_repo),
redirect_uri_validator=ValidateRedirectUri(client_repo=self._client_repo),
client_validator=ValidateClient(client_repo=self._client_repo),
code_validator=ValidateGrantByClient(persistent_grant_repo=self._persistent_grant_repo),
grant_exp_validator=ValidateGrantExpired(),
jwt_manager=self._jwt_manager,
persistent_grant_repo=self._persistent_grant_repo
)
elif grant_type == 'refresh_token':
return RefreshTokenGrantService(
grant_validator=ValidatePersistentGrant(persistent_grant_repo=self._persistent_grant_repo),
redirect_uri_validator=ValidateRedirectUri(client_repo=self._client_repo),
client_validator=ValidateClient(client_repo=self._client_repo),
code_validator=ValidateGrantByClient(persistent_grant_repo=self._persistent_grant_repo),
jwt_manager=self._jwt_manager,
persistent_grant_repo=self._persistent_grant_repo
)
else:
raise UnsupportedGrantTypeError
12 changes: 12 additions & 0 deletions src/business_logic/get_tokens/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Protocol
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from src.business_logic.get_tokens.dto import (
RequestTokenModel,
ResponseTokenModel,
)


class TokenServiceProtocol(Protocol):
async def get_tokens(self, request_data: 'RequestTokenModel') -> 'ResponseTokenModel':
raise NotImplementedError
5 changes: 5 additions & 0 deletions src/business_logic/get_tokens/service_impls/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .auth_code import AuthorizationCodeTokenService
from .refresh_token import RefreshTokenGrantService


__all__ = ['AuthorizationCodeTokenService', 'RefreshTokenGrantService']
Loading