Skip to content

Commit be4ae06

Browse files
committed
Create 'heavy' queue for large conversion tasks
1 parent f912531 commit be4ae06

7 files changed

Lines changed: 147 additions & 21 deletions

File tree

puppet/backend.pp

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@
44
$redis_host = 'video-redis-buster.video.eqiad1.wikimedia.cloud'
55
$http_host = 'v2c.wmflabs.org'
66

7+
# Configure encoding05 and encoding06 to use 1 worker and listen to the heavy
8+
# queue so that resource intensive tasks don't bring down workers.
9+
if $facts['hostname'] in ['encoding05', 'encoding06'] {
10+
$celeryd_concurrency = '1'
11+
$celeryd_queues = 'heavy,celery'
12+
} else {
13+
$celeryd_concurrency = '2'
14+
$celeryd_queues = 'celery'
15+
}
16+
717
## BASIC INSTANCE SETUP
818

919
# include role::labs::lvm::srv
@@ -211,19 +221,19 @@
211221
require => File['/etc/systemd/system/cron.service.d'],
212222
}
213223

214-
$celeryd_config = '# THIS FILE IS MANAGED BY MANUAL PUPPET
224+
$celeryd_config = "# THIS FILE IS MANAGED BY MANUAL PUPPET
215225
CELERYD_NODES=1
216-
CELERYD_OPTS="--concurrency=2"
217-
CELERY_BIN="/srv/v2c/venv/bin/celery"
218-
CELERY_APP="video2commons.backend.worker"
219-
CELERYD_MULTI="multi"
220-
CELERYD_LOG_LEVEL="INFO"
221-
CELERYD_LOG_FILE="/var/log/v2ccelery/%N%I.log"
222-
CELERYD_PID_FILE="/var/run/v2ccelery/%N.pid"
223-
CELERYD_USER="tools.video2commons"
224-
CELERYD_GROUP="tools.video2commons"
226+
CELERYD_OPTS=\"--concurrency=${celeryd_concurrency} -Q ${celeryd_queues}\"
227+
CELERY_BIN=\"/srv/v2c/venv/bin/celery\"
228+
CELERY_APP=\"video2commons.backend.worker\"
229+
CELERYD_MULTI=\"multi\"
230+
CELERYD_LOG_LEVEL=\"INFO\"
231+
CELERYD_LOG_FILE=\"/var/log/v2ccelery/%N%I.log\"
232+
CELERYD_PID_FILE=\"/var/run/v2ccelery/%N.pid\"
233+
CELERYD_USER=\"tools.video2commons\"
234+
CELERYD_GROUP=\"tools.video2commons\"
225235
CELERY_CREATE_DIRS=1
226-
'
236+
"
227237

228238
file { '/etc/default/v2ccelery':
229239
ensure => file,

video2commons/backend/worker.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import celery
2626
from celery.contrib.abortable import AbortableTask
2727
from celery.exceptions import Ignore
28+
from kombu import Queue
2829
from redis import Redis
2930
import pywikibot
3031

@@ -49,6 +50,10 @@
4950

5051
app.conf.accept_content = ["json"]
5152
app.conf.worker_prefetch_multiplier = 1
53+
app.conf.task_queues = [
54+
Queue("celery"),
55+
Queue("heavy"),
56+
]
5257

5358
redisconnection = Redis(host=redis_host, db=3, password=redis_pw)
5459

video2commons/frontend/api.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"""video2commons web API."""
2121

2222
import json
23+
import os
2324
import traceback
2425
import re
2526

@@ -45,6 +46,9 @@
4546
do_validate_filename,
4647
do_validate_filedesc,
4748
sanitize,
49+
predict_task_type_ffprobe,
50+
DEFAULT_QUEUE,
51+
HEAVY_QUEUE,
4852
)
4953
from video2commons.frontend.upload import upload as _upload, status as _uploadstatus
5054
from video2commons.shared import stats
@@ -56,6 +60,16 @@
5660
r"(watch\?.*?(?=v=)v=|embed/|v/|.+\?v=)?([^&=%\?]{11})"
5761
)
5862

