Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions backend/adapters/services/screenscraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,34 @@ async def _request(self, url: str, request_timeout: int = 120) -> dict:
) from exc
except aiohttp.ClientResponseError as err:
if err.status == http.HTTPStatus.TOO_MANY_REQUESTS:
# Retry after 2 seconds if rate limit hit
log.warning("ScreenScraper: rate limit hit, retrying after 2s")
await asyncio.sleep(2)
elif err.status == 426:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="ScreenScraper has blacklisted this application version. Please update RomM.",
) from err
elif err.status == 430:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="ScreenScraper daily scrape quota exhausted. Try again tomorrow.",
) from err
elif err.status == 431:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="ScreenScraper daily unrecognized-ROM quota exhausted. Try again tomorrow.",
) from err
elif err.status == 423:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="ScreenScraper API is currently offline.",
) from err
elif err.status == http.HTTPStatus.UNAUTHORIZED:
log.warning(
"ScreenScraper API is temporarily unavailable (server CPU >60%)"
)
return {}
else:
# Log the error and return an empty dict if the request fails with a different code
log.error(err)
return {}
except json.JSONDecodeError as exc:
Expand Down Expand Up @@ -117,11 +141,32 @@ async def _request(self, url: str, request_timeout: int = 120) -> dict:
)
return await res.json()
except (aiohttp.ClientResponseError, aiohttp.ServerTimeoutError) as err:
if (
isinstance(err, aiohttp.ClientResponseError)
and err.status == http.HTTPStatus.UNAUTHORIZED
):
return {}
if isinstance(err, aiohttp.ClientResponseError):
if err.status == http.HTTPStatus.UNAUTHORIZED:
log.warning(
"ScreenScraper API is temporarily unavailable (server CPU >60%)"
)
return {}
elif err.status == 426:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="ScreenScraper has blacklisted this application version. Please update RomM.",
) from err
elif err.status == 430:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="ScreenScraper daily scrape quota exhausted. Try again tomorrow.",
) from err
elif err.status == 431:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="ScreenScraper daily unrecognized-ROM quota exhausted. Try again tomorrow.",
) from err
elif err.status == 423:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="ScreenScraper API is currently offline.",
) from err

log.error(err)
return {}
Expand Down
138 changes: 73 additions & 65 deletions backend/handler/metadata/ss_handler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import base64
import re
from datetime import datetime
from typing import Final, NotRequired, TypedDict
Expand Down Expand Up @@ -26,13 +25,6 @@
MetadataHandler,
)
from .base_handler import UniversalPlatformSlug as UPS
from .base_handler import (
strip_sensitive_query_params,
)

SS_DEV_ID: Final = base64.b64decode("enVyZGkxNQ==").decode()
SS_DEV_PASSWORD: Final = base64.b64decode("eFRKd29PRmpPUUc=").decode()
SENSITIVE_KEYS = {"ssid", "sspassword"}


def get_preferred_regions(rom: Rom | None = None) -> list[str]:
Expand Down Expand Up @@ -82,6 +74,19 @@ def get_preferred_media_types() -> list[MetadataMediaType]:
# Regex to detect ScreenScraper ID tags in filenames like (ssfr-12345)
SS_TAG_REGEX = re.compile(r"\(ssfr-(\d+)\)", re.IGNORECASE)

NOTGAME_NAME_PREFIX: Final = "ZZZ(NOTGAME)"

_ISO_EXTENSIONS: Final = frozenset({"iso", "cue", "chd", "gdi", "cdi", "bin"})

