diff --git a/alembic/versions/20260524_013_fix_media_file_paths.py b/alembic/versions/20260524_013_fix_media_file_paths.py index 44d05e5..960cffa 100644 --- a/alembic/versions/20260524_013_fix_media_file_paths.py +++ b/alembic/versions/20260524_013_fix_media_file_paths.py @@ -23,7 +23,9 @@ branch_labels = None depends_on = None -_BATCH_SIZE = 5000 +# Can't import from src.web.media_utils in migrations (different runtime context) +# Define locally to keep migration self-contained +_CHANNEL_ID_OFFSET = 1_000_000_000_000 def _derive_stale_folder(chat_id: int) -> str | None: @@ -36,71 +38,46 @@ def _derive_stale_folder(chat_id: int) -> str | None: if chat_id >= 0: return None raw = -chat_id - if raw > 1000000000000: - return str(raw - 1000000000000) + if raw > _CHANNEL_ID_OFFSET: + return str(raw - _CHANNEL_ID_OFFSET) return str(raw) def upgrade(): conn = op.get_bind() - dialect = conn.dialect.name - - if dialect == "postgresql": - # Get all distinct negative chat_ids that have media - result = conn.execute(sa.text("SELECT DISTINCT chat_id FROM media WHERE chat_id < 0 AND file_path IS NOT NULL")) - chat_ids = [row[0] for row in result] - - for chat_id in chat_ids: - stale_folder = _derive_stale_folder(chat_id) - if stale_folder is None: - continue - correct_folder = str(chat_id) - - # Only update rows where file_path contains the stale folder - # Use pattern: ...//... → ...//... - stale_pattern = f"%/{stale_folder}/%" - conn.execute( - sa.text( - "UPDATE media SET file_path = REPLACE(file_path, :old_seg, :new_seg) " - "WHERE chat_id = :cid AND file_path LIKE :pattern" - ), - { - "old_seg": f"/{stale_folder}/", - "new_seg": f"/{correct_folder}/", - "cid": chat_id, - "pattern": stale_pattern, - }, - ) - - elif dialect == "sqlite": - result = conn.execute(sa.text("SELECT DISTINCT chat_id FROM media WHERE chat_id < 0 AND file_path IS NOT NULL")) - chat_ids = [row[0] for row in result] - - for chat_id in chat_ids: - stale_folder = _derive_stale_folder(chat_id) - if stale_folder is None: - continue - correct_folder = str(chat_id) - - stale_pattern = f"%/{stale_folder}/%" - conn.execute( - sa.text( - "UPDATE media SET file_path = REPLACE(file_path, :old_seg, :new_seg) " - "WHERE chat_id = :cid AND file_path LIKE :pattern" - ), - { - "old_seg": f"/{stale_folder}/", - "new_seg": f"/{correct_folder}/", - "cid": chat_id, - "pattern": stale_pattern, - }, - ) + + # Get all distinct negative chat_ids that have media + result = conn.execute(sa.text("SELECT DISTINCT chat_id FROM media WHERE chat_id < 0 AND file_path IS NOT NULL")) + chat_ids = [row[0] for row in result] + + for chat_id in chat_ids: + stale_folder = _derive_stale_folder(chat_id) + if stale_folder is None: + continue + correct_folder = str(chat_id) + + # Only update rows where file_path contains the stale folder + # Use pattern: ...//... → ...//... + stale_pattern = f"%/{stale_folder}/%" + conn.execute( + sa.text( + "UPDATE media SET file_path = REPLACE(file_path, :old_seg, :new_seg) " + "WHERE chat_id = :cid AND file_path LIKE :pattern" + ), + { + "old_seg": f"/{stale_folder}/", + "new_seg": f"/{correct_folder}/", + "cid": chat_id, + "pattern": stale_pattern, + }, + ) def downgrade(): - # Reversible: swap the folder components back + # WARNING: This reverses ALL negative-folder paths to positive, including rows + # created after the upgrade. This is intentional — old code expects positive + # folders in file_path. The runtime fallback handles disk resolution. conn = op.get_bind() - dialect = conn.dialect.name result = conn.execute(sa.text("SELECT DISTINCT chat_id FROM media WHERE chat_id < 0 AND file_path IS NOT NULL")) chat_ids = [row[0] for row in result] diff --git a/pyproject.toml b/pyproject.toml index 7e94774..564fc43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "telegram-archive" -version = "7.10.14" +version = "7.10.15" description = "Automated Telegram backup with Docker. Performs incremental backups of messages and media on a configurable schedule." readme = "README.md" requires-python = ">=3.14" diff --git a/scripts/entrypoint.sh b/scripts/entrypoint.sh index 5c19933..0600566 100644 --- a/scripts/entrypoint.sh +++ b/scripts/entrypoint.sh @@ -93,15 +93,24 @@ if has_tables and not has_alembic: ); \"\"\") # Check artifact from migration 013: file_path values use negative chat_id folders - # If any media row for a negative chat_id has a correctly-negative folder, 013 has run + # Guard: media table may not exist on very old databases cur.execute(\"\"\" SELECT EXISTS ( - SELECT 1 FROM media - WHERE chat_id < 0 AND file_path LIKE '%/' || CAST(chat_id AS TEXT) || '/%' - LIMIT 1 + SELECT FROM information_schema.tables + WHERE table_name = 'media' ); \"\"\") - has_013_paths = cur.fetchone()[0] + has_media_table = cur.fetchone()[0] + has_013_paths = False + if has_media_table: + cur.execute(\"\"\" + SELECT EXISTS ( + SELECT 1 FROM media + WHERE chat_id < 0 AND file_path LIKE '%/' || CAST(chat_id AS TEXT) || '/%' + LIMIT 1 + ); + \"\"\") + has_013_paths = cur.fetchone()[0] # Check artifact from migration 012: idx_media_chat_type index cur.execute(\"\"\" @@ -300,8 +309,13 @@ if has_tables and not has_alembic: ''') # Check artifact from migration 013: file_path values use negative chat_id folders - cur.execute(\"SELECT EXISTS(SELECT 1 FROM media WHERE chat_id < 0 AND file_path LIKE '%/' || CAST(chat_id AS TEXT) || '/%' LIMIT 1)\") - has_013_paths = cur.fetchone()[0] + # Guard: media table may not exist on very old databases + cur.execute(\"SELECT name FROM sqlite_master WHERE type='table' AND name='media'\") + has_media_table = cur.fetchone() is not None + has_013_paths = False + if has_media_table: + cur.execute(\"SELECT EXISTS(SELECT 1 FROM media WHERE chat_id < 0 AND file_path LIKE '%/' || CAST(chat_id AS TEXT) || '/%' LIMIT 1)\") + has_013_paths = cur.fetchone()[0] # Check artifact from migration 012: idx_media_chat_type index cur.execute(\"SELECT name FROM sqlite_master WHERE type='index' AND name='idx_media_chat_type'\") diff --git a/src/__init__.py b/src/__init__.py index a5cbc1a..e5a6b81 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -2,4 +2,4 @@ Telegram Backup Automation - Main Package """ -__version__ = "7.10.14" +__version__ = "7.10.15" diff --git a/src/web/main.py b/src/web/main.py index 7af3fd1..d2116bd 100644 --- a/src/web/main.py +++ b/src/web/main.py @@ -31,6 +31,7 @@ from ..config import Config from ..db import DatabaseAdapter, close_database, get_db_manager, init_database from ..realtime import RealtimeListener +from .media_utils import legacy_folder_alternates, legacy_marked_chat_ids if TYPE_CHECKING: from .push import PushNotificationManager @@ -821,8 +822,8 @@ def _enforce_media_acl(path: str, user: UserContext, *, thumbnail: bool = False) raise HTTPException(status_code=403, detail="Access denied") if media_chat_id not in user_chat_ids: # Legacy fallback: positive folder may correspond to negative marked ID - if media_chat_id > 0 and (-media_chat_id in user_chat_ids or -(1000000000000 + media_chat_id) in user_chat_ids): - pass + if media_chat_id > 0 and any(mid in user_chat_ids for mid in legacy_marked_chat_ids(media_chat_id)): + logger.debug("ACL legacy grant: positive folder mapped to allowed chat via marked-ID convention") else: raise HTTPException(status_code=403, detail="Access denied") @@ -885,7 +886,7 @@ async def serve_thumbnail(size: int, folder: str, filename: str, user: UserConte if user.no_download and not folder.startswith("avatars/"): raise HTTPException(status_code=403, detail="Downloads disabled for this account") - # Chat-level access check + # Early ACL check on requested path (prevents existence leakage) _enforce_media_acl(f"{folder}/{filename}", user, thumbnail=True) from .thumbnails import ensure_thumbnail, resolve_cache_dir @@ -894,10 +895,15 @@ async def serve_thumbnail(size: int, folder: str, filename: str, user: UserConte if _thumb_cache_dir is None: _thumb_cache_dir = resolve_cache_dir(_media_root) - thumb_path = await ensure_thumbnail(_media_root, size, folder, filename, cache_dir=_thumb_cache_dir) - if not thumb_path: + result = await ensure_thumbnail(_media_root, size, folder, filename, cache_dir=_thumb_cache_dir) + if not result: raise HTTPException(status_code=404, detail="Thumbnail not available") + thumb_path, resolved_folder = result + # Secondary ACL on resolved path if it differs (prevents bypass via legacy fallback) + if resolved_folder != folder: + _enforce_media_acl(f"{resolved_folder}/{filename}", user, thumbnail=True) + return FileResponse(thumb_path, media_type="image/webp", headers={"Cache-Control": "public, max-age=86400"}) @@ -928,23 +934,20 @@ async def serve_media(path: str, download: int = Query(0), user: UserContext = D resolved = None if len(parts) == 2: folder, rest = parts - alt_folders = [] - if not folder.startswith("-"): - alt_folders = [f"-{folder}", f"-100{folder}"] - else: - alt_folders = [folder[1:]] + alt_folders = legacy_folder_alternates(folder) for alt in alt_folders: try: resolved = (_media_root / alt / rest).resolve(strict=True) + logger.debug("Legacy fallback: served media via alternate folder resolution") break - except OSError, ValueError: + except OSError, ValueError, RuntimeError: continue if resolved is None: raise HTTPException(status_code=404, detail="File not found") if not resolved.is_relative_to(_media_root): raise HTTPException(status_code=403, detail="Access denied") - _enforce_media_acl(path, user) + _enforce_media_acl(str(resolved.relative_to(_media_root)), user) if not resolved.is_file(): raise HTTPException(status_code=404, detail="File not found") diff --git a/src/web/media_utils.py b/src/web/media_utils.py new file mode 100644 index 0000000..8bd3753 --- /dev/null +++ b/src/web/media_utils.py @@ -0,0 +1,56 @@ +"""Shared utilities for legacy media path resolution. + +Centralizes the Telegram marked-ID convention so it's defined once +and used consistently across serve_media, thumbnails, and ACL checks. +""" + +CHANNEL_ID_OFFSET: int = 1_000_000_000_000 + + +def legacy_folder_alternates(folder: str) -> list[str]: + """Return alternate folder names for legacy positive/negative ID paths. + + Forward (positive folder → possible negative marked IDs on disk): + "1234567890" → ["-1234567890", "-1001234567890"] + + Reverse (negative folder → possible old positive folder on disk): + "-1234567890" → ["1234567890"] (basic group) + "-1001234567890" → ["1234567890"] (channel) + """ + try: + if not folder.startswith("-"): + folder_int = int(folder) + if folder_int <= 0: + return [] + return [f"-{folder}", str(-(CHANNEL_ID_OFFSET + folder_int))] + folder_int = int(folder) + except ValueError: + return [] + raw = -folder_int + if raw > CHANNEL_ID_OFFSET: + return [str(raw - CHANNEL_ID_OFFSET)] + return [str(raw)] + + +def legacy_marked_chat_ids(positive_id: int) -> list[int]: + """Return possible marked chat_ids for a legacy positive folder ID. + + Used by ACL checks to determine if a user has access to a chat + referenced by its old positive folder name. + """ + return [-positive_id, -(CHANNEL_ID_OFFSET + positive_id)] + + +def derive_stale_folder(chat_id: int) -> str | None: + """Derive the old positive folder name from a marked chat_id. + + Basic groups: chat_id = -X → old folder = "X" + Channels: chat_id = -(10^12 + X) → old folder = "X" + Users: chat_id > 0 → no mismatch possible, return None + """ + if chat_id >= 0: + return None + raw = -chat_id + if raw > CHANNEL_ID_OFFSET: + return str(raw - CHANNEL_ID_OFFSET) + return str(raw) diff --git a/src/web/thumbnails.py b/src/web/thumbnails.py index 54a1358..d1f0a54 100644 --- a/src/web/thumbnails.py +++ b/src/web/thumbnails.py @@ -15,6 +15,8 @@ from PIL import Image +from .media_utils import legacy_folder_alternates + logger = logging.getLogger(__name__) # Limit decompression to prevent pixel-bomb OOM attacks (~50 megapixels) @@ -87,11 +89,12 @@ def _generate_sync(source: Path, dest: Path, size: int) -> bool: async def ensure_thumbnail( media_root: Path, size: int, folder: str, filename: str, *, cache_dir: Path | None = None -) -> Path | None: - """Return the path to a cached thumbnail, generating it if needed. +) -> tuple[Path, str] | None: + """Return (thumb_path, resolved_folder) or None. - Returns None when the request is invalid or generation fails. - Includes path traversal protection. + resolved_folder is the actual folder the source was found in (may differ + from the requested folder due to legacy ID fallback). Callers use this + for ACL enforcement on the resolved path. When cache_dir is provided, thumbnails are written there instead of under {media_root}/.thumbs/ — this supports read-only media volumes. @@ -120,28 +123,29 @@ async def ensure_thumbnail( if not dest.is_relative_to(thumbs_root): return None + resolved_folder = folder + if dest.exists(): - return dest + return dest, resolved_folder if not source.exists(): - # Legacy fallback: pre-v4.0.5 paths used positive IDs, disk uses negative marked IDs. - # Try alternate folder names: X→-X (basic group), X→-100X (channel/supergroup) - alt_folders = [] - if not folder.startswith("-"): - alt_folders = [f"-{folder}", f"-100{folder}"] - else: - alt_folders = [folder[1:]] + alt_folders = legacy_folder_alternates(folder) found = False for alt in alt_folders: - alt_source = (media_root / alt / filename).resolve() - if alt_source.is_relative_to(media_root_resolved) and alt_source.exists(): - source = alt_source - found = True - break + try: + alt_source = (media_root / alt / filename).resolve() + if alt_source.is_relative_to(media_root_resolved) and alt_source.exists(): + logger.debug("Thumbnail legacy fallback resolved via alternate folder") + source = alt_source + resolved_folder = alt + found = True + break + except OSError, RuntimeError: + continue if not found: return None async with _generation_semaphore: loop = asyncio.get_running_loop() ok = await loop.run_in_executor(None, _generate_sync, source, dest, size) - return dest if ok else None + return (dest, resolved_folder) if ok else None diff --git a/tests/test_media_utils.py b/tests/test_media_utils.py new file mode 100644 index 0000000..6a36b6e --- /dev/null +++ b/tests/test_media_utils.py @@ -0,0 +1,192 @@ +"""Tests for shared media path utilities (media_utils.py). + +Verifies the Telegram marked-ID convention logic used for legacy +folder resolution across serve_media, thumbnails, and ACL checks. +""" + +import unittest + +from src.web.media_utils import ( + CHANNEL_ID_OFFSET, + derive_stale_folder, + legacy_folder_alternates, + legacy_marked_chat_ids, +) + + +class TestLegacyFolderAlternatesForward(unittest.TestCase): + """Forward resolution: positive folder -> possible negative marked IDs.""" + + def test_forward_10_digit_channel_id(self): + """Positive 10-digit folder produces basic-group and channel negatives.""" + result = legacy_folder_alternates("1234567890") + self.assertEqual(result, ["-1234567890", "-1001234567890"]) + + def test_forward_9_digit_channel_id_pads_correctly(self): + """9-digit folder must produce -100_0_123456789, not -100_123456789.""" + result = legacy_folder_alternates("123456789") + # CHANNEL_ID_OFFSET is 1_000_000_000_000, so 1000000000000 + 123456789 = 1000123456789 + self.assertEqual(result, ["-123456789", "-1000123456789"]) + + def test_forward_small_basic_group_id(self): + """Small group ID produces correct channel-offset alternate.""" + result = legacy_folder_alternates("54321") + self.assertEqual(result, ["-54321", "-1000000054321"]) + + def test_forward_result_types_are_strings(self): + """All returned alternates are strings.""" + result = legacy_folder_alternates("999") + for item in result: + self.assertIsInstance(item, str) + + def test_forward_always_returns_two_alternates(self): + """Forward resolution always returns exactly two alternates.""" + result = legacy_folder_alternates("42") + self.assertEqual(len(result), 2) + + def test_non_numeric_folder_returns_empty_list(self): + """Non-numeric folder names return empty list (no alternates possible).""" + self.assertEqual(legacy_folder_alternates("chat1"), []) + self.assertEqual(legacy_folder_alternates("photos"), []) + self.assertEqual(legacy_folder_alternates("-abc"), []) + + def test_zero_folder_returns_empty_list(self): + """Folder '0' is not a valid Telegram entity ID.""" + self.assertEqual(legacy_folder_alternates("0"), []) + + +class TestLegacyFolderAlternatesReverse(unittest.TestCase): + """Reverse resolution: negative folder -> possible old positive folder.""" + + def test_reverse_channel_folder(self): + """Channel folder -1001234567890 resolves to positive 1234567890.""" + result = legacy_folder_alternates("-1001234567890") + self.assertEqual(result, ["1234567890"]) + + def test_reverse_basic_group_folder(self): + """Basic group folder -54321 resolves to positive 54321.""" + result = legacy_folder_alternates("-54321") + self.assertEqual(result, ["54321"]) + + def test_reverse_boundary_smallest_channel(self): + """Smallest channel ID -1000000000001 resolves to 1.""" + result = legacy_folder_alternates("-1000000000001") + self.assertEqual(result, ["1"]) + + def test_reverse_exact_offset_treated_as_basic_group(self): + """Exact CHANNEL_ID_OFFSET boundary is treated as basic group (not channel).""" + result = legacy_folder_alternates(str(-CHANNEL_ID_OFFSET)) + self.assertEqual(result, [str(CHANNEL_ID_OFFSET)]) + + def test_reverse_always_returns_one_alternate(self): + """Reverse resolution always returns exactly one alternate.""" + result = legacy_folder_alternates("-999") + self.assertEqual(len(result), 1) + + def test_reverse_result_types_are_strings(self): + """Reverse alternates are strings.""" + result = legacy_folder_alternates("-1001234567890") + for item in result: + self.assertIsInstance(item, str) + + +class TestLegacyMarkedChatIds(unittest.TestCase): + """Marked chat ID generation from positive folder IDs.""" + + def test_returns_basic_group_and_channel_forms(self): + """Returns both -id and -(offset + id) for a given positive ID.""" + result = legacy_marked_chat_ids(1234567890) + self.assertEqual(result, [-1234567890, -1001234567890]) + + def test_small_id_produces_correct_pair(self): + """Small IDs still offset correctly.""" + result = legacy_marked_chat_ids(1) + self.assertEqual(result, [-1, -(CHANNEL_ID_OFFSET + 1)]) + + def test_returns_list_of_two_integers(self): + """Always returns exactly two integers.""" + result = legacy_marked_chat_ids(42) + self.assertEqual(len(result), 2) + for item in result: + self.assertIsInstance(item, int) + + def test_both_results_are_negative(self): + """Both marked IDs are negative.""" + result = legacy_marked_chat_ids(99999) + self.assertTrue(all(x < 0 for x in result)) + + +class TestDeriveStaleFolder(unittest.TestCase): + """Derive old positive folder name from marked chat_id.""" + + def test_positive_chat_id_returns_none(self): + """User chat_id (positive) has no stale folder.""" + self.assertIsNone(derive_stale_folder(12345)) + + def test_zero_returns_none(self): + """Zero chat_id returns None.""" + self.assertIsNone(derive_stale_folder(0)) + + def test_basic_group_negative(self): + """Basic group -54321 derives folder 54321.""" + self.assertEqual(derive_stale_folder(-54321), "54321") + + def test_channel_negative(self): + """Channel -1001234567890 derives folder 1234567890.""" + self.assertEqual(derive_stale_folder(-1001234567890), "1234567890") + + def test_boundary_smallest_channel(self): + """Smallest channel ID -1000000000001 derives folder 1.""" + self.assertEqual(derive_stale_folder(-1000000000001), "1") + + def test_exact_offset_treated_as_basic_group(self): + """Exact -CHANNEL_ID_OFFSET is basic group, not channel (raw == offset, not >).""" + self.assertEqual(derive_stale_folder(-CHANNEL_ID_OFFSET), str(CHANNEL_ID_OFFSET)) + + def test_result_is_string_when_not_none(self): + """Non-None results are strings.""" + result = derive_stale_folder(-999) + self.assertIsInstance(result, str) + + +class TestMediaUtilsConsistency(unittest.TestCase): + """Cross-function consistency: forward and reverse are inverses.""" + + def test_channel_roundtrip_derive_then_reverse(self): + """derive_stale_folder output fed to legacy_folder_alternates reverse + produces the original chat_id's folder as an alternate.""" + chat_id = -1001234567890 + folder = derive_stale_folder(chat_id) + # folder is "1234567890", forward alternates include str(chat_id) + alternates = legacy_folder_alternates(folder) + self.assertIn(str(chat_id), alternates) + + def test_basic_group_roundtrip_derive_then_reverse(self): + """Basic group derive -> forward alternates includes original chat_id.""" + chat_id = -54321 + folder = derive_stale_folder(chat_id) + alternates = legacy_folder_alternates(folder) + self.assertIn(str(chat_id), alternates) + + def test_marked_ids_match_forward_alternates(self): + """legacy_marked_chat_ids output matches legacy_folder_alternates (as ints).""" + positive_id = 7777 + marked = legacy_marked_chat_ids(positive_id) + alternates = legacy_folder_alternates(str(positive_id)) + # alternates are strings of the marked IDs + self.assertEqual(sorted(str(m) for m in marked), sorted(alternates)) + + def test_channel_reverse_then_forward_contains_original(self): + """Reverse a channel folder, then forward it, original appears.""" + channel_folder = "-1001234567890" + reversed_folders = legacy_folder_alternates(channel_folder) + # reversed_folders = ["1234567890"] + forward_again = legacy_folder_alternates(reversed_folders[0]) + self.assertIn(channel_folder, forward_again) + + def test_basic_group_reverse_then_forward_contains_original(self): + """Reverse a basic group folder, then forward it, original appears.""" + group_folder = "-54321" + reversed_folders = legacy_folder_alternates(group_folder) + forward_again = legacy_folder_alternates(reversed_folders[0]) + self.assertIn(group_folder, forward_again) diff --git a/tests/test_web_coverage.py b/tests/test_web_coverage.py index 141c571..6b1658e 100644 --- a/tests/test_web_coverage.py +++ b/tests/test_web_coverage.py @@ -348,6 +348,45 @@ async def test_media_restricts_by_chat_id(self): resp = await client.get("/media/123/photo.jpg", cookies={"viewer_auth": token}) self.assertEqual(resp.status_code, 403) + async def test_media_acl_enforced_on_resolved_path_not_url(self): + """ACL bypass prevention: user requests positive folder, file resolves to + a negative folder the user does NOT have access to — must deny.""" + with tempfile.TemporaryDirectory() as tmpdir: + media_root = web_main.Path(tmpdir).resolve() + web_main._media_root = media_root + # File lives under negative folder -999 (the actual chat folder on disk) + denied_dir = os.path.join(tmpdir, "-999") + os.makedirs(denied_dir) + with open(os.path.join(denied_dir, "secret.jpg"), "w") as f: + f.write("secret") + + web_main.AUTH_ENABLED = True + token = "acl-bypass-test" + # User only has access to chat 555, NOT -999 + web_main._sessions[token] = web_main.SessionData(username="v1", role="viewer", allowed_chat_ids={555}) + # Request via positive folder "999" — legacy fallback resolves to "-999" + async with self._client() as client: + resp = await client.get("/media/999/secret.jpg", cookies={"viewer_auth": token}) + self.assertEqual(resp.status_code, 403) + + async def test_media_acl_allows_resolved_path_when_authorized(self): + """User with access to the resolved negative chat can access via positive folder.""" + with tempfile.TemporaryDirectory() as tmpdir: + media_root = web_main.Path(tmpdir).resolve() + web_main._media_root = media_root + denied_dir = os.path.join(tmpdir, "-999") + os.makedirs(denied_dir) + with open(os.path.join(denied_dir, "photo.jpg"), "w") as f: + f.write("img") + + web_main.AUTH_ENABLED = True + token = "acl-allow-test" + # User has access to -999 (the resolved folder) + web_main._sessions[token] = web_main.SessionData(username="v1", role="viewer", allowed_chat_ids={-999}) + async with self._client() as client: + resp = await client.get("/media/999/photo.jpg", cookies={"viewer_auth": token}) + self.assertEqual(resp.status_code, 200) + async def test_media_rejects_shared_folder_for_restricted_user(self): """Restricted users cannot fetch deduplicated _shared files directly.""" with tempfile.TemporaryDirectory() as tmpdir: @@ -513,7 +552,7 @@ async def test_thumbnail_serves_generated_file(self): with patch( "src.web.thumbnails.ensure_thumbnail", new_callable=AsyncMock, - return_value=web_main.Path(thumb_file), + return_value=(web_main.Path(thumb_file), "123"), ): async with self._client() as client: resp = await client.get("/media/thumb/200/123/photo.jpg") diff --git a/tests/test_web_thumbnails.py b/tests/test_web_thumbnails.py index 0632493..84dd297 100644 --- a/tests/test_web_thumbnails.py +++ b/tests/test_web_thumbnails.py @@ -214,7 +214,9 @@ async def test_returns_cached_thumbnail_if_exists(self): result = await ensure_thumbnail(media_root, 200, folder, "img.jpg") self.assertIsNotNone(result) - self.assertEqual(result, thumb.resolve()) + thumb_path, resolved_folder = result + self.assertEqual(thumb_path, thumb.resolve()) + self.assertEqual(resolved_folder, folder) async def test_returns_none_when_source_does_not_exist(self): """ensure_thumbnail returns None when source file is missing.""" @@ -238,8 +240,10 @@ async def test_generates_thumbnail_for_valid_source(self): result = await ensure_thumbnail(media_root, 200, folder, "photo.png") self.assertIsNotNone(result) - self.assertTrue(result.exists()) - self.assertEqual(result.suffix, ".webp") + thumb_path, resolved_folder = result + self.assertTrue(thumb_path.exists()) + self.assertEqual(thumb_path.suffix, ".webp") + self.assertEqual(resolved_folder, folder) if __name__ == "__main__":