From 73732939497534fb5904a5bf8aa8e52d937d4ec3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 05:15:49 +0000 Subject: [PATCH 1/8] Initial plan From ea7f892c88a44b8168458c9c8718aacf945f95ea Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 05:21:31 +0000 Subject: [PATCH 2/8] Add password protection database model and handlers Co-authored-by: Creeper19472 <38857196+Creeper19472@users.noreply.github.com> --- src/include/connection_handler.py | 9 + src/include/database/models/entity.py | 61 +++++ src/include/database/models/protection.py | 83 +++++++ src/include/handlers/directory.py | 37 ++- src/include/handlers/document.py | 30 ++- src/include/handlers/protection.py | 267 ++++++++++++++++++++++ src/main.py | 1 + 7 files changed, 482 insertions(+), 6 deletions(-) create mode 100644 src/include/database/models/protection.py create mode 100644 src/include/handlers/protection.py diff --git a/src/include/connection_handler.py b/src/include/connection_handler.py index 31f974a..ef3531e 100644 --- a/src/include/connection_handler.py +++ b/src/include/connection_handler.py @@ -74,6 +74,11 @@ ) from include.handlers.debugging.throw import RequestThrowExceptionHandler from include.handlers.search import RequestSearchHandler +from include.handlers.protection import ( + RequestEnablePasswordProtectionHandler, + RequestRemovePasswordProtectionHandler, + RequestVerifyPasswordHandler, +) from include.constants import CORE_VERSION, PROTOCOL_VERSION from include.shared import connected_listeners, lockdown_enabled @@ -193,6 +198,10 @@ def handle_request(websocket: websockets.sync.server.ServerConnection, message: # 系统类 "lockdown": RequestLockdownHandler, "view_audit_logs": RequestViewAuditLogsHandler, + # 密码保护类 + "enable_password_protection": RequestEnablePasswordProtectionHandler, + "remove_password_protection": RequestRemovePasswordProtectionHandler, + "verify_password": RequestVerifyPasswordHandler, } # Debugging diff --git a/src/include/database/models/entity.py b/src/include/database/models/entity.py index abc51c4..82378a1 100644 --- a/src/include/database/models/entity.py +++ b/src/include/database/models/entity.py @@ -26,6 +26,67 @@ class BaseObject(Base): id: Mapped[str] access_rules: Mapped[List] + def get_password_protection(self): + """ + Get the password protection entry for this object, if any. + + Returns: + PasswordProtection object if password protected, None otherwise + """ + from include.database.models.protection import PasswordProtection + + _TARGET_TYPE_MAPPING = {"folders": "directory", "documents": "document"} + + session = object_session(self) + if not session: + raise RuntimeError("No active session found for object") + + target_type = _TARGET_TYPE_MAPPING[self.__tablename__] + + protection = ( + session.query(PasswordProtection) + .filter( + PasswordProtection.target_type == target_type, + PasswordProtection.target_id == self.id + ) + .first() + ) + + return protection + + def is_password_protected(self) -> bool: + """ + Check if this object is password protected. + + Returns: + True if password protected, False otherwise + """ + return self.get_password_protection() is not None + + def verify_password(self, password: Optional[str]) -> bool: + """ + Verify the provided password against the stored password. + + Args: + password: The password to verify (None if not provided) + + Returns: + True if password is correct or object is not protected, + False if password is incorrect or missing when required + """ + protection = self.get_password_protection() + + if not protection: + # Not password protected, access granted + return True + + if password is None: + # Password required but not provided + return False + + # Verify the password + return protection.verify_password(password) + def check_access_requirements(self, user: User, access_type: str = "read") -> bool: """ Checks if a given user meets the access requirements for a specific access type based on defined access rules. diff --git a/src/include/database/models/protection.py b/src/include/database/models/protection.py new file mode 100644 index 0000000..a8422f3 --- /dev/null +++ b/src/include/database/models/protection.py @@ -0,0 +1,83 @@ +""" +Database models for object protection (passwords, etc.) + +This module provides a scalable protection system that can be extended +with additional protection types in the future. +""" + +import hashlib +import secrets +from typing import Optional + +from sqlalchemy import VARCHAR, Integer, Text +from sqlalchemy.orm import Mapped +from sqlalchemy.orm import mapped_column + +from include.database.handler import Base + + +class PasswordProtection(Base): + """ + Model for password-based protection on documents and directories. + + This table is separate from the main entity tables to maintain scalability + and allow for future protection types (e.g., encryption, biometric, etc.) + """ + __tablename__ = "password_protections" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + + # Target object identification + target_type: Mapped[str] = mapped_column( + VARCHAR(64), nullable=False, comment="Type: 'document' or 'directory'" + ) + target_id: Mapped[str] = mapped_column( + VARCHAR(255), nullable=False, comment="ID of the protected object" + ) + + # Password storage (hashed) + password_hash: Mapped[str] = mapped_column(Text, nullable=False) + salt: Mapped[str] = mapped_column(Text, nullable=False) + + # Reserved for future protection types + protection_type: Mapped[str] = mapped_column( + VARCHAR(64), nullable=False, default="password", + comment="Protection type: 'password' (reserved for future types like 'encryption', 'biometric')" + ) + + # Additional metadata (reserved for future use) + metadata: Mapped[Optional[str]] = mapped_column( + Text, nullable=True, + comment="JSON metadata for future extensions" + ) + + def set_password(self, plain_password: str) -> None: + """ + Set a new password for this protection entry. + + Args: + plain_password: The plain text password to hash and store + """ + self.salt = secrets.token_hex(16) + salted = plain_password + self.salt + self.password_hash = hashlib.sha256(salted.encode("utf-8")).hexdigest() + + def verify_password(self, plain_password: str) -> bool: + """ + Verify a password against the stored hash. + + Args: + plain_password: The plain text password to verify + + Returns: + True if the password matches, False otherwise + """ + salted = plain_password + self.salt + password_hash = hashlib.sha256(salted.encode("utf-8")).hexdigest() + return password_hash == self.password_hash + + def __repr__(self) -> str: + return ( + f"PasswordProtection(id={self.id!r}, " + f"target_type={self.target_type!r}, target_id={self.target_id!r})" + ) diff --git a/src/include/handlers/directory.py b/src/include/handlers/directory.py index c3bc8a5..ffb8cb9 100644 --- a/src/include/handlers/directory.py +++ b/src/include/handlers/directory.py @@ -21,15 +21,19 @@ class RequestListDirectoryHandler(RequestHandler): handler (ConnectionHandler): The connection handler containing request data and methods for responding. Response Codes: 200 - Directory listing successful, returns a list of files and directories in the response data. + 202 - Password required for access. 400 - Invalid request. - 403 - Invalid user or token. + 403 - Invalid user or token, or incorrect password. 404 - Directory not found. 500 - Internal server error, with the exception message. """ data_schema = { "type": "object", - "properties": {"folder_id": {"anyOf": [{"type": "string"}, {"type": "null"}]}}, + "properties": { + "folder_id": {"anyOf": [{"type": "string"}, {"type": "null"}]}, + "password": {"type": "string"} + }, "required": ["folder_id"], "additionalProperties": False, } @@ -40,6 +44,7 @@ def handle(self, handler: ConnectionHandler): # Parse the directory listing request folder_id: Optional[str] = handler.data.get("folder_id") + password = handler.data.get("password") with Session() as session: this_user = session.get(User, handler.username) @@ -72,6 +77,16 @@ def handle(self, handler: ConnectionHandler): **{"code": 403, "message": "Access denied", "data": {}} ) return 403, folder_id, handler.username + + # Check password protection + if folder.is_password_protected(): + if password is None: + handler.conclude_request(202, {}, "Password required") + return 202, folder_id, handler.username + if not folder.verify_password(password): + handler.conclude_request(403, {}, "Incorrect password") + return 403, folder_id, handler.username + parent = folder.parent children = folder.children documents = folder.documents @@ -129,15 +144,19 @@ class RequestGetDirectoryInfoHandler(RequestHandler): handler (ConnectionHandler): The connection handler containing request data and methods for responding. Response Codes: 200 - Directory info successful, returns directory info in the response data. + 202 - Password required for access. 400 - Invalid request. - 403 - Invalid user or token. + 403 - Invalid user or token, or incorrect password. 404 - Directory not found. 500 - Internal server error, with the exception message. """ data_schema = { "type": "object", - "properties": {"directory_id": {"type": "string", "minLength": 1}}, + "properties": { + "directory_id": {"type": "string", "minLength": 1}, + "password": {"type": "string"} + }, "required": ["directory_id"], "additionalProperties": False, } @@ -145,6 +164,7 @@ class RequestGetDirectoryInfoHandler(RequestHandler): def handle(self, handler: ConnectionHandler): directory_id: str = handler.data["directory_id"] + password = handler.data.get("password") if not directory_id: handler.conclude_request(400, {}, "Directory ID is required") @@ -171,6 +191,15 @@ def handle(self, handler: ConnectionHandler): if not directory.check_access_requirements(user, access_type="read"): handler.conclude_request(403, {}, "Permission denied") return 403, directory_id, handler.username + + # Check password protection + if directory.is_password_protected(): + if password is None: + handler.conclude_request(202, {}, "Password required") + return 202, directory_id, handler.username + if not directory.verify_password(password): + handler.conclude_request(403, {}, "Incorrect password") + return 403, directory_id, handler.username info_code = 0 ### generate access_rules text diff --git a/src/include/handlers/document.py b/src/include/handlers/document.py index 57191d2..4bbc206 100644 --- a/src/include/handlers/document.py +++ b/src/include/handlers/document.py @@ -79,7 +79,10 @@ class RequestGetDocumentInfoHandler(RequestHandler): data_schema = { "type": "object", - "properties": {"document_id": {"type": "string", "minLength": 1}}, + "properties": { + "document_id": {"type": "string", "minLength": 1}, + "password": {"type": "string"} + }, "required": ["document_id"], } @@ -88,6 +91,7 @@ class RequestGetDocumentInfoHandler(RequestHandler): def handle(self, handler: ConnectionHandler): document_id = handler.data.get("document_id") + password = handler.data.get("password") if not document_id: handler.conclude_request(400, {}, "Document ID is required") @@ -114,6 +118,15 @@ def handle(self, handler: ConnectionHandler): if not document.check_access_requirements(user, access_type="read"): handler.conclude_request(403, {}, "Permission denied") return 403, document_id, handler.username + + # Check password protection + if document.is_password_protected(): + if password is None: + handler.conclude_request(202, {}, "Password required") + return 202, document_id, handler.username + if not document.verify_password(password): + handler.conclude_request(403, {}, "Incorrect password") + return 403, document_id, handler.username info_code = 0 ### generate access_rules text @@ -198,7 +211,10 @@ class RequestGetDocumentHandler(RequestHandler): data_schema = { "type": "object", - "properties": {"document_id": {"type": "string", "minLength": 1}}, + "properties": { + "document_id": {"type": "string", "minLength": 1}, + "password": {"type": "string"} + }, "required": ["document_id"], "additionalProperties": False, } @@ -207,6 +223,7 @@ class RequestGetDocumentHandler(RequestHandler): def handle(self, handler: ConnectionHandler): document_id: str = handler.data["document_id"] + password = handler.data.get("password") with Session() as session: user = session.get(User, handler.username) @@ -220,6 +237,15 @@ def handle(self, handler: ConnectionHandler): if not document.check_access_requirements(user): handler.conclude_request(403, {}, "Access denied to the document") return 403, document_id, handler.username + + # Check password protection + if document.is_password_protected(): + if password is None: + handler.conclude_request(202, {}, "Password required") + return 202, document_id, handler.username + if not document.verify_password(password): + handler.conclude_request(403, {}, "Incorrect password") + return 403, document_id, handler.username try: latest_revision = document.get_latest_revision() diff --git a/src/include/handlers/protection.py b/src/include/handlers/protection.py new file mode 100644 index 0000000..0030e67 --- /dev/null +++ b/src/include/handlers/protection.py @@ -0,0 +1,267 @@ +""" +Handlers for password protection management. + +This module provides handlers for enabling, removing, and verifying +password protection on documents and directories. +""" + +from typing import Optional + +from include.classes.connection import ConnectionHandler +from include.classes.request import RequestHandler +from include.database.handler import Session +from include.database.models.classic import User +from include.database.models.entity import Document, Folder +from include.database.models.protection import PasswordProtection + +__all__ = [ + "RequestEnablePasswordProtectionHandler", + "RequestRemovePasswordProtectionHandler", + "RequestVerifyPasswordHandler", +] + + +class RequestEnablePasswordProtectionHandler(RequestHandler): + """ + Enable password protection on a document or directory. + """ + + data_schema = { + "type": "object", + "properties": { + "target_type": { + "type": "string", + "enum": ["document", "directory"] + }, + "target_id": {"type": "string", "minLength": 1}, + "password": {"type": "string", "minLength": 1} + }, + "required": ["target_type", "target_id", "password"], + "additionalProperties": False + } + + require_auth = True + + def handle(self, handler: ConnectionHandler): + target_type: str = handler.data["target_type"] + target_id: str = handler.data["target_id"] + password: str = handler.data["password"] + + with Session() as session: + user = session.get(User, handler.username) + assert user is not None + + # Get the target object + if target_type == "document": + target = session.get(Document, target_id) + if not target: + handler.conclude_request(404, {}, "Document not found") + return 404, target_id, handler.username + + # Check if user has manage permissions + if not target.check_access_requirements(user, access_type="manage"): + handler.conclude_request(403, {}, "Access denied to manage this document") + return 403, target_id, handler.username + + elif target_type == "directory": + target = session.get(Folder, target_id) + if not target: + handler.conclude_request(404, {}, "Directory not found") + return 404, target_id, handler.username + + # Check if user has manage permissions + if not target.check_access_requirements(user, access_type="manage"): + handler.conclude_request(403, {}, "Access denied to manage this directory") + return 403, target_id, handler.username + else: + handler.conclude_request(400, {}, "Invalid target type") + return 400, target_id, handler.username + + # Check if protection already exists + existing_protection = ( + session.query(PasswordProtection) + .filter( + PasswordProtection.target_type == target_type, + PasswordProtection.target_id == target_id + ) + .first() + ) + + if existing_protection: + # Update existing protection + existing_protection.set_password(password) + session.commit() + handler.conclude_request( + 200, {}, "Password protection updated successfully" + ) + return 0, target_id, handler.username + else: + # Create new protection + protection = PasswordProtection( + target_type=target_type, + target_id=target_id, + protection_type="password" + ) + protection.set_password(password) + session.add(protection) + session.commit() + + handler.conclude_request( + 200, {}, "Password protection enabled successfully" + ) + return 0, target_id, handler.username + + +class RequestRemovePasswordProtectionHandler(RequestHandler): + """ + Remove password protection from a document or directory. + """ + + data_schema = { + "type": "object", + "properties": { + "target_type": { + "type": "string", + "enum": ["document", "directory"] + }, + "target_id": {"type": "string", "minLength": 1} + }, + "required": ["target_type", "target_id"], + "additionalProperties": False + } + + require_auth = True + + def handle(self, handler: ConnectionHandler): + target_type: str = handler.data["target_type"] + target_id: str = handler.data["target_id"] + + with Session() as session: + user = session.get(User, handler.username) + assert user is not None + + # Get the target object + if target_type == "document": + target = session.get(Document, target_id) + if not target: + handler.conclude_request(404, {}, "Document not found") + return 404, target_id, handler.username + + # Check if user has manage permissions + if not target.check_access_requirements(user, access_type="manage"): + handler.conclude_request(403, {}, "Access denied to manage this document") + return 403, target_id, handler.username + + elif target_type == "directory": + target = session.get(Folder, target_id) + if not target: + handler.conclude_request(404, {}, "Directory not found") + return 404, target_id, handler.username + + # Check if user has manage permissions + if not target.check_access_requirements(user, access_type="manage"): + handler.conclude_request(403, {}, "Access denied to manage this directory") + return 403, target_id, handler.username + else: + handler.conclude_request(400, {}, "Invalid target type") + return 400, target_id, handler.username + + # Find and delete the protection + protection = ( + session.query(PasswordProtection) + .filter( + PasswordProtection.target_type == target_type, + PasswordProtection.target_id == target_id + ) + .first() + ) + + if not protection: + handler.conclude_request( + 404, {}, "Password protection not found for this object" + ) + return 404, target_id, handler.username + + session.delete(protection) + session.commit() + + handler.conclude_request( + 200, {}, "Password protection removed successfully" + ) + return 0, target_id, handler.username + + +class RequestVerifyPasswordHandler(RequestHandler): + """ + Verify a password for a protected document or directory. + This is useful for client-side validation before attempting access. + """ + + data_schema = { + "type": "object", + "properties": { + "target_type": { + "type": "string", + "enum": ["document", "directory"] + }, + "target_id": {"type": "string", "minLength": 1}, + "password": {"type": "string"} + }, + "required": ["target_type", "target_id", "password"], + "additionalProperties": False + } + + require_auth = True + + def handle(self, handler: ConnectionHandler): + target_type: str = handler.data["target_type"] + target_id: str = handler.data["target_id"] + password: str = handler.data["password"] + + with Session() as session: + user = session.get(User, handler.username) + assert user is not None + + # Get the target object + if target_type == "document": + target = session.get(Document, target_id) + if not target: + handler.conclude_request(404, {}, "Document not found") + return 404, target_id, handler.username + + elif target_type == "directory": + target = session.get(Folder, target_id) + if not target: + handler.conclude_request(404, {}, "Directory not found") + return 404, target_id, handler.username + else: + handler.conclude_request(400, {}, "Invalid target type") + return 400, target_id, handler.username + + # Find the protection + protection = ( + session.query(PasswordProtection) + .filter( + PasswordProtection.target_type == target_type, + PasswordProtection.target_id == target_id + ) + .first() + ) + + if not protection: + handler.conclude_request( + 404, {}, "Password protection not found for this object" + ) + return 404, target_id, handler.username + + # Verify the password + if protection.verify_password(password): + handler.conclude_request( + 200, {"verified": True}, "Password verified successfully" + ) + return 0, target_id, handler.username + else: + handler.conclude_request( + 403, {"verified": False}, "Incorrect password" + ) + return 403, target_id, handler.username diff --git a/src/main.py b/src/main.py index 09d26d8..a094f98 100644 --- a/src/main.py +++ b/src/main.py @@ -41,6 +41,7 @@ from include.database.models.classic import User, UserGroup from include.database.models.entity import Document, DocumentRevision from include.database.models.file import File +from include.database.models.protection import PasswordProtection from include.util.log import getCustomLogger From c81662314251b36d430f29dcfe046277ee2fcee3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 05:24:15 +0000 Subject: [PATCH 3/8] Fix metadata column name conflict in PasswordProtection model Co-authored-by: Creeper19472 <38857196+Creeper19472@users.noreply.github.com> --- src/include/database/models/protection.py | 2 +- tests/test_password_protection.py | 405 ++++++++++++++++++++++ 2 files changed, 406 insertions(+), 1 deletion(-) create mode 100644 tests/test_password_protection.py diff --git a/src/include/database/models/protection.py b/src/include/database/models/protection.py index a8422f3..ebf212d 100644 --- a/src/include/database/models/protection.py +++ b/src/include/database/models/protection.py @@ -46,7 +46,7 @@ class PasswordProtection(Base): ) # Additional metadata (reserved for future use) - metadata: Mapped[Optional[str]] = mapped_column( + protection_metadata: Mapped[Optional[str]] = mapped_column( Text, nullable=True, comment="JSON metadata for future extensions" ) diff --git a/tests/test_password_protection.py b/tests/test_password_protection.py new file mode 100644 index 0000000..4b5427b --- /dev/null +++ b/tests/test_password_protection.py @@ -0,0 +1,405 @@ +""" +Tests for password protection functionality. +""" + +import pytest +from tests.test_client import CFMSTestClient + + +class TestPasswordProtection: + """Test password protection on documents and directories.""" + + @pytest.mark.asyncio + async def test_enable_password_protection_on_document( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test enabling password protection on a document.""" + document_id = test_document["document_id"] + password = "test_password_123" + + # Enable password protection + response = await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 200, \ + f"Failed to enable password protection: {response.get('message', '')}" + + @pytest.mark.asyncio + async def test_access_password_protected_document_without_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test accessing a password-protected document without providing password.""" + document_id = test_document["document_id"] + password = "test_password_456" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Try to access without password - should return 202 + response = await authenticated_client.send_request( + action="get_document_info", + data={"document_id": document_id} + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 202, \ + f"Expected 202 (Password required), got {response.get('code')}: {response.get('message', '')}" + + @pytest.mark.asyncio + async def test_access_password_protected_document_with_wrong_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test accessing a password-protected document with incorrect password.""" + document_id = test_document["document_id"] + correct_password = "correct_password_789" + wrong_password = "wrong_password" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": correct_password + } + ) + + # Try to access with wrong password - should return 403 + response = await authenticated_client.send_request( + action="get_document_info", + data={ + "document_id": document_id, + "password": wrong_password + } + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 403, \ + f"Expected 403 (Incorrect password), got {response.get('code')}: {response.get('message', '')}" + + @pytest.mark.asyncio + async def test_access_password_protected_document_with_correct_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test accessing a password-protected document with correct password.""" + document_id = test_document["document_id"] + password = "correct_password_abc" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Access with correct password - should return 200 + response = await authenticated_client.send_request( + action="get_document_info", + data={ + "document_id": document_id, + "password": password + } + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 200, \ + f"Expected 200 (Success), got {response.get('code')}: {response.get('message', '')}" + assert "data" in response, "Response missing 'data'" + + @pytest.mark.asyncio + async def test_remove_password_protection_from_document( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test removing password protection from a document.""" + document_id = test_document["document_id"] + password = "temp_password_xyz" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Remove password protection + response = await authenticated_client.send_request( + action="remove_password_protection", + data={ + "target_type": "document", + "target_id": document_id + } + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 200, \ + f"Failed to remove password protection: {response.get('message', '')}" + + # Now access without password should work (200) + response = await authenticated_client.send_request( + action="get_document_info", + data={"document_id": document_id} + ) + + assert response["code"] == 200, \ + f"Expected 200 after removing protection, got {response.get('code')}" + + @pytest.mark.asyncio + async def test_verify_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test password verification endpoint.""" + document_id = test_document["document_id"] + password = "verify_test_password" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Verify correct password + response = await authenticated_client.send_request( + action="verify_password", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + assert response["code"] == 200, \ + f"Expected 200 for correct password, got {response.get('code')}" + assert response.get("data", {}).get("verified") == True, \ + "Expected verified=True for correct password" + + # Verify incorrect password + response = await authenticated_client.send_request( + action="verify_password", + data={ + "target_type": "document", + "target_id": document_id, + "password": "wrong_password" + } + ) + + assert response["code"] == 403, \ + f"Expected 403 for incorrect password, got {response.get('code')}" + assert response.get("data", {}).get("verified") == False, \ + "Expected verified=False for incorrect password" + + @pytest.mark.asyncio + async def test_password_protection_on_directory( + self, authenticated_client: CFMSTestClient + ): + """Test password protection on directories.""" + # Create a test directory + create_response = await authenticated_client.create_directory("Test Protected Dir") + assert create_response["code"] == 200, "Failed to create test directory" + directory_id = create_response["data"]["id"] + + try: + password = "dir_password_123" + + # Enable password protection + response = await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "directory", + "target_id": directory_id, + "password": password + } + ) + assert response["code"] == 200, "Failed to enable password protection on directory" + + # Try to access without password - should return 202 + response = await authenticated_client.send_request( + action="get_directory_info", + data={"directory_id": directory_id} + ) + assert response["code"] == 202, \ + f"Expected 202 (Password required), got {response.get('code')}" + + # Access with correct password - should return 200 + response = await authenticated_client.send_request( + action="get_directory_info", + data={ + "directory_id": directory_id, + "password": password + } + ) + assert response["code"] == 200, \ + f"Expected 200 with correct password, got {response.get('code')}" + + finally: + # Cleanup + try: + await authenticated_client.delete_directory(directory_id) + except Exception: + pass + + @pytest.mark.asyncio + async def test_update_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test updating password on a protected document.""" + document_id = test_document["document_id"] + old_password = "old_password_123" + new_password = "new_password_456" + + # Enable password protection with old password + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": old_password + } + ) + + # Update to new password + response = await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": new_password + } + ) + assert response["code"] == 200, "Failed to update password" + + # Old password should no longer work + response = await authenticated_client.send_request( + action="get_document_info", + data={ + "document_id": document_id, + "password": old_password + } + ) + assert response["code"] == 403, \ + f"Old password should not work, got {response.get('code')}" + + # New password should work + response = await authenticated_client.send_request( + action="get_document_info", + data={ + "document_id": document_id, + "password": new_password + } + ) + assert response["code"] == 200, \ + f"New password should work, got {response.get('code')}" + + @pytest.mark.asyncio + async def test_password_protection_on_get_document( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test password protection on get_document action.""" + document_id = test_document["document_id"] + password = "get_doc_password" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Try get_document without password - should return 202 + response = await authenticated_client.send_request( + action="get_document", + data={"document_id": document_id} + ) + assert response["code"] == 202, \ + f"Expected 202 for get_document without password, got {response.get('code')}" + + # Try with password - should return 200 + response = await authenticated_client.send_request( + action="get_document", + data={ + "document_id": document_id, + "password": password + } + ) + assert response["code"] == 200, \ + f"Expected 200 for get_document with password, got {response.get('code')}" + + @pytest.mark.asyncio + async def test_password_protection_on_list_directory( + self, authenticated_client: CFMSTestClient + ): + """Test password protection on list_directory action.""" + # Create a test directory + create_response = await authenticated_client.create_directory("Test List Protected Dir") + assert create_response["code"] == 200, "Failed to create test directory" + directory_id = create_response["data"]["id"] + + try: + password = "list_dir_password" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "directory", + "target_id": directory_id, + "password": password + } + ) + + # Try list_directory without password - should return 202 + response = await authenticated_client.send_request( + action="list_directory", + data={"folder_id": directory_id} + ) + assert response["code"] == 202, \ + f"Expected 202 for list_directory without password, got {response.get('code')}" + + # Try with password - should return 200 + response = await authenticated_client.send_request( + action="list_directory", + data={ + "folder_id": directory_id, + "password": password + } + ) + assert response["code"] == 200, \ + f"Expected 200 for list_directory with password, got {response.get('code')}" + + finally: + # Cleanup + try: + await authenticated_client.delete_directory(directory_id) + except Exception: + pass From 6dd4f5d971aafd2cbf708c15fa5498ecaeead58c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 05:27:09 +0000 Subject: [PATCH 4/8] Address code review feedback - use constants and improve test assertions Co-authored-by: Creeper19472 <38857196+Creeper19472@users.noreply.github.com> --- src/include/constants.py | 4 ++++ src/include/database/models/entity.py | 11 +++++------ tests/test_password_protection.py | 4 ++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/include/constants.py b/src/include/constants.py index 68a3d64..49d5d82 100644 --- a/src/include/constants.py +++ b/src/include/constants.py @@ -11,6 +11,7 @@ "FILE_TRANSFER_MAX_CHUNK_SIZE", "FILE_TRANSFER_MIN_CHUNK_SIZE", "FILE_TASK_DEFAULT_DURATION_SECONDS", + "TARGET_TYPE_MAPPING", ] CORE_VERSION = Version("0.1.0.251226_alpha") @@ -19,6 +20,9 @@ AVAILABLE_ACCESS_TYPES = ["read", "write", "move", "manage"] AVAILABLE_BLOCK_TYPES: set = {"read", "write", "move"} +# Target type mapping for database table names to API types +TARGET_TYPE_MAPPING = {"folders": "directory", "documents": "document"} + # Authentication and Security Constants DEFAULT_TOKEN_EXPIRY_SECONDS = 3600 # 1 hour FAILED_LOGIN_DELAY_SECONDS = 3 # Delay after failed login attempt diff --git a/src/include/database/models/entity.py b/src/include/database/models/entity.py index 82378a1..51f8670 100644 --- a/src/include/database/models/entity.py +++ b/src/include/database/models/entity.py @@ -34,14 +34,13 @@ def get_password_protection(self): PasswordProtection object if password protected, None otherwise """ from include.database.models.protection import PasswordProtection - - _TARGET_TYPE_MAPPING = {"folders": "directory", "documents": "document"} + from include.constants import TARGET_TYPE_MAPPING session = object_session(self) if not session: raise RuntimeError("No active session found for object") - target_type = _TARGET_TYPE_MAPPING[self.__tablename__] + target_type = TARGET_TYPE_MAPPING[self.__tablename__] protection = ( session.query(PasswordProtection) @@ -104,7 +103,7 @@ def check_access_requirements(self, user: User, access_type: str = "read") -> bo - If no access rules are defined, access is granted by default. """ - _TARGET_TYPE_MAPPING = {"folders": "directory", "documents": "document"} + from include.constants import TARGET_TYPE_MAPPING def match_rights(sub_rights_group): if not sub_rights_group: @@ -225,7 +224,7 @@ def match_primary_sub_group(per_match_group): ObjectAccessEntry.entity_type == "user", ObjectAccessEntry.entity_identifier == user.username, ObjectAccessEntry.target_type - == _TARGET_TYPE_MAPPING[self.__tablename__], + == TARGET_TYPE_MAPPING[self.__tablename__], ObjectAccessEntry.target_identifier == self.id, ObjectAccessEntry.access_type == access_type, ObjectAccessEntry.start_time <= now, @@ -246,7 +245,7 @@ def match_primary_sub_group(per_match_group): ObjectAccessEntry.entity_type == "group", ObjectAccessEntry.entity_identifier == group.group_name, ObjectAccessEntry.target_type - == _TARGET_TYPE_MAPPING[self.__tablename__], + == TARGET_TYPE_MAPPING[self.__tablename__], ObjectAccessEntry.target_identifier == self.id, ObjectAccessEntry.access_type == access_type, ObjectAccessEntry.start_time <= now, diff --git a/tests/test_password_protection.py b/tests/test_password_protection.py index 4b5427b..1202c23 100644 --- a/tests/test_password_protection.py +++ b/tests/test_password_protection.py @@ -198,7 +198,7 @@ async def test_verify_password( assert response["code"] == 200, \ f"Expected 200 for correct password, got {response.get('code')}" - assert response.get("data", {}).get("verified") == True, \ + assert response.get("data", {}).get("verified") is True, \ "Expected verified=True for correct password" # Verify incorrect password @@ -213,7 +213,7 @@ async def test_verify_password( assert response["code"] == 403, \ f"Expected 403 for incorrect password, got {response.get('code')}" - assert response.get("data", {}).get("verified") == False, \ + assert response.get("data", {}).get("verified") is False, \ "Expected verified=False for incorrect password" @pytest.mark.asyncio From a409837efe43839e60eb908bb85e23b27fdecb11 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 05:30:15 +0000 Subject: [PATCH 5/8] Improve password hashing security with PBKDF2 and constant-time comparison Co-authored-by: Creeper19472 <38857196+Creeper19472@users.noreply.github.com> --- src/include/database/models/entity.py | 10 ++++++---- src/include/database/models/protection.py | 23 ++++++++++++++++++----- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/include/database/models/entity.py b/src/include/database/models/entity.py index 51f8670..a7d9795 100644 --- a/src/include/database/models/entity.py +++ b/src/include/database/models/entity.py @@ -5,7 +5,7 @@ import secrets from sqlalchemy import VARCHAR, Float, ForeignKey, Integer from include.classes.exceptions import NoActiveRevisionsError -from include.constants import AVAILABLE_ACCESS_TYPES, AVAILABLE_BLOCK_TYPES +from include.constants import AVAILABLE_ACCESS_TYPES, AVAILABLE_BLOCK_TYPES, TARGET_TYPE_MAPPING from include.database.handler import Base from include.classes.access_rule import AccessRuleBase from sqlalchemy.orm import Mapped @@ -19,6 +19,10 @@ from include.database.models.classic import User, ObjectAccessEntry from include.database.models.blocking import UserBlockEntry, UserBlockSubEntry +# Import protection model - placed here to avoid circular imports +if TYPE_CHECKING: + from include.database.models.protection import PasswordProtection + class BaseObject(Base): __abstract__ = True @@ -33,8 +37,8 @@ def get_password_protection(self): Returns: PasswordProtection object if password protected, None otherwise """ + # Import here to avoid circular dependency at module load time from include.database.models.protection import PasswordProtection - from include.constants import TARGET_TYPE_MAPPING session = object_session(self) if not session: @@ -103,8 +107,6 @@ def check_access_requirements(self, user: User, access_type: str = "read") -> bo - If no access rules are defined, access is granted by default. """ - from include.constants import TARGET_TYPE_MAPPING - def match_rights(sub_rights_group): if not sub_rights_group: return True diff --git a/src/include/database/models/protection.py b/src/include/database/models/protection.py index ebf212d..8d9841b 100644 --- a/src/include/database/models/protection.py +++ b/src/include/database/models/protection.py @@ -55,26 +55,39 @@ def set_password(self, plain_password: str) -> None: """ Set a new password for this protection entry. + Uses PBKDF2-HMAC-SHA256 with 600,000 iterations for secure password hashing, + following OWASP recommendations for password storage. + Args: plain_password: The plain text password to hash and store """ self.salt = secrets.token_hex(16) - salted = plain_password + self.salt - self.password_hash = hashlib.sha256(salted.encode("utf-8")).hexdigest() + # Use PBKDF2-HMAC for secure password hashing (OWASP recommended) + # 600,000 iterations is the OWASP recommended minimum as of 2023 + password_bytes = plain_password.encode('utf-8') + salt_bytes = self.salt.encode('utf-8') + key = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 600000) + self.password_hash = key.hex() def verify_password(self, plain_password: str) -> bool: """ Verify a password against the stored hash. + Uses constant-time comparison to prevent timing attacks. + Args: plain_password: The plain text password to verify Returns: True if the password matches, False otherwise """ - salted = plain_password + self.salt - password_hash = hashlib.sha256(salted.encode("utf-8")).hexdigest() - return password_hash == self.password_hash + password_bytes = plain_password.encode('utf-8') + salt_bytes = self.salt.encode('utf-8') + key = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 600000) + computed_hash = key.hex() + + # Use constant-time comparison to prevent timing attacks + return secrets.compare_digest(computed_hash, self.password_hash) def __repr__(self) -> str: return ( From 09e892baece6e87bdba71d1dfb6289c6d9eefbb1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 06:04:15 +0000 Subject: [PATCH 6/8] Refactor to unified ObjectProtection table and move logic to handlers - Renamed PasswordProtection to ObjectProtection for single table design - Changed schema: protection_type column distinguishes types, protection_data stores type-specific JSON - Removed password-specific methods from BaseObject to keep it clean - Created check_password_protection helper function in protection handlers - Updated all handlers to use helper function instead of BaseObject methods - Table now supports multiple protection types (password, encryption, biometric, etc.) Co-authored-by: Creeper19472 <38857196+Creeper19472@users.noreply.github.com> --- src/include/database/models/entity.py | 64 ------------------- src/include/database/models/protection.py | 77 +++++++++++++++-------- src/include/handlers/directory.py | 23 +++---- src/include/handlers/document.py | 23 +++---- src/include/handlers/protection.py | 76 ++++++++++++++++++---- src/main.py | 2 +- 6 files changed, 134 insertions(+), 131 deletions(-) diff --git a/src/include/database/models/entity.py b/src/include/database/models/entity.py index a7d9795..ed9a6f8 100644 --- a/src/include/database/models/entity.py +++ b/src/include/database/models/entity.py @@ -19,10 +19,6 @@ from include.database.models.classic import User, ObjectAccessEntry from include.database.models.blocking import UserBlockEntry, UserBlockSubEntry -# Import protection model - placed here to avoid circular imports -if TYPE_CHECKING: - from include.database.models.protection import PasswordProtection - class BaseObject(Base): __abstract__ = True @@ -30,66 +26,6 @@ class BaseObject(Base): id: Mapped[str] access_rules: Mapped[List] - def get_password_protection(self): - """ - Get the password protection entry for this object, if any. - - Returns: - PasswordProtection object if password protected, None otherwise - """ - # Import here to avoid circular dependency at module load time - from include.database.models.protection import PasswordProtection - - session = object_session(self) - if not session: - raise RuntimeError("No active session found for object") - - target_type = TARGET_TYPE_MAPPING[self.__tablename__] - - protection = ( - session.query(PasswordProtection) - .filter( - PasswordProtection.target_type == target_type, - PasswordProtection.target_id == self.id - ) - .first() - ) - - return protection - - def is_password_protected(self) -> bool: - """ - Check if this object is password protected. - - Returns: - True if password protected, False otherwise - """ - return self.get_password_protection() is not None - - def verify_password(self, password: Optional[str]) -> bool: - """ - Verify the provided password against the stored password. - - Args: - password: The password to verify (None if not provided) - - Returns: - True if password is correct or object is not protected, - False if password is incorrect or missing when required - """ - protection = self.get_password_protection() - - if not protection: - # Not password protected, access granted - return True - - if password is None: - # Password required but not provided - return False - - # Verify the password - return protection.verify_password(password) - def check_access_requirements(self, user: User, access_type: str = "read") -> bool: """ Checks if a given user meets the access requirements for a specific access type based on defined access rules. diff --git a/src/include/database/models/protection.py b/src/include/database/models/protection.py index 8d9841b..95c8c50 100644 --- a/src/include/database/models/protection.py +++ b/src/include/database/models/protection.py @@ -1,8 +1,8 @@ """ -Database models for object protection (passwords, etc.) +Database models for object protection (passwords, encryption, biometric, etc.) -This module provides a scalable protection system that can be extended -with additional protection types in the future. +This module provides a unified protection system that can handle multiple +protection types in a single table. """ import hashlib @@ -16,14 +16,14 @@ from include.database.handler import Base -class PasswordProtection(Base): +class ObjectProtection(Base): """ - Model for password-based protection on documents and directories. + Unified model for all types of protection on documents and directories. - This table is separate from the main entity tables to maintain scalability - and allow for future protection types (e.g., encryption, biometric, etc.) + This single table handles all protection types (password, encryption, biometric, etc.) + distinguished by the protection_type column. """ - __tablename__ = "password_protections" + __tablename__ = "object_protections" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) @@ -35,14 +35,18 @@ class PasswordProtection(Base): VARCHAR(255), nullable=False, comment="ID of the protected object" ) - # Password storage (hashed) - password_hash: Mapped[str] = mapped_column(Text, nullable=False) - salt: Mapped[str] = mapped_column(Text, nullable=False) - - # Reserved for future protection types + # Protection type identifier protection_type: Mapped[str] = mapped_column( - VARCHAR(64), nullable=False, default="password", - comment="Protection type: 'password' (reserved for future types like 'encryption', 'biometric')" + VARCHAR(64), nullable=False, + comment="Protection type: 'password', 'encryption', 'biometric', etc." + ) + + # Protection data (type-specific, stored as JSON or text) + # For password: contains password_hash and salt + # For other types: contains type-specific data + protection_data: Mapped[str] = mapped_column( + Text, nullable=False, + comment="Protection-specific data (JSON format for flexibility)" ) # Additional metadata (reserved for future use) @@ -53,7 +57,7 @@ class PasswordProtection(Base): def set_password(self, plain_password: str) -> None: """ - Set a new password for this protection entry. + Set password protection data for this entry. Uses PBKDF2-HMAC-SHA256 with 600,000 iterations for secure password hashing, following OWASP recommendations for password storage. @@ -61,19 +65,29 @@ def set_password(self, plain_password: str) -> None: Args: plain_password: The plain text password to hash and store """ - self.salt = secrets.token_hex(16) - # Use PBKDF2-HMAC for secure password hashing (OWASP recommended) - # 600,000 iterations is the OWASP recommended minimum as of 2023 + import json + + self.protection_type = "password" + + # Generate salt and hash + salt = secrets.token_hex(16) password_bytes = plain_password.encode('utf-8') - salt_bytes = self.salt.encode('utf-8') + salt_bytes = salt.encode('utf-8') key = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 600000) - self.password_hash = key.hex() + password_hash = key.hex() + + # Store as JSON + self.protection_data = json.dumps({ + "password_hash": password_hash, + "salt": salt + }) def verify_password(self, plain_password: str) -> bool: """ Verify a password against the stored hash. Uses constant-time comparison to prevent timing attacks. + Only works if protection_type is "password". Args: plain_password: The plain text password to verify @@ -81,16 +95,29 @@ def verify_password(self, plain_password: str) -> bool: Returns: True if the password matches, False otherwise """ + import json + + if self.protection_type != "password": + return False + + try: + data = json.loads(self.protection_data) + password_hash = data["password_hash"] + salt = data["salt"] + except (json.JSONDecodeError, KeyError): + return False + password_bytes = plain_password.encode('utf-8') - salt_bytes = self.salt.encode('utf-8') + salt_bytes = salt.encode('utf-8') key = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 600000) computed_hash = key.hex() # Use constant-time comparison to prevent timing attacks - return secrets.compare_digest(computed_hash, self.password_hash) + return secrets.compare_digest(computed_hash, password_hash) def __repr__(self) -> str: return ( - f"PasswordProtection(id={self.id!r}, " - f"target_type={self.target_type!r}, target_id={self.target_id!r})" + f"ObjectProtection(id={self.id!r}, " + f"target_type={self.target_type!r}, target_id={self.target_id!r}, " + f"protection_type={self.protection_type!r})" ) diff --git a/src/include/handlers/directory.py b/src/include/handlers/directory.py index ffb8cb9..045059d 100644 --- a/src/include/handlers/directory.py +++ b/src/include/handlers/directory.py @@ -7,6 +7,7 @@ from include.database.handler import Session from include.database.models.classic import User from include.database.models.entity import Folder, Document, FolderAccessRule +from include.handlers.protection import check_password_protection from include.util.audit import log_audit from include.util.rule.applying import apply_access_rules import include.system.messages as smsg @@ -79,13 +80,10 @@ def handle(self, handler: ConnectionHandler): return 403, folder_id, handler.username # Check password protection - if folder.is_password_protected(): - if password is None: - handler.conclude_request(202, {}, "Password required") - return 202, folder_id, handler.username - if not folder.verify_password(password): - handler.conclude_request(403, {}, "Incorrect password") - return 403, folder_id, handler.username + protection_code, protection_msg = check_password_protection(folder, password, session) + if protection_code != 0: + handler.conclude_request(protection_code, {}, protection_msg) + return protection_code, folder_id, handler.username parent = folder.parent children = folder.children @@ -193,13 +191,10 @@ def handle(self, handler: ConnectionHandler): return 403, directory_id, handler.username # Check password protection - if directory.is_password_protected(): - if password is None: - handler.conclude_request(202, {}, "Password required") - return 202, directory_id, handler.username - if not directory.verify_password(password): - handler.conclude_request(403, {}, "Incorrect password") - return 403, directory_id, handler.username + protection_code, protection_msg = check_password_protection(directory, password, session) + if protection_code != 0: + handler.conclude_request(protection_code, {}, protection_msg) + return protection_code, directory_id, handler.username info_code = 0 ### generate access_rules text diff --git a/src/include/handlers/document.py b/src/include/handlers/document.py index 4bbc206..8cfdd18 100644 --- a/src/include/handlers/document.py +++ b/src/include/handlers/document.py @@ -19,6 +19,7 @@ NoActiveRevisionsError, ) from include.database.models.file import File, FileTask +from include.handlers.protection import check_password_protection from include.util.rule.applying import apply_access_rules import include.system.messages as smsg @@ -120,13 +121,10 @@ def handle(self, handler: ConnectionHandler): return 403, document_id, handler.username # Check password protection - if document.is_password_protected(): - if password is None: - handler.conclude_request(202, {}, "Password required") - return 202, document_id, handler.username - if not document.verify_password(password): - handler.conclude_request(403, {}, "Incorrect password") - return 403, document_id, handler.username + protection_code, protection_msg = check_password_protection(document, password, session) + if protection_code != 0: + handler.conclude_request(protection_code, {}, protection_msg) + return protection_code, document_id, handler.username info_code = 0 ### generate access_rules text @@ -239,13 +237,10 @@ def handle(self, handler: ConnectionHandler): return 403, document_id, handler.username # Check password protection - if document.is_password_protected(): - if password is None: - handler.conclude_request(202, {}, "Password required") - return 202, document_id, handler.username - if not document.verify_password(password): - handler.conclude_request(403, {}, "Incorrect password") - return 403, document_id, handler.username + protection_code, protection_msg = check_password_protection(document, password, session) + if protection_code != 0: + handler.conclude_request(protection_code, {}, protection_msg) + return protection_code, document_id, handler.username try: latest_revision = document.get_latest_revision() diff --git a/src/include/handlers/protection.py b/src/include/handlers/protection.py index 0030e67..1d457ed 100644 --- a/src/include/handlers/protection.py +++ b/src/include/handlers/protection.py @@ -9,18 +9,66 @@ from include.classes.connection import ConnectionHandler from include.classes.request import RequestHandler +from include.constants import TARGET_TYPE_MAPPING from include.database.handler import Session from include.database.models.classic import User from include.database.models.entity import Document, Folder -from include.database.models.protection import PasswordProtection +from include.database.models.protection import ObjectProtection __all__ = [ "RequestEnablePasswordProtectionHandler", "RequestRemovePasswordProtectionHandler", "RequestVerifyPasswordHandler", + "check_password_protection", ] +def check_password_protection(target, password: Optional[str], session) -> tuple[int, str]: + """ + Check password protection for a document or directory. + + This is a helper function to be used by request handlers to check + if an object has password protection and verify the password if provided. + + Args: + target: The Document or Folder object to check + password: The password provided by the user (None if not provided) + session: The database session + + Returns: + Tuple of (code, message): + - (0, "success") if access granted (no protection or correct password) + - (202, "Password required") if protected but no password provided + - (403, "Incorrect password") if protected and wrong password provided + """ + target_type = TARGET_TYPE_MAPPING[target.__tablename__] + + # Check if object has password protection + protection = ( + session.query(ObjectProtection) + .filter( + ObjectProtection.target_type == target_type, + ObjectProtection.target_id == target.id, + ObjectProtection.protection_type == "password" + ) + .first() + ) + + if not protection: + # No password protection, access granted + return (0, "success") + + if password is None: + # Password required but not provided + return (202, "Password required") + + # Verify the password + if protection.verify_password(password): + return (0, "success") + else: + return (403, "Incorrect password") + + class RequestEnablePasswordProtectionHandler(RequestHandler): """ Enable password protection on a document or directory. @@ -79,10 +127,11 @@ def handle(self, handler: ConnectionHandler): # Check if protection already exists existing_protection = ( - session.query(PasswordProtection) + session.query(ObjectProtection) .filter( - PasswordProtection.target_type == target_type, - PasswordProtection.target_id == target_id + ObjectProtection.target_type == target_type, + ObjectProtection.target_id == target_id, + ObjectProtection.protection_type == "password" ) .first() ) @@ -97,10 +146,9 @@ def handle(self, handler: ConnectionHandler): return 0, target_id, handler.username else: # Create new protection - protection = PasswordProtection( + protection = ObjectProtection( target_type=target_type, - target_id=target_id, - protection_type="password" + target_id=target_id ) protection.set_password(password) session.add(protection) @@ -168,10 +216,11 @@ def handle(self, handler: ConnectionHandler): # Find and delete the protection protection = ( - session.query(PasswordProtection) + session.query(ObjectProtection) .filter( - PasswordProtection.target_type == target_type, - PasswordProtection.target_id == target_id + ObjectProtection.target_type == target_type, + ObjectProtection.target_id == target_id, + ObjectProtection.protection_type == "password" ) .first() ) @@ -240,10 +289,11 @@ def handle(self, handler: ConnectionHandler): # Find the protection protection = ( - session.query(PasswordProtection) + session.query(ObjectProtection) .filter( - PasswordProtection.target_type == target_type, - PasswordProtection.target_id == target_id + ObjectProtection.target_type == target_type, + ObjectProtection.target_id == target_id, + ObjectProtection.protection_type == "password" ) .first() ) diff --git a/src/main.py b/src/main.py index a094f98..3e50ea5 100644 --- a/src/main.py +++ b/src/main.py @@ -41,7 +41,7 @@ from include.database.models.classic import User, UserGroup from include.database.models.entity import Document, DocumentRevision from include.database.models.file import File -from include.database.models.protection import PasswordProtection +from include.database.models.protection import ObjectProtection from include.util.log import getCustomLogger From 5c4adda66b07ab2e1a6f3edad21cd40a4f993a8c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 06:05:45 +0000 Subject: [PATCH 7/8] Move json import to module level for performance Co-authored-by: Creeper19472 <38857196+Creeper19472@users.noreply.github.com> --- src/include/database/models/protection.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/include/database/models/protection.py b/src/include/database/models/protection.py index 95c8c50..64daf53 100644 --- a/src/include/database/models/protection.py +++ b/src/include/database/models/protection.py @@ -6,6 +6,7 @@ """ import hashlib +import json import secrets from typing import Optional @@ -65,8 +66,6 @@ def set_password(self, plain_password: str) -> None: Args: plain_password: The plain text password to hash and store """ - import json - self.protection_type = "password" # Generate salt and hash @@ -95,8 +94,6 @@ def verify_password(self, plain_password: str) -> bool: Returns: True if the password matches, False otherwise """ - import json - if self.protection_type != "password": return False From d64e638feb9a3f433df9b6a100f5b61b1d40f3f8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 06:06:41 +0000 Subject: [PATCH 8/8] Add default value for protection_type column Co-authored-by: Creeper19472 <38857196+Creeper19472@users.noreply.github.com> --- src/include/database/models/protection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/include/database/models/protection.py b/src/include/database/models/protection.py index 64daf53..a50a760 100644 --- a/src/include/database/models/protection.py +++ b/src/include/database/models/protection.py @@ -38,7 +38,7 @@ class ObjectProtection(Base): # Protection type identifier protection_type: Mapped[str] = mapped_column( - VARCHAR(64), nullable=False, + VARCHAR(64), nullable=False, default="password", comment="Protection type: 'password', 'encryption', 'biometric', etc." )