Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
570 changes: 570 additions & 0 deletions common/lib/archive_metadata.py

Large diffs are not rendered by default.

109 changes: 50 additions & 59 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2305,77 +2305,41 @@ def get_media_from_children(self, item_ids=[]) -> dict:
:param list item_ids: A list of item IDs to limit the filename retrieval to.
returns dict: item_id as key and a list of tuples (child dataset key -> filename) as items
"""
children = self.get_children()
from common.lib.exceptions import MetadataException

children = self.get_children()
if not children:
return {}

media_map = {}

# Get children that are image/video downloaders
media_datasets = [
p for p in children
if ("video-downloader" in p.type or "image-downloader" in p.type) and p.data.get("num_rows", 0) > 0 and p.is_finished()
]

# Loop through media datasets and create a map of dataset key -> filenames
# Skip files that were downloaded multiple times
media_map = {}
seen_files = set()
item_ids = [str(i_id) for i_id in item_ids]

for media_dataset in media_datasets:
for item in media_dataset.iterate_items():

if item.file.name == ".metadata.json":
with item.file.open() as infile:
metadata = json.load(infile)

for url_key, item_metadata in metadata.items():
media_items = set()
post_ids = item_metadata.get("post_ids", []) # Required

if not post_ids:
continue

# Make sure we're matching and passing strings
post_ids = [str(p_id) for p_id in post_ids]
item_ids = [str(i_id) for i_id in item_ids]

# Skip items that are not in the requested item_ids
if item_ids and not any(p_id in item_ids for p_id in post_ids):
continue

# Single file (images usually format like this)
is_success = item_metadata.get("success", True)

if is_success and "filename" in item_metadata:
media_info = (media_dataset.key, item_metadata["filename"])
media_items.add(media_info)

# Multiple files (videos with the 'files' array)
if item_metadata.get("files"):
for file in item_metadata["files"]:
if file.get("success") and "filename" in file:
media_info = (media_dataset.key, file["filename"])
media_items.add(media_info)

if not media_items:
continue

# Append to post_id list
for post_id in post_ids:
if post_id not in media_map:
media_map[post_id] = []

for media_item in media_items:
if media_item not in media_map[post_id]:
# Don't add post_id -> filename couplings that we've already seen
media_ref = (post_id, media_item[1])
if media_ref not in seen_files:
media_map[post_id].append(media_item)
seen_files.add(media_ref)

# break after .metadata.json
break
try:
metadata = media_dataset.read_media_metadata()
except (FileNotFoundError, MetadataException):
continue

for filename, entry in metadata.iter_entries():
post_ids = [str(p_id) for p_id in entry.get("post_ids", [])]
if not post_ids:
continue
if item_ids and not any(p_id in item_ids for p_id in post_ids):
continue

media_info = (media_dataset.key, filename)
for post_id in post_ids:
media_ref = (post_id, filename)
if media_ref in seen_files:
continue
seen_files.add(media_ref)
media_map.setdefault(post_id, []).append(media_info)

return media_map

Expand All @@ -2395,6 +2359,33 @@ def get_metadata(self):
metadata["current_4CAT_version"] = get_software_version()
return metadata

def read_media_metadata(self, filename=".metadata.json"):
"""
Load this dataset's media-archive `.metadata.json`.

Returns a `MediaArchiveMetadata` instance. Legacy formats are
normalized on load so callers always see the current schema.

Raises `FileNotFoundError` if no metadata file is present and
`MetadataException` if the dataset is unfinished or the file is
malformed.
"""
from common.lib.archive_metadata import MediaArchiveMetadata
return MediaArchiveMetadata.read(self, filename=filename)

def new_media_metadata(self, from_dataset, processor_type, filename=".metadata.json"):
"""
Empty `MediaArchiveMetadata` container for a processor producing a
media archive for this dataset.

