Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions puppet/backend.pp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
$redis_host = 'video-redis-buster.video.eqiad1.wikimedia.cloud'
$http_host = 'v2c.wmflabs.org'

# Configure encoding05 and encoding06 to use 1 worker and listen to the heavy
# queue so that resource intensive tasks don't bring down workers.
if $facts['hostname'] in ['encoding05', 'encoding06'] {
$celeryd_concurrency = '1'
$celeryd_queues = 'heavy,celery'
} else {
$celeryd_concurrency = '2'
$celeryd_queues = 'celery'
}

## BASIC INSTANCE SETUP

# include role::labs::lvm::srv
Expand Down Expand Up @@ -211,19 +221,19 @@
require => File['/etc/systemd/system/cron.service.d'],
}

$celeryd_config = '# THIS FILE IS MANAGED BY MANUAL PUPPET
$celeryd_config = "# THIS FILE IS MANAGED BY MANUAL PUPPET
CELERYD_NODES=1
CELERYD_OPTS="--concurrency=2"
CELERY_BIN="/srv/v2c/venv/bin/celery"
CELERY_APP="video2commons.backend.worker"
CELERYD_MULTI="multi"
CELERYD_LOG_LEVEL="INFO"
CELERYD_LOG_FILE="/var/log/v2ccelery/%N%I.log"
CELERYD_PID_FILE="/var/run/v2ccelery/%N.pid"
CELERYD_USER="tools.video2commons"
CELERYD_GROUP="tools.video2commons"
CELERYD_OPTS=\"--concurrency=${celeryd_concurrency} -Q ${celeryd_queues}\"
CELERY_BIN=\"/srv/v2c/venv/bin/celery\"
CELERY_APP=\"video2commons.backend.worker\"
CELERYD_MULTI=\"multi\"
CELERYD_LOG_LEVEL=\"INFO\"
CELERYD_LOG_FILE=\"/var/log/v2ccelery/%N%I.log\"
CELERYD_PID_FILE=\"/var/run/v2ccelery/%N.pid\"
CELERYD_USER=\"tools.video2commons\"
CELERYD_GROUP=\"tools.video2commons\"
CELERY_CREATE_DIRS=1
'
"

