diff --git a/src/core/services/musicbrainz.py b/src/core/services/musicbrainz.py new file mode 100644 index 0000000..270816e --- /dev/null +++ b/src/core/services/musicbrainz.py @@ -0,0 +1,257 @@ +""" +MusicBrainz API client for fetching music album metadata. + +API Documentation: https://musicbrainz.org/doc/MusicBrainz_API +Cover Art Archive: https://coverartarchive.org/ + +This API is free and does not require authentication. +Rate limiting: 1 request per second with a proper User-Agent header. +""" + +import logging +import re +from dataclasses import dataclass +from http import HTTPStatus + +import requests + +logger = logging.getLogger(__name__) + +MUSICBRAINZ_BASE_URL = "https://musicbrainz.org/ws/2/" +COVERART_BASE_URL = "https://coverartarchive.org/" + +# User-Agent is required by MusicBrainz API +USER_AGENT = "Datakult/1.0 (personal media tracker)" + +# Minimum query length for search +MIN_QUERY_LENGTH = 2 + +# Pattern for valid Cover Art Archive URLs +COVERART_PATTERN = re.compile(r"^https://coverartarchive\.org/release/[a-f0-9-]+/") + +# Minimum size in bytes to consider a cover valid +MIN_COVER_SIZE_BYTES = 1000 + + +class MusicBrainzError(Exception): + """Exception raised when MusicBrainz API request fails.""" + + +def _extract_artists(data: dict) -> list[str]: + """Extract artist names from artist-credit data.""" + return [ac["name"] for ac in data.get("artist-credit", []) if isinstance(ac, dict) and "name" in ac] + + +def _extract_year(date_str: str) -> int | None: + """Extract year from a date string.""" + if not date_str: + return None + year_match = re.match(r"(\d{4})", date_str) + return int(year_match.group(1)) if year_match else None + + +def _extract_label(label_info: list) -> str | None: + """Extract label name from label-info data.""" + if not label_info or not isinstance(label_info[0], dict): + return None + label_data = label_info[0].get("label", {}) + return label_data.get("name") if label_data else None + + +def _extract_genres_and_tags(data: dict) -> list[str]: + """Extract unique genre and tag names.""" + genres = [g["name"] for g in data.get("genres", []) if isinstance(g, dict) and "name" in g] + for tag in data.get("tags", []): + if isinstance(tag, dict) and "name" in tag and tag["name"] not in genres: + genres.append(tag["name"]) + return genres + + +@dataclass +class MusicBrainzResult: + """Represents a search result from MusicBrainz.""" + + mbid: str # MusicBrainz ID (UUID) + title: str + artists: list[str] + year: int | None + country: str | None + label: str | None + + @property + def cover_url(self) -> str | None: + """Returns the URL for the cover image (front, 500px).""" + if self.mbid: + return f"{COVERART_BASE_URL}release/{self.mbid}/front-500" + return None + + @property + def cover_url_small(self) -> str | None: + """Returns a smaller cover URL for thumbnails (250px).""" + if self.mbid: + return f"{COVERART_BASE_URL}release/{self.mbid}/front-250" + return None + + @property + def cover_url_large(self) -> str | None: + """Returns the full-size cover URL.""" + if self.mbid: + return f"{COVERART_BASE_URL}release/{self.mbid}/front" + return None + + +class MusicBrainzClient: + """Client for interacting with the MusicBrainz API.""" + + def __init__(self): + self.session = requests.Session() + self.session.headers.update( + { + "User-Agent": USER_AGENT, + "Accept": "application/json", + } + ) + + def _request(self, endpoint: str, params: dict | None = None) -> dict: + """Make a request to the MusicBrainz API.""" + params = params or {} + params["fmt"] = "json" + + url = f"{MUSICBRAINZ_BASE_URL}{endpoint}" + + try: + response = self.session.get(url, params=params, timeout=10) + response.raise_for_status() + except requests.RequestException: + logger.exception("MusicBrainz API request failed") + raise + + return response.json() + + def search_releases(self, query: str, limit: int = 10) -> list[MusicBrainzResult]: + """ + Search for music releases (albums). + + Args: + query: The search query + limit: Maximum number of results + + Returns: + List of MusicBrainzResult objects + """ + if not query or len(query) < MIN_QUERY_LENGTH: + return [] + + data = self._request("release", {"query": query, "limit": limit}) + + return [ + MusicBrainzResult( + mbid=release.get("id", ""), + title=release.get("title", ""), + artists=_extract_artists(release), + year=_extract_year(release.get("date", "")), + country=release.get("country"), + label=_extract_label(release.get("label-info", [])), + ) + for release in data.get("releases", []) + ] + + def get_release_details(self, mbid: str) -> dict: + """ + Get detailed information about a release (album). + + Args: + mbid: The MusicBrainz release ID + + Returns a dict with: + - title, year, overview + - artists: list of artist names + - genres: list of genre/tag names + - cover_url: full URL for cover image + - musicbrainz_url: URL to MusicBrainz page + - media_type: "music" + """ + data = self._request(f"release/{mbid}", {"inc": "artists+labels+tags+genres+release-groups"}) + + artists = _extract_artists(data) + year = _extract_year(data.get("date", "")) + genres = _extract_genres_and_tags(data) + label = _extract_label(data.get("label-info", [])) + + # Build overview/description + overview_parts = [] + if label: + overview_parts.append(f"Label: {label}") + if country := data.get("country"): + overview_parts.append(f"Country: {country}") + if primary_type := data.get("release-group", {}).get("primary-type"): + overview_parts.append(f"Type: {primary_type}") + + return { + "title": data.get("title", ""), + "year": year, + "overview": " | ".join(overview_parts) if overview_parts else "", + "artists": artists, + "contributors": artists, + "genres": genres, + "cover_url": f"{COVERART_BASE_URL}release/{mbid}/front-500", + "musicbrainz_url": f"https://musicbrainz.org/release/{mbid}", + "media_type": "music", + } + + def check_cover_exists(self, mbid: str) -> bool: + """ + Check if a cover exists in the Cover Art Archive. + + Args: + mbid: The MusicBrainz release ID + + Returns: + True if cover exists, False otherwise + """ + try: + response = self.session.head( + f"{COVERART_BASE_URL}release/{mbid}/front", + timeout=5, + allow_redirects=True, + ) + except requests.RequestException: + return False + return response.status_code == HTTPStatus.OK + + def download_cover(self, cover_url: str) -> bytes | None: + """Download cover image and return bytes.""" + if not cover_url: + return None + + # Basic validation - ensure it's from Cover Art Archive + if not COVERART_PATTERN.match(cover_url): + logger.warning("Invalid Cover Art Archive URL: %s", cover_url) + return None + + try: + response = self.session.get(cover_url, timeout=15, allow_redirects=True) + # Cover Art Archive returns 404 if no cover exists + if response.status_code == HTTPStatus.NOT_FOUND: + logger.info("No cover art available for: %s", cover_url) + return None + response.raise_for_status() + except requests.RequestException: + logger.exception("Failed to download cover from %s", cover_url) + return None + + # Check minimum size + if len(response.content) < MIN_COVER_SIZE_BYTES: + logger.warning("Cover too small (placeholder?): %s", cover_url) + return None + + return response.content + + +def get_musicbrainz_client() -> MusicBrainzClient: + """ + Factory function to get a MusicBrainz client instance. + + MusicBrainz doesn't require authentication, so this always returns a client. + """ + return MusicBrainzClient() diff --git a/src/core/urls.py b/src/core/urls.py index e025734..b49f6cd 100644 --- a/src/core/urls.py +++ b/src/core/urls.py @@ -18,6 +18,7 @@ path("tmdb-search/", views.tmdb_search_htmx, name="tmdb_search_htmx"), path("igdb-search/", views.igdb_search_htmx, name="igdb_search_htmx"), path("openlibrary-search/", views.openlibrary_search_htmx, name="openlibrary_search_htmx"), + path("musicbrainz-search/", views.musicbrainz_search_htmx, name="musicbrainz_search_htmx"), path("media/validate_field/", validate_media_field, name="media_validate_field"), path("media//review-full/", views.media_review_full_htmx, name="media_review_full_htmx"), path("media//review-clamped/", views.media_review_clamped_htmx, name="media_review_clamped_htmx"), diff --git a/src/core/views.py b/src/core/views.py index bd115cf..45b8525 100644 --- a/src/core/views.py +++ b/src/core/views.py @@ -20,6 +20,7 @@ from .models import Agent, Media, SavedView, Tag from .queries import build_media_context from .services.igdb import get_igdb_client +from .services.musicbrainz import get_musicbrainz_client from .services.openlibrary import get_openlibrary_client from .services.tmdb import get_tmdb_client from .utils import create_backup, delete_orphan_agents_by_ids @@ -147,6 +148,9 @@ def _download_cover(cover_url: str) -> bytes | None: elif "covers.openlibrary.org" in cover_url: client = get_openlibrary_client() return client.download_cover(cover_url) + elif "coverartarchive.org" in cover_url: + client = get_musicbrainz_client() + return client.download_cover(cover_url) return None @@ -163,12 +167,18 @@ def _build_import_initial_data(import_data: dict, media=None) -> dict: media_type = "GAME" elif source_media_type == "book": media_type = "BOOK" + elif source_media_type == "music": + media_type = "MUSIC" else: media_type = "" # Determine external URI external_uri = ( - import_data.get("tmdb_url") or import_data.get("igdb_url") or import_data.get("openlibrary_url") or "" + import_data.get("tmdb_url") + or import_data.get("igdb_url") + or import_data.get("openlibrary_url") + or import_data.get("musicbrainz_url") + or "" ) initial_data = { @@ -186,6 +196,28 @@ def _build_import_initial_data(import_data: dict, media=None) -> dict: return initial_data +def _get_import_data_from_request(request) -> dict | None: + """Fetch import data based on request parameters.""" + tmdb_id = request.GET.get("tmdb_id") + media_type = request.GET.get("media_type") + lang = request.GET.get("lang", DEFAULT_TMDB_LANGUAGE) + igdb_id = request.GET.get("igdb_id") + openlibrary_key = request.GET.get("openlibrary_key") + musicbrainz_id = request.GET.get("musicbrainz_id") + + if tmdb_id and media_type in ("movie", "tv"): + return _fetch_tmdb_data(tmdb_id, media_type, language=lang) + if igdb_id: + return _fetch_igdb_data(igdb_id) + if openlibrary_key: + openlibrary_year = request.GET.get("year") + year = int(openlibrary_year) if openlibrary_year and openlibrary_year.isdigit() else None + return _fetch_openlibrary_data(openlibrary_key, year=year) + if musicbrainz_id: + return _fetch_musicbrainz_data(musicbrainz_id) + return None + + @login_required def media_edit(request, pk=None): media = get_object_or_404(Media, pk=pk) if pk else None @@ -219,23 +251,7 @@ def media_edit(request, pk=None): messages.success(request, _(msg_key) % {"title": instance.title}) return redirect("media_detail", pk=instance.pk) else: - # Check for import parameters from different sources - tmdb_id = request.GET.get("tmdb_id") - media_type = request.GET.get("media_type") - lang = request.GET.get("lang", DEFAULT_TMDB_LANGUAGE) - igdb_id = request.GET.get("igdb_id") - openlibrary_key = request.GET.get("openlibrary_key") - - if tmdb_id and media_type in ("movie", "tv"): - import_data = _fetch_tmdb_data(tmdb_id, media_type, language=lang) - elif igdb_id: - import_data = _fetch_igdb_data(igdb_id) - elif openlibrary_key: - # Get year from search results if available - openlibrary_year = request.GET.get("year") - year = int(openlibrary_year) if openlibrary_year and openlibrary_year.isdigit() else None - import_data = _fetch_openlibrary_data(openlibrary_key, year=year) - + import_data = _get_import_data_from_request(request) if import_data: initial_data = _build_import_initial_data(import_data, media) form = MediaForm(initial=initial_data, instance=media) @@ -310,6 +326,19 @@ def _fetch_openlibrary_data(work_key: str, year: int | None = None) -> dict | No return details +def _fetch_musicbrainz_data(mbid: str) -> dict | None: + """Fetch MusicBrainz data for pre-filling the form.""" + client = get_musicbrainz_client() + + try: + details = client.get_release_details(mbid) + except requests.RequestException: + logger.exception("Failed to fetch MusicBrainz data for release %s", mbid) + return None + + return details + + @login_required def media_import(request): """Display TMDB search page for importing media.""" @@ -478,6 +507,32 @@ def openlibrary_search_htmx(request): return render(request, "partials/openlibrary/openlibrary_suggestions.html", {**base_context, "results": results}) +@login_required +def musicbrainz_search_htmx(request): + """HTMX view: search MusicBrainz for music albums.""" + query = request.GET.get("q", "").strip() + media_id = request.GET.get("media_id") + + base_context = {"results": [], "media_id": media_id, "query": query} + + if len(query) < MIN_SEARCH_QUERY_LENGTH: + return render(request, "partials/musicbrainz/musicbrainz_suggestions.html", base_context) + + client = get_musicbrainz_client() + + try: + results = client.search_releases(query, limit=MAX_SEARCH_RESULTS) + except requests.RequestException: + logger.exception("MusicBrainz search failed") + return render( + request, + "partials/musicbrainz/musicbrainz_suggestions.html", + {**base_context, "error": "Search failed"}, + ) + + return render(request, "partials/musicbrainz/musicbrainz_suggestions.html", {**base_context, "results": results}) + + @login_required def media_review_clamped_htmx(request, pk): """HTMX view: return clamped review for a media item (for table cell collapse).""" diff --git a/src/templates/base/media_import.html b/src/templates/base/media_import.html index 9e3544d..16ddbd8 100644 --- a/src/templates/base/media_import.html +++ b/src/templates/base/media_import.html @@ -20,19 +20,25 @@