63+
FILEKEY_REGEX = re.compile(
64+
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
65+
)
66+
67+
UPLOADS_DIR = os.path.join(
68+
os.path.dirname(os.path.realpath(__file__)), "static/uploads"
69+
)
70+
71+
VALID_QUEUES = {DEFAULT_QUEUE, HEAVY_QUEUE}
72+
5973
api = Blueprint("api", __name__)
6074

6175

@@ -426,6 +440,22 @@ def run_task():
426440
downloadkey, convertkey = get_backend_keys(request.form["format"])
427441
username = session["username"]
428442
oauth = (session["access_token_key"], session["access_token_secret"])
443+
queue = request.form.get("queue")
444+
445+
# If no queue is specified, such is the case with manually uploaded files
446+
# due to yt-dlp not being used and extracturl not being called, determine
447+
# which queue to process the file on based on file metadata such as bitrate
448+
# and resolution. We do this to avoid OOM errors with heavy files.
449+
if not queue:
450+
if url.startswith("uploads:"):
451+
filekey = url.split(":", 1)[1]
452+
if not FILEKEY_REGEX.match(filekey):
453+
return jsonify(error="Invalid file key format"), 400
454+
455+
filepath = os.path.join(UPLOADS_DIR, filekey)
456+
queue = predict_task_type_ffprobe(filepath)
457+
else:
458+
queue = DEFAULT_QUEUE
429459

430460
taskid = run_task_internal(
431461
filename,
@@ -440,17 +470,23 @@ def run_task():
440470
username,
441471
oauth,
442472
),
473+
queue,
443474
)
444475

445476
return jsonify(id=taskid, step="success")
446477

447478

448-
def run_task_internal(filename, params):
479+
def run_task_internal(filename, params, queue):
449480
"""Internal run task function to accept whatever params given."""
450481
banned = check_banned()
451482
assert not banned, "You are banned from using this tool! Reason: " + banned
452483

453-
res = worker.main.delay(*params)
484+
# Validate queue to prevent tasks being sent to non-existent queues.
485+
# Unfortunately Celery tries to be helpful and creates queues on demand.
486+
if queue not in VALID_QUEUES:
487+
queue = DEFAULT_QUEUE
488+
489+
res = worker.main.apply_async(args=params, queue=queue)
454490
taskid = res.id
455491

456492
expire = 14 * 24 * 3600 # 2 weeks
@@ -491,7 +527,9 @@ def restart_task():
491527
params = redisconnection.get("params:" + id)
492528
assert params, "Could not extract the task parameters."
493529

494-
newid = run_task_internal(filename, json.loads(params))
530+
# Always restart failed tasks on the heavy queue as a failsafe in case the
531+
# task failed earlier due to being misprioritized.
532+
newid = run_task_internal(filename, json.loads(params), HEAVY_QUEUE)
495533
redisconnection.set("restarted:" + id, newid)
496534

497535
redis_publish("update", {"taskid": id, "data": _status(id)})

video2commons/frontend/static/video2commons.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@
365365
"filedesc",
366366
"filename",
367367
"videos",
368+
"queue",
368369
])
369370
.then(() => {
370371
newTaskData.initialFilenameValidated = true;
@@ -1317,6 +1318,7 @@
13171318
filename: video.filename,
13181319
filedesc: filedesc,
13191320
format: video.format,
1321+
queue: video.queue,
13201322
};
13211323
});
13221324
} else {
@@ -1335,6 +1337,7 @@
13351337
filename: newTaskData.filename,
13361338
filedesc: filedesc,
13371339
format: newTaskData.format,
1340+
queue: newTaskData.queue,
13381341
});
13391342
}
13401343

video2commons/frontend/static/video2commons.min.js

Lines changed: 0 additions & 5 deletions
This file was deleted.

video2commons/frontend/urlextract.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
from video2commons.backend.encode.transcode import WebVideoTranscode
2424
from video2commons.config import tooldir, youtube_user, youtube_pass
2525