:param str from_dataset: key of the *source* dataset whose posts are
being downloaded from (this is generally not the same as `self`).
:param str processor_type: type of the processor, used for metadata.
"""
from common.lib.archive_metadata import MediaArchiveMetadata
return MediaArchiveMetadata.new(self, processor_type=processor_type,
from_dataset=from_dataset, filename=filename)

def get_result_url(self):
"""
Gets the 4CAT frontend URL of a dataset file.
Expand Down
6 changes: 6 additions & 0 deletions common/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,9 @@ class MediaSignatureException(FourcatException):
Raise in media processors when the media cannot be read
"""
pass

class MetadataException(FourcatException):
"""
Raise when there is an issue with metadata
"""
pass
160 changes: 160 additions & 0 deletions common/lib/media_archive_library.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""
Look up media that previous downloader runs already fetched.

When a downloader (currently the video downloader) runs, other downloaders of
the same kind may have already fetched some of the same URLs from the same
source data. `MediaArchiveLibrary` aggregates the `MediaArchiveMetadata` of
those previous runs
"""
from common.lib.exceptions import MetadataException, DataSetException


class MediaLibraryHit:
"""
Result of a `MediaArchiveLibrary.find()` lookup.

A success hit carries the archive the files live in (`metadata`, whose
`.dataset` locates the zip) and the matching `(filename, item)` entries.
A failure hit carries the set of failure `reasons` seen for the URL
across all previous archives; the consumer decides what they mean.
"""

def __init__(self, is_success, metadata=None, entries=None, reasons=None):
self.is_success = is_success
self.metadata = metadata
self.entries = entries or []
self.reasons = reasons or set()


class MediaArchiveLibrary:
"""
Aggregate of `MediaArchiveMetadata` from previous downloader datasets.

Construct via `collect()` inside a processor; the bare constructor takes
metadata objects directly and is intended for testing.
"""

def __init__(self, metadata_objects, current_dataset=None):
self.metadata_objects = list(metadata_objects)
self.current_dataset = current_dataset
self._url_index = None

@classmethod
def collect(cls, current_dataset, modules, compatible_types):
"""
Build a library from finished downloader datasets that share
`current_dataset`'s source data.

:param current_dataset: the dataset being produced now (excluded
from the result).
:param modules: module registry, used to resolve the original
dataset of a filtered set.
:param list compatible_types: processor types whose archives count,
e.g. `["video-downloader"]`.
"""
datasets = cls._collect_previous_downloaders(current_dataset, modules, compatible_types)
metadata_objects = []
for dataset in datasets:
try:
metadata_objects.append(dataset.read_media_metadata())
except (FileNotFoundError, MetadataException):
# no metadata, or unfinished/malformed — nothing to reuse
continue

if current_dataset is not None:
current_dataset.log(
f"Media library: {len(metadata_objects)} previous "
f"{'/'.join(compatible_types)} archive(s) available for reuse"
)
return cls(metadata_objects, current_dataset=current_dataset)

@staticmethod
def _collect_previous_downloaders(current_dataset, modules, compatible_types):
"""
Sibling datasets of a compatible processor type, plus — if the
current dataset's parent is a filtered copy — the downloaders of the
dataset it was copied from. Excludes the current dataset itself.
"""
from common.lib.dataset import DataSet

# kids from the parent
parent_dataset = current_dataset.get_parent()
downloaders = [
child for child in parent_dataset.get_children()
if child.type in compatible_types and child.key != current_dataset.key
]

# kids from the original (if filtered dataset)
if "copied_from" in parent_dataset.parameters and parent_dataset.is_top_dataset():
try:
original = DataSet(key=parent_dataset.parameters["copied_from"],
db=current_dataset.db, modules=modules)
downloaders += [
child for child in original.top_parent().get_children()
if child.type in compatible_types and child.key != current_dataset.key
]
except DataSetException:
# the original dataset no longer exists
pass

return downloaders

def _build_index(self) -> dict:
"""
`{url: {"items": [(metadata, filename, item), ...],
"failures": [(metadata, failure), ...]}}`

A URL can map to entries from several archives (downloaded more than
once) and to several files within one archive (e.g. a playlist).
"""
index = {}
for metadata in self.metadata_objects:
for filename, item in metadata.iter_entries():
url = item.get("url")
if not url:
continue
index.setdefault(url, {"items": [], "failures": []})
index[url]["items"].append((metadata, filename, item))
for failure in metadata.iter_failures():
url = failure.get("url")
if not url:
continue
index.setdefault(url, {"items": [], "failures": []})
index[url]["failures"].append((metadata, failure))
return index