{% translate "Import metadata" %}

aria-label="{% translate 'Movies & TV' %}" id="tab-tmdb" checked - hx-on:click="document.getElementById('source-tmdb').classList.remove('hidden'); document.getElementById('source-igdb').classList.add('hidden'); document.getElementById('source-openlibrary').classList.add('hidden'); document.getElementById('import-results').innerHTML='';"> + hx-on:click="document.getElementById('source-tmdb').classList.remove('hidden'); document.getElementById('source-igdb').classList.add('hidden'); document.getElementById('source-openlibrary').classList.add('hidden'); document.getElementById('source-musicbrainz').classList.add('hidden'); document.getElementById('import-results').innerHTML='';"> + hx-on:click="document.getElementById('source-igdb').classList.remove('hidden'); document.getElementById('source-tmdb').classList.add('hidden'); document.getElementById('source-openlibrary').classList.add('hidden'); document.getElementById('source-musicbrainz').classList.add('hidden'); document.getElementById('import-results').innerHTML='';"> + hx-on:click="document.getElementById('source-openlibrary').classList.remove('hidden'); document.getElementById('source-tmdb').classList.add('hidden'); document.getElementById('source-igdb').classList.add('hidden'); document.getElementById('source-musicbrainz').classList.add('hidden'); document.getElementById('import-results').innerHTML='';"> + {# TMDB Search (Movies & TV) #}
@@ -132,6 +138,35 @@

{% translate "Import metadata" %}

+ {# MusicBrainz Search (Music) #} + {# Loading indicator #}
diff --git a/src/templates/partials/musicbrainz/musicbrainz_suggestions.html b/src/templates/partials/musicbrainz/musicbrainz_suggestions.html new file mode 100644 index 0000000..41fe31f --- /dev/null +++ b/src/templates/partials/musicbrainz/musicbrainz_suggestions.html @@ -0,0 +1,55 @@ +{% load i18n %} +{% if error %} +
+ {% lucide "circle-alert" class="w-5 h-5" %} + {{ error }} +
+{% elif results %} + +{% elif query %} +
+ {% lucide "search-x" class="w-5 h-5" %} + {% translate "No results found" %} +
+{% endif %}