diff --git a/src/include/connection_handler.py b/src/include/connection_handler.py index 31f974a..ef3531e 100644 --- a/src/include/connection_handler.py +++ b/src/include/connection_handler.py @@ -74,6 +74,11 @@ ) from include.handlers.debugging.throw import RequestThrowExceptionHandler from include.handlers.search import RequestSearchHandler +from include.handlers.protection import ( + RequestEnablePasswordProtectionHandler, + RequestRemovePasswordProtectionHandler, + RequestVerifyPasswordHandler, +) from include.constants import CORE_VERSION, PROTOCOL_VERSION from include.shared import connected_listeners, lockdown_enabled @@ -193,6 +198,10 @@ def handle_request(websocket: websockets.sync.server.ServerConnection, message: # 系统类 "lockdown": RequestLockdownHandler, "view_audit_logs": RequestViewAuditLogsHandler, + # 密码保护类 + "enable_password_protection": RequestEnablePasswordProtectionHandler, + "remove_password_protection": RequestRemovePasswordProtectionHandler, + "verify_password": RequestVerifyPasswordHandler, } # Debugging diff --git a/src/include/constants.py b/src/include/constants.py index 68a3d64..49d5d82 100644 --- a/src/include/constants.py +++ b/src/include/constants.py @@ -11,6 +11,7 @@ "FILE_TRANSFER_MAX_CHUNK_SIZE", "FILE_TRANSFER_MIN_CHUNK_SIZE", "FILE_TASK_DEFAULT_DURATION_SECONDS", + "TARGET_TYPE_MAPPING", ] CORE_VERSION = Version("0.1.0.251226_alpha") @@ -19,6 +20,9 @@ AVAILABLE_ACCESS_TYPES = ["read", "write", "move", "manage"] AVAILABLE_BLOCK_TYPES: set = {"read", "write", "move"} +# Target type mapping for database table names to API types +TARGET_TYPE_MAPPING = {"folders": "directory", "documents": "document"} + # Authentication and Security Constants DEFAULT_TOKEN_EXPIRY_SECONDS = 3600 # 1 hour FAILED_LOGIN_DELAY_SECONDS = 3 # Delay after failed login attempt diff --git a/src/include/database/models/entity.py b/src/include/database/models/entity.py index abc51c4..ed9a6f8 100644 --- a/src/include/database/models/entity.py +++ b/src/include/database/models/entity.py @@ -5,7 +5,7 @@ import secrets from sqlalchemy import VARCHAR, Float, ForeignKey, Integer from include.classes.exceptions import NoActiveRevisionsError -from include.constants import AVAILABLE_ACCESS_TYPES, AVAILABLE_BLOCK_TYPES +from include.constants import AVAILABLE_ACCESS_TYPES, AVAILABLE_BLOCK_TYPES, TARGET_TYPE_MAPPING from include.database.handler import Base from include.classes.access_rule import AccessRuleBase from sqlalchemy.orm import Mapped @@ -43,8 +43,6 @@ def check_access_requirements(self, user: User, access_type: str = "read") -> bo - If no access rules are defined, access is granted by default. """ - _TARGET_TYPE_MAPPING = {"folders": "directory", "documents": "document"} - def match_rights(sub_rights_group): if not sub_rights_group: return True @@ -164,7 +162,7 @@ def match_primary_sub_group(per_match_group): ObjectAccessEntry.entity_type == "user", ObjectAccessEntry.entity_identifier == user.username, ObjectAccessEntry.target_type - == _TARGET_TYPE_MAPPING[self.__tablename__], + == TARGET_TYPE_MAPPING[self.__tablename__], ObjectAccessEntry.target_identifier == self.id, ObjectAccessEntry.access_type == access_type, ObjectAccessEntry.start_time <= now, @@ -185,7 +183,7 @@ def match_primary_sub_group(per_match_group): ObjectAccessEntry.entity_type == "group", ObjectAccessEntry.entity_identifier == group.group_name, ObjectAccessEntry.target_type - == _TARGET_TYPE_MAPPING[self.__tablename__], + == TARGET_TYPE_MAPPING[self.__tablename__], ObjectAccessEntry.target_identifier == self.id, ObjectAccessEntry.access_type == access_type, ObjectAccessEntry.start_time <= now, diff --git a/src/include/database/models/protection.py b/src/include/database/models/protection.py new file mode 100644 index 0000000..a50a760 --- /dev/null +++ b/src/include/database/models/protection.py @@ -0,0 +1,120 @@ +""" +Database models for object protection (passwords, encryption, biometric, etc.) + +This module provides a unified protection system that can handle multiple +protection types in a single table. +""" + +import hashlib +import json +import secrets +from typing import Optional + +from sqlalchemy import VARCHAR, Integer, Text +from sqlalchemy.orm import Mapped +from sqlalchemy.orm import mapped_column + +from include.database.handler import Base + + +class ObjectProtection(Base): + """ + Unified model for all types of protection on documents and directories. + + This single table handles all protection types (password, encryption, biometric, etc.) + distinguished by the protection_type column. + """ + __tablename__ = "object_protections" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + + # Target object identification + target_type: Mapped[str] = mapped_column( + VARCHAR(64), nullable=False, comment="Type: 'document' or 'directory'" + ) + target_id: Mapped[str] = mapped_column( + VARCHAR(255), nullable=False, comment="ID of the protected object" + ) + + # Protection type identifier + protection_type: Mapped[str] = mapped_column( + VARCHAR(64), nullable=False, default="password", + comment="Protection type: 'password', 'encryption', 'biometric', etc." + ) + + # Protection data (type-specific, stored as JSON or text) + # For password: contains password_hash and salt + # For other types: contains type-specific data + protection_data: Mapped[str] = mapped_column( + Text, nullable=False, + comment="Protection-specific data (JSON format for flexibility)" + ) + + # Additional metadata (reserved for future use) + protection_metadata: Mapped[Optional[str]] = mapped_column( + Text, nullable=True, + comment="JSON metadata for future extensions" + ) + + def set_password(self, plain_password: str) -> None: + """ + Set password protection data for this entry. + + Uses PBKDF2-HMAC-SHA256 with 600,000 iterations for secure password hashing, + following OWASP recommendations for password storage. + + Args: + plain_password: The plain text password to hash and store + """ + self.protection_type = "password" + + # Generate salt and hash + salt = secrets.token_hex(16) + password_bytes = plain_password.encode('utf-8') + salt_bytes = salt.encode('utf-8') + key = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 600000) + password_hash = key.hex() + + # Store as JSON + self.protection_data = json.dumps({ + "password_hash": password_hash, + "salt": salt + }) + + def verify_password(self, plain_password: str) -> bool: + """ + Verify a password against the stored hash. + + Uses constant-time comparison to prevent timing attacks. + Only works if protection_type is "password". + + Args: + plain_password: The plain text password to verify + + Returns: + True if the password matches, False otherwise + """ + if self.protection_type != "password": + return False + + try: + data = json.loads(self.protection_data) + password_hash = data["password_hash"] + salt = data["salt"] + except (json.JSONDecodeError, KeyError): + return False + + password_bytes = plain_password.encode('utf-8') + salt_bytes = salt.encode('utf-8') + key = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 600000) + computed_hash = key.hex() + + # Use constant-time comparison to prevent timing attacks + return secrets.compare_digest(computed_hash, password_hash) + + def __repr__(self) -> str: + return ( + f"ObjectProtection(id={self.id!r}, " + f"target_type={self.target_type!r}, target_id={self.target_id!r}, " + f"protection_type={self.protection_type!r})" + ) diff --git a/src/include/handlers/directory.py b/src/include/handlers/directory.py index c3bc8a5..045059d 100644 --- a/src/include/handlers/directory.py +++ b/src/include/handlers/directory.py @@ -7,6 +7,7 @@ from include.database.handler import Session from include.database.models.classic import User from include.database.models.entity import Folder, Document, FolderAccessRule +from include.handlers.protection import check_password_protection from include.util.audit import log_audit from include.util.rule.applying import apply_access_rules import include.system.messages as smsg @@ -21,15 +22,19 @@ class RequestListDirectoryHandler(RequestHandler): handler (ConnectionHandler): The connection handler containing request data and methods for responding. Response Codes: 200 - Directory listing successful, returns a list of files and directories in the response data. + 202 - Password required for access. 400 - Invalid request. - 403 - Invalid user or token. + 403 - Invalid user or token, or incorrect password. 404 - Directory not found. 500 - Internal server error, with the exception message. """ data_schema = { "type": "object", - "properties": {"folder_id": {"anyOf": [{"type": "string"}, {"type": "null"}]}}, + "properties": { + "folder_id": {"anyOf": [{"type": "string"}, {"type": "null"}]}, + "password": {"type": "string"} + }, "required": ["folder_id"], "additionalProperties": False, } @@ -40,6 +45,7 @@ def handle(self, handler: ConnectionHandler): # Parse the directory listing request folder_id: Optional[str] = handler.data.get("folder_id") + password = handler.data.get("password") with Session() as session: this_user = session.get(User, handler.username) @@ -72,6 +78,13 @@ def handle(self, handler: ConnectionHandler): **{"code": 403, "message": "Access denied", "data": {}} ) return 403, folder_id, handler.username + + # Check password protection + protection_code, protection_msg = check_password_protection(folder, password, session) + if protection_code != 0: + handler.conclude_request(protection_code, {}, protection_msg) + return protection_code, folder_id, handler.username + parent = folder.parent children = folder.children documents = folder.documents @@ -129,15 +142,19 @@ class RequestGetDirectoryInfoHandler(RequestHandler): handler (ConnectionHandler): The connection handler containing request data and methods for responding. Response Codes: 200 - Directory info successful, returns directory info in the response data. + 202 - Password required for access. 400 - Invalid request. - 403 - Invalid user or token. + 403 - Invalid user or token, or incorrect password. 404 - Directory not found. 500 - Internal server error, with the exception message. """ data_schema = { "type": "object", - "properties": {"directory_id": {"type": "string", "minLength": 1}}, + "properties": { + "directory_id": {"type": "string", "minLength": 1}, + "password": {"type": "string"} + }, "required": ["directory_id"], "additionalProperties": False, } @@ -145,6 +162,7 @@ class RequestGetDirectoryInfoHandler(RequestHandler): def handle(self, handler: ConnectionHandler): directory_id: str = handler.data["directory_id"] + password = handler.data.get("password") if not directory_id: handler.conclude_request(400, {}, "Directory ID is required") @@ -171,6 +189,12 @@ def handle(self, handler: ConnectionHandler): if not directory.check_access_requirements(user, access_type="read"): handler.conclude_request(403, {}, "Permission denied") return 403, directory_id, handler.username + + # Check password protection + protection_code, protection_msg = check_password_protection(directory, password, session) + if protection_code != 0: + handler.conclude_request(protection_code, {}, protection_msg) + return protection_code, directory_id, handler.username info_code = 0 ### generate access_rules text diff --git a/src/include/handlers/document.py b/src/include/handlers/document.py index 57191d2..8cfdd18 100644 --- a/src/include/handlers/document.py +++ b/src/include/handlers/document.py @@ -19,6 +19,7 @@ NoActiveRevisionsError, ) from include.database.models.file import File, FileTask +from include.handlers.protection import check_password_protection from include.util.rule.applying import apply_access_rules import include.system.messages as smsg @@ -79,7 +80,10 @@ class RequestGetDocumentInfoHandler(RequestHandler): data_schema = { "type": "object", - "properties": {"document_id": {"type": "string", "minLength": 1}}, + "properties": { + "document_id": {"type": "string", "minLength": 1}, + "password": {"type": "string"} + }, "required": ["document_id"], } @@ -88,6 +92,7 @@ class RequestGetDocumentInfoHandler(RequestHandler): def handle(self, handler: ConnectionHandler): document_id = handler.data.get("document_id") + password = handler.data.get("password") if not document_id: handler.conclude_request(400, {}, "Document ID is required") @@ -114,6 +119,12 @@ def handle(self, handler: ConnectionHandler): if not document.check_access_requirements(user, access_type="read"): handler.conclude_request(403, {}, "Permission denied") return 403, document_id, handler.username + + # Check password protection + protection_code, protection_msg = check_password_protection(document, password, session) + if protection_code != 0: + handler.conclude_request(protection_code, {}, protection_msg) + return protection_code, document_id, handler.username info_code = 0 ### generate access_rules text @@ -198,7 +209,10 @@ class RequestGetDocumentHandler(RequestHandler): data_schema = { "type": "object", - "properties": {"document_id": {"type": "string", "minLength": 1}}, + "properties": { + "document_id": {"type": "string", "minLength": 1}, + "password": {"type": "string"} + }, "required": ["document_id"], "additionalProperties": False, } @@ -207,6 +221,7 @@ class RequestGetDocumentHandler(RequestHandler): def handle(self, handler: ConnectionHandler): document_id: str = handler.data["document_id"] + password = handler.data.get("password") with Session() as session: user = session.get(User, handler.username) @@ -220,6 +235,12 @@ def handle(self, handler: ConnectionHandler): if not document.check_access_requirements(user): handler.conclude_request(403, {}, "Access denied to the document") return 403, document_id, handler.username + + # Check password protection + protection_code, protection_msg = check_password_protection(document, password, session) + if protection_code != 0: + handler.conclude_request(protection_code, {}, protection_msg) + return protection_code, document_id, handler.username try: latest_revision = document.get_latest_revision() diff --git a/src/include/handlers/protection.py b/src/include/handlers/protection.py new file mode 100644 index 0000000..1d457ed --- /dev/null +++ b/src/include/handlers/protection.py @@ -0,0 +1,317 @@ +""" +Handlers for password protection management. + +This module provides handlers for enabling, removing, and verifying +password protection on documents and directories. +""" + +from typing import Optional + +from include.classes.connection import ConnectionHandler +from include.classes.request import RequestHandler +from include.constants import TARGET_TYPE_MAPPING +from include.database.handler import Session +from include.database.models.classic import User +from include.database.models.entity import Document, Folder +from include.database.models.protection import ObjectProtection + +__all__ = [ + "RequestEnablePasswordProtectionHandler", + "RequestRemovePasswordProtectionHandler", + "RequestVerifyPasswordHandler", + "check_password_protection", +] + + +def check_password_protection(target, password: Optional[str], session) -> tuple[int, str]: + """ + Check password protection for a document or directory. + + This is a helper function to be used by request handlers to check + if an object has password protection and verify the password if provided. + + Args: + target: The Document or Folder object to check + password: The password provided by the user (None if not provided) + session: The database session + + Returns: + Tuple of (code, message): + - (0, "success") if access granted (no protection or correct password) + - (202, "Password required") if protected but no password provided + - (403, "Incorrect password") if protected and wrong password provided + """ + target_type = TARGET_TYPE_MAPPING[target.__tablename__] + + # Check if object has password protection + protection = ( + session.query(ObjectProtection) + .filter( + ObjectProtection.target_type == target_type, + ObjectProtection.target_id == target.id, + ObjectProtection.protection_type == "password" + ) + .first() + ) + + if not protection: + # No password protection, access granted + return (0, "success") + + if password is None: + # Password required but not provided + return (202, "Password required") + + # Verify the password + if protection.verify_password(password): + return (0, "success") + else: + return (403, "Incorrect password") + + +class RequestEnablePasswordProtectionHandler(RequestHandler): + """ + Enable password protection on a document or directory. + """ + + data_schema = { + "type": "object", + "properties": { + "target_type": { + "type": "string", + "enum": ["document", "directory"] + }, + "target_id": {"type": "string", "minLength": 1}, + "password": {"type": "string", "minLength": 1} + }, + "required": ["target_type", "target_id", "password"], + "additionalProperties": False + } + + require_auth = True + + def handle(self, handler: ConnectionHandler): + target_type: str = handler.data["target_type"] + target_id: str = handler.data["target_id"] + password: str = handler.data["password"] + + with Session() as session: + user = session.get(User, handler.username) + assert user is not None + + # Get the target object + if target_type == "document": + target = session.get(Document, target_id) + if not target: + handler.conclude_request(404, {}, "Document not found") + return 404, target_id, handler.username + + # Check if user has manage permissions + if not target.check_access_requirements(user, access_type="manage"): + handler.conclude_request(403, {}, "Access denied to manage this document") + return 403, target_id, handler.username + + elif target_type == "directory": + target = session.get(Folder, target_id) + if not target: + handler.conclude_request(404, {}, "Directory not found") + return 404, target_id, handler.username + + # Check if user has manage permissions + if not target.check_access_requirements(user, access_type="manage"): + handler.conclude_request(403, {}, "Access denied to manage this directory") + return 403, target_id, handler.username + else: + handler.conclude_request(400, {}, "Invalid target type") + return 400, target_id, handler.username + + # Check if protection already exists + existing_protection = ( + session.query(ObjectProtection) + .filter( + ObjectProtection.target_type == target_type, + ObjectProtection.target_id == target_id, + ObjectProtection.protection_type == "password" + ) + .first() + ) + + if existing_protection: + # Update existing protection + existing_protection.set_password(password) + session.commit() + handler.conclude_request( + 200, {}, "Password protection updated successfully" + ) + return 0, target_id, handler.username + else: + # Create new protection + protection = ObjectProtection( + target_type=target_type, + target_id=target_id + ) + protection.set_password(password) + session.add(protection) + session.commit() + + handler.conclude_request( + 200, {}, "Password protection enabled successfully" + ) + return 0, target_id, handler.username + + +class RequestRemovePasswordProtectionHandler(RequestHandler): + """ + Remove password protection from a document or directory. + """ + + data_schema = { + "type": "object", + "properties": { + "target_type": { + "type": "string", + "enum": ["document", "directory"] + }, + "target_id": {"type": "string", "minLength": 1} + }, + "required": ["target_type", "target_id"], + "additionalProperties": False + } + + require_auth = True + + def handle(self, handler: ConnectionHandler): + target_type: str = handler.data["target_type"] + target_id: str = handler.data["target_id"] + + with Session() as session: + user = session.get(User, handler.username) + assert user is not None + + # Get the target object + if target_type == "document": + target = session.get(Document, target_id) + if not target: + handler.conclude_request(404, {}, "Document not found") + return 404, target_id, handler.username + + # Check if user has manage permissions + if not target.check_access_requirements(user, access_type="manage"): + handler.conclude_request(403, {}, "Access denied to manage this document") + return 403, target_id, handler.username + + elif target_type == "directory": + target = session.get(Folder, target_id) + if not target: + handler.conclude_request(404, {}, "Directory not found") + return 404, target_id, handler.username + + # Check if user has manage permissions + if not target.check_access_requirements(user, access_type="manage"): + handler.conclude_request(403, {}, "Access denied to manage this directory") + return 403, target_id, handler.username + else: + handler.conclude_request(400, {}, "Invalid target type") + return 400, target_id, handler.username + + # Find and delete the protection + protection = ( + session.query(ObjectProtection) + .filter( + ObjectProtection.target_type == target_type, + ObjectProtection.target_id == target_id, + ObjectProtection.protection_type == "password" + ) + .first() + ) + + if not protection: + handler.conclude_request( + 404, {}, "Password protection not found for this object" + ) + return 404, target_id, handler.username + + session.delete(protection) + session.commit() + + handler.conclude_request( + 200, {}, "Password protection removed successfully" + ) + return 0, target_id, handler.username + + +class RequestVerifyPasswordHandler(RequestHandler): + """ + Verify a password for a protected document or directory. + This is useful for client-side validation before attempting access. + """ + + data_schema = { + "type": "object", + "properties": { + "target_type": { + "type": "string", + "enum": ["document", "directory"] + }, + "target_id": {"type": "string", "minLength": 1}, + "password": {"type": "string"} + }, + "required": ["target_type", "target_id", "password"], + "additionalProperties": False + } + + require_auth = True + + def handle(self, handler: ConnectionHandler): + target_type: str = handler.data["target_type"] + target_id: str = handler.data["target_id"] + password: str = handler.data["password"] + + with Session() as session: + user = session.get(User, handler.username) + assert user is not None + + # Get the target object + if target_type == "document": + target = session.get(Document, target_id) + if not target: + handler.conclude_request(404, {}, "Document not found") + return 404, target_id, handler.username + + elif target_type == "directory": + target = session.get(Folder, target_id) + if not target: + handler.conclude_request(404, {}, "Directory not found") + return 404, target_id, handler.username + else: + handler.conclude_request(400, {}, "Invalid target type") + return 400, target_id, handler.username + + # Find the protection + protection = ( + session.query(ObjectProtection) + .filter( + ObjectProtection.target_type == target_type, + ObjectProtection.target_id == target_id, + ObjectProtection.protection_type == "password" + ) + .first() + ) + + if not protection: + handler.conclude_request( + 404, {}, "Password protection not found for this object" + ) + return 404, target_id, handler.username + + # Verify the password + if protection.verify_password(password): + handler.conclude_request( + 200, {"verified": True}, "Password verified successfully" + ) + return 0, target_id, handler.username + else: + handler.conclude_request( + 403, {"verified": False}, "Incorrect password" + ) + return 403, target_id, handler.username diff --git a/src/main.py b/src/main.py index 09d26d8..3e50ea5 100644 --- a/src/main.py +++ b/src/main.py @@ -41,6 +41,7 @@ from include.database.models.classic import User, UserGroup from include.database.models.entity import Document, DocumentRevision from include.database.models.file import File +from include.database.models.protection import ObjectProtection from include.util.log import getCustomLogger diff --git a/tests/test_password_protection.py b/tests/test_password_protection.py new file mode 100644 index 0000000..1202c23 --- /dev/null +++ b/tests/test_password_protection.py @@ -0,0 +1,405 @@ +""" +Tests for password protection functionality. +""" + +import pytest +from tests.test_client import CFMSTestClient + + +class TestPasswordProtection: + """Test password protection on documents and directories.""" + + @pytest.mark.asyncio + async def test_enable_password_protection_on_document( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test enabling password protection on a document.""" + document_id = test_document["document_id"] + password = "test_password_123" + + # Enable password protection + response = await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 200, \ + f"Failed to enable password protection: {response.get('message', '')}" + + @pytest.mark.asyncio + async def test_access_password_protected_document_without_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test accessing a password-protected document without providing password.""" + document_id = test_document["document_id"] + password = "test_password_456" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Try to access without password - should return 202 + response = await authenticated_client.send_request( + action="get_document_info", + data={"document_id": document_id} + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 202, \ + f"Expected 202 (Password required), got {response.get('code')}: {response.get('message', '')}" + + @pytest.mark.asyncio + async def test_access_password_protected_document_with_wrong_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test accessing a password-protected document with incorrect password.""" + document_id = test_document["document_id"] + correct_password = "correct_password_789" + wrong_password = "wrong_password" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": correct_password + } + ) + + # Try to access with wrong password - should return 403 + response = await authenticated_client.send_request( + action="get_document_info", + data={ + "document_id": document_id, + "password": wrong_password + } + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 403, \ + f"Expected 403 (Incorrect password), got {response.get('code')}: {response.get('message', '')}" + + @pytest.mark.asyncio + async def test_access_password_protected_document_with_correct_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test accessing a password-protected document with correct password.""" + document_id = test_document["document_id"] + password = "correct_password_abc" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Access with correct password - should return 200 + response = await authenticated_client.send_request( + action="get_document_info", + data={ + "document_id": document_id, + "password": password + } + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 200, \ + f"Expected 200 (Success), got {response.get('code')}: {response.get('message', '')}" + assert "data" in response, "Response missing 'data'" + + @pytest.mark.asyncio + async def test_remove_password_protection_from_document( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test removing password protection from a document.""" + document_id = test_document["document_id"] + password = "temp_password_xyz" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Remove password protection + response = await authenticated_client.send_request( + action="remove_password_protection", + data={ + "target_type": "document", + "target_id": document_id + } + ) + + assert isinstance(response, dict), "Response should be a dictionary" + assert "code" in response, "Response missing 'code' field" + assert response["code"] == 200, \ + f"Failed to remove password protection: {response.get('message', '')}" + + # Now access without password should work (200) + response = await authenticated_client.send_request( + action="get_document_info", + data={"document_id": document_id} + ) + + assert response["code"] == 200, \ + f"Expected 200 after removing protection, got {response.get('code')}" + + @pytest.mark.asyncio + async def test_verify_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test password verification endpoint.""" + document_id = test_document["document_id"] + password = "verify_test_password" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Verify correct password + response = await authenticated_client.send_request( + action="verify_password", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + assert response["code"] == 200, \ + f"Expected 200 for correct password, got {response.get('code')}" + assert response.get("data", {}).get("verified") is True, \ + "Expected verified=True for correct password" + + # Verify incorrect password + response = await authenticated_client.send_request( + action="verify_password", + data={ + "target_type": "document", + "target_id": document_id, + "password": "wrong_password" + } + ) + + assert response["code"] == 403, \ + f"Expected 403 for incorrect password, got {response.get('code')}" + assert response.get("data", {}).get("verified") is False, \ + "Expected verified=False for incorrect password" + + @pytest.mark.asyncio + async def test_password_protection_on_directory( + self, authenticated_client: CFMSTestClient + ): + """Test password protection on directories.""" + # Create a test directory + create_response = await authenticated_client.create_directory("Test Protected Dir") + assert create_response["code"] == 200, "Failed to create test directory" + directory_id = create_response["data"]["id"] + + try: + password = "dir_password_123" + + # Enable password protection + response = await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "directory", + "target_id": directory_id, + "password": password + } + ) + assert response["code"] == 200, "Failed to enable password protection on directory" + + # Try to access without password - should return 202 + response = await authenticated_client.send_request( + action="get_directory_info", + data={"directory_id": directory_id} + ) + assert response["code"] == 202, \ + f"Expected 202 (Password required), got {response.get('code')}" + + # Access with correct password - should return 200 + response = await authenticated_client.send_request( + action="get_directory_info", + data={ + "directory_id": directory_id, + "password": password + } + ) + assert response["code"] == 200, \ + f"Expected 200 with correct password, got {response.get('code')}" + + finally: + # Cleanup + try: + await authenticated_client.delete_directory(directory_id) + except Exception: + pass + + @pytest.mark.asyncio + async def test_update_password( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test updating password on a protected document.""" + document_id = test_document["document_id"] + old_password = "old_password_123" + new_password = "new_password_456" + + # Enable password protection with old password + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": old_password + } + ) + + # Update to new password + response = await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": new_password + } + ) + assert response["code"] == 200, "Failed to update password" + + # Old password should no longer work + response = await authenticated_client.send_request( + action="get_document_info", + data={ + "document_id": document_id, + "password": old_password + } + ) + assert response["code"] == 403, \ + f"Old password should not work, got {response.get('code')}" + + # New password should work + response = await authenticated_client.send_request( + action="get_document_info", + data={ + "document_id": document_id, + "password": new_password + } + ) + assert response["code"] == 200, \ + f"New password should work, got {response.get('code')}" + + @pytest.mark.asyncio + async def test_password_protection_on_get_document( + self, authenticated_client: CFMSTestClient, test_document: dict + ): + """Test password protection on get_document action.""" + document_id = test_document["document_id"] + password = "get_doc_password" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "document", + "target_id": document_id, + "password": password + } + ) + + # Try get_document without password - should return 202 + response = await authenticated_client.send_request( + action="get_document", + data={"document_id": document_id} + ) + assert response["code"] == 202, \ + f"Expected 202 for get_document without password, got {response.get('code')}" + + # Try with password - should return 200 + response = await authenticated_client.send_request( + action="get_document", + data={ + "document_id": document_id, + "password": password + } + ) + assert response["code"] == 200, \ + f"Expected 200 for get_document with password, got {response.get('code')}" + + @pytest.mark.asyncio + async def test_password_protection_on_list_directory( + self, authenticated_client: CFMSTestClient + ): + """Test password protection on list_directory action.""" + # Create a test directory + create_response = await authenticated_client.create_directory("Test List Protected Dir") + assert create_response["code"] == 200, "Failed to create test directory" + directory_id = create_response["data"]["id"] + + try: + password = "list_dir_password" + + # Enable password protection + await authenticated_client.send_request( + action="enable_password_protection", + data={ + "target_type": "directory", + "target_id": directory_id, + "password": password + } + ) + + # Try list_directory without password - should return 202 + response = await authenticated_client.send_request( + action="list_directory", + data={"folder_id": directory_id} + ) + assert response["code"] == 202, \ + f"Expected 202 for list_directory without password, got {response.get('code')}" + + # Try with password - should return 200 + response = await authenticated_client.send_request( + action="list_directory", + data={ + "folder_id": directory_id, + "password": password + } + ) + assert response["code"] == 200, \ + f"Expected 200 for list_directory with password, got {response.get('code')}" + + finally: + # Cleanup + try: + await authenticated_client.delete_directory(directory_id) + except Exception: + pass