Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cd7644a
feat(dispatch): scaffold dispatch package with config and redis_keys
as535364 Apr 27, 2026
c4d3ccc
refactor(dispatch): rename runner_token_hash_key to runner_token_key
as535364 Apr 27, 2026
da5b200
feat(dispatch): add runner registration and liveness tracking
as535364 Apr 27, 2026
dec564f
chore(dispatch): address Task 2 code review nits
as535364 Apr 27, 2026
e6d0eca
feat(dispatch): add atomic Lua reclaim script for orphan jobs
as535364 Apr 27, 2026
9207e03
feat(dispatch): add job enqueue and claim from pending queue
as535364 Apr 27, 2026
ea17b82
feat(dispatch): add orphan reclaim and complete_job paths
as535364 Apr 27, 2026
7bc6ae3
refactor(dispatch): remove redundant Python-side script cache
as535364 Apr 27, 2026
ca0c9d2
feat(api): add runner API request schemas
as535364 Apr 27, 2026
c391d47
feat(api): add require_runner_token decorator
as535364 Apr 27, 2026
fa522b8
feat(api): add runner API blueprint with 4 endpoints
as535364 Apr 27, 2026
a42c954
feat(dispatch): mark Submission as JE when reclaim attempts exhausted
as535364 Apr 27, 2026
2ce7915
refactor(submission): submit() enqueues to Redis instead of POSTing t…
as535364 Apr 27, 2026
957d8da
refactor(submission): rejudge() enqueues to Redis; add Redis cleanup …
as535364 Apr 27, 2026
785ecc0
refactor(submission): remove push-based sandbox dispatch methods
as535364 Apr 27, 2026
baf8a66
refactor(api): remove old PUT /<submission>/complete callback endpoint
as535364 Apr 27, 2026
2ad54be
refactor(api): drop sandbox_instances handling from PUT /submission/c…
as535364 Apr 27, 2026
84e7202
refactor(engine): deprecate Sandbox EmbeddedDocument; keep field as u…
as535364 Apr 27, 2026
8213928
style: fix pre-existing yapf formatting in dispatch and tests
as535364 Apr 27, 2026
3a6de2f
refactor: migrate problem endpoints from SANDBOX_TOKEN to per-runner …
as535364 Apr 27, 2026
bf62532
test(integration): add end-to-end mock-runner integration tests
as535364 Apr 27, 2026
4ff3474
fix(submission): get_detailed_result uses task['cases'] not task.cases
as535364 Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def app():
(copycat_api, '/copycat'),
(health_api, '/health'),
(user_api, '/user'),
(runner_api, '/runners'),
]
for api, prefix in api2prefix:
app.register_blueprint(api, url_prefix=prefix)
Expand Down
4 changes: 4 additions & 0 deletions dispatch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Dispatch module: Redis-backed runner registry and job queue.

Public API re-exported from submodules.
"""
22 changes: 22 additions & 0 deletions dispatch/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Configuration for dispatch module — env vars and tuning constants."""
import os

# Shared secret used by runners to register with backend.
# In production, set via env var (RUNNER_REGISTRATION_TOKEN).
# Default value is for tests/dev only.
RUNNER_REGISTRATION_TOKEN: str = os.getenv(
"RUNNER_REGISTRATION_TOKEN",
"dev-only-registration-token-change-me",
)

# Heartbeat / lease parameters
HEARTBEAT_INTERVAL_SEC: int = 15
RUNNER_ALIVE_TTL_SEC: int = 30 # 2x heartbeat
POLL_INTERVAL_SEC: int = 3
MAX_CONCURRENT_JOBS_PER_RUNNER: int = 8

# Job retry policy
MAX_ATTEMPTS: int = 3 # 1 initial + 2 reclaims, then mark JE

# Presigned URL TTL for code download (seconds)
CODE_PRESIGNED_URL_TTL_SEC: int = 3600 # 1 hour
180 changes: 180 additions & 0 deletions dispatch/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""Job lifecycle: enqueue, claim (pending + orphan reclaim), complete."""
import json
from datetime import datetime, timezone
from typing import Optional

from ulid import ULID

from mongo.utils import RedisCache
from .config import MAX_ATTEMPTS
from .redis_keys import job_key, JOBS_PENDING, JOBS_LEASED
from .runner import is_alive
from .scripts import reclaim_orphan_atomic


def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()


def _build_tasks_meta(submission) -> list[dict]:
"""Extract testcase metadata from submission for runner."""
tasks = submission.problem.test_case_info.get("tasks", [])
return [{
"task_id": idx,
"case_count": t.get("caseCount", 0),
"memory_limit": t.get("memoryLimit", 0),
"time_limit": t.get("timeLimit", 0),
} for idx, t in enumerate(tasks)]


def enqueue_job(submission) -> str:
"""Create a Job from a Submission and push to pending queue. Returns jb_id."""
jb_id = f"jb_{ULID()}"
rds = RedisCache().client

rds.hset(job_key(jb_id),
mapping={
"submission_id": str(submission.id),
"problem_id": submission.problem_id,
"language": submission.language,
"code_minio_path": submission.code_minio_path,
"checker": 'print("not implement yet. qaq")',
"tasks_meta_json": json.dumps(_build_tasks_meta(submission)),
"attempts": 0,
"created_at": _now_iso(),
})
rds.lpush(JOBS_PENDING, jb_id)
return jb_id


def claim_next_job(rn_id: str) -> Optional[dict]:
"""Try to claim next job for this runner. Returns None if no job available.

