Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ class Settings(BaseSettings):
s3_port: int = Field(alias='S3_PORT')
minio_root_user: str = Field(alias='MINIO_ROOT_USER')
minio_root_password: str = Field(alias='MINIO_ROOT_PASSWORD')
jwt_algorithm: str = Field(default='HS256', alias='JWT_ALGORITHM')
access_token_expire_minutes: int = Field(
default=30, alias='ACCESS_TOKEN_EXPIRE_MINUTES'
)
refresh_token_expire_days: int = Field(default=7, alias='REFRESH_TOKEN_EXPIRE_DAYS')
secret_key: str = Field(alias='SECRET_KEY')
debug_mode: bool = Field(default=False, alias='DEBUG_MODE')

Expand Down
9 changes: 6 additions & 3 deletions app/core/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from app.core.config import settings

pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
ALGORITHM = 'HS256'


def verify_password(plain_password: str, hashed_password: str) -> bool:
Expand All @@ -22,7 +21,11 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None) -> s
if expires_delta:
expire = datetime.now(UTC) + expires_delta
else:
expire = datetime.now(UTC) + timedelta(minutes=30)
expire = datetime.now(UTC) + timedelta(
minutes=settings.access_token_expire_minutes
)
to_encode.update({'exp': expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=ALGORITHM)
encoded_jwt = jwt.encode(
to_encode, settings.secret_key, algorithm=settings.jwt_algorithm
)
return str(encoded_jwt)
5 changes: 3 additions & 2 deletions app/shared/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from app.core.config import settings
from app.core.database import get_session
from app.core.exceptions import CredentialsError
from app.core.security import ALGORITHM
from app.services.user.models import User

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/api/v1/auth/token')
Expand All @@ -20,7 +19,9 @@ async def get_current_user(
session: Annotated[AsyncSession, Depends(get_session)],
) -> User:
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
payload = jwt.decode(
token, settings.secret_key, algorithms=[settings.jwt_algorithm]
)
email: str | None = payload.get('sub')
if email is None:
raise CredentialsError()
Expand Down