@property
def url_index(self) -> dict:
if self._url_index is None:
self._url_index = self._build_index()
return self._url_index

def find(self, url: str):
"""
Look up a URL across all previous downloader archives.

Returns a `MediaLibraryHit` — a success hit if any archive
downloaded it, otherwise a failure hit carrying every failure
`reason` seen — or `None` if the URL was never seen.
"""
bucket = self.url_index.get(url)
if not bucket:
return None

if bucket["items"]:
# success beats failure; take the first archive that has the URL
# and copy all of its files for that URL (one URL may yield many,
# e.g. a playlist)
first_metadata = bucket["items"][0][0]
entries = [(filename, item) for metadata, filename, item in bucket["items"]
if metadata is first_metadata]
return MediaLibraryHit(is_success=True, metadata=first_metadata, entries=entries)

# pass all the failures (e.g. in case one is "error", but later is "not_a_video", or
# something else the consumer wants to interpret)
reasons = {failure.get("reason") for _, failure in bucket["failures"]
if failure.get("reason")}
return MediaLibraryHit(is_success=False, reasons=reasons)

def __len__(self) -> int:
return len(self.metadata_objects)
40 changes: 36 additions & 4 deletions processors/audio/audio_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import oslex

from backend.lib.processor import BasicProcessor
from common.lib.exceptions import ProcessorInterruptedException
from common.lib.exceptions import ProcessorInterruptedException, MetadataException

__author__ = "Dale Wahl"
__credits__ = ["Dale Wahl"]
Expand Down Expand Up @@ -89,6 +89,20 @@ def process(self):
if max_files != 0:
total_possible_videos = min(total_possible_videos, max_files)

# Read the source video archive's metadata so each extracted audio
# file can carry its video's provenance (source posts, URL).
try:
source_metadata = self.source_dataset.read_media_metadata()
except (FileNotFoundError, MetadataException):
source_metadata = None

# Build our own metadata describing the audio files we produce,
# rather than passing the (video-keyed) source metadata through.
metadata = self.dataset.new_media_metadata(
processor_type=self.type,
from_dataset=(source_metadata.from_dataset if source_metadata else self.source_dataset.key),
)

processed_videos = 0
written = 0

Expand All @@ -97,9 +111,9 @@ def process(self):
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while determining image wall order")

# Check for 4CAT's metadata JSON and copy it
# the source archive's metadata describes videos; we write our
# own (audio-keyed) metadata below, so skip the original
if item.file.name == '.metadata.json':
shutil.copy(item.file, output_dir.joinpath(".metadata.json"))
continue

if max_files != 0 and processed_videos >= max_files:
Expand All @@ -123,9 +137,23 @@ def process(self):
ffmpeg_output = result.stdout.decode("utf-8")
ffmpeg_error = result.stderr.decode("utf-8")

audio_file = output_dir.joinpath(f"{vid_name}.wav")
audio_filename = f"{vid_name}.wav"
audio_file = output_dir.joinpath(audio_filename)

# carry the source video's provenance onto the extracted audio
video_item = source_metadata.get_entry(item.file.name) if source_metadata else None
post_ids = video_item.get("post_ids", []) if video_item else []
source_url = video_item.get("url") if video_item else None
extra = dict(video_item.get("extra") or {}) if video_item else {}

if audio_file.exists():
written += 1
metadata.add_item(audio_filename, post_ids=post_ids, url=source_url,
extra=extra, replace=True)
else:
metadata.add_failure(post_ids=post_ids, reason="extraction_failed",
reason_description=f"ffmpeg exited with code {result.returncode}",
url=source_url)

if ffmpeg_output:
with open(str(output_dir.joinpath(f"{vid_name}_stdout.log")), 'w', encoding="utf-8") as outfile:
Expand All @@ -143,6 +171,10 @@ def process(self):
self.dataset.update_status(f"Extracted audio from {written} of {processed_videos} attempted videos")
self.dataset.update_progress(min(1, processed_videos / max(total_possible_videos, 1)))

# Write our own metadata describing the extracted audio files (and
# any extraction failures), keyed by audio filename.
metadata.write(output_dir)

# Finish up
warning = f"Extracted {written}/{processed_videos} audio files, check the logs for errors." \
if written < processed_videos else None
Expand Down
Loading
Loading