Strategy:
1. Try pending queue (RPOP for FIFO; LPUSH+RPOP gives FIFO).
2. Scan leased jobs for orphans (owner not alive) and try to reclaim
one atomically.
3. If neither yields a job, return None.
"""
rds = RedisCache().client

# Step 1: pending queue
jb_id_bytes = rds.rpop(JOBS_PENDING)
if jb_id_bytes is not None:
jb_id = jb_id_bytes.decode()
_assign_to_runner(jb_id, rn_id)
return _build_payload(jb_id)

# Step 2: orphan reclaim — scan leased jobs for dead owners
leased_ids = rds.smembers(JOBS_LEASED)
for orphan_id_bytes in leased_ids:
orphan_id = orphan_id_bytes.decode()
owner_bytes = rds.hget(job_key(orphan_id), "leased_by")
if owner_bytes is None:
continue
owner = owner_bytes.decode()
if is_alive(owner):
continue
# Owner is dead. Try to atomically reclaim.
reclaim_result = reclaim_orphan_atomic(
jb_id=orphan_id,
expected_owner=owner,
new_owner=rn_id,
max_attempts=MAX_ATTEMPTS,
)
if reclaim_result == 1:
return _build_payload(orphan_id)
if reclaim_result == -1:
_on_attempts_exhausted(orphan_id)
# continue to look at other orphans
# 0 = lost the race, try next orphan
return None


def _assign_to_runner(jb_id: str, rn_id: str) -> None:
"""Mark job as leased to this runner (called after RPOP from pending)."""
rds = RedisCache().client
rds.hset(job_key(jb_id),
mapping={
"leased_by": rn_id,
"leased_at": _now_iso(),
})
rds.hincrby(job_key(jb_id), "attempts", 1)
rds.sadd(JOBS_LEASED, jb_id)


def _on_attempts_exhausted(jb_id: str) -> None:
"""When a job exhausts max_attempts, mark its Submission as Judge Error.

Lua script already removed jb_id from JOBS_LEASED set. This function
cleans the job hash and updates the Submission's status field.
"""
# Local import to avoid circular dependency at module load time.
from mongo import Submission

rds = RedisCache().client
submission_id_bytes = rds.hget(job_key(jb_id), "submission_id")
if submission_id_bytes is None:
rds.delete(job_key(jb_id))
return
submission_id = submission_id_bytes.decode()
try:
sub = Submission(submission_id)
if sub:
sub.update(status=6) # JE
except Exception:
pass
finally:
rds.delete(job_key(jb_id))


def _build_payload(jb_id: str) -> dict:
"""Build the payload returned to the runner via next-job endpoint.

Note: code_minio_path is returned as-is here. The blueprint layer is
responsible for converting it to a presigned URL before sending to runner.
"""
rds = RedisCache().client
h = rds.hgetall(job_key(jb_id))
# Decode bytes to str
h = {k.decode(): v.decode() for k, v in h.items()}
return {
"job_id": jb_id,
"submission_id": h["submission_id"],
"problem_id": int(h["problem_id"]),
"language": int(h["language"]),
"code_minio_path": h["code_minio_path"],
"checker": h["checker"],
"tasks": json.loads(h["tasks_meta_json"]),
}


def complete_job(
rn_id: str,
jb_id: str,
tasks: list,
process_result,
) -> str:
"""Process a completed job. Returns one of: 'ok', 'wrong_owner', 'not_found'.

Args:
rn_id: The runner claiming completion.
jb_id: The job id.
tasks: Result tasks array (passed through to process_result).
process_result: Callable(submission_id_str, tasks) -> None. Injected so
this module doesn't depend directly on mongo.Submission.
"""
rds = RedisCache().client
h = rds.hgetall(job_key(jb_id))
if not h:
return "not_found"
leased_by = h.get(b"leased_by")
if leased_by is None or leased_by.decode() != rn_id:
return "wrong_owner"

submission_id = h[b"submission_id"].decode()
process_result(submission_id, tasks)

rds.delete(job_key(jb_id))
rds.srem(JOBS_LEASED, jb_id)
return "ok"
26 changes: 26 additions & 0 deletions dispatch/redis_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Centralized Redis key naming. All Redis keys used by dispatch live here."""

# Runner namespace
RUNNERS_REGISTERED = "runners:registered"


def runner_meta_key(rn_id: str) -> str:
return f"runner:{rn_id}:meta"


def runner_token_key(rn_id: str) -> str:
"""STRING key holding SHA-256 digest of the runner's bearer token."""
return f"runner:{rn_id}:token_hash"


def runner_alive_key(rn_id: str) -> str:
return f"runner:{rn_id}:alive"


# Job namespace
JOBS_PENDING = "jobs:pending"
JOBS_LEASED = "jobs:leased"


