-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
89 lines (87 loc) · 3.2 KB
/
auth.py
File metadata and controls
89 lines (87 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from datetime import datetime, timedelta
from typing import Annotated
from fastapi import Depends, HTTPException, APIRouter,Form,Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from pydantic import BaseModel
from models import User
from sqlalchemy.orm import Session
from database import SessionLocal
from starlette import status
from passlib.context import CryptContext
from fastapi.responses import RedirectResponse
router=APIRouter(prefix="/auth",tags=["auth"])
SECRET_KEY="63f4945d921d599f27ae4fdf5bada3f1"
ALGORITHM="HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
bcrypt_context=CryptContext(schemes=["bcrypt"],deprecated="auto")
oauth2_bearer=OAuth2PasswordBearer(tokenUrl="/auth/token")
class createuser(BaseModel):
username:str
password:str
class token(BaseModel):
access_token:str
token_type:str
def get_db():
db=SessionLocal()
try:
yield db
finally:
db.close()
def verify_password(plain_password,hashed_password):
return bcrypt_context.verify(plain_password,hashed_password)
def create_access_token(username :str):
payload={"sub":username,"exp":datetime.utcnow()+timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)}
return jwt.encode(payload,SECRET_KEY,algorithm=ALGORITHM)
db_dependency=Annotated[Session,Depends(get_db)]
@router.post("/",status_code=status.HTTP_201_CREATED)
def register_user(user: str=Form(...),password: str=Form(...), db: Session = Depends(get_db)):
existing = db.query(User).filter(User.username == user).first()
if existing:
raise HTTPException(status_code=400, detail="Username already exists")
hashed_password = bcrypt_context.hash(user.password)
new_user = User(
username=user.username,
hashed_password=hashed_password
)
db.add(new_user)
db.commit()
return {"message": "User created successfully"}
@router.post("/token")
def login(form_data:OAuth2PasswordRequestForm = Depends(),db: Session = Depends(get_db)
):
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
access_token = create_access_token(user.username)
response = RedirectResponse(url="/dashboard", status_code=302)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True
)
return response
def get_current_user(request: Request):
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
return username