-
Notifications
You must be signed in to change notification settings - Fork 2
Add password protection for documents and directories #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7373293
ea7f892
c816623
6dd4f5d
a409837
09e892b
5c4adda
d64e638
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| """ | ||
| Database models for object protection (passwords, encryption, biometric, etc.) | ||
|
|
||
| This module provides a unified protection system that can handle multiple | ||
| protection types in a single table. | ||
| """ | ||
|
|
||
| import hashlib | ||
| import json | ||
| import secrets | ||
| from typing import Optional | ||
|
|
||
| from sqlalchemy import VARCHAR, Integer, Text | ||
| from sqlalchemy.orm import Mapped | ||
| from sqlalchemy.orm import mapped_column | ||
|
|
||
| from include.database.handler import Base | ||
|
|
||
|
|
||
| class ObjectProtection(Base): | ||
| """ | ||
| Unified model for all types of protection on documents and directories. | ||
|
|
||
| This single table handles all protection types (password, encryption, biometric, etc.) | ||
| distinguished by the protection_type column. | ||
| """ | ||
| __tablename__ = "object_protections" | ||
|
|
||
| id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) | ||
|
|
||
| # Target object identification | ||
| target_type: Mapped[str] = mapped_column( | ||
| VARCHAR(64), nullable=False, comment="Type: 'document' or 'directory'" | ||
| ) | ||
| target_id: Mapped[str] = mapped_column( | ||
| VARCHAR(255), nullable=False, comment="ID of the protected object" | ||
| ) | ||
|
|
||
| # Protection type identifier | ||
| protection_type: Mapped[str] = mapped_column( | ||
| VARCHAR(64), nullable=False, default="password", | ||
| comment="Protection type: 'password', 'encryption', 'biometric', etc." | ||
| ) | ||
|
|
||
| # Protection data (type-specific, stored as JSON or text) | ||
| # For password: contains password_hash and salt | ||
| # For other types: contains type-specific data | ||
| protection_data: Mapped[str] = mapped_column( | ||
| Text, nullable=False, | ||
| comment="Protection-specific data (JSON format for flexibility)" | ||
| ) | ||
|
|
||
| # Additional metadata (reserved for future use) | ||
| protection_metadata: Mapped[Optional[str]] = mapped_column( | ||
| Text, nullable=True, | ||
| comment="JSON metadata for future extensions" | ||
| ) | ||
|
|
||
| def set_password(self, plain_password: str) -> None: | ||
| """ | ||
| Set password protection data for this entry. | ||
|
|
||
| Uses PBKDF2-HMAC-SHA256 with 600,000 iterations for secure password hashing, | ||
| following OWASP recommendations for password storage. | ||
|
|
||
| Args: | ||
| plain_password: The plain text password to hash and store | ||
| """ | ||
| self.protection_type = "password" | ||
|
|
||
| # Generate salt and hash | ||
| salt = secrets.token_hex(16) | ||
| password_bytes = plain_password.encode('utf-8') | ||
| salt_bytes = salt.encode('utf-8') | ||
| key = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 600000) | ||
| password_hash = key.hex() | ||
|
|
||
| # Store as JSON | ||
| self.protection_data = json.dumps({ | ||
| "password_hash": password_hash, | ||
| "salt": salt | ||
| }) | ||
|
|
||
| def verify_password(self, plain_password: str) -> bool: | ||
| """ | ||
| Verify a password against the stored hash. | ||
|
|
||
| Uses constant-time comparison to prevent timing attacks. | ||
| Only works if protection_type is "password". | ||
|
|
||
| Args: | ||
| plain_password: The plain text password to verify | ||
|
|
||
| Returns: | ||
| True if the password matches, False otherwise | ||
| """ | ||
| if self.protection_type != "password": | ||
| return False | ||
|
|
||
| try: | ||
| data = json.loads(self.protection_data) | ||
| password_hash = data["password_hash"] | ||
| salt = data["salt"] | ||
| except (json.JSONDecodeError, KeyError): | ||
| return False | ||
|
|
||
| password_bytes = plain_password.encode('utf-8') | ||
| salt_bytes = salt.encode('utf-8') | ||
| key = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 600000) | ||
| computed_hash = key.hex() | ||
|
|
||
| # Use constant-time comparison to prevent timing attacks | ||
| return secrets.compare_digest(computed_hash, password_hash) | ||
|
|
||
| def __repr__(self) -> str: | ||
| return ( | ||
| f"ObjectProtection(id={self.id!r}, " | ||
| f"target_type={self.target_type!r}, target_id={self.target_id!r}, " | ||
| f"protection_type={self.protection_type!r})" | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
| from include.database.handler import Session | ||
| from include.database.models.classic import User | ||
| from include.database.models.entity import Folder, Document, FolderAccessRule | ||
| from include.handlers.protection import check_password_protection | ||
| from include.util.audit import log_audit | ||
| from include.util.rule.applying import apply_access_rules | ||
| import include.system.messages as smsg | ||
|
|
@@ -21,15 +22,19 @@ class RequestListDirectoryHandler(RequestHandler): | |
| handler (ConnectionHandler): The connection handler containing request data and methods for responding. | ||
| Response Codes: | ||
| 200 - Directory listing successful, returns a list of files and directories in the response data. | ||
| 202 - Password required for access. | ||
| 400 - Invalid request. | ||
| 403 - Invalid user or token. | ||
| 403 - Invalid user or token, or incorrect password. | ||
| 404 - Directory not found. | ||
| 500 - Internal server error, with the exception message. | ||
| """ | ||
|
|
||
| data_schema = { | ||
| "type": "object", | ||
| "properties": {"folder_id": {"anyOf": [{"type": "string"}, {"type": "null"}]}}, | ||
| "properties": { | ||
| "folder_id": {"anyOf": [{"type": "string"}, {"type": "null"}]}, | ||
| "password": {"type": "string"} | ||
| }, | ||
| "required": ["folder_id"], | ||
| "additionalProperties": False, | ||
| } | ||
|
|
@@ -40,6 +45,7 @@ def handle(self, handler: ConnectionHandler): | |
|
|
||
| # Parse the directory listing request | ||
| folder_id: Optional[str] = handler.data.get("folder_id") | ||
| password = handler.data.get("password") | ||
|
|
||
| with Session() as session: | ||
| this_user = session.get(User, handler.username) | ||
|
|
@@ -72,6 +78,13 @@ def handle(self, handler: ConnectionHandler): | |
| **{"code": 403, "message": "Access denied", "data": {}} | ||
| ) | ||
| return 403, folder_id, handler.username | ||
|
|
||
| # Check password protection | ||
| protection_code, protection_msg = check_password_protection(folder, password, session) | ||
| if protection_code != 0: | ||
| handler.conclude_request(protection_code, {}, protection_msg) | ||
| return protection_code, folder_id, handler.username | ||
|
|
||
| parent = folder.parent | ||
| children = folder.children | ||
| documents = folder.documents | ||
|
|
@@ -129,22 +142,27 @@ class RequestGetDirectoryInfoHandler(RequestHandler): | |
| handler (ConnectionHandler): The connection handler containing request data and methods for responding. | ||
| Response Codes: | ||
| 200 - Directory info successful, returns directory info in the response data. | ||
| 202 - Password required for access. | ||
| 400 - Invalid request. | ||
| 403 - Invalid user or token. | ||
| 403 - Invalid user or token, or incorrect password. | ||
| 404 - Directory not found. | ||
| 500 - Internal server error, with the exception message. | ||
| """ | ||
|
|
||
| data_schema = { | ||
| "type": "object", | ||
| "properties": {"directory_id": {"type": "string", "minLength": 1}}, | ||
| "properties": { | ||
| "directory_id": {"type": "string", "minLength": 1}, | ||
| "password": {"type": "string"} | ||
| }, | ||
| "required": ["directory_id"], | ||
| "additionalProperties": False, | ||
| } | ||
|
|
||
| def handle(self, handler: ConnectionHandler): | ||
|
||
|
|
||
| directory_id: str = handler.data["directory_id"] | ||
| password = handler.data.get("password") | ||
|
|
||
| if not directory_id: | ||
| handler.conclude_request(400, {}, "Directory ID is required") | ||
|
|
@@ -171,6 +189,12 @@ def handle(self, handler: ConnectionHandler): | |
| if not directory.check_access_requirements(user, access_type="read"): | ||
| handler.conclude_request(403, {}, "Permission denied") | ||
| return 403, directory_id, handler.username | ||
|
|
||
| # Check password protection | ||
| protection_code, protection_msg = check_password_protection(directory, password, session) | ||
| if protection_code != 0: | ||
| handler.conclude_request(protection_code, {}, protection_msg) | ||
| return protection_code, directory_id, handler.username | ||
|
|
||
| info_code = 0 | ||
| ### generate access_rules text | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The database model is missing a unique constraint to prevent duplicate protection entries for the same target. Without a unique constraint on (target_type, target_id, protection_type), multiple password protection entries could be created for the same document or directory, leading to undefined behavior during verification. Consider adding a unique constraint or composite index on these three columns.