26+
import json
2627
import re
28+
import subprocess
2729
import emoji
2830
import guess_language
2931
import pywikibot
@@ -65,6 +67,78 @@
6567
# size supported by MediaWiki for uploads.
6668
MAX_FILENAME_SIZE = 228
6769

70+
# Task queue routing thresholds for the default and heavy queues. Both 4k and
71+
# 20 Mbps (in kbps) videos get routed to the heavy queue to avoid OOM errors.
72+
HEAVY_RESOLUTION_THRESHOLD = 3840
73+
HEAVY_BITRATE_THRESHOLD = 20000
74+
DEFAULT_QUEUE = "celery"
75+
HEAVY_QUEUE = "heavy"
76+
77+
78+
def predict_task_type(metadata):
79+
"""Predict how resource-intensive a task is based on yt-dlp metadata."""
80+
# Check if the resolution exceed the threshold for the standard queue.
81+
width = metadata.get("width") or 0
82+
height = metadata.get("height") or 0
83+
if width >= HEAVY_RESOLUTION_THRESHOLD or height >= HEAVY_RESOLUTION_THRESHOLD:
84+
return HEAVY_QUEUE
85+
86+
# Check the bitrate. TBR is total, VBR is video only (both in kbps).
87+
bitrate = metadata.get("tbr") or metadata.get("vbr") or 0
88+
if bitrate >= HEAVY_BITRATE_THRESHOLD:
89+
return HEAVY_QUEUE
90+
91+
# Default to heavy if no bitrate is available. This is a fallback for video
92+
# sources that don't have yt-dlp extractors such as NASA Scientific
93+
# Visualization Studio, which typically has large files.
94+
if metadata.get("tbr") is None and metadata.get("vbr") is None:
95+
return HEAVY_QUEUE
96+
97+
return DEFAULT_QUEUE
98+
99+
100+
def predict_task_type_ffprobe(filepath):
101+
"""Predict how resource-intensive a task is based on ffprobe metadata."""
102+
result = subprocess.run(
103+
[
104+
"ffprobe",
105+
"-v",
106+
"quiet",
107+
"-print_format",
108+
"json",
109+
"-show_streams",
110+
"-show_format",
111+
filepath,
112+
],
113+
capture_output=True,
114+
text=True,
115+
)
116+
117+
if result.returncode != 0:
118+
return DEFAULT_QUEUE
119+
120+
probe = json.loads(result.stdout)
121+
122+
# Check video streams for resolution.
123+
for stream in probe.get("streams", []):
124+
if stream.get("codec_type") == "video":
125+
width = stream.get("width") or 0
126+
height = stream.get("height") or 0
127+
if (
128+
width >= HEAVY_RESOLUTION_THRESHOLD
129+
or height >= HEAVY_RESOLUTION_THRESHOLD
130+
):
131+
return HEAVY_QUEUE
132+
133+
# Check bitrate from format (in bits/sec, convert to kbps).
134+
format_info = probe.get("format", {})
135+
bitrate_bps = int(format_info.get("bit_rate") or 0)
136+
bitrate_kbps = bitrate_bps / 1000
137+
if bitrate_kbps >= HEAVY_BITRATE_THRESHOLD:
138+
return HEAVY_QUEUE
139+
140+
return DEFAULT_QUEUE
141+
68142

69143
def make_dummy_desc(filename):
70144
filedesc = FILEDESC_TEMPLATE % {
@@ -158,6 +232,7 @@ def _extract_info(info):
158232
"filedesc": filedesc.strip(),
159233
"filename": sanitize(title),
160234
"date": _date(url, ie_key, title, info),
235+
"queue": predict_task_type(info),
161236
}
162237

163238

video2commons/shared/stats.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ def collect_worker_stats(conn, inspector):
4444

4545

4646
def get_queue_length(conn):
47-
"""Get the number of messages waiting in the broker queue."""
48-
return conn.llen("celery") + conn.hlen("unacked")
47+
"""Get the number of messages waiting in the broker queues."""
48+
return conn.llen("celery") + conn.llen("heavy") + conn.hlen("unacked")
4949

5050

5151
def update_task_stats(conn, task_id, remove=False):

0 commit comments

Comments
 (0)