diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index cfb2d539..9f4ca8e4 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -13,6 +13,10 @@ concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true +permissions: + contents: read + pull-requests: read + jobs: build: if: | diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index de488e51..76eae45f 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -6,6 +6,10 @@ on: tags: - "v*.*.*" +permissions: + contents: write + id-token: write + jobs: build: runs-on: ubuntu-latest diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 38cf23ef..72f7f701 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -13,6 +13,10 @@ concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true +permissions: + contents: read + pull-requests: read + jobs: build: if: | diff --git a/.gitignore b/.gitignore index 439e736f..42ce7a49 100644 --- a/.gitignore +++ b/.gitignore @@ -147,6 +147,7 @@ venv.bak/ dmypy.json .DS_Store +.vibe/ data storage config/ diff --git a/supernote/cli/admin.py b/supernote/cli/admin.py index acc6682b..44005000 100644 --- a/supernote/cli/admin.py +++ b/supernote/cli/admin.py @@ -7,6 +7,7 @@ import getpass import hashlib import sys +import warnings from supernote.client.admin import AdminClient from supernote.client.exceptions import ApiException @@ -14,6 +15,16 @@ from .client import create_session +def _warn_md5_usage() -> None: + """Warn about MD5 usage for security awareness.""" + warnings.warn( + "MD5 is used for Supernote protocol compatibility but is cryptographically weak. " + "This is required for device compatibility, not a security best practice.", + UserWarning, + stacklevel=3, + ) + + async def add_user_async( url: str, email: str, password: str, display_name: str | None = None ) -> None: @@ -24,6 +35,7 @@ async def add_user_async( admin_client = AdminClient(session.client) # Hash password + _warn_md5_usage() password_md5 = hashlib.md5(password.encode()).hexdigest() # Try Public Registration @@ -110,6 +122,7 @@ async def reset_password_async(url: str, email: str, password: str) -> None: print(f"Attempting to reset password for '{email}' on {url}...") admin_client = AdminClient(session.client) + _warn_md5_usage() password_md5 = hashlib.md5(password.encode()).hexdigest() try: diff --git a/supernote/client/hashing.py b/supernote/client/hashing.py index d7bcf327..b7f30c5f 100644 --- a/supernote/client/hashing.py +++ b/supernote/client/hashing.py @@ -1,18 +1,40 @@ -"""Module for hashing utilities.""" +"""Module for hashing utilities. + +Security Note: +This module implements the Supernote protocol's password hashing scheme which uses MD5. +MD5 is considered cryptographically broken and unsuitable for security purposes. +However, it is required for compatibility with official Supernote devices. +The protocol uses: MD5(password) + SHA256(MD5(password) + salt) +""" import hashlib import logging +import warnings logger = logging.getLogger(__name__) +def _warn_md5_usage() -> None: + """Warn about MD5 usage for security awareness.""" + warnings.warn( + "MD5 is used for Supernote protocol compatibility but is cryptographically weak. " + "This is required for device compatibility, not a security best practice.", + UserWarning, + stacklevel=3, + ) + + def _sha256_string(s: str) -> str: """Return SHA256 hex digest of a string.""" return hashlib.sha256(s.encode("utf-8")).hexdigest() def _md5_string(s: str) -> str: - """Return MD5 hex digest of a string.""" + """Return MD5 hex digest of a string. + + Security Warning: MD5 is cryptographically broken but required for Supernote protocol compatibility. + """ + _warn_md5_usage() return hashlib.md5(s.encode("utf-8")).hexdigest() diff --git a/supernote/server/config.py b/supernote/server/config.py index 37c2cdc7..09d240a2 100644 --- a/supernote/server/config.py +++ b/supernote/server/config.py @@ -330,7 +330,7 @@ def load( if gemini_api_key := os.getenv("SUPERNOTE_GEMINI_API_KEY"): config.gemini_api_key = gemini_api_key - logger.info("Using SUPERNOTE_GEMINI_API_KEY") + logger.info("Using SUPERNOTE_GEMINI_API_KEY: ***") if gemini_ocr_model := os.getenv("SUPERNOTE_GEMINI_OCR_MODEL"): config.gemini_ocr_model = gemini_ocr_model @@ -367,7 +367,7 @@ def load( if mistral_api_key := os.getenv("SUPERNOTE_MISTRAL_API_KEY"): config.mistral_api_key = mistral_api_key - logger.info("Using SUPERNOTE_MISTRAL_API_KEY") + logger.info("Using SUPERNOTE_MISTRAL_API_KEY: ***") if mistral_ocr_model := os.getenv("SUPERNOTE_MISTRAL_OCR_MODEL"): config.mistral_ocr_model = mistral_ocr_model diff --git a/supernote/server/services/mistral.py b/supernote/server/services/mistral.py index 64c14cdb..3fdb8855 100644 --- a/supernote/server/services/mistral.py +++ b/supernote/server/services/mistral.py @@ -105,7 +105,8 @@ async def generate_json(self, prompt: str, schema: dict[str, Any]) -> str: ), response_format={"type": "json_object"}, ) - content = response.choices[0].message.content if response.choices else "" + message = response.choices[0].message if response.choices else None + content = message.content if message else "" return ( content if isinstance(content, str) diff --git a/supernote/server/static/index.html b/supernote/server/static/index.html index 8a5a3707..26058373 100644 --- a/supernote/server/static/index.html +++ b/supernote/server/static/index.html @@ -14,7 +14,7 @@ if (dark) document.documentElement.classList.add('dark'); })(); - + diff --git a/tests/cli/test_admin.py b/tests/cli/test_admin.py index 4f8445fc..02ba6de4 100644 --- a/tests/cli/test_admin.py +++ b/tests/cli/test_admin.py @@ -4,7 +4,11 @@ from contextlib import asynccontextmanager from unittest.mock import AsyncMock, MagicMock, patch -from supernote.cli.admin import list_users_async +from supernote.cli.admin import ( + add_user_async, + list_users_async, + reset_password_async, +) async def test_list_users_async_calls_get_json() -> None: @@ -28,3 +32,147 @@ async def mock_create_session( await list_users_async() mock_client.get_json.assert_called_once() + + +async def test_list_users_async_successful_listing() -> None: + """list_users_async successfully lists users when API call succeeds.""" + # Mock user data + mock_users = [ + {"email": "user1@example.com", "userName": "User One", "totalCapacity": "100"}, + {"email": "user2@example.com", "userName": "User Two", "totalCapacity": "200"}, + ] + + mock_client = AsyncMock() + mock_client.get_json.side_effect = Exception("blocked") # First call fails + mock_resp = AsyncMock() + mock_resp.json = AsyncMock(return_value=mock_users) + mock_client.get = AsyncMock(return_value=mock_resp) + + mock_session = MagicMock() + mock_session.client = mock_client + + @asynccontextmanager + async def mock_create_session( + *args: object, **kwargs: object + ) -> AsyncGenerator[MagicMock, None]: + yield mock_session + + with patch("supernote.cli.admin.create_session", mock_create_session): + # Capture print output + import io + from contextlib import redirect_stdout + + f = io.StringIO() + with redirect_stdout(f): + await list_users_async() + + output = f.getvalue() + + # Verify the output contains user data + assert "Total Users: 2" in output + assert "user1@example.com" in output + assert "user2@example.com" in output + assert "User One" in output + assert "User Two" in output + + +async def test_add_user_async_success() -> None: + """Test add_user_async successful user creation.""" + # Mock the AdminClient methods directly + with patch("supernote.cli.admin.AdminClient") as mock_admin_client_class: + mock_admin_instance = AsyncMock() + mock_admin_instance.register = AsyncMock() # Public registration succeeds + mock_admin_client_class.return_value = mock_admin_instance + + mock_session = MagicMock() + mock_session.client = MagicMock() + + @asynccontextmanager + async def mock_create_session( + *args: object, **kwargs: object + ) -> AsyncGenerator[MagicMock, None]: + yield mock_session + + with patch("supernote.cli.admin.create_session", mock_create_session): + # Capture print output + import io + from contextlib import redirect_stdout + + f = io.StringIO() + with redirect_stdout(f): + await add_user_async( + "http://test.com", "test@example.com", "password123", "Test User" + ) + + output = f.getvalue() + assert "Success! User created (Public Registration)" in output + + +async def test_add_user_async_admin_fallback() -> None: + """Test add_user_async falls back to admin creation when public registration fails.""" + # Mock the AdminClient methods directly + from supernote.client.exceptions import ApiException + + with patch("supernote.cli.admin.AdminClient") as mock_admin_client_class: + mock_admin_instance = AsyncMock() + mock_admin_instance.register = AsyncMock( + side_effect=ApiException("Public registration disabled") + ) + mock_admin_instance.admin_create_user = AsyncMock() # Admin creation succeeds + mock_admin_client_class.return_value = mock_admin_instance + + mock_session = MagicMock() + mock_session.client = MagicMock() + + @asynccontextmanager + async def mock_create_session( + *args: object, **kwargs: object + ) -> AsyncGenerator[MagicMock, None]: + yield mock_session + + with patch("supernote.cli.admin.create_session", mock_create_session): + # Capture print output + import io + from contextlib import redirect_stdout + + f = io.StringIO() + with redirect_stdout(f): + await add_user_async( + "http://test.com", "test@example.com", "password123", "Test User" + ) + + output = f.getvalue() + assert "Public registration failed or disabled" in output + assert "Success! User created (Admin API)" in output + + +async def test_reset_password_async_success() -> None: + """Test reset_password_async successful password reset.""" + # Mock the AdminClient methods directly + with patch("supernote.cli.admin.AdminClient") as mock_admin_client_class: + mock_admin_instance = AsyncMock() + mock_admin_instance.admin_reset_password = AsyncMock() + mock_admin_client_class.return_value = mock_admin_instance + + mock_session = MagicMock() + mock_session.client = MagicMock() + + @asynccontextmanager + async def mock_create_session( + *args: object, **kwargs: object + ) -> AsyncGenerator[MagicMock, None]: + yield mock_session + + with patch("supernote.cli.admin.create_session", mock_create_session): + # Capture print output + import io + from contextlib import redirect_stdout + + f = io.StringIO() + with redirect_stdout(f): + await reset_password_async( + "http://test.com", "test@example.com", "newpassword123" + ) + + output = f.getvalue() + assert "Success! Password Reset" in output diff --git a/tests/client/test_login.py b/tests/client/test_login.py index 18614813..07df9a9f 100644 --- a/tests/client/test_login.py +++ b/tests/client/test_login.py @@ -1,6 +1,7 @@ """Tests for the login flow.""" import hashlib +import warnings from typing import Awaitable, Callable import aiohttp @@ -13,6 +14,15 @@ from supernote.models.auth import LoginVO, RandomCodeVO +def _warn_md5_usage() -> None: + """Warn about MD5 usage for security awareness.""" + warnings.warn( + "MD5 is used for Supernote protocol compatibility but is cryptographically weak.", + UserWarning, + stacklevel=2, + ) + + # Mock SHA-256 implementation matching the JS one def sha256(data: str) -> str: return hashlib.sha256(data.encode()).hexdigest() diff --git a/tests/server/conftest.py b/tests/server/conftest.py index 4f75f347..bbfb009f 100644 --- a/tests/server/conftest.py +++ b/tests/server/conftest.py @@ -7,6 +7,7 @@ import json import logging import socket +import warnings from collections.abc import AsyncGenerator, Generator from pathlib import Path from unittest.mock import AsyncMock, patch @@ -35,6 +36,16 @@ ) from supernote.server.services.user import JWT_ALGORITHM, UserService + +def _warn_md5_usage() -> None: + """Warn about MD5 usage for security awareness.""" + warnings.warn( + "MD5 is used for Supernote protocol compatibility but is cryptographically weak.", + UserWarning, + stacklevel=2, + ) + + TEST_USERNAME = "test@example.com" TEST_PASSWORD = "testpassword" @@ -72,7 +83,7 @@ def proxy_mode() -> str | None: def pick_port() -> int: """Find a free port on the host.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) + s.bind(("127.0.0.1", 0)) return int(s.getsockname()[1]) @@ -152,6 +163,7 @@ async def create_test_user( if await user_service.check_user_exists(test_user): await user_service.unregister(test_user) + _warn_md5_usage() result = await user_service.create_user( UserRegisterDTO( email=test_user, diff --git a/tests/server/equipment/test_binding.py b/tests/server/equipment/test_binding.py index d9776c6f..0281e811 100644 --- a/tests/server/equipment/test_binding.py +++ b/tests/server/equipment/test_binding.py @@ -1,4 +1,5 @@ import hashlib +import warnings from typing import Any from aiohttp.test_utils import TestClient @@ -6,6 +7,15 @@ from tests.server.conftest import TEST_PASSWORD, TEST_USERNAME +def _warn_md5_usage() -> None: + """Warn about MD5 usage for security awareness.""" + warnings.warn( + "MD5 is used for Supernote protocol compatibility but is cryptographically weak.", + UserWarning, + stacklevel=2, + ) + + async def _login(client: TestClient, equipment_no: str) -> Any: # 1. Get Random Code resp = await client.post( @@ -16,6 +26,7 @@ async def _login(client: TestClient, equipment_no: str) -> Any: timestamp = data["timestamp"] # 2. Login + _warn_md5_usage() pwd_md5 = hashlib.md5(TEST_PASSWORD.encode()).hexdigest() concat = pwd_md5 + code password_hash = hashlib.sha256(concat.encode()).hexdigest() diff --git a/tests/server/services/test_mistral.py b/tests/server/services/test_mistral.py index 89fac8b1..ec628e81 100644 --- a/tests/server/services/test_mistral.py +++ b/tests/server/services/test_mistral.py @@ -131,6 +131,18 @@ async def test_generate_json_not_configured() -> None: await svc.generate_json("prompt", schema={}) +async def test_generate_json_none_message(service: MistralService) -> None: + mock_choice = MagicMock() + mock_choice.message = None # This is the case that was causing the mypy error + mock_response = MagicMock() + mock_response.choices = [mock_choice] + service._client.chat.complete_async = AsyncMock(return_value=mock_response) # type: ignore[union-attr, method-assign] + + result = await service.generate_json("prompt", schema={}) + + assert result == "" + + async def test_ocr_image_empty_pages(service: MistralService) -> None: mock_response = MagicMock() mock_response.pages = [] diff --git a/tests/server/test_app.py b/tests/server/test_app.py index 6c6c15ab..d9f727e5 100644 --- a/tests/server/test_app.py +++ b/tests/server/test_app.py @@ -43,13 +43,14 @@ async def test_proxy_headers_ignored_by_default( assert full_upload_url is not None # Should NOT use forwarded headers, should use actual test client host - assert not full_upload_url.startswith("https://malicious-domain.com"), ( + from urllib.parse import urlparse + + parsed_url = urlparse(full_upload_url) + assert parsed_url.netloc != "malicious-domain.com", ( f"Proxy headers should be ignored by default, got: {full_upload_url}" ) # Should use the test server's actual scheme and host - assert ( - "http://127.0.0.1" in full_upload_url or "http://localhost" in full_upload_url - ) + assert parsed_url.hostname == "127.0.0.1" or parsed_url.hostname == "localhost" @pytest.mark.parametrize("proxy_mode", ["relaxed"]) @@ -83,9 +84,10 @@ async def test_upload_url_proxy_headers_relaxed( assert full_upload_url is not None # Verification: Should use forwarded headers - assert full_upload_url.startswith("https://my-public-domain.com"), ( - f"Got URL: {full_upload_url}" - ) + from urllib.parse import urlparse + + parsed_url = urlparse(full_upload_url) + assert parsed_url.netloc == "my-public-domain.com", f"Got URL: {full_upload_url}" @pytest.mark.parametrize("proxy_mode", ["relaxed"]) diff --git a/uv.lock b/uv.lock index d4babc0a..b3889072 100644 --- a/uv.lock +++ b/uv.lock @@ -1447,7 +1447,7 @@ wheels = [ [[package]] name = "supernote" -version = "1.6.2" +version = "1.6.4" source = { editable = "." } dependencies = [ { name = "colour" },