def job_key(jb_id: str) -> str:
return f"job:{jb_id}"
89 changes: 89 additions & 0 deletions dispatch/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Runner lifecycle: registration, token verification, liveness tracking."""
import hashlib
import secrets
from datetime import datetime, timezone

from ulid import ULID

from mongo.utils import RedisCache
from .config import RUNNER_ALIVE_TTL_SEC
from .redis_keys import (
RUNNERS_REGISTERED,
runner_alive_key,
runner_meta_key,
runner_token_key,
)


def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()


def _hash_token(rk_token: str) -> str:
return hashlib.sha256(rk_token.encode()).hexdigest()


def register(name: str, registration_ip: str) -> tuple[str, str]:
"""Register a new runner. Returns (rn_id, rk_token).

rk_token is returned in plaintext only here — backend stores only the hash.
"""
rn_id = f"rn_{ULID()}"
rk_token = f"rk_{secrets.token_urlsafe(32)}"
rds = RedisCache().client

rds.hset(
runner_meta_key(rn_id),
mapping={
"name": name,
"registered_at": _now_iso(),
"registration_ip": registration_ip,
},
)
rds.set(runner_token_key(rn_id), _hash_token(rk_token))
rds.sadd(RUNNERS_REGISTERED, rn_id)
rds.set(runner_alive_key(rn_id), "1", ex=RUNNER_ALIVE_TTL_SEC)
return rn_id, rk_token


def verify_token(rn_id: str, rk_token: str) -> bool:
"""Verify a runner's token. Returns False if rn_id unknown or token mismatch."""
rds = RedisCache().client
stored = rds.get(runner_token_key(rn_id))
if stored is None:
return False
expected_hash = stored.decode() if isinstance(stored, bytes) else stored
actual_hash = _hash_token(rk_token)
return secrets.compare_digest(expected_hash, actual_hash)


def renew_alive(rn_id: str) -> None:
"""Refresh runner alive TTL. Called on every heartbeat.

Caller MUST verify the runner's token before calling this — does not
check existence and will create the key for any rn_id string.
"""
RedisCache().client.set(runner_alive_key(rn_id),
"1",
ex=RUNNER_ALIVE_TTL_SEC)


def is_alive(rn_id: str) -> bool:
"""True if this runner has a non-expired alive key."""
return bool(RedisCache().client.exists(runner_alive_key(rn_id)))


def verify_any_token(rk_token: str) -> bool:
"""Check if rk_token belongs to any registered runner.

Used by endpoints that accept rk_token via query string (legacy auth shape
inherited from old SANDBOX_TOKEN endpoints — testdata fetching).
Linear scan over registered runners; fine for small N. If runner count
grows, add a reverse-index of token_hash -> rn_id.
"""
rds = RedisCache().client
for rn_id_bytes in rds.smembers(RUNNERS_REGISTERED):
rn_id = rn_id_bytes.decode()
if verify_token(rn_id, rk_token):
return True
return False
61 changes: 61 additions & 0 deletions dispatch/scripts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Lua scripts for atomic Redis operations."""
from datetime import datetime, timezone

from mongo.utils import RedisCache
from .redis_keys import job_key, JOBS_LEASED

# Lua reclaim script:
# KEYS[1] = job:<jb_id>
# ARGV[1] = expected_owner (rn_id we expect to currently hold the lease)
# ARGV[2] = new_owner (rn_id taking over)
# ARGV[3] = leased_at (ISO timestamp string)
# ARGV[4] = max_attempts (string-encoded int)
# ARGV[5] = jobs_leased_set_name
#
# Return:
# 1 = success (lease transferred, attempts incremented)
# 0 = failed (owner changed before we could reclaim)
# -1 = exhausted (attempts >= max_attempts; job removed from leased set)
_RECLAIM_LUA = """
local current_owner = redis.call('HGET', KEYS[1], 'leased_by')
if current_owner ~= ARGV[1] then
return 0
end

local attempts = tonumber(redis.call('HGET', KEYS[1], 'attempts')) or 0
local max_attempts = tonumber(ARGV[4])
if attempts >= max_attempts then
redis.call('SREM', ARGV[5], string.sub(KEYS[1], 5))
return -1
end

redis.call('HSET', KEYS[1], 'leased_by', ARGV[2], 'leased_at', ARGV[3])
redis.call('HINCRBY', KEYS[1], 'attempts', 1)
return 1
"""


def reclaim_orphan_atomic(
jb_id: str,
expected_owner: str,
new_owner: str,
max_attempts: int,
) -> int:
"""Atomically transfer lease from expected_owner to new_owner.

Returns:
1 if reclaimed
0 if owner changed before we could reclaim
-1 if attempts exhausted (job removed from JOBS_LEASED set)
"""
rds = RedisCache().client
script = rds.register_script(
_RECLAIM_LUA) # redis-py caches EVALSHA internally
leased_at = datetime.now(timezone.utc).isoformat()
return int(
script(
keys=[job_key(jb_id)],
args=[
expected_owner, new_owner, leased_at, max_attempts, JOBS_LEASED
],
))
Loading
Loading