file { '/etc/default/v2ccelery':
ensure => file,
Expand Down
5 changes: 5 additions & 0 deletions video2commons/backend/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import celery
from celery.contrib.abortable import AbortableTask
from celery.exceptions import Ignore
from kombu import Queue
from redis import Redis
import pywikibot

Expand All @@ -49,6 +50,10 @@

app.conf.accept_content = ["json"]
app.conf.worker_prefetch_multiplier = 1
app.conf.task_queues = [
Queue("celery"),
Queue("heavy"),
]

redisconnection = Redis(host=redis_host, db=3, password=redis_pw)

Expand Down
44 changes: 41 additions & 3 deletions video2commons/frontend/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""video2commons web API."""

import json
import os
import traceback
import re

Expand All @@ -45,6 +46,9 @@
do_validate_filename,
do_validate_filedesc,
sanitize,
predict_task_type_ffprobe,
DEFAULT_QUEUE,
HEAVY_QUEUE,
)
from video2commons.frontend.upload import upload as _upload, status as _uploadstatus
from video2commons.shared import stats
Expand All @@ -56,6 +60,16 @@
r"(watch\?.*?(?=v=)v=|embed/|v/|.+\?v=)?([^&=%\?]{11})"
)

FILEKEY_REGEX = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
)

UPLOADS_DIR = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "static/uploads"
)

VALID_QUEUES = {DEFAULT_QUEUE, HEAVY_QUEUE}

api = Blueprint("api", __name__)


Expand Down Expand Up @@ -426,6 +440,22 @@ def run_task():
downloadkey, convertkey = get_backend_keys(request.form["format"])
username = session["username"]
oauth = (session["access_token_key"], session["access_token_secret"])
queue = request.form.get("queue")

# If no queue is specified, such is the case with manually uploaded files
# due to yt-dlp not being used and extracturl not being called, determine
# which queue to process the file on based on file metadata such as bitrate
# and resolution. We do this to avoid OOM errors with heavy files.
if not queue:
if url.startswith("uploads:"):
filekey = url.split(":", 1)[1]
if not FILEKEY_REGEX.match(filekey):
return jsonify(error="Invalid file key format"), 400

filepath = os.path.join(UPLOADS_DIR, filekey)
queue = predict_task_type_ffprobe(filepath)
else:
queue = DEFAULT_QUEUE

taskid = run_task_internal(
filename,
Expand All @@ -440,17 +470,23 @@ def run_task():
username,
oauth,
),
queue,
)

return jsonify(id=taskid, step="success")


def run_task_internal(filename, params):
def run_task_internal(filename, params, queue):
"""Internal run task function to accept whatever params given."""
banned = check_banned()
assert not banned, "You are banned from using this tool! Reason: " + banned

res = worker.main.delay(*params)
# Validate queue to prevent tasks being sent to non-existent queues.
# Unfortunately Celery tries to be helpful and creates queues on demand.
if queue not in VALID_QUEUES:
queue = DEFAULT_QUEUE

res = worker.main.apply_async(args=params, queue=queue)
taskid = res.id

expire = 14 * 24 * 3600 # 2 weeks
Expand Down Expand Up @@ -491,7 +527,9 @@ def restart_task():
params = redisconnection.get("params:" + id)
assert params, "Could not extract the task parameters."

newid = run_task_internal(filename, json.loads(params))
# Always restart failed tasks on the heavy queue as a failsafe in case the
# task failed earlier due to being misprioritized.
newid = run_task_internal(filename, json.loads(params), HEAVY_QUEUE)
redisconnection.set("restarted:" + id, newid)

redis_publish("update", {"taskid": id, "data": _status(id)})
Expand Down
3 changes: 3 additions & 0 deletions video2commons/frontend/static/video2commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@
"filedesc",
"filename",
"videos",
"queue",
])
.then(() => {
newTaskData.initialFilenameValidated = true;
Expand Down Expand Up @@ -1317,6 +1318,7 @@
filename: video.filename,
filedesc: filedesc,
format: video.format,
queue: video.queue,
};
});
} else {
Expand All @@ -1335,6 +1337,7 @@
filename: newTaskData.filename,
filedesc: filedesc,
format: newTaskData.format,
queue: newTaskData.queue,
});
}

Expand Down
6 changes: 3 additions & 3 deletions video2commons/frontend/static/video2commons.min.js

Large diffs are not rendered by default.

75 changes: 75 additions & 0 deletions video2commons/frontend/urlextract.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from video2commons.backend.encode.transcode import WebVideoTranscode
from video2commons.config import tooldir, youtube_user, youtube_pass

import json
import re
import subprocess
import emoji
import guess_language
import pywikibot
Expand Down Expand Up @@ -65,6 +67,78 @@
# size supported by MediaWiki for uploads.
MAX_FILENAME_SIZE = 228

# Task queue routing thresholds for the default and heavy queues. Both 4k and
# 20 Mbps (in kbps) videos get routed to the heavy queue to avoid OOM errors.
HEAVY_RESOLUTION_THRESHOLD = 3840
HEAVY_BITRATE_THRESHOLD = 20000
DEFAULT_QUEUE = "celery"
HEAVY_QUEUE = "heavy"


def predict_task_type(metadata):
"""Predict how resource-intensive a task is based on yt-dlp metadata."""
# Check if the resolution exceed the threshold for the standard queue.
width = metadata.get("width") or 0
height = metadata.get("height") or 0
if width >= HEAVY_RESOLUTION_THRESHOLD or height >= HEAVY_RESOLUTION_THRESHOLD:
return HEAVY_QUEUE

# Check the bitrate. TBR is total, VBR is video only (both in kbps).
bitrate = metadata.get("tbr") or metadata.get("vbr") or 0
if bitrate >= HEAVY_BITRATE_THRESHOLD:
return HEAVY_QUEUE

# Default to heavy if no bitrate is available. This is a fallback for video
# sources that don't have yt-dlp extractors such as NASA Scientific
# Visualization Studio, which typically has large files.
if metadata.get("tbr") is None and metadata.get("vbr") is None:
return HEAVY_QUEUE

return DEFAULT_QUEUE


def predict_task_type_ffprobe(filepath):
"""Predict how resource-intensive a task is based on ffprobe metadata."""
result = subprocess.run(
[
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-show_format",
filepath,
],
capture_output=True,
text=True,
)

if result.returncode != 0:
return DEFAULT_QUEUE

probe = json.loads(result.stdout)

# Check video streams for resolution.
for stream in probe.get("streams", []):
if stream.get("codec_type") == "video":
width = stream.get("width") or 0
height = stream.get("height") or 0
if (
width >= HEAVY_RESOLUTION_THRESHOLD
or height >= HEAVY_RESOLUTION_THRESHOLD
):
return HEAVY_QUEUE

# Check bitrate from format (in bits/sec, convert to kbps).
format_info = probe.get("format", {})
bitrate_bps = int(format_info.get("bit_rate") or 0)
bitrate_kbps = bitrate_bps / 1000
if bitrate_kbps >= HEAVY_BITRATE_THRESHOLD:
return HEAVY_QUEUE

return DEFAULT_QUEUE


def make_dummy_desc(filename):
filedesc = FILEDESC_TEMPLATE % {
Expand Down Expand Up @@ -158,6 +232,7 @@ def _extract_info(info):
"filedesc": filedesc.strip(),
"filename": sanitize(title),
"date": _date(url, ie_key, title, info),
"queue": predict_task_type(info),
}


Expand Down
4 changes: 2 additions & 2 deletions video2commons/shared/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def collect_worker_stats(conn, inspector):


def get_queue_length(conn):
"""Get the number of messages waiting in the broker queue."""
return conn.llen("celery") + conn.hlen("unacked")
"""Get the number of messages waiting in the broker queues."""
return conn.llen("celery") + conn.llen("heavy") + conn.hlen("unacked")


def update_task_stats(conn, task_id, remove=False):
Expand Down