_HTML_ENTITIES: Final[dict[str, str]] = {
"&": "&",
"&": "&",
"'": "'",
" ": " ",
""": '"',
"©": "©",
}

ACCEPTABLE_FILE_EXTENSIONS_BY_PLATFORM_SLUG = {
UPS.DC: ["cue", "chd", "gdi", "cdi"],
UPS.SEGACD: ["cue", "chd", "bin"],
Expand Down Expand Up @@ -150,6 +155,29 @@ class SSRom(BaseRom):
ss_metadata: NotRequired[SSMetadata]


def _is_notgame(game: SSGame) -> bool:
if game.get("notgame") == "true":
return True
return any(
name.get("text", "").upper().startswith(NOTGAME_NAME_PREFIX)
for name in game.get("noms", [])
)


def _get_rom_type(file: RomFile) -> str:
if not file.is_top_level:
return "dossier"
if file.file_extension.lower() in _ISO_EXTENSIONS:
return "iso"
return "rom"


def _decode_html_entities(text: str) -> str:
for entity, char in _HTML_ENTITIES.items():
text = text.replace(entity, char)
return text


def extract_media_from_ss_game(rom: Rom, game: SSGame) -> SSMetadataMedia:
preferred_media_types = get_preferred_media_types()

Expand Down Expand Up @@ -190,62 +218,44 @@ def extract_media_from_ss_game(rom: Rom, game: SSGame) -> SSMetadataMedia:
continue

if media.get("type") == "box-2D-back" and not ss_media["box2d_back_url"]:
ss_media["box2d_back_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["box2d_back_url"] = media["url"]
if MetadataMediaType.BOX2D_BACK in preferred_media_types:
ss_media["box2d_back_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.BOX2D_BACK)}/box2d_back.png"
)
elif media.get("type") == "bezel-16-9" and not ss_media["bezel_url"]:
ss_media["bezel_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["bezel_url"] = media["url"]
if MetadataMediaType.BEZEL in preferred_media_types:
ss_media["bezel_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.BEZEL)}/bezel.png"
)
elif media.get("type") == "box-2D" and not ss_media["box2d_url"]:
ss_media["box2d_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["box2d_url"] = media["url"]
elif media.get("type") == "fanart" and not ss_media["fanart_url"]:
ss_media["fanart_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["fanart_url"] = media["url"]
if MetadataMediaType.FANART in preferred_media_types:
ss_media["fanart_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.FANART)}/fanart.png"
)
elif media.get("type") == "box-texture" and not ss_media["fullbox_url"]:
ss_media["fullbox_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["fullbox_url"] = media["url"]
elif media.get("type") == "wheel-hd" and not ss_media["logo_url"]:
ss_media["logo_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["logo_url"] = media["url"]

if MetadataMediaType.LOGO in preferred_media_types:
ss_media["logo_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.LOGO)}/logo.png"
)
elif media.get("type") == "wheel" and not ss_media["logo_url"]:
ss_media["logo_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["logo_url"] = media["url"]
if MetadataMediaType.LOGO in preferred_media_types:
ss_media["logo_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.LOGO)}/logo.png"
)
elif media.get("type") == "manuel" and not ss_media["manual_url"]:
ss_media["manual_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["manual_url"] = media["url"]
elif media.get("type") == "screenmarquee" and not ss_media["marquee_url"]:
ss_media["marquee_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["marquee_url"] = media["url"]
if MetadataMediaType.MARQUEE in preferred_media_types:
ss_media["marquee_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.MARQUEE)}/marquee.png"
Expand All @@ -256,53 +266,37 @@ def extract_media_from_ss_game(rom: Rom, game: SSGame) -> SSMetadataMedia:
or media.get("type") == "mixrbv1"
or media.get("type") == "mixrbv2"
) and not ss_media["miximage_url"]:
ss_media["miximage_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["miximage_url"] = media["url"]
if MetadataMediaType.MIXIMAGE in preferred_media_types:
ss_media["miximage_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.MIXIMAGE)}/miximage.png"
)
elif media.get("type") == "support-2D" and not ss_media["physical_url"]:
ss_media["physical_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["physical_url"] = media["url"]
if MetadataMediaType.PHYSICAL in preferred_media_types:
ss_media["physical_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.PHYSICAL)}/physical.png"
)
elif media.get("type") == "ss" and not ss_media["screenshot_url"]:
ss_media["screenshot_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["screenshot_url"] = media["url"]
elif media.get("type") == "box-2D-side" and not ss_media["box2d_side_url"]:
ss_media["box2d_side_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["box2d_side_url"] = media["url"]
elif media.get("type") == "steamgrid" and not ss_media["steamgrid_url"]:
ss_media["steamgrid_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["steamgrid_url"] = media["url"]
elif media.get("type") == "box-3D" and not ss_media["box3d_url"]:
ss_media["box3d_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["box3d_url"] = media["url"]
if MetadataMediaType.BOX3D in preferred_media_types:
ss_media["box3d_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.BOX3D)}/box3d.png"
)
elif media.get("type") == "sstitle" and not ss_media["title_screen_url"]:
ss_media["title_screen_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["title_screen_url"] = media["url"]
if MetadataMediaType.TITLE_SCREEN in preferred_media_types:
ss_media["title_screen_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.TITLE_SCREEN)}/title_screen.png"
)
elif media.get("type") == "video" and not ss_media["video_url"]:
ss_media["video_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["video_url"] = media["url"]
if MetadataMediaType.VIDEO in preferred_media_types:
ss_media["video_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.VIDEO)}/video.mp4"
Expand All @@ -311,9 +305,7 @@ def extract_media_from_ss_game(rom: Rom, game: SSGame) -> SSMetadataMedia:
media.get("type") == "video-normalized"
and not ss_media["video_normalized_url"]
):
ss_media["video_normalized_url"] = strip_sensitive_query_params(
media["url"], SENSITIVE_KEYS
)
ss_media["video_normalized_url"] = media["url"]
if MetadataMediaType.VIDEO_NORMALIZED in preferred_media_types:
ss_media["video_normalized_path"] = (
f"{fs_resource_handler.get_media_resources_path(rom.platform_id, rom.id, MetadataMediaType.VIDEO_NORMALIZED)}/video-normalized.mp4"
Expand Down Expand Up @@ -482,8 +474,8 @@ def build_ss_game(rom: Rom, game: SSGame) -> SSRom:
ss_id = int(game["id"]) if game.get("id") is not None else None
game_rom: SSRom = {
"ss_id": ss_id,
"name": res_name.replace(" : ", ": "), # Normalize colons
"summary": res_summary,
"name": _decode_html_entities(res_name.replace(" : ", ": ")),
"summary": _decode_html_entities(res_summary),
"url_cover": str(url_cover) if url_cover else "",
"url_manual": str(url_manual) if url_manual else "",
"url_screenshots": url_screenshots,
Expand Down Expand Up @@ -534,6 +526,11 @@ async def _search_rom(

games_by_name: dict[str, SSGame] = {}
for rom in roms:
if _is_notgame(rom):
log.warning(
"ScreenScraper: Received notgame entry in search results, ignoring"
)
continue
for name in rom.get("noms", []):
if name["text"] not in games_by_name or int(rom["id"]) < int(
games_by_name[name["text"]]["id"]
Expand Down Expand Up @@ -612,10 +609,18 @@ async def lookup_rom(
sha1=sha1_hash,
crc=crc_hash,
rom_size_bytes=fs_size_bytes,
rom_name=first_file.file_name,
rom_type=_get_rom_type(first_file),
)
if not res:
return SSRom(ss_id=None)

if _is_notgame(res):
log.warning(
"ScreenScraper: Received notgame entry from hash lookup, ignoring"
)
return SSRom(ss_id=None)

return build_ss_game(rom, res)

async def get_rom(self, rom: Rom, file_name: str, platform_ss_id: int) -> SSRom:
Expand Down Expand Up @@ -645,6 +650,9 @@ async def get_rom(self, rom: Rom, file_name: str, platform_ss_id: int) -> SSRom:
search_term = fs_rom_handler.get_file_name_with_no_tags(file_name)
fallback_rom = SSRom(ss_id=None)

if not search_term:
return fallback_rom

# Support for PS2 OPL filename format
match = PS2_OPL_REGEX.match(file_name)
if platform_ss_id == PS2_SS_ID and match:
Expand Down Expand Up @@ -766,7 +774,7 @@ def _is_ss_region(game: SSGame) -> bool:
return [
build_ss_game(rom, game)
for game in matched_games
if _is_ss_region(game) and game.get("id")
if not _is_notgame(game) and _is_ss_region(game) and game.get("id")
]


Expand Down
8 changes: 7 additions & 1 deletion backend/logger/formatter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from pprint import pformat

from colorama import Fore, Style, init
Expand Down Expand Up @@ -55,6 +56,11 @@
}


_CREDENTIAL_PATTERN = re.compile(
r"(ssid|sspassword|devid|devpassword)=[^&\s\"]*", re.IGNORECASE
)


def should_strip_ansi() -> bool:
"""Determine if ANSI escape codes should be stripped."""
# Check if an explicit environment variable is set to control color behavior
Expand Down Expand Up @@ -116,7 +122,7 @@ def format(self, record: logging.LogRecord) -> str:
}
log_fmt = formats.get(record.levelno)
formatter = logging.Formatter(fmt=log_fmt, datefmt="%Y-%m-%d %H:%M:%S")
return formatter.format(record)
return _CREDENTIAL_PATTERN.sub(r"\1=***", formatter.format(record))


def highlight(msg: str = "", color=YELLOW) -> str:
Expand Down
Loading
Loading