-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauthentication.py
More file actions
115 lines (89 loc) · 3.62 KB
/
authentication.py
File metadata and controls
115 lines (89 loc) · 3.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from argon2 import exceptions
from flask import Blueprint, jsonify, request
from pymysql import MySQLError
from config.ratelimit import limiter
from jwt_helper import (
TokenError,
extract_token_from_header,
generate_access_token,
generate_refresh_token,
verify_token,
)
from utility.database import database_cursor
from utility.encryption import encrypt_email, hash_email, hash_password, verify_password
from utility.validation import validate_email, validate_password
authentication_blueprint = Blueprint("authentication", __name__)
def login_person_by_email(email):
email_hash = hash_email(email)
with database_cursor() as cursor:
cursor.callproc("login_person_by_email", (email_hash,))
return cursor.fetchone()
def update_last_login(person_id):
with database_cursor() as cursor:
cursor.callproc("update_last_login", (person_id,))
@authentication_blueprint.route("/register", methods=["POST"])
@limiter.limit("5 per minute")
def register():
data = request.get_json()
name = data.get("username")
email = data.get("email")
password = data.get("password")
language_code = data.get("language_code", "en")
if not name or not email or not password:
return jsonify(message="Username, email, and password are required"), 400
if not validate_email(email):
return jsonify(message="Invalid email address"), 400
if not validate_password(password):
return jsonify(message="Password does not meet security requirements"), 400
hashed_password = hash_password(password)
email = hash_email(email), encrypt_email(email)
try:
with database_cursor() as cursor:
cursor.callproc(
"register_person", (name, *email, hashed_password, language_code)
)
except MySQLError as e:
if "User name already exists" in str(e):
return jsonify(message="User name already exists"), 400
elif "Email already exists" in str(e):
return jsonify(message="Email already exists"), 400
else:
return jsonify(message="An error occurred during registration"), 500
return jsonify(message="User created successfully"), 201
@authentication_blueprint.route("/login", methods=["POST"])
@limiter.limit("10 per minute")
def login():
data = request.get_json()
email = data.get("email")
password = data.get("password")
if not email or not password:
return jsonify(message="Email and password are required"), 400
person = login_person_by_email(email)
if not person:
return jsonify(message="Invalid credentials"), 401
try:
verify_password(password, person["hashed_password"])
except exceptions.VerifyMismatchError:
return jsonify(message="Invalid credentials"), 401
except Exception:
return jsonify(message="An internal error occurred"), 500
person_id = person["person_id"]
access_token = generate_access_token(person_id)
refresh_token = generate_refresh_token(person_id)
update_last_login(person_id)
return jsonify(
message="Login successful",
access_token=access_token,
refresh_token=refresh_token,
)
@authentication_blueprint.route("/refresh", methods=["POST"])
@limiter.limit("5 per hour")
def refresh_token():
try:
token = extract_token_from_header()
decoded = verify_token(token, required_type="refresh")
person_id = decoded["person_id"]
new_access_token = generate_access_token(person_id)
return jsonify(access_token=new_access_token), 200
except TokenError as e:
return jsonify(message=e.message), e.status_code