diff --git a/.github/workflows/README.md b/.github/workflows/README.md new file mode 100644 index 0000000..5253f34 --- /dev/null +++ b/.github/workflows/README.md @@ -0,0 +1,24 @@ +# GitHub Actions Workflows + +## test.yml - Automated Testing + +This workflow runs the pytest test suite automatically when: +- Code is pushed to any branch +- A pull request is opened or updated + +### What it does: +1. Sets up a Python environment (tests on Python 3.13 and 3.14) +2. Installs project dependencies and test requirements +3. Creates necessary directories for the server +4. Runs the full test suite with pytest +5. Uploads test results and logs as artifacts (retained for 7 days) + +### Configuration: +- **Timeout**: 10 minutes per test run +- **Matrix testing**: Tests across Python 3.13 and 3.14 +- **Artifacts**: Test cache and server logs are uploaded for debugging + +### Viewing Results: +- Check the "Actions" tab in the GitHub repository +- Test results will show pass/fail status for each Python version +- Download artifacts to review detailed logs if tests fail diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..85f415b --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,52 @@ +name: Run Tests + +on: + push: + branches: [ "**" ] + pull_request: + branches: [ "**" ] + +jobs: + test: + runs-on: ubuntu-latest + + strategy: + matrix: + python-version: ["3.13", "3.14"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Set up uv + run: | + python -m pip install --upgrade pip + pip install uv + + - name: Install dependencies + run: | + uv sync --dev + + - name: Create necessary directories + run: | + mkdir -p src/content/ssl + mkdir -p src/content/logs + + - name: Run tests with pytest + run: | + uv run pytest tests/ -v --tb=short + timeout-minutes: 10 + + - name: Upload test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: test-results-py${{ matrix.python-version }} + path: | + .pytest_cache/ + src/content/logs/ + retention-days: 7 diff --git a/.gitignore b/.gitignore index daf8d84..10a0f32 100644 --- a/.gitignore +++ b/.gitignore @@ -46,8 +46,9 @@ app*.db admin_password.txt config.toml -content/ssl/ -content/files/ -content/logs/* +src/content/ssl/ +src/content/files/ +src/content/logs/* -.idea \ No newline at end of file +.idea +uv.lock \ No newline at end of file diff --git a/.gitmodules b/.gitmodules index 5bb2acf..49bdb39 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ -[submodule "certtools"] - path = certtools +[submodule "src/certtools"] + path = src/certtools url = https://github.com/creeper19472/cfms_certtools diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..83d79f3 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,17 @@ +{ + // 使用 IntelliSense 了解相关属性。 + // 悬停以查看现有属性的描述。 + // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "name": "Python 调试程序: 当前文件", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/src" + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md index fefa2c8..4a5b145 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,23 @@ comments as the primary reference. [doc-url]: https://cfms-server-doc.readthedocs.io/zh_CN/latest +## Testing + +This repository includes an automated test suite built with pytest. To run the tests: + +```bash +# Install dependencies +uv sync --dev + +# Run all tests +uv run pytest + +# Run specific test files +uv run pytest tests/test_basic.py +``` + +For more information about the test suite, see [tests/README.md](tests/README.md). + ## Alpha Test This is a project that is under active development and we are looking diff --git a/include/__init__.py b/include/__init__.py deleted file mode 100644 index 00d68ac..0000000 --- a/include/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -""" -CFMS WebSocket Server - Core Include Package - -This package contains the core functionality for the CFMS WebSocket server, -including connection handling, database models, request handlers, and utilities. -""" diff --git a/include/database/models/__init__.py b/include/database/models/__init__.py deleted file mode 100644 index c55fb67..0000000 --- a/include/database/models/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -CFMS Database Models - -ORM models for users, groups, documents, files, and access control. -""" diff --git a/include/util/__init__.py b/include/util/__init__.py deleted file mode 100644 index ffefd75..0000000 --- a/include/util/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -""" -CFMS Utilities - -Utility functions for logging, user management, group management, -password validation, and audit logging. -""" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..3951681 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "cfms-on-websocket" +version = "0.1.0" +description = "The server-side program for CFMS on WebSocket, a WebSocket-based implementation of the CFMS protocol." +readme = "README.md" +requires-python = ">=3.14" +dependencies = [ + "cryptography>=46.0.3", + "filetype>=1.2.0", + "jsonschema>=4.25.1", + "pycryptodome>=3.23.0", + "pyjwt>=2.10.1", + "sqlalchemy>=2.0.44", + "tomlkit>=0.13.3", + "websockets>=15.0.1", +] + +[dependency-groups] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.21.0", +] diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..2829c6d --- /dev/null +++ b/pytest.ini @@ -0,0 +1,27 @@ +[tool:pytest] +# Pytest configuration +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* + +# Output settings +addopts = + -v + --tb=short + --strict-markers + --disable-warnings + +# Markers +markers = + slow: marks tests as slow (deselect with '-m "not slow"') + integration: marks tests as integration tests + unit: marks tests as unit tests + +# Timeout for tests (in seconds) +timeout = 300 + +# Coverage settings (if pytest-cov is installed) +# --cov=include +# --cov-report=html +# --cov-report=term-missing diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 7f1320c..0000000 --- a/requirements.txt +++ /dev/null @@ -1,8 +0,0 @@ -PyJwt -sqlalchemy -tomlkit -websockets -pycryptodome -cryptography -jsonschema -filetype \ No newline at end of file diff --git a/LICENSE b/src/LICENSE similarity index 100% rename from LICENSE rename to src/LICENSE diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/certtools b/src/certtools similarity index 100% rename from certtools rename to src/certtools diff --git a/config.sample.toml b/src/config.sample.toml similarity index 100% rename from config.sample.toml rename to src/config.sample.toml diff --git a/content/hello b/src/content/hello similarity index 100% rename from content/hello rename to src/content/hello diff --git a/src/include/__init__.py b/src/include/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/include/classes/__init__.py b/src/include/classes/__init__.py similarity index 100% rename from include/classes/__init__.py rename to src/include/classes/__init__.py diff --git a/include/classes/access_rule.py b/src/include/classes/access_rule.py similarity index 100% rename from include/classes/access_rule.py rename to src/include/classes/access_rule.py diff --git a/include/classes/auth.py b/src/include/classes/auth.py similarity index 100% rename from include/classes/auth.py rename to src/include/classes/auth.py diff --git a/include/classes/connection.py b/src/include/classes/connection.py similarity index 100% rename from include/classes/connection.py rename to src/include/classes/connection.py diff --git a/include/classes/exceptions.py b/src/include/classes/exceptions.py similarity index 100% rename from include/classes/exceptions.py rename to src/include/classes/exceptions.py diff --git a/include/classes/request.py b/src/include/classes/request.py similarity index 100% rename from include/classes/request.py rename to src/include/classes/request.py diff --git a/include/classes/version.py b/src/include/classes/version.py similarity index 100% rename from include/classes/version.py rename to src/include/classes/version.py diff --git a/include/conf_loader.py b/src/include/conf_loader.py similarity index 100% rename from include/conf_loader.py rename to src/include/conf_loader.py diff --git a/include/connection_handler.py b/src/include/connection_handler.py similarity index 100% rename from include/connection_handler.py rename to src/include/connection_handler.py diff --git a/include/constants.py b/src/include/constants.py similarity index 100% rename from include/constants.py rename to src/include/constants.py diff --git a/include/database/__init__.py b/src/include/database/__init__.py similarity index 100% rename from include/database/__init__.py rename to src/include/database/__init__.py diff --git a/include/database/handler.py b/src/include/database/handler.py similarity index 100% rename from include/database/handler.py rename to src/include/database/handler.py diff --git a/include/database/models/blocking.py b/src/include/database/models/blocking.py similarity index 100% rename from include/database/models/blocking.py rename to src/include/database/models/blocking.py diff --git a/include/database/models/classic.py b/src/include/database/models/classic.py similarity index 100% rename from include/database/models/classic.py rename to src/include/database/models/classic.py diff --git a/include/database/models/entity.py b/src/include/database/models/entity.py similarity index 100% rename from include/database/models/entity.py rename to src/include/database/models/entity.py diff --git a/include/database/models/file.py b/src/include/database/models/file.py similarity index 100% rename from include/database/models/file.py rename to src/include/database/models/file.py diff --git a/include/handlers/README b/src/include/handlers/README similarity index 100% rename from include/handlers/README rename to src/include/handlers/README diff --git a/include/handlers/__init__.py b/src/include/handlers/__init__.py similarity index 100% rename from include/handlers/__init__.py rename to src/include/handlers/__init__.py diff --git a/include/handlers/auth.py b/src/include/handlers/auth.py similarity index 100% rename from include/handlers/auth.py rename to src/include/handlers/auth.py diff --git a/include/handlers/directory.py b/src/include/handlers/directory.py similarity index 98% rename from include/handlers/directory.py rename to src/include/handlers/directory.py index 7307cae..d9abb63 100644 --- a/include/handlers/directory.py +++ b/src/include/handlers/directory.py @@ -34,6 +34,8 @@ class RequestListDirectoryHandler(RequestHandler): "additionalProperties": False, } + require_auth = True + def handle(self, handler: ConnectionHandler): # Parse the directory listing request @@ -41,11 +43,8 @@ def handle(self, handler: ConnectionHandler): with Session() as session: this_user = session.get(User, handler.username) - if not this_user or not this_user.is_token_valid(handler.token): - handler.conclude_request( - **{"code": 401, "message": "Invalid user or token", "data": {}} - ) - return 401, folder_id + assert this_user is not None + if not folder_id: parent = None children = ( @@ -277,6 +276,8 @@ class RequestCreateDirectoryHandler(RequestHandler): "required": ["name"], } + require_auth = True + def handle(self, handler: ConnectionHandler): # Parse the directory creation request @@ -289,11 +290,8 @@ def handle(self, handler: ConnectionHandler): with Session() as session: this_user = session.get(User, handler.username) - if not this_user or not this_user.is_token_valid(handler.token): - handler.conclude_request( - **{"code": 403, "message": "Invalid user or token", "data": {}} - ) - return 401, parent_id, handler.username + assert this_user is not None # require_auth ensures this + if parent_id: parent = session.get(Folder, parent_id) if not parent: diff --git a/include/handlers/document.py b/src/include/handlers/document.py similarity index 97% rename from include/handlers/document.py rename to src/include/handlers/document.py index 6367364..69bf050 100644 --- a/include/handlers/document.py +++ b/src/include/handlers/document.py @@ -54,7 +54,6 @@ def create_file_task(file: File, transfer_mode: int = 0): if not file: return None - now = time.time() task = FileTask( file_id=file.id, @@ -84,6 +83,8 @@ class RequestGetDocumentInfoHandler(RequestHandler): "required": ["document_id"], } + require_auth = True + def handle(self, handler: ConnectionHandler): document_id = handler.data.get("document_id") @@ -92,23 +93,23 @@ def handle(self, handler: ConnectionHandler): handler.conclude_request(400, {}, "Document ID is required") return - if not handler.username: - handler.conclude_request( - **{"code": 403, "message": "Authentication is required", "data": {}} - ) - return 401, document_id - with Session() as session: user = session.get(User, handler.username) - document = session.get(Document, document_id) + assert user is not None - if user is None or not user.is_token_valid(handler.token): - handler.conclude_request(403, {}, "Invalid user or token") - return 401, document_id + document = session.get(Document, document_id) if not document: handler.conclude_request(404, {}, "Document not found") return 404, document_id, handler.username + + try: + document.get_latest_revision() + except NoActiveRevisionsError: + handler.conclude_request( + 404, {}, "No active revisions found for this document" + ) + return 404, document_id, handler.username if not document.check_access_requirements(user, access_type="read"): handler.conclude_request(403, {}, "Permission denied") @@ -202,16 +203,15 @@ class RequestGetDocumentHandler(RequestHandler): "additionalProperties": False, } + require_auth = True + def handle(self, handler: ConnectionHandler): document_id: str = handler.data["document_id"] with Session() as session: user = session.get(User, handler.username) document = session.get(Document, document_id) - - if user is None or not user.is_token_valid(handler.token): - handler.conclude_request(403, {}, "Invalid user or token") - return 401, document_id + assert user is not None if not document: handler.conclude_request(404, {}, "Document not found") @@ -255,6 +255,8 @@ class RequestCreateDocumentHandler(RequestHandler): "additionalProperties": False, } + require_auth = True + def handle(self, handler: ConnectionHandler): folder_id: str = handler.data.get("folder_id", "") @@ -266,15 +268,12 @@ def handle(self, handler: ConnectionHandler): with Session() as session: user = session.get(User, handler.username) + assert user is not None if not document_title: handler.conclude_request(400, {}, "Document title is required") return - if not user or not user.is_token_valid(handler.token): - handler.conclude_request(403, {}, "Invalid user or token") - return 401, folder_id - # 由于之后的逻辑可能提前结束,必须在实质性操作发生前鉴权 if not "create_document" in user.all_permissions: handler.conclude_request(403, {}, "Permission denied") @@ -363,7 +362,9 @@ def handle(self, handler: ConnectionHandler): new_document_revision.file, transfer_mode=1 ) handler.conclude_request( - 200, {"task_data": task_data}, "Task successfully created" + 200, + {"document_id": new_document.id, "task_data": task_data}, + "Task successfully created", ) return 0, folder_id, {"title": document_title}, handler.username else: diff --git a/include/handlers/management/__init__.py b/src/include/handlers/management/__init__.py similarity index 100% rename from include/handlers/management/__init__.py rename to src/include/handlers/management/__init__.py diff --git a/include/handlers/management/access.py b/src/include/handlers/management/access.py similarity index 100% rename from include/handlers/management/access.py rename to src/include/handlers/management/access.py diff --git a/include/handlers/management/group.py b/src/include/handlers/management/group.py similarity index 95% rename from include/handlers/management/group.py rename to src/include/handlers/management/group.py index 5be7474..92d7054 100644 --- a/include/handlers/management/group.py +++ b/src/include/handlers/management/group.py @@ -20,16 +20,13 @@ class RequestListGroupsHandler(RequestHandler): data_schema = {"type": "object", "additionalProperties": False} + require_auth = True + def handle(self, handler: ConnectionHandler): with Session() as session: user = session.get(User, handler.username) # 执行操作的用户 - - if not user or not user.is_token_valid(handler.token): - handler.conclude_request( - **{"code": 403, "message": "Invalid user or token", "data": {}} - ) - return + assert user is not None if "list_groups" not in user.all_permissions: handler.conclude_request( @@ -85,16 +82,13 @@ class RequestCreateGroupHandler(RequestHandler): "additionalProperties": False, } + require_auth = True + def handle(self, handler: ConnectionHandler): with Session() as session: user = session.get(User, handler.username) - - if not user or not user.is_token_valid(handler.token): - handler.conclude_request( - **{"code": 403, "message": "Invalid user or token", "data": {}} - ) - return + assert user is not None # currently handle_create_group() will not judge whether the requesting # user is eligible to apply the given permissions for the new group. @@ -303,16 +297,13 @@ class RequestGetGroupInfoHandler(RequestHandler): "additionalProperties": False, } + require_auth = True + def handle(self, handler: ConnectionHandler): with Session() as session: user = session.get(User, handler.username) # 执行操作的用户 - - if not user or not user.is_token_valid(handler.token): - handler.conclude_request( - **{"code": 403, "message": "Invalid user or token", "data": {}} - ) - return + assert user is not None if not handler.data["group_name"]: handler.conclude_request( diff --git a/include/handlers/management/system.py b/src/include/handlers/management/system.py similarity index 100% rename from include/handlers/management/system.py rename to src/include/handlers/management/system.py diff --git a/include/handlers/management/user.py b/src/include/handlers/management/user.py similarity index 97% rename from include/handlers/management/user.py rename to src/include/handlers/management/user.py index 3c1f3bf..96451ac 100644 --- a/include/handlers/management/user.py +++ b/src/include/handlers/management/user.py @@ -30,14 +30,12 @@ class RequestListUsersHandler(RequestHandler): "additionalProperties": False, } + require_auth = True + def handle(self, handler: ConnectionHandler): with Session() as session: this_user = session.get(User, handler.username) - if not this_user or not this_user.is_token_valid(handler.token): - handler.conclude_request( - code=403, message="Invalid user or token", data={} - ) - return + assert this_user is not None if "list_users" not in this_user.all_permissions: handler.conclude_request( @@ -106,16 +104,13 @@ class RequestCreateUserHandler(RequestHandler): "additionalProperties": False, } + require_auth = True + def handle(self, handler: ConnectionHandler): with Session() as session: this_user = session.get(User, handler.username) - - if not this_user or not this_user.is_token_valid(handler.token): - handler.conclude_request( - **{"code": 403, "message": "Invalid user or token", "data": {}} - ) - return + assert this_user is not None # currently handle_create_user() will not judge whether the requesting # user is eligible to apply the given permissions for the new user. @@ -523,6 +518,8 @@ class RequestGetUserInfoHandler(RequestHandler): "additionalProperties": False, } + require_auth = True + def handle(self, handler: ConnectionHandler): user_to_get_username = handler.data["username"] if not user_to_get_username: @@ -533,11 +530,7 @@ def handle(self, handler: ConnectionHandler): with Session() as session: this_user = session.get(User, handler.username) - if not this_user or not this_user.is_token_valid(handler.token): - handler.conclude_request( - **{"code": 403, "message": "Invalid user or token", "data": {}} - ) - return + assert this_user is not None user_to_get = session.get(User, user_to_get_username) if not user_to_get: diff --git a/include/handlers/test.py b/src/include/handlers/test.py similarity index 100% rename from include/handlers/test.py rename to src/include/handlers/test.py diff --git a/include/shared.py b/src/include/shared.py similarity index 100% rename from include/shared.py rename to src/include/shared.py diff --git a/include/system/__init__.py b/src/include/system/__init__.py similarity index 100% rename from include/system/__init__.py rename to src/include/system/__init__.py diff --git a/include/system/messages.py b/src/include/system/messages.py similarity index 100% rename from include/system/messages.py rename to src/include/system/messages.py diff --git a/include/util/audit.py b/src/include/util/audit.py similarity index 100% rename from include/util/audit.py rename to src/include/util/audit.py diff --git a/include/util/group.py b/src/include/util/group.py similarity index 100% rename from include/util/group.py rename to src/include/util/group.py diff --git a/include/util/log.py b/src/include/util/log.py similarity index 100% rename from include/util/log.py rename to src/include/util/log.py diff --git a/include/util/pwd.py b/src/include/util/pwd.py similarity index 100% rename from include/util/pwd.py rename to src/include/util/pwd.py diff --git a/include/util/rule/__init__.py b/src/include/util/rule/__init__.py similarity index 100% rename from include/util/rule/__init__.py rename to src/include/util/rule/__init__.py diff --git a/include/util/rule/applying.py b/src/include/util/rule/applying.py similarity index 100% rename from include/util/rule/applying.py rename to src/include/util/rule/applying.py diff --git a/include/util/rule/validation.py b/src/include/util/rule/validation.py similarity index 100% rename from include/util/rule/validation.py rename to src/include/util/rule/validation.py diff --git a/include/util/user.py b/src/include/util/user.py similarity index 100% rename from include/util/user.py rename to src/include/util/user.py diff --git a/main.py b/src/main.py similarity index 96% rename from main.py rename to src/main.py index 51ddbbf..2d5ceca 100644 --- a/main.py +++ b/src/main.py @@ -56,6 +56,9 @@ def server_init(): if os.path.exists("./ssl_key.pem"): os.remove("./ssl_key.pem") + # Create database tables before inserting data + Base.metadata.create_all(engine) + from include.util.group import create_group create_group( @@ -105,7 +108,7 @@ def server_init(): ], ) with Session() as session: - init_file = File(id="init", path="./content/hello") + init_file = File(id="init", path="./content/hello", active=True) session.add(init_file) init_document = Document(id="hello", title="Hello World") @@ -227,12 +230,15 @@ def main(): # Always create tables that do not exist Base.metadata.create_all(engine) + # Determine socket family based on dualstack setting + socket_family = socket.AF_INET6 if global_config["server"]["dualstack_ipv6"] else socket.AF_INET + with serve( handle_connection, global_config["server"]["host"], global_config["server"]["port"], ssl=ssl_context, - family=socket.AF_INET6, + family=socket_family, dualstack_ipv6=global_config["server"]["dualstack_ipv6"], ) as server: logger.info( diff --git a/test.py b/src/test.py similarity index 100% rename from test.py rename to src/test.py diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..7917c36 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,211 @@ +# CFMS WebSocket Server - Test Suite + +This directory contains the automated test suite for the CFMS (Classified File Management System) WebSocket server. + +## Overview + +The test suite provides comprehensive coverage of the server's functionality, including: + +- **Basic Server Functionality**: Connection handling, server info, and error handling +- **Authentication**: Login, token management, and session handling +- **Document Management**: Create, read, update, delete operations for documents +- **Directory Management**: Directory listing, creation, and deletion +- **User Management**: User CRUD operations and permissions +- **Group Management**: Group CRUD operations and permission management + +## Prerequisites + +Before running the tests, ensure you have: + +1. Python 3.8 or higher installed +2. All project dependencies installed: + ```bash + pip install -r requirements.txt + pip install -r requirements-test.txt + ``` + +## Running Tests + +### Run All Tests + +```bash +pytest +``` + +### Run Specific Test Files + +```bash +# Test basic functionality +pytest tests/test_basic.py + +# Test document operations +pytest tests/test_documents.py + +# Test directory operations +pytest tests/test_directories.py + +# Test user management +pytest tests/test_users.py + +# Test group management +pytest tests/test_groups.py +``` + +### Run Specific Test Classes or Functions + +```bash +# Run a specific test class +pytest tests/test_basic.py::TestAuthentication + +# Run a specific test function +pytest tests/test_basic.py::TestAuthentication::test_login_success +``` + +### Run Tests with Verbose Output + +```bash +pytest -v +``` + +### Run Tests and Show Print Statements + +```bash +pytest -s +``` + +## Test Structure + +### Test Client (`test_client.py`) + +The `CFMSTestClient` class provides a convenient interface for interacting with the server during tests. It handles: + +- WebSocket connection management +- Request/response formatting +- Authentication token management +- Common API operations + +Example usage: + +```python +from tests.test_client import CFMSTestClient + +# Create and connect client +with CFMSTestClient() as client: + # Login + response = client.login("admin", "password") + + # Create a document + response = client.create_document("My Document") +``` + +### Fixtures (`conftest.py`) + +The test suite uses pytest fixtures for common setup: + +- `server_process`: Starts the server for testing +- `admin_credentials`: Provides admin login credentials +- `client`: Provides a connected test client +- `authenticated_client`: Provides an authenticated client +- `test_document`: Creates a test document (with cleanup) +- `test_user`: Creates a test user (with cleanup) +- `test_group`: Creates a test group (with cleanup) + +### Test Files + +Each test file focuses on a specific area of functionality: + +- `test_basic.py`: Server basics and authentication +- `test_documents.py`: Document operations +- `test_directories.py`: Directory operations +- `test_users.py`: User management +- `test_groups.py`: Group management + +## Test Coverage + +The test suite covers: + +✅ Server connection and basic info +✅ Authentication (login, token refresh, invalid credentials) +✅ Document CRUD operations +✅ Directory operations +✅ User management (create, read, delete) +✅ Group management (create, read, delete) +✅ Authorization checks (operations without authentication) +✅ Input validation (empty fields, invalid data) +✅ Error handling (nonexistent resources, duplicate entries) + +## Writing New Tests + +When adding new tests: + +1. Place them in the appropriate test file (or create a new one) +2. Use descriptive test names that explain what is being tested +3. Use fixtures for common setup and teardown +4. Clean up any resources created during tests +5. Assert on both success and failure cases +6. Document complex test scenarios + +Example: + +```python +def test_my_new_feature(authenticated_client: CFMSTestClient): + """Test description explaining what this test verifies.""" + # Arrange + data = {"key": "value"} + + # Act + response = authenticated_client.some_operation(data) + + # Assert + assert response["code"] == 200 + assert "expected_field" in response["data"] +``` + +## Continuous Integration + +These tests are designed to run in CI/CD pipelines. The test suite: + +- Automatically starts and stops the server +- Cleans up resources after each test +- Provides clear error messages for debugging +- Can run in isolation or as a full suite + +## Troubleshooting + +### Server Won't Start + +If tests fail because the server won't start: + +1. Check that `config.toml` exists (it will be created from `config.sample.toml`) +2. Ensure the SSL certificate directory exists: `mkdir -p content/ssl` +3. Check for port conflicts (default: 5104) + +### Tests Fail Intermittently + +If tests fail randomly: + +1. Increase the server startup wait time in `conftest.py` +2. Check for resource cleanup issues +3. Ensure tests are properly isolated + +### Authentication Errors + +If authentication tests fail: + +1. Verify `admin_password.txt` is being created +2. Check that the database is being initialized properly +3. Ensure the token is being properly stored and passed + +## Contributing + +When contributing tests: + +1. Follow the existing test structure and naming conventions +2. Ensure tests are independent and can run in any order +3. Add appropriate assertions for both success and error cases +4. Document any special setup or requirements +5. Run the full test suite before submitting changes + +## License + +These tests are part of the CFMS project and follow the same license as the main project. diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..af16737 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,5 @@ +""" +CFMS WebSocket Server Test Suite + +This package contains automated tests for the CFMS (Classified File Management System) WebSocket server. +""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..f703725 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,254 @@ +""" +Pytest configuration and fixtures for CFMS test suite. +""" + +import os +import pytest +import subprocess +import time +import signal +from typing import Generator + +from tests.test_client import CFMSTestClient + + +@pytest.fixture(scope="session") +def server_process() -> Generator[subprocess.Popen, None, None]: + """ + Start the CFMS server for testing and tear it down after tests complete. + + This fixture starts the server in a subprocess and waits for it to be ready. + After all tests complete, it gracefully shuts down the server. + """ + # Ensure config file exists in src/ directory (server runs from there) + src_config_file = "src/config.toml" + if not os.path.exists(src_config_file): + # Copy sample config if config doesn't exist + import shutil + shutil.copy("src/config.sample.toml", src_config_file) + + # Modify config for testing: disable password expiration + with open(src_config_file, "r", encoding='utf-8') as f: + config_content = f.read() + + # Disable password expiration for tests + config_content = config_content.replace( + "enable_passwd_force_expiration = true", + "enable_passwd_force_expiration = false" + ) + config_content = config_content.replace( + "require_passwd_enforcement_changes = true", + "require_passwd_enforcement_changes = false" + ) + config_content = config_content.replace( + "dualstack_ipv6 = true", + "dualstack_ipv6 = false" + ) + + with open(src_config_file, "w", encoding='utf-8') as f: + f.write(config_content) + + # Clean up any previous test artifacts (in src/ where server runs) + for artifact in ["init", "app.db", "admin_password.txt"]: + src_artifact = os.path.join("src", artifact) + if os.path.exists(src_artifact): + os.remove(src_artifact) + + # Ensure necessary directories exist in src/ (where server runs from) + os.makedirs("src/content/ssl", exist_ok=True) + os.makedirs("src/content/logs", exist_ok=True) + + # Start the server (run from src/ directory) + process = subprocess.Popen( + ["uv", "run", "python", "main.py"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + cwd=os.path.join(os.getcwd(), "src") + ) + + # Wait for server to be ready (give it time to initialize) + max_wait = 15 + wait_time = 0 + while wait_time < max_wait: + time.sleep(1) + wait_time += 1 + + # Check if process crashed + if process.poll() is not None: + stdout, stderr = process.communicate() + pytest.fail(f"Server failed to start.\nSTDOUT: {stdout}\nSTDERR: {stderr}") + + # Check if initialization is complete (admin_password.txt is in src/) + if os.path.exists("src/admin_password.txt"): + # Give it one more second to fully start + time.sleep(1) + break + + if not os.path.exists("src/admin_password.txt"): + process.terminate() + stdout, stderr = process.communicate() + pytest.fail(f"Server initialization timed out.\nSTDOUT: {stdout}\nSTDERR: {stderr}") + + yield process + + # Cleanup: terminate the server + try: + process.terminate() + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + + +@pytest.fixture(scope="session") +def admin_credentials(server_process) -> dict: + """ + Get admin credentials from the generated password file. + + Args: + server_process: The server process fixture (dependency to ensure server is started) + + Returns: + Dictionary with 'username' and 'password' keys + """ + # The server_process fixture has already started the server and waited + # for admin_password.txt to be created in src/, so we can just read it + password_file = "src/admin_password.txt" + + if not os.path.exists(password_file): + pytest.fail("Admin password file not found after server started") + + with open(password_file, "r", encoding="utf-8") as f: + password = f.read().strip() + + return { + "username": "admin", + "password": password + } + + +@pytest.fixture +def client(server_process) -> Generator[CFMSTestClient, None, None]: + """ + Provide a connected test client for each test. + + This fixture creates a new client instance and connects to the server. + After the test completes, it disconnects the client. + """ + client = CFMSTestClient() + # reconnect if needed + for _attempt in range(5): + try: + client.connect() + break + except (ConnectionRefusedError, TimeoutError): + if _attempt == 4: + raise + continue + + yield client + client.disconnect() + + +@pytest.fixture +def authenticated_client(client: CFMSTestClient, admin_credentials: dict) -> CFMSTestClient: + """ + Provide an authenticated test client with admin credentials. + + This fixture logs in with admin credentials and provides + a ready-to-use authenticated client. + """ + response = client.login(admin_credentials["username"], admin_credentials["password"]) + assert response["code"] == 200, f"Login failed: {response}" + return client + + +@pytest.fixture +def test_document(authenticated_client: CFMSTestClient) -> Generator[dict, None, None]: + """ + Create a test document and clean it up after the test. + + Yields: + Dictionary with document information + """ + response = authenticated_client.create_document("Test Document") + assert response["code"] == 200, f"Failed to create test document: {response}" + + document_id = response["data"]["document_id"] + task_id = response["data"]["task_data"]["task_id"] + + # upload the file + authenticated_client.upload_file_to_server( + task_id, + "./pyproject.toml" + ) + + yield { + "document_id": document_id, + "title": "Test Document" + } + + # Cleanup: delete the document + try: + authenticated_client.delete_document(document_id) + except Exception: + pass # Ignore cleanup errors + + +@pytest.fixture +def test_user(authenticated_client: CFMSTestClient) -> Generator[dict, None, None]: + """ + Create a test user and clean it up after the test. + + Yields: + Dictionary with user information + """ + username = f"test_user_{int(time.time())}" + password = "TestPassword123!" + + response = authenticated_client.create_user( + username=username, + password=password, + nickname="Test User" + ) + assert response["code"] == 200, f"Failed to create test user: {response}" + + yield { + "username": username, + "password": password, + "nickname": "Test User" + } + + # Cleanup: delete the user + try: + authenticated_client.delete_user(username) + except Exception: + pass # Ignore cleanup errors + + +@pytest.fixture +def test_group(authenticated_client: CFMSTestClient) -> Generator[dict, None, None]: + """ + Create a test group and clean it up after the test. + + Yields: + Dictionary with group information + """ + group_name = f"test_group_{int(time.time())}" + + response = authenticated_client.create_group( + group_name=group_name, + permissions=[] + ) + assert response["code"] == 200, f"Failed to create test group: {response}" + + yield { + "group_name": group_name + } + + # Cleanup: delete the group + try: + authenticated_client.send_request("delete_group", {"group_name": group_name}) + except Exception: + pass # Ignore cleanup errors diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 0000000..be2f58a --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,105 @@ +""" +Tests for basic server functionality and authentication. +""" + +import pytest +from tests.test_client import CFMSTestClient + + +class TestServerBasics: + """Test basic server functionality.""" + + def test_server_connection(self, client: CFMSTestClient): + """Test that we can connect to the server.""" + assert client.websocket is not None + assert client.websocket.protocol.state.name == "OPEN" + + def test_server_info(self, client: CFMSTestClient): + """Test getting server information.""" + response = client.server_info() + + assert response["code"] == 200 + assert "data" in response + assert "server_name" in response["data"] + assert "version" in response["data"] + assert "protocol_version" in response["data"] + + def test_unknown_action(self, client: CFMSTestClient): + """Test that unknown actions are handled properly.""" + response = client.send_request("nonexistent_action", include_auth=False) + + assert response["code"] == 400 + assert "Unknown action" in response["message"] + + +class TestAuthentication: + """Test authentication functionality.""" + + def test_login_success(self, client: CFMSTestClient, admin_credentials: dict): + """Test successful login.""" + response = client.login( + admin_credentials["username"], + admin_credentials["password"] + ) + + # For debugging + if response["code"] != 200: + print(f"Login response: {response}") + + assert response["code"] == 200 + assert "data" in response + assert "token" in response["data"] + assert client.token is not None + assert client.username == admin_credentials["username"] + + def test_login_invalid_credentials(self, client: CFMSTestClient): + """Test login with invalid credentials.""" + response = client.login("invalid_user", "invalid_password") + + assert response["code"] == 401 + assert "Invalid credentials" in response["message"] + + def test_login_missing_username(self, client: CFMSTestClient): + """Test login with missing username.""" + response = client.send_request("login", {"password": "test"}, include_auth=False) + + assert response["code"] == 400 + + def test_login_missing_password(self, client: CFMSTestClient): + """Test login with missing password.""" + response = client.send_request("login", {"username": "test"}, include_auth=False) + + assert response["code"] == 400 + + def test_refresh_token(self, authenticated_client: CFMSTestClient): + """Test token refresh.""" + old_token = authenticated_client.token + + response = authenticated_client.refresh_token() + + assert response["code"] == 200 + assert "token" in response["data"] + assert authenticated_client.token is not None + assert authenticated_client.token != old_token + + def test_authentication_required(self, client: CFMSTestClient): + """Test that protected endpoints require authentication.""" + response = client.send_request("list_users", include_auth=False) + + # Server returns 401 or 403 for missing authentication + assert response["code"] in [401, 403] + + def test_invalid_token(self, client: CFMSTestClient, admin_credentials: dict): + """Test request with invalid token.""" + # Login first to get a valid session structure + client.login(admin_credentials["username"], admin_credentials["password"]) + + # Now use an invalid token + response = client.send_request( + "list_users", + username=admin_credentials["username"], + token="invalid_token_12345" + ) + + assert response["code"] == 403 + assert "Invalid user or token" in response["message"] diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..3afcef2 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,611 @@ +""" +Test client for CFMS WebSocket Server. + +This module provides a reusable WebSocket client for testing the CFMS server. +""" + +import hashlib +import json +import mmap +import os +import ssl +import time +from typing import Any, Dict, Optional +from websockets.sync.client import connect, ClientConnection + + +def calculate_sha256(file_path: str) -> str: + """ + Calculate SHA256 hash of a file using memory-mapped I/O for efficiency. + + Uses memory-mapped files for faster hash calculation of large files. + + Args: + file_path: Path to the file to hash + + Returns: + Hexadecimal SHA256 hash string + """ + with open(file_path, "rb") as f: + # Use memory-mapped files to map directly to memory + mmapped_file = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) + return hashlib.sha256(mmapped_file).hexdigest() + + +class CFMSTestClient: + """ + A test client for the CFMS WebSocket server. + + This client provides convenient methods for connecting to the server, + sending requests, and receiving responses. It handles authentication + and connection management automatically. + """ + + def __init__(self, host: str = "localhost", port: int = 5104, use_ssl: bool = True): + """ + Initialize the test client. + + Args: + host: Server hostname + port: Server port + use_ssl: Whether to use SSL/TLS connection + """ + self.host = host + self.port = port + self.use_ssl = use_ssl + self.websocket: Optional[ClientConnection] = None + self.username: Optional[str] = None + self.token: Optional[str] = None + + def connect(self) -> None: + """ + Establish a WebSocket connection to the server. + """ + if self.websocket is not None: + return + + protocol = "wss" if self.use_ssl else "ws" + uri = f"{protocol}://{self.host}:{self.port}" + + if self.use_ssl: + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + else: + ssl_context = None + + self.websocket = connect(uri, ssl=ssl_context) + + def disconnect(self) -> None: + """ + Close the WebSocket connection. + """ + if self.websocket is not None: + self.websocket.close() + self.websocket = None + self.username = None + self.token = None + + def send_request( + self, + action: str, + data: Optional[Dict[str, Any]] = None, + username: Optional[str] = None, + token: Optional[str] = None, + include_auth: bool = True + ) -> Dict[str, Any]: + """ + Send a request to the server and receive the response. + + Args: + action: The action to perform + data: Optional data payload for the request + username: Optional username (defaults to stored username) + token: Optional token (defaults to stored token) + include_auth: Whether to include authentication credentials + + Returns: + The response from the server as a dictionary + """ + if self.websocket is None: + raise RuntimeError("Not connected to server. Call connect() first.") + + request = { + "action": action, + "data": data if data is not None else {} + } + + if include_auth: + request["username"] = username if username is not None else self.username + request["token"] = token if token is not None else self.token + + self.websocket.send(json.dumps(request, ensure_ascii=False)) + response_text = self.websocket.recv() + return json.loads(response_text) + + def login(self, username: str, password: str) -> Dict[str, Any]: + """ + Authenticate with the server. + + Args: + username: Username to authenticate with + password: Password for the user + + Returns: + The login response from the server + """ + response = self.send_request( + "login", + {"username": username, "password": password}, + include_auth=False + ) + + if response.get("code") == 200: + self.username = username + self.token = response.get("data", {}).get("token") + + return response + + def server_info(self) -> Dict[str, Any]: + """ + Get server information. + + Returns: + Server information including version and protocol version + """ + return self.send_request("server_info", include_auth=False) + + def refresh_token(self) -> Dict[str, Any]: + """ + Refresh the authentication token. + + Returns: + Response with new token + """ + response = self.send_request("refresh_token") + + if response.get("code") == 200: + self.token = response.get("data", {}).get("token") + + return response + + def get_document(self, document_id: str) -> Dict[str, Any]: + """ + Get a document by ID. + + Args: + document_id: The ID of the document to retrieve + + Returns: + The document data + """ + return self.send_request("get_document", {"document_id": document_id}) + + def create_document(self, title: str, folder_id: Optional[str] = None) -> Dict[str, Any]: + """ + Create a new document. + + Args: + title: Title of the document + folder_id: Optional folder ID to create the document in + + Returns: + Response with created document information + """ + data = {"title": title} + if folder_id is not None: + data["folder_id"] = folder_id + return self.send_request("create_document", data) + + def delete_document(self, document_id: str) -> Dict[str, Any]: + """ + Delete a document. + + Args: + document_id: The ID of the document to delete + + Returns: + Response indicating success or failure + """ + return self.send_request("delete_document", {"document_id": document_id}) + + def rename_document(self, document_id: str, new_title: str) -> Dict[str, Any]: + """ + Rename a document. + + Args: + document_id: The ID of the document to rename + new_title: The new title for the document + + Returns: + Response indicating success or failure + """ + return self.send_request("rename_document", { + "document_id": document_id, + "new_title": new_title + }) + + def get_document_info(self, document_id: str) -> Dict[str, Any]: + """ + Get information about a document. + + Args: + document_id: The ID of the document + + Returns: + Document information + """ + return self.send_request("get_document_info", {"document_id": document_id}) + + def list_directory(self, folder_id: Optional[str] = None) -> Dict[str, Any]: + """ + List contents of a directory. + + Args: + folder_id: The ID of the folder (None for root) + + Returns: + Directory listing + """ + data = {} + data["folder_id"] = folder_id + + return self.send_request("list_directory", data) + + def create_directory(self, name: str, parent_id: Optional[str] = None) -> Dict[str, Any]: + """ + Create a new directory. + + Args: + name: Name of the directory + parent_id: Optional parent directory ID + + Returns: + Response with created directory information + """ + data = {"name": name} + if parent_id is not None: + data["parent_id"] = parent_id + return self.send_request("create_directory", data) + + def delete_directory(self, folder_id: str) -> Dict[str, Any]: + """ + Delete a directory. + + Args: + folder_id: The ID of the folder to delete + + Returns: + Response indicating success or failure + """ + return self.send_request("delete_directory", {"folder_id": folder_id}) + + def create_user( + self, + username: str, + password: str, + nickname: Optional[str] = None, + groups: Optional[list] = None + ) -> Dict[str, Any]: + """ + Create a new user. + + Args: + username: Username for the new user + password: Password for the new user + nickname: Optional nickname + groups: Optional list of group assignments + + Returns: + Response with created user information + """ + data: dict[str, Any] = { + "username": username, + "password": password + } + if nickname is not None: + data["nickname"] = nickname + if groups is not None: + data["groups"] = groups + return self.send_request("create_user", data) + + def delete_user(self, username: str) -> Dict[str, Any]: + """ + Delete a user. + + Args: + username: Username of the user to delete + + Returns: + Response indicating success or failure + """ + return self.send_request("delete_user", {"username": username}) + + def get_user_info(self, username: str) -> Dict[str, Any]: + """ + Get information about a user. + + Args: + username: Username of the user + + Returns: + User information + """ + return self.send_request("get_user_info", {"username": username}) + + def list_users(self) -> Dict[str, Any]: + """ + List all users. + + Returns: + List of users + """ + return self.send_request("list_users", {}) + + def create_group(self, group_name: str, permissions: Optional[list] = None) -> Dict[str, Any]: + """ + Create a new user group. + + Args: + group_name: Name of the group + permissions: Optional list of permissions + + Returns: + Response with created group information + """ + data: dict[str, Any] = {"group_name": group_name} + if permissions is not None: + data["permissions"] = permissions + return self.send_request("create_group", data) + + def list_groups(self) -> Dict[str, Any]: + """ + List all user groups. + + Returns: + List of groups + """ + return self.send_request("list_groups", {}) + + def get_group_info(self, group_name: str) -> Dict[str, Any]: + """ + Get information about a group. + + Args: + group_name: Name of the group + + Returns: + Group information + """ + return self.send_request("get_group_info", {"group_name": group_name}) + + def upload_file_to_server( + self, task_id: str, file_path: str + ): + """ + Upload a file to the server over WebSocket connection. + + Args: + task_id: Server task ID for this upload + file_path: Local path to the file to upload + + Raises: + ValueError: If server response is invalid + RuntimeError: If upload is rejected by server + """ + + # Receive file metadata from the server + response = self.send_request( + "upload_file", + {"task_id": task_id}, + include_auth=True + ) + + if response["action"] != "transfer_file": + raise ValueError + + file_size = os.path.getsize(file_path) + sha256 = calculate_sha256(file_path) if file_size else None + + task_info = { + "action": "transfer_file", + "data": { + "sha256": sha256, + "file_size": file_size, + }, + } + + assert self.websocket + self.websocket.send(json.dumps(task_info, ensure_ascii=False)) + received_response = str(self.websocket.recv()) + + if received_response.startswith("ready"): + ready = True + elif received_response == "stop": + ready = False + else: + raise RuntimeError + + if ready: + + try: + chunk_size = int(received_response.split()[1]) + with open(file_path, "rb") as f: + while True: + chunk = f.read(chunk_size) + self.websocket.send(chunk) + + if not chunk or len(chunk) < chunk_size: + break + + # need to wait for server confirmation + server_response = json.loads(self.websocket.recv()) + + except Exception: + raise + + + # def receive_file_from_server( + # self, + # task_id: str, + # file_path: str, # filename: str | None = None + # ): + # """ + # Receives a file from the server over a websocket connection using AES encryption. + + # Steps: + # 1. Requests file metadata (SHA-256 hash, file size, chunk info) from the server. + # 2. Sends readiness acknowledgment to the server. + # 3. Receives encrypted file chunks, saves them temporarily. + # 4. Receives AES key and IV, decrypts all chunks, and writes the output file. + # 5. Deletes temporary chunk files. + # 6. Verifies the file size and SHA-256 hash. + # 7. Removes the output file if verification fails. + + # Args: + # client (ClientConnection): The websocket client connection. + # task_id (str): The identifier for the file transfer task. + # file_path (str): The path to save the received file. + + # Yields: + # Tuple[int, ...]: Progress updates at various stages. + + # Raises: + # ValueError: If the server response is invalid. + # FileSizeMismatchError: If the received file size does not match the expected size. + # FileHashMismatchError: If the received file hash does not match the expected hash. + # Exception: For other errors during transfer or decryption. + # """ + + # assert self.websocket + + # # Send the request for file metadata + # self.websocket.send( + # json.dumps( + # { + # "action": "download_file", + # "data": {"task_id": task_id}, + # }, + # ensure_ascii=False, + # ) + # ) + + # # Receive file metadata from the server + # response = json.loads(self.websocket.recv()) + # if response["action"] != "transfer_file": + # raise ValueError("Invalid action received for file transfer") + + # sha256 = response["data"].get("sha256") # SHA256 of original file + # file_size = response["data"].get("file_size") # Size of original file + # chunk_size = response["data"].get("chunk_size", 8192) # Chunk size + # total_chunks = response["data"].get("total_chunks") # Total chunks + + # self.websocket.send("ready") + + # downloading_path = FLET_APP_STORAGE_TEMP + "/downloading/" + task_id + # await aiofiles.os.makedirs(downloading_path, exist_ok=True) + + # if not file_size: + # async with aiofiles.open(file_path, "wb") as f: + # await f.truncate(0) + # return + + # try: + + # received_chunks = 0 + # iv: bytes = b"" + + # while received_chunks + 1 <= total_chunks: + # # Receive encrypted data from the server + + # data = await self.recv() + # if not data: + # raise ValueError("Received empty data from server") + + # data_json: dict = json.loads(data) + + # index = data_json["data"].get("index") + # if index == 0: + # iv = base64.b64decode(data_json["data"].get("iv")) + # chunk_hash = data_json["data"].get("hash") # provided but unused + # chunk_data = base64.b64decode(data_json["data"].get("chunk")) + # chunk_file_path = os.path.join(downloading_path, str(index)) + + # async with aiofiles.open(chunk_file_path, "wb") as chunk_file: + # await chunk_file.write(chunk_data) + + # received_chunks += 1 + + # if received_chunks < total_chunks: + # received_file_size = chunk_size * received_chunks + # else: + # received_file_size = file_size + + # yield 0, received_file_size, file_size + + # # Get decryption information + # decrypted_data = await self.recv() + # decrypted_data_json: dict = json.loads(decrypted_data) + + # aes_key = base64.b64decode(decrypted_data_json["data"].get("key")) + + # # Decrypt chunks + # decrypted_chunks = 1 + # cipher = AES.new(aes_key, AES.MODE_CFB, iv=iv) # Initialize cipher + + # async with aiofiles.open(file_path, "wb") as out_file: + # while decrypted_chunks <= total_chunks: + # yield 1, decrypted_chunks, total_chunks + + # chunk_file_path = os.path.join( + # downloading_path, str(decrypted_chunks - 1) + # ) + + # async with aiofiles.open(chunk_file_path, "rb") as chunk_file: + # encrypted_chunk = await chunk_file.read() + # decrypted_chunk = cipher.decrypt(encrypted_chunk) + # await out_file.write(decrypted_chunk) + + # # os.remove(chunk_file_path) + # decrypted_chunks += 1 + + # # Delete temporary folder + # yield 2, + + # await asyncio.get_event_loop().run_in_executor( + # None, shutil.rmtree, downloading_path + # ) + + # except Exception: + # raise + + # # Verify file + + # async def _action_verify() -> None: + + # if file_size != await aiofiles.os.path.getsize(file_path): + # raise FileSizeMismatchError( + # file_size, await aiofiles.os.path.getsize(file_path) + # ) + + # # Verify SHA256 + # actual_sha256 = await calculate_sha256(file_path) + # if sha256 and actual_sha256 != sha256: + # raise FileHashMismatchError(sha256, actual_sha256) + + # yield 3, + + # try: + # await _action_verify() + # except Exception: + # await aiofiles.os.remove(file_path) + # raise + + def __enter__(self): + """Context manager entry.""" + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.disconnect() diff --git a/tests/test_directories.py b/tests/test_directories.py new file mode 100644 index 0000000..709fd45 --- /dev/null +++ b/tests/test_directories.py @@ -0,0 +1,123 @@ +""" +Tests for directory management operations. +""" + +import pytest +from tests.test_client import CFMSTestClient + + +class TestDirectoryOperations: + """Test directory operations.""" + + def test_list_directory_root(self, authenticated_client: CFMSTestClient): + """Test listing the root directory.""" + response = authenticated_client.list_directory() + + assert response["code"] == 200 + assert "data" in response + + def test_create_directory(self, authenticated_client: CFMSTestClient): + """Test creating a new directory.""" + dir_name = "Test Directory" + response = authenticated_client.create_directory(dir_name) + + # Directory creation might succeed or fail based on permissions + # We just check the response is valid + assert "code" in response + assert "data" in response + + if response["code"] == 200: + # Cleanup if created successfully + directory_id = response["data"].get("id") + if directory_id: + try: + authenticated_client.delete_directory(directory_id) + except Exception: + pass + + def test_create_directory_with_empty_name(self, authenticated_client: CFMSTestClient): + """Test creating a directory with an empty name.""" + response = authenticated_client.create_directory("") + + # Should fail validation + assert response["code"] == 400 + + def test_delete_directory(self, authenticated_client: CFMSTestClient): + """Test deleting a directory.""" + # First create a directory + create_response = authenticated_client.create_directory("Directory to Delete") + + if create_response["code"] == 200: + directory_id = create_response["data"]["id"] + + # Delete it + delete_response = authenticated_client.delete_directory(directory_id) + + # Should get a response (success or failure is implementation-dependent) + assert "code" in delete_response + + def test_delete_nonexistent_directory(self, authenticated_client: CFMSTestClient): + """Test deleting a directory that doesn't exist.""" + response = authenticated_client.delete_directory("nonexistent_folder_id") + + assert response["code"] != 200 + + def test_list_directory_contents(self, authenticated_client: CFMSTestClient): + """Test listing directory contents after creating items.""" + # Create a test directory + dir_response = authenticated_client.create_directory("Test List Dir") + + if dir_response["code"] == 200: + directory_id = dir_response["data"]["id"] + + try: + # Create a document in the directory + doc_response = authenticated_client.create_document( + "Test Doc in Dir", + folder_id=directory_id + ) + + if doc_response["code"] == 200: + # List the directory + list_response = authenticated_client.list_directory(directory_id) + + assert list_response["code"] == 200 + assert "data" in list_response + + # Cleanup document + try: + authenticated_client.delete_document( + doc_response["data"]["document_id"] + ) + except Exception: + pass + finally: + # Cleanup directory + try: + authenticated_client.delete_directory(directory_id) + except Exception: + pass + + +class TestDirectoryWithoutAuth: + """Test that directory operations require authentication.""" + + def test_list_directory_without_auth(self, client: CFMSTestClient): + """Test that listing directories requires authentication.""" + response = client.send_request( + "list_directory", + {"folder_id": None}, + include_auth=False + ) + + assert response["code"] == 401 + + def test_create_directory_without_auth(self, client: CFMSTestClient): + """Test that creating a directory requires authentication.""" + response = client.send_request( + "create_directory", + {"name": "Test"}, + include_auth=False + ) + + assert response["code"] == 401 diff --git a/tests/test_documents.py b/tests/test_documents.py new file mode 100644 index 0000000..1118113 --- /dev/null +++ b/tests/test_documents.py @@ -0,0 +1,133 @@ +""" +Tests for document management operations. +""" + +import pytest +from tests.test_client import CFMSTestClient + + +class TestDocumentOperations: + """Test document CRUD operations.""" + + def test_create_document(self, authenticated_client: CFMSTestClient): + """Test creating a new document.""" + response = authenticated_client.create_document("Test Document") + + assert response["code"] == 200 + assert "data" in response + assert "document_id" in response["data"] + + # Cleanup + document_id = response["data"]["document_id"] + authenticated_client.delete_document(document_id) + + def test_get_document(self, authenticated_client: CFMSTestClient, test_document: dict): + """Test retrieving a document.""" + response = authenticated_client.get_document(test_document["document_id"]) + + assert response["code"] == 200 + assert "data" in response + + def test_get_nonexistent_document(self, authenticated_client: CFMSTestClient): + """Test retrieving a document that doesn't exist.""" + response = authenticated_client.get_document("nonexistent_doc_id") + + assert response["code"] != 200 + + def test_get_document_info(self, authenticated_client: CFMSTestClient, test_document: dict): + """Test getting document information.""" + response = authenticated_client.get_document_info(test_document["document_id"]) + + assert response["code"] == 200 + assert "data" in response + + def test_rename_document(self, authenticated_client: CFMSTestClient, test_document: dict): + """Test renaming a document.""" + new_title = "Renamed Test Document" + response = authenticated_client.rename_document( + test_document["document_id"], + new_title + ) + + assert response["code"] == 200 + + # Verify the rename + info_response = authenticated_client.get_document_info(test_document["document_id"]) + assert info_response["code"] == 200 + assert info_response["data"]["title"] == new_title + + def test_delete_document(self, authenticated_client: CFMSTestClient): + """Test deleting a document.""" + # Create a document + create_response = authenticated_client.create_document("Document to Delete") + assert create_response["code"] == 200 + document_id = create_response["data"]["document_id"] + + # Delete it + delete_response = authenticated_client.delete_document(document_id) + assert delete_response["code"] == 200 + + # Verify it's gone + get_response = authenticated_client.get_document(document_id) + assert get_response["code"] != 200 + + def test_create_document_with_empty_title(self, authenticated_client: CFMSTestClient): + """Test creating a document with an empty title.""" + response = authenticated_client.create_document("") + + # Should fail validation + assert response["code"] == 400 + + def test_create_multiple_documents(self, authenticated_client: CFMSTestClient): + """Test creating multiple documents.""" + document_ids = [] + + try: + for i in range(3): + response = authenticated_client.create_document(f"Test Document {i}") + assert response["code"] == 200 + + # upload file to activate the document + task_id = response["data"]["task_data"]["task_id"] + authenticated_client.upload_file_to_server( + task_id, + "./pyproject.toml" + ) + + document_ids.append(response["data"]["document_id"]) + + # Verify all documents exist + for doc_id in document_ids: + response = authenticated_client.get_document_info(doc_id) + assert response["code"] == 200 + finally: + # Cleanup + for doc_id in document_ids: + try: + authenticated_client.delete_document(doc_id) + except Exception: + pass + + +class TestDocumentWithoutAuth: + """Test that document operations require authentication.""" + + def test_create_document_without_auth(self, client: CFMSTestClient): + """Test that creating a document requires authentication.""" + response = client.send_request( + "create_document", + {"title": "Test"}, + include_auth=False + ) + + assert response["code"] == 401 + + def test_get_document_without_auth(self, client: CFMSTestClient): + """Test that getting a document requires authentication.""" + response = client.send_request( + "get_document", + {"document_id": "hello"}, + include_auth=False + ) + + assert response["code"] == 401 diff --git a/tests/test_groups.py b/tests/test_groups.py new file mode 100644 index 0000000..0914494 --- /dev/null +++ b/tests/test_groups.py @@ -0,0 +1,156 @@ +""" +Tests for group management operations. +""" + +import pytest +import time +from tests.test_client import CFMSTestClient + + +class TestGroupOperations: + """Test group management operations.""" + + def test_list_groups(self, authenticated_client: CFMSTestClient): + """Test listing all groups.""" + response = authenticated_client.list_groups() + + assert response["code"] == 200 + assert "data" in response + assert "groups" in response["data"] + assert isinstance(response["data"]["groups"], list) + + # Should have at least the default groups (sysop, user) + group_names = [group["name"] for group in response["data"]["groups"]] + assert "sysop" in group_names + assert "user" in group_names + + def test_create_group(self, authenticated_client: CFMSTestClient): + """Test creating a new group.""" + group_name = f"test_group_{int(time.time())}" + + response = authenticated_client.create_group( + group_name=group_name, + permissions=[] + ) + + assert response["code"] == 200 + + # Cleanup + try: + authenticated_client.send_request("delete_group", {"group_name": group_name}) + except Exception: + pass + + def test_get_group_info(self, authenticated_client: CFMSTestClient, test_group: dict): + """Test getting group information.""" + response = authenticated_client.get_group_info(test_group["group_name"]) + + assert response["code"] == 200 + assert "data" in response + assert response["data"]["name"] == test_group["group_name"] + + def test_get_sysop_group_info(self, authenticated_client: CFMSTestClient): + """Test getting information for the sysop group.""" + response = authenticated_client.get_group_info("sysop") + + assert response["code"] == 200 + assert "data" in response + assert response["data"]["name"] == "sysop" + assert "permissions" in response["data"] + + def test_get_nonexistent_group_info(self, authenticated_client: CFMSTestClient): + """Test getting info for a group that doesn't exist.""" + response = authenticated_client.get_group_info("nonexistent_group_12345") + + assert response["code"] != 200 + + # def test_create_group_with_permissions(self, authenticated_client: CFMSTestClient): + # """Test creating a group with specific permissions.""" + # group_name = f"perm_group_{int(time.time())}" + # permissions = [ + # {"permission": "create_document", "start_time": 0, "end_time": None} + # ] + + # response = authenticated_client.create_group( + # group_name=group_name, + # permissions=permissions + # ) + + # assert response["code"] == 200 + + # # Verify the group has the permissions + # info_response = authenticated_client.get_group_info(group_name) + # if info_response["code"] == 200: + # assert "permissions" in info_response["data"] + + # # Cleanup + # try: + # authenticated_client.send_request("delete_group", {"group_name": group_name}) + # except Exception: + # pass + + def test_create_group_with_empty_name(self, authenticated_client: CFMSTestClient): + """Test creating a group with an empty name.""" + response = authenticated_client.create_group("") + + # Should fail validation + assert response["code"] == 400 + + def test_create_duplicate_group(self, authenticated_client: CFMSTestClient, test_group: dict): + """Test creating a group with a duplicate name.""" + response = authenticated_client.create_group(test_group["group_name"]) + + # Should fail due to duplicate name + assert response["code"] != 200 + + def test_delete_group(self, authenticated_client: CFMSTestClient): + """Test deleting a group.""" + # Create a group + group_name = f"group_to_delete_{int(time.time())}" + create_response = authenticated_client.create_group(group_name) + assert create_response["code"] == 200 + + # Delete it + delete_response = authenticated_client.send_request( + "delete_group", + {"group_name": group_name} + ) + assert delete_response["code"] == 200 + + # Verify it's gone + info_response = authenticated_client.get_group_info(group_name) + assert info_response["code"] != 200 + + +class TestGroupWithoutAuth: + """Test that group operations require authentication.""" + + def test_list_groups_without_auth(self, client: CFMSTestClient): + """Test that listing groups requires authentication.""" + response = client.send_request( + "list_groups", + {}, + include_auth=False + ) + + assert response["code"] == 401 + + def test_create_group_without_auth(self, client: CFMSTestClient): + """Test that creating a group requires authentication.""" + response = client.send_request( + "create_group", + {"group_name": "testgroup"}, + include_auth=False + ) + + assert response["code"] == 401 + + def test_get_group_info_without_auth(self, client: CFMSTestClient): + """Test that getting group info requires authentication.""" + response = client.send_request( + "get_group_info", + {"group_name": "sysop"}, + include_auth=False + ) + + assert response["code"] == 401 diff --git a/tests/test_users.py b/tests/test_users.py new file mode 100644 index 0000000..4ab4c1d --- /dev/null +++ b/tests/test_users.py @@ -0,0 +1,153 @@ +""" +Tests for user management operations. +""" + +import pytest +import time +from tests.test_client import CFMSTestClient + + +class TestUserOperations: + """Test user management operations.""" + + def test_list_users(self, authenticated_client: CFMSTestClient): + """Test listing all users.""" + response = authenticated_client.list_users() + + assert response["code"] == 200 + assert "data" in response + assert "users" in response["data"] + assert isinstance(response["data"]["users"], list) + + # Should at least have the admin user + usernames = [user["username"] for user in response["data"]["users"]] + assert "admin" in usernames + + def test_create_user(self, authenticated_client: CFMSTestClient): + """Test creating a new user.""" + username = f"test_user_{int(time.time())}" + password = "TestPassword123!" + + response = authenticated_client.create_user( + username=username, + password=password, + nickname="Test User" + ) + + assert response["code"] == 200 + + # Cleanup + try: + authenticated_client.delete_user(username) + except Exception: + pass + + def test_get_user_info(self, authenticated_client: CFMSTestClient, test_user: dict): + """Test getting user information.""" + response = authenticated_client.get_user_info(test_user["username"]) + + assert response["code"] == 200 + assert "data" in response + assert response["data"]["username"] == test_user["username"] + + def test_get_nonexistent_user_info(self, authenticated_client: CFMSTestClient): + """Test getting info for a user that doesn't exist.""" + response = authenticated_client.get_user_info("nonexistent_user_12345") + + assert response["code"] != 200 + + def test_delete_user(self, authenticated_client: CFMSTestClient): + """Test deleting a user.""" + # Create a user + username = f"user_to_delete_{int(time.time())}" + create_response = authenticated_client.create_user( + username=username, + password="TestPassword123!" + ) + assert create_response["code"] == 200 + + # Delete it + delete_response = authenticated_client.delete_user(username) + assert delete_response["code"] == 200 + + # Verify it's gone + info_response = authenticated_client.get_user_info(username) + assert info_response["code"] != 200 + + # def test_create_user_with_weak_password(self, authenticated_client: CFMSTestClient): + # """Test creating a user with a weak password.""" + # username = f"weak_pwd_user_{int(time.time())}" + # weak_password = "weak" + + # response = authenticated_client.create_user( + # username=username, + # password=weak_password + # ) + + # # Should fail due to password requirements + # assert response["code"] != 200 + + def test_create_user_with_duplicate_username(self, authenticated_client: CFMSTestClient, test_user: dict): + """Test creating a user with a duplicate username.""" + response = authenticated_client.create_user( + username=test_user["username"], + password="AnotherPassword123!" + ) + + # Should fail due to duplicate username + assert response["code"] != 200 + + def test_create_user_with_empty_username(self, authenticated_client: CFMSTestClient): + """Test creating a user with an empty username.""" + response = authenticated_client.create_user( + username="", + password="TestPassword123!" + ) + + # Should fail validation + assert response["code"] == 400 + + def test_get_admin_user_info(self, authenticated_client: CFMSTestClient): + """Test getting admin user information.""" + response = authenticated_client.get_user_info("admin") + + assert response["code"] == 200 + assert "data" in response + assert response["data"]["username"] == "admin" + + +class TestUserWithoutAuth: + """Test that user operations require authentication.""" + + def test_list_users_without_auth(self, client: CFMSTestClient): + """Test that listing users requires authentication.""" + response = client.send_request( + "list_users", + {}, + include_auth=False + ) + + assert response["code"] == 401 + + def test_create_user_without_auth(self, client: CFMSTestClient): + """Test that creating a user requires authentication.""" + response = client.send_request( + "create_user", + { + "username": "testuser", + "password": "TestPassword123!" + }, + include_auth=False + ) + + assert response["code"] == 401 + + def test_get_user_info_without_auth(self, client: CFMSTestClient): + """Test that getting user info requires authentication.""" + response = client.send_request( + "get_user_info", + {"username": "admin"}, + include_auth=False + ) + + assert response["code"] == 401