From cd7644af2a363ef07f43f1394611adc9920b7412 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:04:40 +0800 Subject: [PATCH 01/22] feat(dispatch): scaffold dispatch package with config and redis_keys --- dispatch/__init__.py | 4 ++++ dispatch/config.py | 22 ++++++++++++++++++++++ dispatch/redis_keys.py | 20 ++++++++++++++++++++ tests/unittest/dispatch/__init__.py | 0 4 files changed, 46 insertions(+) create mode 100644 dispatch/__init__.py create mode 100644 dispatch/config.py create mode 100644 dispatch/redis_keys.py create mode 100644 tests/unittest/dispatch/__init__.py diff --git a/dispatch/__init__.py b/dispatch/__init__.py new file mode 100644 index 00000000..e09f713d --- /dev/null +++ b/dispatch/__init__.py @@ -0,0 +1,4 @@ +"""Dispatch module: Redis-backed runner registry and job queue. + +Public API re-exported from submodules. +""" diff --git a/dispatch/config.py b/dispatch/config.py new file mode 100644 index 00000000..8362a700 --- /dev/null +++ b/dispatch/config.py @@ -0,0 +1,22 @@ +"""Configuration for dispatch module — env vars and tuning constants.""" +import os + +# Shared secret used by runners to register with backend. +# In production, set via env var (RUNNER_REGISTRATION_TOKEN). +# Default value is for tests/dev only. +RUNNER_REGISTRATION_TOKEN: str = os.getenv( + "RUNNER_REGISTRATION_TOKEN", + "dev-only-registration-token-change-me", +) + +# Heartbeat / lease parameters +HEARTBEAT_INTERVAL_SEC: int = 15 +RUNNER_ALIVE_TTL_SEC: int = 30 # 2x heartbeat +POLL_INTERVAL_SEC: int = 3 +MAX_CONCURRENT_JOBS_PER_RUNNER: int = 8 + +# Job retry policy +MAX_ATTEMPTS: int = 3 # 1 initial + 2 reclaims, then mark JE + +# Presigned URL TTL for code download (seconds) +CODE_PRESIGNED_URL_TTL_SEC: int = 3600 # 1 hour diff --git a/dispatch/redis_keys.py b/dispatch/redis_keys.py new file mode 100644 index 00000000..b2cef7a4 --- /dev/null +++ b/dispatch/redis_keys.py @@ -0,0 +1,20 @@ +"""Centralized Redis key naming. All Redis keys used by dispatch live here.""" + +# Runner namespace +RUNNERS_REGISTERED = "runners:registered" + +def runner_meta_key(rn_id: str) -> str: + return f"runner:{rn_id}:meta" + +def runner_token_hash_key(rn_id: str) -> str: + return f"runner:{rn_id}:token_hash" + +def runner_alive_key(rn_id: str) -> str: + return f"runner:{rn_id}:alive" + +# Job namespace +JOBS_PENDING = "jobs:pending" +JOBS_LEASED = "jobs:leased" + +def job_key(jb_id: str) -> str: + return f"job:{jb_id}" diff --git a/tests/unittest/dispatch/__init__.py b/tests/unittest/dispatch/__init__.py new file mode 100644 index 00000000..e69de29b From c4d3cccc7b1cd156e8b51522540e7808c7caab26 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:13:28 +0800 Subject: [PATCH 02/22] refactor(dispatch): rename runner_token_hash_key to runner_token_key The 'hash' in the old name suggested a Redis HASH data structure, but the key actually addresses a STRING storing a SHA-256 hex digest. Rename removes ambiguity. Caught in code review before Task 2 wrote tests against the old name. --- dispatch/redis_keys.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dispatch/redis_keys.py b/dispatch/redis_keys.py index b2cef7a4..e35f0209 100644 --- a/dispatch/redis_keys.py +++ b/dispatch/redis_keys.py @@ -6,7 +6,8 @@ def runner_meta_key(rn_id: str) -> str: return f"runner:{rn_id}:meta" -def runner_token_hash_key(rn_id: str) -> str: +def runner_token_key(rn_id: str) -> str: + """STRING key holding SHA-256 digest of the runner's bearer token.""" return f"runner:{rn_id}:token_hash" def runner_alive_key(rn_id: str) -> str: From da5b200d73e8453ecefb87c5ba0ae83ae39809b5 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:18:31 +0800 Subject: [PATCH 03/22] feat(dispatch): add runner registration and liveness tracking Implement register, verify_token, renew_alive, is_alive in dispatch/runner.py with TDD (8 test cases). Fix RedisCache fakeredis singleton so cross-instance state is shared in tests (shared FakeServer). --- dispatch/runner.py | 69 ++++++++++++++++++++ mongo/utils.py | 5 +- tests/unittest/dispatch/test_runner.py | 89 ++++++++++++++++++++++++++ 3 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 dispatch/runner.py create mode 100644 tests/unittest/dispatch/test_runner.py diff --git a/dispatch/runner.py b/dispatch/runner.py new file mode 100644 index 00000000..b232b1b5 --- /dev/null +++ b/dispatch/runner.py @@ -0,0 +1,69 @@ +"""Runner lifecycle: registration, token verification, liveness tracking.""" +import hashlib +import secrets +from datetime import datetime, timezone + +from ulid import ULID + +from mongo.utils import RedisCache +from .config import RUNNER_ALIVE_TTL_SEC +from .redis_keys import ( + RUNNERS_REGISTERED, + runner_alive_key, + runner_meta_key, + runner_token_key, +) + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _hash_token(rk_token: str) -> str: + return hashlib.sha256(rk_token.encode()).hexdigest() + + +def register(name: str, registration_ip: str) -> tuple[str, str]: + """Register a new runner. Returns (rn_id, rk_token). + + rk_token is returned in plaintext only here — backend stores only the hash. + """ + rn_id = f"rn_{ULID()}" + rk_token = f"rk_{secrets.token_urlsafe(32)}" + rds = RedisCache().client + + rds.hset( + runner_meta_key(rn_id), + mapping={ + "name": name, + "registered_at": _now_iso(), + "registration_ip": registration_ip, + }, + ) + rds.set(runner_token_key(rn_id), _hash_token(rk_token)) + rds.sadd(RUNNERS_REGISTERED, rn_id) + rds.set(runner_alive_key(rn_id), "1", ex=RUNNER_ALIVE_TTL_SEC) + return rn_id, rk_token + + +def verify_token(rn_id: str, rk_token: str) -> bool: + """Verify a runner's token. Returns False if rn_id unknown or token mismatch.""" + rds = RedisCache().client + stored = rds.get(runner_token_key(rn_id)) + if stored is None: + return False + expected_hash = stored.decode() if isinstance(stored, bytes) else stored + actual_hash = _hash_token(rk_token) + return secrets.compare_digest(expected_hash, actual_hash) + + +def renew_alive(rn_id: str) -> None: + """Refresh runner alive TTL. Called on every heartbeat.""" + RedisCache().client.set( + runner_alive_key(rn_id), "1", ex=RUNNER_ALIVE_TTL_SEC + ) + + +def is_alive(rn_id: str) -> bool: + """True if this runner has a non-expired alive key.""" + return bool(RedisCache().client.exists(runner_alive_key(rn_id))) diff --git a/mongo/utils.py b/mongo/utils.py index 67ba1f8e..474af4f8 100644 --- a/mongo/utils.py +++ b/mongo/utils.py @@ -92,7 +92,10 @@ def client(self): if self._client is None: if self.PORT is None: import fakeredis - self._client = fakeredis.FakeStrictRedis() + if not hasattr(RedisCache, '_FAKE_SERVER'): + RedisCache._FAKE_SERVER = fakeredis.FakeServer() + self._client = fakeredis.FakeStrictRedis( + server=RedisCache._FAKE_SERVER) else: self._client = redis.Redis(connection_pool=self.POOL) return self._client diff --git a/tests/unittest/dispatch/test_runner.py b/tests/unittest/dispatch/test_runner.py new file mode 100644 index 00000000..030f06a7 --- /dev/null +++ b/tests/unittest/dispatch/test_runner.py @@ -0,0 +1,89 @@ +import pytest +from mongo.utils import RedisCache +from dispatch import runner as runner_mod +from dispatch.redis_keys import ( + RUNNERS_REGISTERED, + runner_meta_key, + runner_token_key, + runner_alive_key, +) + + +@pytest.fixture(autouse=True) +def clear_redis(): + """Clear Redis state between tests (fakeredis persists across tests).""" + RedisCache().client.flushdb() + yield + RedisCache().client.flushdb() + + +def test_register_returns_id_and_token(): + rn_id, rk_token = runner_mod.register(name="my-runner", registration_ip="1.2.3.4") + assert rn_id.startswith("rn_") + assert rk_token.startswith("rk_") + assert len(rk_token) > 30 # actually random + + +def test_register_persists_to_redis(): + rn_id, rk_token = runner_mod.register(name="my-runner", registration_ip="1.2.3.4") + rds = RedisCache().client + + # Meta hash exists with name + ip + meta = rds.hgetall(runner_meta_key(rn_id)) + assert meta[b"name"] == b"my-runner" + assert meta[b"registration_ip"] == b"1.2.3.4" + assert b"registered_at" in meta + + # Token hash exists (NOT plaintext token) + stored_hash = rds.get(runner_token_key(rn_id)) + assert stored_hash is not None + assert stored_hash.decode() != rk_token # plaintext NOT stored + + # In registered set + assert rds.sismember(RUNNERS_REGISTERED, rn_id) + + # Initial alive key exists with TTL + assert rds.exists(runner_alive_key(rn_id)) + assert 0 < rds.ttl(runner_alive_key(rn_id)) <= 30 + + +def test_register_each_call_unique_token(): + _, t1 = runner_mod.register(name="a", registration_ip="1.1.1.1") + _, t2 = runner_mod.register(name="b", registration_ip="1.1.1.1") + assert t1 != t2 + + +def test_verify_token_returns_true_for_correct_token(): + rn_id, rk_token = runner_mod.register(name="x", registration_ip="1.1.1.1") + assert runner_mod.verify_token(rn_id, rk_token) is True + + +def test_verify_token_returns_false_for_wrong_token(): + rn_id, _ = runner_mod.register(name="x", registration_ip="1.1.1.1") + assert runner_mod.verify_token(rn_id, "rk_wrongtoken") is False + + +def test_verify_token_returns_false_for_unknown_runner(): + assert runner_mod.verify_token("rn_nonexistent", "rk_anything") is False + + +def test_renew_alive_resets_ttl(): + rn_id, _ = runner_mod.register(name="x", registration_ip="1.1.1.1") + rds = RedisCache().client + # Manually set short TTL + rds.expire(runner_alive_key(rn_id), 5) + assert rds.ttl(runner_alive_key(rn_id)) <= 5 + + runner_mod.renew_alive(rn_id) + assert 25 < rds.ttl(runner_alive_key(rn_id)) <= 30 + + +def test_is_alive_returns_true_when_key_exists(): + rn_id, _ = runner_mod.register(name="x", registration_ip="1.1.1.1") + assert runner_mod.is_alive(rn_id) is True + + +def test_is_alive_returns_false_when_key_expired(): + rn_id, _ = runner_mod.register(name="x", registration_ip="1.1.1.1") + RedisCache().client.delete(runner_alive_key(rn_id)) + assert runner_mod.is_alive(rn_id) is False From dec564f206afa6b8b145f12ee23fa03a02baf3df Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:23:10 +0800 Subject: [PATCH 04/22] chore(dispatch): address Task 2 code review nits - tests use RUNNER_ALIVE_TTL_SEC constant instead of hardcoded 30 - renew_alive() docstring documents caller's verify_token responsibility --- dispatch/runner.py | 6 +++++- tests/unittest/dispatch/test_runner.py | 5 +++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dispatch/runner.py b/dispatch/runner.py index b232b1b5..7a0423a8 100644 --- a/dispatch/runner.py +++ b/dispatch/runner.py @@ -58,7 +58,11 @@ def verify_token(rn_id: str, rk_token: str) -> bool: def renew_alive(rn_id: str) -> None: - """Refresh runner alive TTL. Called on every heartbeat.""" + """Refresh runner alive TTL. Called on every heartbeat. + + Caller MUST verify the runner's token before calling this — does not + check existence and will create the key for any rn_id string. + """ RedisCache().client.set( runner_alive_key(rn_id), "1", ex=RUNNER_ALIVE_TTL_SEC ) diff --git a/tests/unittest/dispatch/test_runner.py b/tests/unittest/dispatch/test_runner.py index 030f06a7..498e6b35 100644 --- a/tests/unittest/dispatch/test_runner.py +++ b/tests/unittest/dispatch/test_runner.py @@ -1,6 +1,7 @@ import pytest from mongo.utils import RedisCache from dispatch import runner as runner_mod +from dispatch.config import RUNNER_ALIVE_TTL_SEC from dispatch.redis_keys import ( RUNNERS_REGISTERED, runner_meta_key, @@ -44,7 +45,7 @@ def test_register_persists_to_redis(): # Initial alive key exists with TTL assert rds.exists(runner_alive_key(rn_id)) - assert 0 < rds.ttl(runner_alive_key(rn_id)) <= 30 + assert 0 < rds.ttl(runner_alive_key(rn_id)) <= RUNNER_ALIVE_TTL_SEC def test_register_each_call_unique_token(): @@ -75,7 +76,7 @@ def test_renew_alive_resets_ttl(): assert rds.ttl(runner_alive_key(rn_id)) <= 5 runner_mod.renew_alive(rn_id) - assert 25 < rds.ttl(runner_alive_key(rn_id)) <= 30 + assert (RUNNER_ALIVE_TTL_SEC - 5) < rds.ttl(runner_alive_key(rn_id)) <= RUNNER_ALIVE_TTL_SEC def test_is_alive_returns_true_when_key_exists(): From e6d0eca9b7e3a875d30eb9c8b08fddb6a9c423bc Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:24:35 +0800 Subject: [PATCH 05/22] feat(dispatch): add atomic Lua reclaim script for orphan jobs Adds reclaim_orphan_atomic() backed by a single Lua script so that only one runner can claim an orphaned job; also enforces max-attempts policy. Adds lupa dev-dep to enable fakeredis Lua execution in tests. --- dispatch/scripts.py | 69 +++++++++++++++ poetry.lock | 79 ++++++++++++++++- pyproject.toml | 1 + .../unittest/dispatch/test_reclaim_script.py | 88 +++++++++++++++++++ 4 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 dispatch/scripts.py create mode 100644 tests/unittest/dispatch/test_reclaim_script.py diff --git a/dispatch/scripts.py b/dispatch/scripts.py new file mode 100644 index 00000000..94f2dace --- /dev/null +++ b/dispatch/scripts.py @@ -0,0 +1,69 @@ +"""Lua scripts for atomic Redis operations.""" +from datetime import datetime, timezone + +from mongo.utils import RedisCache +from .redis_keys import job_key, JOBS_LEASED + + +# Lua reclaim script: +# KEYS[1] = job: +# ARGV[1] = expected_owner (rn_id we expect to currently hold the lease) +# ARGV[2] = new_owner (rn_id taking over) +# ARGV[3] = leased_at (ISO timestamp string) +# ARGV[4] = max_attempts (string-encoded int) +# ARGV[5] = jobs_leased_set_name +# +# Return: +# 1 = success (lease transferred, attempts incremented) +# 0 = failed (owner changed before we could reclaim) +# -1 = exhausted (attempts >= max_attempts; job removed from leased set) +_RECLAIM_LUA = """ +local current_owner = redis.call('HGET', KEYS[1], 'leased_by') +if current_owner ~= ARGV[1] then + return 0 +end + +local attempts = tonumber(redis.call('HGET', KEYS[1], 'attempts')) or 0 +local max_attempts = tonumber(ARGV[4]) +if attempts >= max_attempts then + redis.call('SREM', ARGV[5], string.sub(KEYS[1], 5)) + return -1 +end + +redis.call('HSET', KEYS[1], 'leased_by', ARGV[2], 'leased_at', ARGV[3]) +redis.call('HINCRBY', KEYS[1], 'attempts', 1) +return 1 +""" + +_script_cache = {} + + +def _get_reclaim_script(): + """Lazily register the script (so we get a fresh script per Redis client).""" + rds = RedisCache().client + cache_key = id(rds) + if cache_key not in _script_cache: + _script_cache[cache_key] = rds.register_script(_RECLAIM_LUA) + return _script_cache[cache_key] + + +def reclaim_orphan_atomic( + jb_id: str, + expected_owner: str, + new_owner: str, + max_attempts: int, +) -> int: + """Atomically transfer lease from expected_owner to new_owner. + + Returns: + 1 if reclaimed + 0 if owner changed before we could reclaim + -1 if attempts exhausted (job removed from JOBS_LEASED set) + """ + script = _get_reclaim_script() + leased_at = datetime.now(timezone.utc).isoformat() + return int( + script( + keys=[job_key(jb_id)], + args=[expected_owner, new_owner, leased_at, max_attempts, JOBS_LEASED], + )) diff --git a/poetry.lock b/poetry.lock index 09c0495e..81cbf063 100644 --- a/poetry.lock +++ b/poetry.lock @@ -632,6 +632,83 @@ MarkupSafe = ">=2.0" [package.extras] i18n = ["Babel (>=2.7)"] +[[package]] +name = "lupa" +version = "2.8" +description = "Python wrapper around Lua and LuaJIT" +optional = false +python-versions = ">=3.8" +groups = ["dev"] +files = [ + {file = "lupa-2.8-cp310-abi3-win32.whl", hash = "sha256:c2a5fd15dc62374e1661a55f01744c9ec1c56f291ba4a0749d3af2174556e78f"}, + {file = "lupa-2.8-cp310-abi3-win_arm64.whl", hash = "sha256:9e304fb1c50cf23fd8882afbe1aa87525ef8a72667bcab3b37b2bbb2bc542269"}, + {file = "lupa-2.8-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:97bd01e90b8031e56a5fd5bb70605aea09f1dba675c1140308a52780f93d06f1"}, + {file = "lupa-2.8-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0b5ebe1a13c45767919c86750b84fe2da9f6288b6f3cea4ce7660bb2abc9d921"}, + {file = "lupa-2.8-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:097e7d0f1719a88020b67c82e05d53d7973c166952393afcecfd8434c7e19a15"}, + {file = "lupa-2.8-cp310-cp310-win_amd64.whl", hash = "sha256:7bb223ee8f72d0dc076b0d65296ee72f1c69450f9d2fed5315f7707d98c4a03d"}, + {file = "lupa-2.8-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:b12e43c1fb787189dfc28cd604aef0baa2cb95e27da19498d520361d0ace070a"}, + {file = "lupa-2.8-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f6f603391dffb256e36a79fd2044084d5f4b8a0a4c0e5ad291cd3ab3aaf1fd0a"}, + {file = "lupa-2.8-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9f6f41c91366e7d0d474f87d81c1274af861f40812bf729c9f97ab4c8f3c7ac8"}, + {file = "lupa-2.8-cp311-cp311-win_amd64.whl", hash = "sha256:f5a6af145b0ea818f01d27bfe2583a4b538570bef61d22c8773e0eccf011234c"}, + {file = "lupa-2.8-cp312-abi3-macosx_10_13_x86_64.whl", hash = "sha256:f4342f4de76ae7ce2ab0672d36003bdb7e1a33252f293b569298ddd792e70e33"}, + {file = "lupa-2.8-cp312-abi3-manylinux2010_i686.manylinux_2_12_i686.manylinux_2_28_i686.whl", hash = "sha256:4203fa1659315e939a5304e75001b8cc14234fb3cbb3ed86c049b0cc5d90fcee"}, + {file = "lupa-2.8-cp312-abi3-manylinux2014_armv7l.manylinux_2_17_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:81f2d843ce668b653146c007467570210ae44be51dac6926666c51d49536f307"}, + {file = "lupa-2.8-cp312-abi3-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:d3d0cde2c77588d1c60875a4f34f059513476c6e1775351897195b51e0f3df08"}, + {file = "lupa-2.8-cp312-abi3-manylinux_2_34_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:9e0d11b8f3a8dac6413f704fef7161d048bb10c58bdac6cbffa5e60efa56e9a3"}, + {file = "lupa-2.8-cp312-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:54cff414f21f8cd8c6be4aae52541f3b9cd39602b59e3a3db9b5c9f9f674ff18"}, + {file = "lupa-2.8-cp312-abi3-musllinux_1_2_armv7l.whl", hash = "sha256:24b4d8af5558e549b70daf1547f5c1c1d664ecea9fc790f83efe5d75e9a93797"}, + {file = "lupa-2.8-cp312-abi3-musllinux_1_2_i686.whl", hash = "sha256:ce86dff1ee7f7cf45f5622065ae991949dd7bb1703581cbc58a630137bb7ccf9"}, + {file = "lupa-2.8-cp312-abi3-musllinux_1_2_ppc64le.whl", hash = "sha256:f4d01b2a08c70bbb883a9e082b6b36b89121ed5910b710f1ba11c73295ff4fba"}, + {file = "lupa-2.8-cp312-abi3-musllinux_1_2_riscv64.whl", hash = "sha256:7f210d5a8353e510ea1199c42cf3cbdd630553bf2bc8fb4c00fea06fdec7c798"}, + {file = "lupa-2.8-cp312-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:4f81a02806e7c7ad26d8c6fa222c8bef1b0c1b124347c879be880b41339d41e4"}, + {file = "lupa-2.8-cp312-abi3-win32.whl", hash = "sha256:360056453a7a4eaa4ac5a204c31a5a014b1eb2ee5490603234d2ba831684f1f2"}, + {file = "lupa-2.8-cp312-abi3-win_arm64.whl", hash = "sha256:1628371c6592a6d5650497a9e31fb2bb3a7e9883c1f301d1111265e484045af9"}, + {file = "lupa-2.8-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:450650f91c48c2415b0d59ab3abfcfda3b6efb5b858205f4d4bda8ad141fa529"}, + {file = "lupa-2.8-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:27044f3363047f946b3d3aab9157cbd172b3538ada9ec1baef43432bf7d03a78"}, + {file = "lupa-2.8-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8cf4f064a0e5531afce2d7d750120c10c10f9529139af6ca6150d13151034398"}, + {file = "lupa-2.8-cp312-cp312-win_amd64.whl", hash = "sha256:281bedc5deb92d31e649a3552edd662449365a635904fa4d5cb4509c7245e34e"}, + {file = "lupa-2.8-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:45fc9da0145ecb0083ef5ff9975116cc784bd0258bdc2bd131ba15483ce18398"}, + {file = "lupa-2.8-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:58e18afed57955b41130e269c78f53d4123ab86e236b53816f4cbffa25cb5d30"}, + {file = "lupa-2.8-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fc47f536ac13a79cef47d29a2b205576a22841f042a2bcec1676b95806e7706a"}, + {file = "lupa-2.8-cp313-cp313-win_amd64.whl", hash = "sha256:ce9404c661dbac65cc9bed351ad45e797af93d30d70be309a3fa8209ac86d93b"}, + {file = "lupa-2.8-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:348c3f8ecabb6324dcbc05c2740d762ef8fcec7b06c79e45262ab97a217684e3"}, + {file = "lupa-2.8-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:951496471056061598a7d1729a6cdf48d662fec777a9f2d8aa5a1e62fd30e5a5"}, + {file = "lupa-2.8-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a591b9947ca347b41a63370e121d6e2b1458fe6dde9ae065029ec10a37f25ff4"}, + {file = "lupa-2.8-cp314-cp314-win_amd64.whl", hash = "sha256:3903c9cf628dae2f56405503247b77a61a3a61bd2dda470e336950c74776d55d"}, + {file = "lupa-2.8-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:f711a8ab0486b9ac6fdda94a22ddcfbc9f0d4a27e3a8cf1bf79c6e48b33017c1"}, + {file = "lupa-2.8-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:dc51250e76367a3e27fcd01dc769b9bfcbbc34f48df48dde53d6af6e75b7eaa5"}, + {file = "lupa-2.8-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f8a22088a552828958603323f0a5c4b3e11e03b75d0bf4c965ef879de9b60a8d"}, + {file = "lupa-2.8-cp314-cp314t-win32.whl", hash = "sha256:4f7c553c1d8cfffbe85d81daef730d12cae4b6002d457542914da0ac8a1145b3"}, + {file = "lupa-2.8-cp314-cp314t-win_amd64.whl", hash = "sha256:d8766aff03a78c80ad2d188a8bdb216de5ec838359cd87e05bbdfa56394a6105"}, + {file = "lupa-2.8-cp314-cp314t-win_arm64.whl", hash = "sha256:91d622777febda3ab1bed1d45295f2f32a4680c7b3d7caf8c669998ed5c44118"}, + {file = "lupa-2.8-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:81b283bfb13cc43fa4910fc98ec110ab861bcb39680f48b266f99d6e3be1049e"}, + {file = "lupa-2.8-cp38-cp38-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:5caf45d15d424cee52fd67341e96e2b1dde0658ae90eb156ac56aa0d8330bc38"}, + {file = "lupa-2.8-cp38-cp38-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:33e7e5aebca64b154b0a1679caf79e19254ff37bba51e87abab6848f97cb2de1"}, + {file = "lupa-2.8-cp38-cp38-win32.whl", hash = "sha256:e8d4f4dd4acf4a0e42adc6b1ad220e1c86fe3028402c2f78bd0728a6d241bbe9"}, + {file = "lupa-2.8-cp38-cp38-win_amd64.whl", hash = "sha256:1ac2b1ec7504e6148cba1bc35ac36c74d18a0ca6d367ffe7e78a3773c2694c0e"}, + {file = "lupa-2.8-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:b036738282a5acd2e71fdddb317c9df8b87c1673aa57f403d05fcc2be8abc4ba"}, + {file = "lupa-2.8-cp39-abi3-manylinux2010_i686.manylinux_2_12_i686.manylinux_2_28_i686.whl", hash = "sha256:ac6b6e8d0e617e26a98cbb44880bcd75de5d32b3ad7b3b3793583909292b47ed"}, + {file = "lupa-2.8-cp39-abi3-manylinux2014_armv7l.manylinux_2_17_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:ba3a7dd839f90c3d2e53bebe3c192b1f3f9fd720a6781256405123211fd0dce6"}, + {file = "lupa-2.8-cp39-abi3-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:d7edb13a7a5250b5c6c22d1495d9e842b5c9fc5081c8fe6b5efe2112fe3e41f9"}, + {file = "lupa-2.8-cp39-abi3-manylinux_2_34_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:891f72e0bffbed1e4175f975aeb2a083956586a100066525e1be485f617f7b25"}, + {file = "lupa-2.8-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:a295f87b5b7ebbfd5191932e8cb0e51df3c7769101ac6b6c7d7c9fb27bfd1307"}, + {file = "lupa-2.8-cp39-abi3-musllinux_1_2_armv7l.whl", hash = "sha256:4fe5d7a810b64ea8511eb885fc8cdde042ee5ff7b7d08ae78f32449756acb177"}, + {file = "lupa-2.8-cp39-abi3-musllinux_1_2_i686.whl", hash = "sha256:bfc470012ef66ad064c7bd77416af03a3452ef630b04b9012595ea13f2e54518"}, + {file = "lupa-2.8-cp39-abi3-musllinux_1_2_ppc64le.whl", hash = "sha256:250e035fdaffe8c87093e3ebc206ac29a26131b1568ea711d780c26001ce96e7"}, + {file = "lupa-2.8-cp39-abi3-musllinux_1_2_riscv64.whl", hash = "sha256:b9bddb09acfffb4f828f790f444b11dc0cca591afea1a244d9329eea2d20c003"}, + {file = "lupa-2.8-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:2e64acbbd47e9b82a64405a39e0d2b36a5a7dad8ab41c0f3437f572f7d282ba3"}, + {file = "lupa-2.8-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f6ddca4774d5ca451768a95e378a3aa041076e29f4613b8562f8e98efb6690fd"}, + {file = "lupa-2.8-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3ffcfd8e19f943ad459136b3f60f085ae4948f024192a93ca4b4ac3023ec88d8"}, + {file = "lupa-2.8-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9f3f3955f65f9fde2dc6eda3041ccd394cf54d4bf083f0cdf6feb3d58e5f38d3"}, + {file = "lupa-2.8-cp39-cp39-win32.whl", hash = "sha256:9e76e45057cfcaa20ee3422c2289a91f9d51783d020da3570ee226de8f6e71cd"}, + {file = "lupa-2.8-cp39-cp39-win_amd64.whl", hash = "sha256:6fbcc9911f05c67affbd225fc024268e61e98a18ad1b1c2aed6c8796e4056554"}, + {file = "lupa-2.8-cp39-cp39-win_arm64.whl", hash = "sha256:6c817d5421094507662e5f8feb8cd1e154c10879921c06079b6063be9d8f33c5"}, + {file = "lupa-2.8-pp311-pypy311_pp73-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:32e4e5103bbddcdd2458fb2ccae6c8ba11c9997c711d7e379e0d45551d109c76"}, + {file = "lupa-2.8-pp311-pypy311_pp73-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7667001804657496dee9feced2daae5000b4604a3218dd8e6b7b754982ba88b8"}, + {file = "lupa-2.8-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:86f6f668966965b15247dc32d064cfe7be67b71e584ccfacbe2f637575296878"}, + {file = "lupa-2.8.tar.gz", hash = "sha256:d8022641b9ec8ecf2c5ecbe9f47e5a70e0b87c4b5ae921b92cb02a638e0acd08"}, +] + [[package]] name = "lxml" version = "6.0.0" @@ -1779,4 +1856,4 @@ platformdirs = ">=3.5.1" [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "f826d021f8e0c57c0c61f13cda4ff141c4ff2ecd0c34b79c1ab0c1a8615d51b1" +content-hash = "923184d86c4c96cc186e61675c8f1f932ccafb84cbecc24a3b7674b04a10a489" diff --git a/pyproject.toml b/pyproject.toml index 30b0a836..378d59e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ fakeredis = "^1.9" toml = "^0.10" pytest-cov = "^3.0" testcontainers = {extras = ["minio"], version = "^4.10.0"} +lupa = "^2.8" [build-system] requires = ["poetry-core"] diff --git a/tests/unittest/dispatch/test_reclaim_script.py b/tests/unittest/dispatch/test_reclaim_script.py new file mode 100644 index 00000000..ed9e3fa3 --- /dev/null +++ b/tests/unittest/dispatch/test_reclaim_script.py @@ -0,0 +1,88 @@ +"""Tests for the atomic reclaim Lua script. + +The script must be atomic: if two runners try to reclaim the same orphan job, +only one should succeed. It must also enforce the max-attempts limit. +""" +import pytest +from datetime import datetime, timezone +from mongo.utils import RedisCache +from dispatch.scripts import reclaim_orphan_atomic +from dispatch.redis_keys import job_key, JOBS_LEASED + + +@pytest.fixture(autouse=True) +def clear_redis(): + RedisCache().client.flushdb() + yield + RedisCache().client.flushdb() + + +def _seed_leased_job(jb_id: str, owner: str, attempts: int = 1): + rds = RedisCache().client + rds.hset(job_key(jb_id), mapping={ + "leased_by": owner, + "leased_at": datetime.now(timezone.utc).isoformat(), + "attempts": attempts, + }) + rds.sadd(JOBS_LEASED, jb_id) + + +def test_reclaim_succeeds_when_owner_matches(): + _seed_leased_job("jb_1", owner="rn_old", attempts=1) + + result = reclaim_orphan_atomic( + jb_id="jb_1", + expected_owner="rn_old", + new_owner="rn_new", + max_attempts=3, + ) + + assert result == 1 # success + rds = RedisCache().client + assert rds.hget(job_key("jb_1"), "leased_by") == b"rn_new" + assert int(rds.hget(job_key("jb_1"), "attempts")) == 2 # incremented + + +def test_reclaim_fails_when_owner_changed(): + """Another runner already reclaimed it before us.""" + _seed_leased_job("jb_1", owner="rn_someone_else", attempts=1) + + result = reclaim_orphan_atomic( + jb_id="jb_1", + expected_owner="rn_old", # we expected rn_old + new_owner="rn_new", + max_attempts=3, + ) + + assert result == 0 # not reclaimed + assert RedisCache().client.hget(job_key("jb_1"), "leased_by") == b"rn_someone_else" + + +def test_reclaim_returns_negative_when_max_attempts_reached(): + _seed_leased_job("jb_1", owner="rn_old", attempts=3) + + result = reclaim_orphan_atomic( + jb_id="jb_1", + expected_owner="rn_old", + new_owner="rn_new", + max_attempts=3, + ) + + assert result == -1 # exhausted + # Job removed from leased set so caller can mark Submission JE + assert not RedisCache().client.sismember(JOBS_LEASED, "jb_1") + + +def test_reclaim_is_atomic_under_concurrent_calls(): + """Simulate two runners reclaiming the same orphan at once. + + Lua scripts are atomic in Redis, so even back-to-back calls can't both succeed. + """ + _seed_leased_job("jb_1", owner="rn_old", attempts=1) + + r1 = reclaim_orphan_atomic("jb_1", "rn_old", "rn_new1", max_attempts=3) + r2 = reclaim_orphan_atomic("jb_1", "rn_old", "rn_new2", max_attempts=3) + + assert r1 == 1 # first wins + assert r2 == 0 # second sees owner already changed + assert RedisCache().client.hget(job_key("jb_1"), "leased_by") == b"rn_new1" From 9207e0365b622bcc7081ba4d18a86cc93fdd5297 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:26:48 +0800 Subject: [PATCH 06/22] feat(dispatch): add job enqueue and claim from pending queue --- dispatch/job.py | 99 +++++++++++++++++++++++++++++ tests/unittest/dispatch/test_job.py | 89 ++++++++++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 dispatch/job.py create mode 100644 tests/unittest/dispatch/test_job.py diff --git a/dispatch/job.py b/dispatch/job.py new file mode 100644 index 00000000..dddf5c6e --- /dev/null +++ b/dispatch/job.py @@ -0,0 +1,99 @@ +"""Job lifecycle: enqueue, claim (pending + orphan reclaim), complete.""" +import json +from datetime import datetime, timezone +from typing import Optional + +from ulid import ULID + +from mongo.utils import RedisCache +from .redis_keys import job_key, JOBS_PENDING, JOBS_LEASED + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _build_tasks_meta(submission) -> list[dict]: + """Extract testcase metadata from submission for runner.""" + tasks = submission.problem.test_case_info.get("tasks", []) + return [ + { + "task_id": idx, + "case_count": t.get("caseCount", 0), + "memory_limit": t.get("memoryLimit", 0), + "time_limit": t.get("timeLimit", 0), + } + for idx, t in enumerate(tasks) + ] + + +def enqueue_job(submission) -> str: + """Create a Job from a Submission and push to pending queue. Returns jb_id.""" + jb_id = f"jb_{ULID()}" + rds = RedisCache().client + + rds.hset(job_key(jb_id), mapping={ + "submission_id": str(submission.id), + "problem_id": submission.problem_id, + "language": submission.language, + "code_minio_path": submission.code_minio_path, + "checker": 'print("not implement yet. qaq")', + "tasks_meta_json": json.dumps(_build_tasks_meta(submission)), + "attempts": 0, + "created_at": _now_iso(), + }) + rds.lpush(JOBS_PENDING, jb_id) + return jb_id + + +def claim_next_job(rn_id: str) -> Optional[dict]: + """Try to claim next job for this runner. Returns None if no job available. + + Strategy: + 1. Try pending queue (RPOP for FIFO; LPUSH+RPOP gives FIFO). + 2. (Task 5 will add orphan reclaim here.) + 3. If neither yields a job, return None. + """ + rds = RedisCache().client + + # Step 1: pending queue + jb_id_bytes = rds.rpop(JOBS_PENDING) + if jb_id_bytes is not None: + jb_id = jb_id_bytes.decode() + _assign_to_runner(jb_id, rn_id) + return _build_payload(jb_id) + + # Step 2: orphan reclaim — implemented in Task 5 + return None + + +def _assign_to_runner(jb_id: str, rn_id: str) -> None: + """Mark job as leased to this runner (called after RPOP from pending).""" + rds = RedisCache().client + rds.hset(job_key(jb_id), mapping={ + "leased_by": rn_id, + "leased_at": _now_iso(), + }) + rds.hincrby(job_key(jb_id), "attempts", 1) + rds.sadd(JOBS_LEASED, jb_id) + + +def _build_payload(jb_id: str) -> dict: + """Build the payload returned to the runner via next-job endpoint. + + Note: code_minio_path is returned as-is here. The blueprint layer is + responsible for converting it to a presigned URL before sending to runner. + """ + rds = RedisCache().client + h = rds.hgetall(job_key(jb_id)) + # Decode bytes to str + h = {k.decode(): v.decode() for k, v in h.items()} + return { + "job_id": jb_id, + "submission_id": h["submission_id"], + "problem_id": int(h["problem_id"]), + "language": int(h["language"]), + "code_minio_path": h["code_minio_path"], + "checker": h["checker"], + "tasks": json.loads(h["tasks_meta_json"]), + } diff --git a/tests/unittest/dispatch/test_job.py b/tests/unittest/dispatch/test_job.py new file mode 100644 index 00000000..a80342c2 --- /dev/null +++ b/tests/unittest/dispatch/test_job.py @@ -0,0 +1,89 @@ +import json +from unittest.mock import MagicMock + +import pytest +from mongo.utils import RedisCache +from dispatch import job as job_mod +from dispatch.redis_keys import job_key, JOBS_PENDING, JOBS_LEASED + + +@pytest.fixture(autouse=True) +def clear_redis(): + RedisCache().client.flushdb() + yield + RedisCache().client.flushdb() + + +def _make_submission(submission_id="sub_1", problem_id=42, language=1, + code_minio_path="submissions/sub_1.zip"): + """Build a minimal submission-like object for enqueue_job tests.""" + sub = MagicMock() + sub.id = submission_id + sub.problem_id = problem_id + sub.language = language + sub.code_minio_path = code_minio_path + sub.problem.test_case_info = { + "tasks": [{"caseCount": 3, "memoryLimit": 65536, "timeLimit": 1000}] + } + return sub + + +def test_enqueue_job_creates_hash_and_pushes_to_pending(): + sub = _make_submission() + + jb_id = job_mod.enqueue_job(sub) + + assert jb_id.startswith("jb_") + rds = RedisCache().client + + # Hash created with all expected fields + h = rds.hgetall(job_key(jb_id)) + assert h[b"submission_id"] == b"sub_1" + assert h[b"problem_id"] == b"42" + assert h[b"language"] == b"1" + assert h[b"code_minio_path"] == b"submissions/sub_1.zip" + assert h[b"attempts"] == b"0" + assert b"created_at" in h + assert b"tasks_meta_json" in h + + # Pushed to pending queue + assert rds.lrange(JOBS_PENDING, 0, -1) == [jb_id.encode()] + + +def test_claim_next_job_from_empty_queue_returns_none(): + assert job_mod.claim_next_job(rn_id="rn_1") is None + + +def test_claim_next_job_from_pending_queue(): + sub = _make_submission() + jb_id = job_mod.enqueue_job(sub) + + payload = job_mod.claim_next_job(rn_id="rn_1") + + assert payload is not None + assert payload["job_id"] == jb_id + assert payload["submission_id"] == "sub_1" + assert payload["problem_id"] == 42 + assert payload["language"] == 1 + assert payload["code_minio_path"] == "submissions/sub_1.zip" + assert "tasks" in payload # parsed from tasks_meta_json + + # Side effects in Redis + rds = RedisCache().client + assert rds.hget(job_key(jb_id), "leased_by") == b"rn_1" + assert rds.sismember(JOBS_LEASED, jb_id) + assert int(rds.hget(job_key(jb_id), "attempts")) == 1 + # Removed from pending + assert rds.llen(JOBS_PENDING) == 0 + + +def test_claim_next_job_is_fifo(): + """First enqueued = first claimed.""" + j1 = job_mod.enqueue_job(_make_submission(submission_id="sub_1")) + j2 = job_mod.enqueue_job(_make_submission(submission_id="sub_2")) + + p1 = job_mod.claim_next_job(rn_id="rn_a") + p2 = job_mod.claim_next_job(rn_id="rn_b") + + assert p1["job_id"] == j1 + assert p2["job_id"] == j2 From ea17b82ef8bc381c884c0e0506bbda0c7a29b7de Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:29:44 +0800 Subject: [PATCH 07/22] feat(dispatch): add orphan reclaim and complete_job paths --- dispatch/job.py | 104 ++++++++++++++++------ tests/unittest/dispatch/test_job.py | 129 +++++++++++++++++++++++++++- 2 files changed, 206 insertions(+), 27 deletions(-) diff --git a/dispatch/job.py b/dispatch/job.py index dddf5c6e..07d8148b 100644 --- a/dispatch/job.py +++ b/dispatch/job.py @@ -6,7 +6,10 @@ from ulid import ULID from mongo.utils import RedisCache +from .config import MAX_ATTEMPTS from .redis_keys import job_key, JOBS_PENDING, JOBS_LEASED +from .runner import is_alive +from .scripts import reclaim_orphan_atomic def _now_iso() -> str: @@ -16,15 +19,12 @@ def _now_iso() -> str: def _build_tasks_meta(submission) -> list[dict]: """Extract testcase metadata from submission for runner.""" tasks = submission.problem.test_case_info.get("tasks", []) - return [ - { - "task_id": idx, - "case_count": t.get("caseCount", 0), - "memory_limit": t.get("memoryLimit", 0), - "time_limit": t.get("timeLimit", 0), - } - for idx, t in enumerate(tasks) - ] + return [{ + "task_id": idx, + "case_count": t.get("caseCount", 0), + "memory_limit": t.get("memoryLimit", 0), + "time_limit": t.get("timeLimit", 0), + } for idx, t in enumerate(tasks)] def enqueue_job(submission) -> str: @@ -32,16 +32,17 @@ def enqueue_job(submission) -> str: jb_id = f"jb_{ULID()}" rds = RedisCache().client - rds.hset(job_key(jb_id), mapping={ - "submission_id": str(submission.id), - "problem_id": submission.problem_id, - "language": submission.language, - "code_minio_path": submission.code_minio_path, - "checker": 'print("not implement yet. qaq")', - "tasks_meta_json": json.dumps(_build_tasks_meta(submission)), - "attempts": 0, - "created_at": _now_iso(), - }) + rds.hset(job_key(jb_id), + mapping={ + "submission_id": str(submission.id), + "problem_id": submission.problem_id, + "language": submission.language, + "code_minio_path": submission.code_minio_path, + "checker": 'print("not implement yet. qaq")', + "tasks_meta_json": json.dumps(_build_tasks_meta(submission)), + "attempts": 0, + "created_at": _now_iso(), + }) rds.lpush(JOBS_PENDING, jb_id) return jb_id @@ -51,7 +52,8 @@ def claim_next_job(rn_id: str) -> Optional[dict]: Strategy: 1. Try pending queue (RPOP for FIFO; LPUSH+RPOP gives FIFO). - 2. (Task 5 will add orphan reclaim here.) + 2. Scan leased jobs for orphans (owner not alive) and try to reclaim + one atomically. 3. If neither yields a job, return None. """ rds = RedisCache().client @@ -63,17 +65,38 @@ def claim_next_job(rn_id: str) -> Optional[dict]: _assign_to_runner(jb_id, rn_id) return _build_payload(jb_id) - # Step 2: orphan reclaim — implemented in Task 5 + # Step 2: orphan reclaim — scan leased jobs for dead owners + leased_ids = rds.smembers(JOBS_LEASED) + for orphan_id_bytes in leased_ids: + orphan_id = orphan_id_bytes.decode() + owner_bytes = rds.hget(job_key(orphan_id), "leased_by") + if owner_bytes is None: + continue + owner = owner_bytes.decode() + if is_alive(owner): + continue + # Owner is dead. Try to atomically reclaim. + reclaim_result = reclaim_orphan_atomic( + jb_id=orphan_id, + expected_owner=owner, + new_owner=rn_id, + max_attempts=MAX_ATTEMPTS, + ) + if reclaim_result == 1: + return _build_payload(orphan_id) + # 0 = lost the race, try next orphan + # -1 = exhausted (already cleaned up by Lua); Task 9 will hook in JE marking return None def _assign_to_runner(jb_id: str, rn_id: str) -> None: """Mark job as leased to this runner (called after RPOP from pending).""" rds = RedisCache().client - rds.hset(job_key(jb_id), mapping={ - "leased_by": rn_id, - "leased_at": _now_iso(), - }) + rds.hset(job_key(jb_id), + mapping={ + "leased_by": rn_id, + "leased_at": _now_iso(), + }) rds.hincrby(job_key(jb_id), "attempts", 1) rds.sadd(JOBS_LEASED, jb_id) @@ -97,3 +120,34 @@ def _build_payload(jb_id: str) -> dict: "checker": h["checker"], "tasks": json.loads(h["tasks_meta_json"]), } + + +def complete_job( + rn_id: str, + jb_id: str, + tasks: list, + process_result, +) -> str: + """Process a completed job. Returns one of: 'ok', 'wrong_owner', 'not_found'. + + Args: + rn_id: The runner claiming completion. + jb_id: The job id. + tasks: Result tasks array (passed through to process_result). + process_result: Callable(submission_id_str, tasks) -> None. Injected so + this module doesn't depend directly on mongo.Submission. + """ + rds = RedisCache().client + h = rds.hgetall(job_key(jb_id)) + if not h: + return "not_found" + leased_by = h.get(b"leased_by") + if leased_by is None or leased_by.decode() != rn_id: + return "wrong_owner" + + submission_id = h[b"submission_id"].decode() + process_result(submission_id, tasks) + + rds.delete(job_key(jb_id)) + rds.srem(JOBS_LEASED, jb_id) + return "ok" diff --git a/tests/unittest/dispatch/test_job.py b/tests/unittest/dispatch/test_job.py index a80342c2..6d9755b7 100644 --- a/tests/unittest/dispatch/test_job.py +++ b/tests/unittest/dispatch/test_job.py @@ -14,7 +14,9 @@ def clear_redis(): RedisCache().client.flushdb() -def _make_submission(submission_id="sub_1", problem_id=42, language=1, +def _make_submission(submission_id="sub_1", + problem_id=42, + language=1, code_minio_path="submissions/sub_1.zip"): """Build a minimal submission-like object for enqueue_job tests.""" sub = MagicMock() @@ -23,7 +25,11 @@ def _make_submission(submission_id="sub_1", problem_id=42, language=1, sub.language = language sub.code_minio_path = code_minio_path sub.problem.test_case_info = { - "tasks": [{"caseCount": 3, "memoryLimit": 65536, "timeLimit": 1000}] + "tasks": [{ + "caseCount": 3, + "memoryLimit": 65536, + "timeLimit": 1000 + }] } return sub @@ -87,3 +93,122 @@ def test_claim_next_job_is_fifo(): assert p1["job_id"] == j1 assert p2["job_id"] == j2 + + +# Append after existing tests in test_job.py + +from dispatch.redis_keys import runner_alive_key +from dispatch import runner as runner_mod + + +def test_claim_next_job_reclaims_orphan_when_owner_dead(): + """Job leased to a runner whose alive key expired should be reclaimable.""" + # Setup: enqueue + claim by runner1, then expire runner1's alive key + sub = _make_submission() + jb_id = job_mod.enqueue_job(sub) + rn_id, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1") + job_mod.claim_next_job(rn_id=rn_id) # rn1 takes the job + # Simulate runner1 dying: + RedisCache().client.delete(runner_alive_key(rn_id)) + + # Now another runner polls + rn2_id, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2") + payload = job_mod.claim_next_job(rn_id=rn2_id) + + assert payload is not None + assert payload["job_id"] == jb_id + rds = RedisCache().client + assert rds.hget(job_key(jb_id), "leased_by") == rn2_id.encode() + assert int(rds.hget(job_key(jb_id), "attempts")) == 2 # incremented + + +def test_claim_next_job_skips_orphan_with_alive_owner(): + sub = _make_submission() + jb_id = job_mod.enqueue_job(sub) + rn1, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1") + job_mod.claim_next_job(rn_id=rn1) # rn1 takes the job; rn1 still alive + + rn2, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2") + payload = job_mod.claim_next_job(rn_id=rn2) + + assert payload is None # no work for rn2 — rn1 is still alive + # rn1 still owns the job + assert RedisCache().client.hget(job_key(jb_id), + "leased_by") == rn1.encode() + + +def test_claim_next_job_reclaim_at_max_attempts_returns_signal(): + """When attempts == MAX_ATTEMPTS, reclaim should signal exhaustion.""" + sub = _make_submission() + jb_id = job_mod.enqueue_job(sub) + rn1, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1") + job_mod.claim_next_job(rn_id=rn1) + # Manually bump attempts to MAX-1 then kill rn1 + RedisCache().client.hset(job_key(jb_id), "attempts", 3) + RedisCache().client.delete(runner_alive_key(rn1)) + + rn2, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2") + payload = job_mod.claim_next_job(rn_id=rn2) + + # The exhausted job is removed from JOBS_LEASED but the caller (blueprint) + # is responsible for marking Submission JE separately. claim_next_job itself + # returns None (Task 9 will hook in the JE marking). + assert payload is None + assert not RedisCache().client.sismember(JOBS_LEASED, jb_id) + + +def test_complete_job_with_correct_owner_succeeds(): + sub = _make_submission() + jb_id = job_mod.enqueue_job(sub) + rn_id, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1") + job_mod.claim_next_job(rn_id=rn_id) + + process_calls = [] + + def fake_process(submission_id, tasks): + process_calls.append((submission_id, tasks)) + + result = job_mod.complete_job( + rn_id=rn_id, + jb_id=jb_id, + tasks=[{ + "status": "AC" + }], + process_result=fake_process, + ) + + assert result == "ok" + assert process_calls == [("sub_1", [{"status": "AC"}])] + rds = RedisCache().client + # Job cleaned up + assert rds.hgetall(job_key(jb_id)) == {} + assert not rds.sismember(JOBS_LEASED, jb_id) + + +def test_complete_job_with_wrong_owner_returns_wrong_owner(): + sub = _make_submission() + jb_id = job_mod.enqueue_job(sub) + rn1, _ = runner_mod.register(name="r1", registration_ip="1.1.1.1") + job_mod.claim_next_job(rn_id=rn1) + + rn2, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2") + result = job_mod.complete_job( + rn_id=rn2, + jb_id=jb_id, + tasks=[], + process_result=lambda *a, **k: None, + ) + + assert result == "wrong_owner" + # Job NOT cleaned up — still belongs to rn1 + assert RedisCache().client.exists(job_key(jb_id)) + + +def test_complete_job_with_unknown_id_returns_not_found(): + result = job_mod.complete_job( + rn_id="rn_x", + jb_id="jb_nonexistent", + tasks=[], + process_result=lambda *a, **k: None, + ) + assert result == "not_found" From 7bc6ae3c3851b7a8e1cd687a909147d940917146 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:30:03 +0800 Subject: [PATCH 08/22] refactor(dispatch): remove redundant Python-side script cache redis-py's register_script() already handles EVALSHA caching internally. The Python-side _script_cache + _get_reclaim_script() helpers added unnecessary complexity. Caught in code review. --- dispatch/scripts.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/dispatch/scripts.py b/dispatch/scripts.py index 94f2dace..2e11f849 100644 --- a/dispatch/scripts.py +++ b/dispatch/scripts.py @@ -4,7 +4,6 @@ from mongo.utils import RedisCache from .redis_keys import job_key, JOBS_LEASED - # Lua reclaim script: # KEYS[1] = job: # ARGV[1] = expected_owner (rn_id we expect to currently hold the lease) @@ -35,17 +34,6 @@ return 1 """ -_script_cache = {} - - -def _get_reclaim_script(): - """Lazily register the script (so we get a fresh script per Redis client).""" - rds = RedisCache().client - cache_key = id(rds) - if cache_key not in _script_cache: - _script_cache[cache_key] = rds.register_script(_RECLAIM_LUA) - return _script_cache[cache_key] - def reclaim_orphan_atomic( jb_id: str, @@ -60,10 +48,14 @@ def reclaim_orphan_atomic( 0 if owner changed before we could reclaim -1 if attempts exhausted (job removed from JOBS_LEASED set) """ - script = _get_reclaim_script() + rds = RedisCache().client + script = rds.register_script( + _RECLAIM_LUA) # redis-py caches EVALSHA internally leased_at = datetime.now(timezone.utc).isoformat() return int( script( keys=[job_key(jb_id)], - args=[expected_owner, new_owner, leased_at, max_attempts, JOBS_LEASED], + args=[ + expected_owner, new_owner, leased_at, max_attempts, JOBS_LEASED + ], )) From ca0c9d2c52ddc6eb9b47fa201f2c0b641381a56c Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:31:56 +0800 Subject: [PATCH 09/22] feat(api): add runner API request schemas --- model/schemas/__init__.py | 4 ++++ model/schemas/runner.py | 11 +++++++++++ 2 files changed, 15 insertions(+) create mode 100644 model/schemas/runner.py diff --git a/model/schemas/__init__.py b/model/schemas/__init__.py index 83e41411..2980fe44 100644 --- a/model/schemas/__init__.py +++ b/model/schemas/__init__.py @@ -45,3 +45,7 @@ CloneProblemBody, PublishProblemBody, ) +from .runner import ( + RegisterRunnerBody, + CompleteJobBody, +) diff --git a/model/schemas/runner.py b/model/schemas/runner.py new file mode 100644 index 00000000..ef7eabf5 --- /dev/null +++ b/model/schemas/runner.py @@ -0,0 +1,11 @@ +from typing import Any, List, Optional +from .base import BaseSchema + + +class RegisterRunnerBody(BaseSchema): + registration_token: str + name: Optional[str] = None + + +class CompleteJobBody(BaseSchema): + tasks: List[Any] From c391d475ec1d4b384cb28c7d06fc214a03d54152 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:33:46 +0800 Subject: [PATCH 10/22] feat(api): add require_runner_token decorator --- model/utils/__init__.py | 6 ++- model/utils/runner_auth.py | 29 ++++++++++++++ tests/unittest/test_runner_auth.py | 63 ++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 model/utils/runner_auth.py create mode 100644 tests/unittest/test_runner_auth.py diff --git a/model/utils/__init__.py b/model/utils/__init__.py index 017b7dc6..99dd90c1 100644 --- a/model/utils/__init__.py +++ b/model/utils/__init__.py @@ -1,9 +1,13 @@ from . import request from . import response +from . import runner_auth from . import smtp from .request import * from .response import * +from .runner_auth import * from .smtp import * -__all__ = [*request.__all__, *response.__all__, *smtp.__all__] +__all__ = [ + *request.__all__, *response.__all__, *runner_auth.__all__, *smtp.__all__ +] diff --git a/model/utils/runner_auth.py b/model/utils/runner_auth.py new file mode 100644 index 00000000..e6826710 --- /dev/null +++ b/model/utils/runner_auth.py @@ -0,0 +1,29 @@ +"""@require_runner_token decorator for runner API endpoints.""" +from functools import wraps +from flask import request + +from dispatch import runner as runner_mod +from .response import HTTPError + +__all__ = ['require_runner_token'] + +_BEARER_PREFIX = "Bearer " + + +def require_runner_token(f): + """Verify Authorization: Bearer rk_xxx against the runner_id in URL. + + Routes using this decorator must accept a `runner_id` URL path parameter. + """ + + @wraps(f) + def wrapper(runner_id, *args, **kwargs): + auth = request.headers.get("Authorization", "") + if not auth.startswith(_BEARER_PREFIX): + return HTTPError("missing or malformed Authorization header", 401) + token = auth[len(_BEARER_PREFIX):] + if not runner_mod.verify_token(runner_id, token): + return HTTPError("invalid runner token", 401) + return f(runner_id, *args, **kwargs) + + return wrapper diff --git a/tests/unittest/test_runner_auth.py b/tests/unittest/test_runner_auth.py new file mode 100644 index 00000000..da3c51f0 --- /dev/null +++ b/tests/unittest/test_runner_auth.py @@ -0,0 +1,63 @@ +import pytest +from flask import Flask +from mongo.utils import RedisCache +from model.utils.runner_auth import require_runner_token +from dispatch import runner as runner_mod + + +@pytest.fixture(autouse=True) +def clear_redis(): + RedisCache().client.flushdb() + yield + RedisCache().client.flushdb() + + +@pytest.fixture +def app_with_protected_route(): + app = Flask(__name__) + + @app.get("/protected/") + @require_runner_token + def protected(runner_id): + return {"runner_id": runner_id} + + return app + + +def test_protected_route_with_valid_token_passes(app_with_protected_route): + rn_id, rk_token = runner_mod.register(name="r", registration_ip="1.1.1.1") + client = app_with_protected_route.test_client() + rv = client.get(f"/protected/{rn_id}", + headers={"Authorization": f"Bearer {rk_token}"}) + assert rv.status_code == 200 + assert rv.get_json() == {"runner_id": rn_id} + + +def test_protected_route_missing_header_returns_401(app_with_protected_route): + rn_id, _ = runner_mod.register(name="r", registration_ip="1.1.1.1") + client = app_with_protected_route.test_client() + rv = client.get(f"/protected/{rn_id}") + assert rv.status_code == 401 + + +def test_protected_route_wrong_token_returns_401(app_with_protected_route): + rn_id, _ = runner_mod.register(name="r", registration_ip="1.1.1.1") + client = app_with_protected_route.test_client() + rv = client.get(f"/protected/{rn_id}", + headers={"Authorization": "Bearer rk_wrong"}) + assert rv.status_code == 401 + + +def test_protected_route_unknown_runner_returns_401(app_with_protected_route): + client = app_with_protected_route.test_client() + rv = client.get("/protected/rn_nonexistent", + headers={"Authorization": "Bearer rk_anything"}) + assert rv.status_code == 401 + + +def test_protected_route_non_bearer_scheme_returns_401( + app_with_protected_route): + rn_id, rk_token = runner_mod.register(name="r", registration_ip="1.1.1.1") + client = app_with_protected_route.test_client() + rv = client.get(f"/protected/{rn_id}", headers={"Authorization": rk_token}) + assert rv.status_code == 401 From fa522b839c7894696d4d91514550f0a00a484799 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:43:09 +0800 Subject: [PATCH 11/22] feat(api): add runner API blueprint with 4 endpoints Implements POST /runners/register, POST /runners//heartbeat, GET /runners//next-job, PUT /runners//jobs//complete. Wires blueprint into app.py and adds test_case_info property to engine.Problem so dispatch layer can read task metadata via duck-typing interface used in both production and mock tests. --- app.py | 1 + model/__init__.py | 3 + model/runner.py | 92 +++++++++++++ mongo/engine.py | 12 ++ tests/unittest/test_runner_api.py | 207 ++++++++++++++++++++++++++++++ 5 files changed, 315 insertions(+) create mode 100644 model/runner.py create mode 100644 tests/unittest/test_runner_api.py diff --git a/app.py b/app.py index eca7d693..c6a9d180 100644 --- a/app.py +++ b/app.py @@ -26,6 +26,7 @@ def app(): (copycat_api, '/copycat'), (health_api, '/health'), (user_api, '/user'), + (runner_api, '/runners'), ] for api, prefix in api2prefix: app.register_blueprint(api, url_prefix=prefix) diff --git a/model/__init__.py b/model/__init__.py index 485c706c..0ae39305 100644 --- a/model/__init__.py +++ b/model/__init__.py @@ -11,6 +11,7 @@ from . import copycat from . import health from . import user +from . import runner from .auth import * from .profile import * @@ -25,6 +26,7 @@ from .copycat import * from .health import * from .user import * +from .runner import * __all__ = [ *auth.__all__, @@ -40,4 +42,5 @@ *copycat.__all__, *health.__all__, *user.__all__, + *runner.__all__, ] diff --git a/model/runner.py b/model/runner.py new file mode 100644 index 00000000..73257363 --- /dev/null +++ b/model/runner.py @@ -0,0 +1,92 @@ +"""Runner API blueprint: registration, heartbeat, job pickup, completion.""" +import secrets +from datetime import timedelta + +from flask import Blueprint, request + +from mongo import Submission +from mongo.utils import MinioClient +from .schemas import RegisterRunnerBody, CompleteJobBody +from .utils import HTTPError, HTTPResponse, parse_body, require_runner_token + +from dispatch import runner as runner_mod +from dispatch import job as job_mod +from dispatch.config import ( + RUNNER_REGISTRATION_TOKEN, + HEARTBEAT_INTERVAL_SEC, + POLL_INTERVAL_SEC, + MAX_CONCURRENT_JOBS_PER_RUNNER, + CODE_PRESIGNED_URL_TTL_SEC, +) + +__all__ = ["runner_api"] +runner_api = Blueprint("runner_api", __name__) + + +@runner_api.post("/register") +@parse_body(RegisterRunnerBody) +def register(body: RegisterRunnerBody): + if not secrets.compare_digest(body.registration_token, + RUNNER_REGISTRATION_TOKEN): + return HTTPError("invalid registration token", 401) + rn_id, rk_token = runner_mod.register( + name=body.name or "unnamed", + registration_ip=request.remote_addr or "unknown", + ) + return HTTPResponse( + data={ + "runner_id": rn_id, + "token": rk_token, + "config": { + "heartbeat_interval_sec": HEARTBEAT_INTERVAL_SEC, + "poll_interval_sec": POLL_INTERVAL_SEC, + "max_concurrent_jobs": MAX_CONCURRENT_JOBS_PER_RUNNER, + }, + }, + status_code=201, + ) + + +@runner_api.post("//heartbeat") +@require_runner_token +def heartbeat(runner_id): + runner_mod.renew_alive(runner_id) + return "", 204 + + +@runner_api.get("//next-job") +@require_runner_token +def next_job(runner_id): + payload = job_mod.claim_next_job(runner_id) + if payload is None: + return "", 204 + # Convert code_minio_path to presigned URL just before sending + minio_path = payload.pop("code_minio_path") + minio = MinioClient() + payload["code_url"] = minio.client.presigned_get_object( + minio.bucket, + minio_path, + expires=timedelta(seconds=CODE_PRESIGNED_URL_TTL_SEC), + ) + return HTTPResponse(data=payload) + + +@runner_api.put("//jobs//complete") +@require_runner_token +@parse_body(CompleteJobBody) +def complete(runner_id, job_id, body: CompleteJobBody): + + def process(submission_id_str: str, tasks: list) -> None: + Submission(submission_id_str).process_result(tasks) + + result = job_mod.complete_job( + rn_id=runner_id, + jb_id=job_id, + tasks=body.tasks, + process_result=process, + ) + if result == "wrong_owner": + return HTTPError("job has been reclaimed by another runner", 409) + if result == "not_found": + return HTTPError("job not found", 404) + return "", 204 diff --git a/mongo/engine.py b/mongo/engine.py index ff66a37f..09ca7f1e 100644 --- a/mongo/engine.py +++ b/mongo/engine.py @@ -310,6 +310,18 @@ class Visibility: default='', ) + @property + def test_case_info(self) -> dict: + """Return test case metadata as a plain dict (used by dispatch layer).""" + tc = self.test_case + return { + "tasks": [{ + "caseCount": t.case_count, + "memoryLimit": t.memory_limit, + "timeLimit": t.time_limit, + } for t in (tc.tasks if tc else [])], + } + class CaseResult(EmbeddedDocument): status = IntField(required=True) diff --git a/tests/unittest/test_runner_api.py b/tests/unittest/test_runner_api.py new file mode 100644 index 00000000..69817cf4 --- /dev/null +++ b/tests/unittest/test_runner_api.py @@ -0,0 +1,207 @@ +"""End-to-end HTTP tests for the runner API blueprint. + +Uses the standard `client` fixture from conftest.py (Flask test client) and +exercises the blueprint via real HTTP-style calls. +""" +import pytest +from mongo.utils import RedisCache +from dispatch import runner as runner_mod +from dispatch.redis_keys import runner_alive_key +from dispatch.config import RUNNER_REGISTRATION_TOKEN, RUNNER_ALIVE_TTL_SEC + +# first_admin is the seed admin created by app.py on first boot. +_ADMIN = "first_admin" + +_TEST_CASE_INFO = { + 'language': + 0, + 'fillInTemplate': + '', + 'tasks': [{ + 'caseCount': 1, + 'taskScore': 100, + 'memoryLimit': 32768, + 'timeLimit': 1000, + }], +} + +# Minimal valid tasks payload: 1 task × 1 case (matches _TEST_CASE_INFO above) +_VALID_TASKS = [[{ + "exitCode": 0, + "status": "AC", + "stdout": "", + "stderr": "", + "execTime": 0, + "memoryUsage": 0, +}]] + + +@pytest.fixture(autouse=True) +def clear_redis(): + RedisCache().client.flushdb() + yield + RedisCache().client.flushdb() + + +# --- POST /runners/register --- + + +def test_register_with_valid_token_returns_201_with_credentials(client): + rv = client.post("/runners/register", + json={ + "registration_token": RUNNER_REGISTRATION_TOKEN, + "name": "test-runner", + }) + assert rv.status_code == 201 + body = rv.get_json()["data"] + assert body["runner_id"].startswith("rn_") + assert body["token"].startswith("rk_") + assert "config" in body + assert body["config"]["heartbeat_interval_sec"] == 15 + + +def test_register_with_invalid_token_returns_401(client): + rv = client.post("/runners/register", + json={ + "registration_token": "wrong-token", + "name": "test-runner", + }) + assert rv.status_code == 401 + + +# --- POST /runners//heartbeat --- + + +def test_heartbeat_with_valid_token_returns_204_and_renews(client): + rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1") + rds = RedisCache().client + rds.expire(runner_alive_key(rn_id), 5) + rv = client.post(f"/runners/{rn_id}/heartbeat", + headers={"Authorization": f"Bearer {rk}"}) + assert rv.status_code == 204 + assert (RUNNER_ALIVE_TTL_SEC - 5) < rds.ttl( + runner_alive_key(rn_id)) <= RUNNER_ALIVE_TTL_SEC + + +def test_heartbeat_with_invalid_token_returns_401(client): + rn_id, _ = runner_mod.register(name="r", registration_ip="1.1.1.1") + rv = client.post(f"/runners/{rn_id}/heartbeat", + headers={"Authorization": "Bearer wrong"}) + assert rv.status_code == 401 + + +# --- GET /runners//next-job --- + + +def test_next_job_returns_204_when_no_jobs(client): + rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1") + rv = client.get(f"/runners/{rn_id}/next-job", + headers={"Authorization": f"Bearer {rk}"}) + assert rv.status_code == 204 + + +def test_next_job_returns_200_with_payload_when_pending( + client, + app, + save_source, +): + """Submit a real submission, enqueue it, then have a runner pull via next-job.""" + from tests.utils.submission import create_submission + from tests.utils.problem import create_problem + from dispatch import job as job_mod + rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1") + + with app.app_context(): + problem = create_problem( + owner=_ADMIN, + test_case_info=_TEST_CASE_INFO, + ) + save_source("base", b"int main(){}", lang=0) + sub = create_submission(user=_ADMIN, problem=problem, lang=0) + job_mod.enqueue_job(sub) + + rv = client.get(f"/runners/{rn_id}/next-job", + headers={"Authorization": f"Bearer {rk}"}) + assert rv.status_code == 200 + body = rv.get_json()["data"] + assert body["job_id"].startswith("jb_") + assert body["submission_id"] == str(sub.id) + # code_url is presigned (contains amz signing params or testcontainers minio host) + assert "code_url" in body and len(body["code_url"]) > 0 + + +# --- PUT /runners//jobs//complete --- + + +def test_complete_with_valid_owner_returns_204( + client, + app, + save_source, +): + from tests.utils.submission import create_submission + from tests.utils.problem import create_problem + from dispatch import job as job_mod + rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1") + with app.app_context(): + problem = create_problem( + owner=_ADMIN, + test_case_info=_TEST_CASE_INFO, + ) + save_source("base", b"int main(){}", lang=0) + sub = create_submission(user=_ADMIN, problem=problem, lang=0) + job_mod.enqueue_job(sub) + + # Pull job + rv = client.get(f"/runners/{rn_id}/next-job", + headers={"Authorization": f"Bearer {rk}"}) + jb_id = rv.get_json()["data"]["job_id"] + + # Complete + rv = client.put( + f"/runners/{rn_id}/jobs/{jb_id}/complete", + headers={"Authorization": f"Bearer {rk}"}, + json={"tasks": _VALID_TASKS}, + ) + assert rv.status_code == 204 + + +def test_complete_with_wrong_owner_returns_409( + client, + app, + save_source, +): + from tests.utils.submission import create_submission + from tests.utils.problem import create_problem + from dispatch import job as job_mod + rn1, rk1 = runner_mod.register(name="r1", registration_ip="1.1.1.1") + rn2, rk2 = runner_mod.register(name="r2", registration_ip="1.1.1.2") + with app.app_context(): + problem = create_problem( + owner=_ADMIN, + test_case_info=_TEST_CASE_INFO, + ) + save_source("base", b"int main(){}", lang=0) + sub = create_submission(user=_ADMIN, problem=problem, lang=0) + job_mod.enqueue_job(sub) + + rv = client.get(f"/runners/{rn1}/next-job", + headers={"Authorization": f"Bearer {rk1}"}) + jb_id = rv.get_json()["data"]["job_id"] + + # rn2 tries to complete rn1's job + rv = client.put( + f"/runners/{rn2}/jobs/{jb_id}/complete", + headers={"Authorization": f"Bearer {rk2}"}, + json={"tasks": _VALID_TASKS}, + ) + assert rv.status_code == 409 + + +def test_complete_with_unknown_job_returns_404(client): + rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1") + rv = client.put( + f"/runners/{rn_id}/jobs/jb_nonexistent/complete", + headers={"Authorization": f"Bearer {rk}"}, + json={"tasks": _VALID_TASKS}, + ) + assert rv.status_code == 404 From a42c954fd29a5474a2f86d48cf02de84110835f7 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:48:59 +0800 Subject: [PATCH 12/22] feat(dispatch): mark Submission as JE when reclaim attempts exhausted Add _on_attempts_exhausted helper that updates Submission.status to 6 (JE) and deletes the orphaned job hash when Lua reclaim returns -1. --- dispatch/job.py | 29 ++++++++++++++++- tests/unittest/dispatch/test_job.py | 48 +++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/dispatch/job.py b/dispatch/job.py index 07d8148b..1c06ce8a 100644 --- a/dispatch/job.py +++ b/dispatch/job.py @@ -84,8 +84,10 @@ def claim_next_job(rn_id: str) -> Optional[dict]: ) if reclaim_result == 1: return _build_payload(orphan_id) + if reclaim_result == -1: + _on_attempts_exhausted(orphan_id) + # continue to look at other orphans # 0 = lost the race, try next orphan - # -1 = exhausted (already cleaned up by Lua); Task 9 will hook in JE marking return None @@ -101,6 +103,31 @@ def _assign_to_runner(jb_id: str, rn_id: str) -> None: rds.sadd(JOBS_LEASED, jb_id) +def _on_attempts_exhausted(jb_id: str) -> None: + """When a job exhausts max_attempts, mark its Submission as Judge Error. + + Lua script already removed jb_id from JOBS_LEASED set. This function + cleans the job hash and updates the Submission's status field. + """ + # Local import to avoid circular dependency at module load time. + from mongo import Submission + + rds = RedisCache().client + submission_id_bytes = rds.hget(job_key(jb_id), "submission_id") + if submission_id_bytes is None: + rds.delete(job_key(jb_id)) + return + submission_id = submission_id_bytes.decode() + try: + sub = Submission(submission_id) + if sub: + sub.update(status=6) # JE + except Exception: + pass + finally: + rds.delete(job_key(jb_id)) + + def _build_payload(jb_id: str) -> dict: """Build the payload returned to the runner via next-job endpoint. diff --git a/tests/unittest/dispatch/test_job.py b/tests/unittest/dispatch/test_job.py index 6d9755b7..3feafce1 100644 --- a/tests/unittest/dispatch/test_job.py +++ b/tests/unittest/dispatch/test_job.py @@ -212,3 +212,51 @@ def test_complete_job_with_unknown_id_returns_not_found(): process_result=lambda *a, **k: None, ) assert result == "not_found" + + +def test_claim_next_job_marks_submission_je_when_exhausted(app): + """When orphan reclaim hits max_attempts, Submission must be marked JE (status=6).""" + from bson import ObjectId + from datetime import datetime + from dispatch.config import MAX_ATTEMPTS + + rds = RedisCache().client + + with app.app_context(): + from mongo import engine + + # Create a minimal Submission document in MongoDB (mongomock allows + # fake ObjectId refs — no real problem/user needed for this test). + sub_doc = engine.Submission( + problem=ObjectId(), + user=ObjectId(), + language=0, + status=-1, + timestamp=datetime.now(), + ) + sub_doc.save() + submission_id = str(sub_doc.id) + + # Manually write the job hash (bypasses enqueue_job's problem access). + jb_id = "jb_exhaustion_test" + rds.hset( + job_key(jb_id), + mapping={ + "submission_id": submission_id, + "attempts": MAX_ATTEMPTS, + "leased_by": "rn_dead", + }, + ) + rds.sadd(JOBS_LEASED, jb_id) + # rn_dead has no alive key — it is already dead. + + rn2, _ = runner_mod.register(name="r2", registration_ip="1.1.1.2") + result = job_mod.claim_next_job(rn_id=rn2) + + assert result is None + # Submission status must be 6 (JE). + # Query only the status field to avoid ref dereference on fake ObjectIds. + refreshed = engine.Submission.objects.only("status").get(id=sub_doc.id) + assert refreshed.status == 6 + # Job hash should also be cleaned up. + assert rds.hgetall(job_key(jb_id)) == {} From 2ce7915c44db9461326369ce4c0a8a7307e5dc08 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:01:33 +0800 Subject: [PATCH 13/22] refactor(submission): submit() enqueues to Redis instead of POSTing to sandbox - mongo/submission.py::submit() removes TESTING shortcut and old send() call; now calls dispatch.job.enqueue_job() to push to Redis queue - Tests updated: removed manual enqueue_job() calls in test_runner_api.py that were workarounds before submit() was wired - New test added: test_submit_enqueues_job_to_redis_pending verifies submit() pushes exactly one job to JOBS_PENDING with correct submission_id --- mongo/submission.py | 8 ++++--- tests/test_submission.py | 37 +++++++++++++++++++++++++++++++ tests/unittest/test_runner_api.py | 11 ++++----- 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/mongo/submission.py b/mongo/submission.py index 953ab1de..e004130b 100644 --- a/mongo/submission.py +++ b/mongo/submission.py @@ -415,10 +415,12 @@ def submit(self, code_file) -> bool: stat['submissionIds'] = [] homework.save() submission.delete() - # we no need to actually send code to sandbox during testing - if current_app.config['TESTING'] or self.handwritten: + if self.handwritten: return True - return self.send() + # Enqueue to Redis (replaces the old self.send() / sandbox POST) + from dispatch.job import enqueue_job + enqueue_job(self) + return True def send(self) -> bool: ''' diff --git a/tests/test_submission.py b/tests/test_submission.py index 65e74cfa..f09c2c78 100644 --- a/tests/test_submission.py +++ b/tests/test_submission.py @@ -1084,3 +1084,40 @@ def test_cannot_view_output_out_of_index(app, forge_client): rv = client.get(f'/submission/{submission.id}/output/100/100') assert rv.status_code == 400, rv.get_json() assert rv.get_json()['message'] == 'task not exist' + + +def test_submit_enqueues_job_to_redis_pending(app): + """After submit(), a job hash should appear in Redis pending queue.""" + from mongo.utils import RedisCache + from dispatch.redis_keys import JOBS_PENDING, job_key + from tests.utils.submission import create_submission + + rds = RedisCache().client + rds.flushdb() + + with app.app_context(): + problem = utils.problem.create_problem(test_case_info={ + 'language': + 0, + 'fillInTemplate': + '', + 'tasks': [{ + 'caseCount': 1, + 'taskScore': 100, + 'memoryLimit': 32768, + 'timeLimit': 1000, + }], + }, ) + sub = create_submission( + user=problem.owner, + problem=problem, + lang=0, + ) + + # One job in pending queue + pending = rds.lrange(JOBS_PENDING, 0, -1) + assert len(pending) == 1, f"expected 1 pending job, got {len(pending)}" + jb_id = pending[0].decode() + # Job hash references our submission + assert rds.hget(job_key(jb_id), + "submission_id") == str(sub.id).encode() diff --git a/tests/unittest/test_runner_api.py b/tests/unittest/test_runner_api.py index 69817cf4..0a85f652 100644 --- a/tests/unittest/test_runner_api.py +++ b/tests/unittest/test_runner_api.py @@ -105,10 +105,9 @@ def test_next_job_returns_200_with_payload_when_pending( app, save_source, ): - """Submit a real submission, enqueue it, then have a runner pull via next-job.""" + """Submit a real submission; submit() enqueues it so runner can pull via next-job.""" from tests.utils.submission import create_submission from tests.utils.problem import create_problem - from dispatch import job as job_mod rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1") with app.app_context(): @@ -118,7 +117,7 @@ def test_next_job_returns_200_with_payload_when_pending( ) save_source("base", b"int main(){}", lang=0) sub = create_submission(user=_ADMIN, problem=problem, lang=0) - job_mod.enqueue_job(sub) + # enqueue_job is now called inside submit(); no manual call needed rv = client.get(f"/runners/{rn_id}/next-job", headers={"Authorization": f"Bearer {rk}"}) @@ -140,7 +139,6 @@ def test_complete_with_valid_owner_returns_204( ): from tests.utils.submission import create_submission from tests.utils.problem import create_problem - from dispatch import job as job_mod rn_id, rk = runner_mod.register(name="r", registration_ip="1.1.1.1") with app.app_context(): problem = create_problem( @@ -149,7 +147,7 @@ def test_complete_with_valid_owner_returns_204( ) save_source("base", b"int main(){}", lang=0) sub = create_submission(user=_ADMIN, problem=problem, lang=0) - job_mod.enqueue_job(sub) + # enqueue_job is now called inside submit(); no manual call needed # Pull job rv = client.get(f"/runners/{rn_id}/next-job", @@ -172,7 +170,6 @@ def test_complete_with_wrong_owner_returns_409( ): from tests.utils.submission import create_submission from tests.utils.problem import create_problem - from dispatch import job as job_mod rn1, rk1 = runner_mod.register(name="r1", registration_ip="1.1.1.1") rn2, rk2 = runner_mod.register(name="r2", registration_ip="1.1.1.2") with app.app_context(): @@ -182,7 +179,7 @@ def test_complete_with_wrong_owner_returns_409( ) save_source("base", b"int main(){}", lang=0) sub = create_submission(user=_ADMIN, problem=problem, lang=0) - job_mod.enqueue_job(sub) + # enqueue_job is now called inside submit(); no manual call needed rv = client.get(f"/runners/{rn1}/next-job", headers={"Authorization": f"Bearer {rk1}"}) From 957d8da7890ff69fb69a7b00b77a4936631b29ac Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:04:41 +0800 Subject: [PATCH 14/22] refactor(submission): rejudge() enqueues to Redis; add Redis cleanup fixture - mongo/submission.py::rejudge() now calls dispatch.job.enqueue_job() (consistent with submit() refactor in previous commit) - tests/conftest.py: add autouse fixture that flushdb between every test to prevent fakeredis state leakage across tests (was causing pre-existing flakiness in TestTeacherGetSubmission) --- mongo/submission.py | 8 +++----- tests/conftest.py | 13 +++++++++++++ tests/test_submission.py | 37 +++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/mongo/submission.py b/mongo/submission.py index e004130b..eec2e0b7 100644 --- a/mongo/submission.py +++ b/mongo/submission.py @@ -347,17 +347,15 @@ def rejudge(self) -> bool: ''' rejudge this submission ''' - # delete output file self.delete_output() - # turn back to haven't be judged self.update( status=-1, last_send=datetime.now(), tasks=[], ) - if current_app.config['TESTING']: - return True - return self.send() + from dispatch.job import enqueue_job + enqueue_job(self) + return True def _generate_code_minio_path(self): return f'submissions/{self.id}_{ULID()}.zip' diff --git a/tests/conftest.py b/tests/conftest.py index 3b4e2ec7..b362ef9b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -202,6 +202,19 @@ def problem_ids( return problem_ids +@pytest.fixture(autouse=True) +def clear_redis_between_tests(): + """Clear Redis (fakeredis) state between every test to prevent leakage. + + Without this, tests that write to Redis (e.g., dispatch jobs, permission + cache) can affect other tests via the shared _FAKE_SERVER singleton. + """ + from mongo.utils import RedisCache + RedisCache().client.flushdb() + yield + RedisCache().client.flushdb() + + @pytest.fixture def save_source(tmp_path): diff --git a/tests/test_submission.py b/tests/test_submission.py index f09c2c78..8e3fdb7a 100644 --- a/tests/test_submission.py +++ b/tests/test_submission.py @@ -1086,6 +1086,43 @@ def test_cannot_view_output_out_of_index(app, forge_client): assert rv.get_json()['message'] == 'task not exist' +def test_rejudge_enqueues_new_job_to_pending(app): + from mongo.utils import RedisCache + from dispatch.redis_keys import JOBS_PENDING + from tests.utils.submission import create_submission + + rds = RedisCache().client + rds.flushdb() + + with app.app_context(): + problem = utils.problem.create_problem( + test_case_info={ + 'language': + 0, + 'fillInTemplate': + '', + 'tasks': [{ + 'caseCount': 1, + 'taskScore': 100, + 'memoryLimit': 32768, + 'timeLimit': 1000, + }], + }) + sub = create_submission(user=problem.owner, problem=problem, lang=0) + # The submit() during create_submission already enqueued one job. + # Clear pending to focus on rejudge: + rds.delete(JOBS_PENDING) + + # Rejudge + sub.rejudge() + + # New job in pending + assert rds.llen(JOBS_PENDING) == 1 + # Submission status reset to -1 + sub.reload() + assert sub.status == -1 + + def test_submit_enqueues_job_to_redis_pending(app): """After submit(), a job hash should appear in Redis pending queue.""" from mongo.utils import RedisCache From 785ecc0b2d0b3a05a1467f2a61474dbc976e4155 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:07:18 +0800 Subject: [PATCH 15/22] refactor(submission): remove push-based sandbox dispatch methods Delete target_sandbox, send, sandbox_resp_handler, assign_token, and verify_token from Submission class; also drop unused `import requests as rq`. --- mongo/submission.py | 108 -------------------------------------------- 1 file changed, 108 deletions(-) diff --git a/mongo/submission.py b/mongo/submission.py index eec2e0b7..755a5525 100644 --- a/mongo/submission.py +++ b/mongo/submission.py @@ -14,7 +14,6 @@ ) import enum import tempfile -import requests as rq from hashlib import md5 from bson.son import SON from flask import current_app @@ -260,52 +259,6 @@ def default_del_func(d): del_funcs.get(d, default_del_func)(d) self.obj.delete() - def sandbox_resp_handler(self, resp): - # judge queue is currently full - def on_500(resp): - raise JudgeQueueFullError - - # backend send some invalid data - def on_400(resp): - raise ValueError(resp.text) - - # send a invalid token - def on_403(resp): - raise ValueError('invalid token') - - h = { - 500: on_500, - 403: on_403, - 400: on_400, - 200: lambda r: True, - } - try: - return h[resp.status_code](resp) - except KeyError: - self.logger.error('can not handle response from sandbox') - self.logger.error( - f'status code: {resp.status_code}\n' - f'headers: {resp.headers}\n' - f'body: {resp.text}', ) - return False - - def target_sandbox(self): - load = 10**3 # current min load - tar = None # target - for sb in self.config().sandbox_instances: - resp = rq.get(f'{sb.url}/status') - if not resp.ok: - self.logger.warning(f'sandbox {sb.name} status exception') - self.logger.warning( - f'status code: {resp.status_code}\n ' - f'body: {resp.text}', ) - continue - resp = resp.json() - if resp['load'] < load: - load = resp['load'] - tar = sb - return tar - def get_comment(self) -> bytes: ''' if comment not exist @@ -420,44 +373,6 @@ def submit(self, code_file) -> bool: enqueue_job(self) return True - def send(self) -> bool: - ''' - send code to sandbox - ''' - if self.handwritten: - logging.warning(f'try to send a handwritten {self}') - return False - # TODO: Ensure problem is ready to submitted - # if not Problem(self.problem).is_test_case_ready(): - # raise TestCaseNotFound(self.problem.problem_id) - # setup post body - files = { - 'src': io.BytesIO(b"".join(self._get_code_raw())), - } - # look for the target sandbox - tar = self.target_sandbox() - if tar is None: - self.logger.error(f'can not target a sandbox for {repr(self)}') - return False - # save token for validation - Submission.assign_token(self.id, tar.token) - post_data = { - 'token': tar.token, - 'checker': 'print("not implement yet. qaq")', - 'problem_id': self.problem_id, - 'language': self.language, - } - judge_url = f'{tar.url}/submit/{self.id}' - # send submission to snadbox for judgement - self.logger.info(f'send {self} to {tar.name}') - resp = rq.post( - judge_url, - data=post_data, - files=files, - ) - self.logger.info(f'recieve {self} resp from sandbox') - return self.sandbox_resp_handler(resp) - def process_result(self, tasks: list): ''' process results from sandbox @@ -727,29 +642,6 @@ def add( submission.save() return cls(submission.id) - @classmethod - def assign_token(cls, submission_id, token=None): - ''' - generate a token for the submission - ''' - if token is None: - token = gen_token() - RedisCache().set(gen_key(submission_id), token) - return token - - @classmethod - def verify_token(cls, submission_id, token): - cache = RedisCache() - key = gen_key(submission_id) - s_token = cache.get(key) - if s_token is None: - return False - s_token = s_token.decode('ascii') - valid = secrets.compare_digest(s_token, token) - if valid: - cache.delete(key) - return valid - def to_dict(self) -> Dict[str, Any]: ret = self._to_dict() # Convert Bson object to python dictionary From baf8a660ddd1b5ce9a721f97fd04f65dea82aac1 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:09:05 +0800 Subject: [PATCH 16/22] refactor(api): remove old PUT //complete callback endpoint Drop on_submission_complete handler and OnSubmissionCompleteBody schema; these were part of the push-based dispatch that is now replaced by the pull-based runner API. --- model/schemas/__init__.py | 1 - model/schemas/submission.py | 5 ----- model/submission.py | 21 --------------------- 3 files changed, 27 deletions(-) diff --git a/model/schemas/__init__.py b/model/schemas/__init__.py index 2980fe44..e3071033 100644 --- a/model/schemas/__init__.py +++ b/model/schemas/__init__.py @@ -23,7 +23,6 @@ from .submission import ( CreateSubmissionBody, GetSubmissionListQuery, - OnSubmissionCompleteBody, GradeSubmissionBody, UpdateConfigBody, ) diff --git a/model/schemas/submission.py b/model/schemas/submission.py index d6e27d35..6d00933a 100644 --- a/model/schemas/submission.py +++ b/model/schemas/submission.py @@ -20,11 +20,6 @@ class GetSubmissionListQuery(BaseSchema): ip_addr: Optional[str] = None -class OnSubmissionCompleteBody(BaseSchema): - tasks: List[Any] - token: str - - class GradeSubmissionBody(BaseSchema): score: int diff --git a/model/submission.py b/model/submission.py index 2cdd8b2d..6c8a7556 100644 --- a/model/submission.py +++ b/model/submission.py @@ -22,7 +22,6 @@ from .schemas import ( CreateSubmissionBody, GetSubmissionListQuery, - OnSubmissionCompleteBody, GradeSubmissionBody, UpdateConfigBody, ) @@ -342,26 +341,6 @@ def get_submission_pdf(user, submission: Submission, item): ) -@submission_api.put('//complete') -@parse_body(OnSubmissionCompleteBody) -@Request.doc('submission', Submission) -def on_submission_complete(submission: Submission, - body: OnSubmissionCompleteBody): - tasks = body.tasks - token = body.token - if not Submission.verify_token(submission.id, token): - return HTTPError('i don\'t know you', 403) - try: - submission.process_result(tasks) - except (ValidationError, KeyError) as e: - return HTTPError( - 'invalid data!\n' - f'{type(e).__name__}: {e}', - 400, - ) - return HTTPResponse(f'{submission} result recieved.') - - @submission_api.route('/', methods=['PUT']) @login_required @Request.doc('submission', Submission) From 2ad54bef647c819fae8d510801096edd6648cc44 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:10:41 +0800 Subject: [PATCH 17/22] refactor(api): drop sandbox_instances handling from PUT /submission/config Remove sandbox_instances field from UpdateConfigBody and simplify update_config to only update rate_limit; runners now self-register via the pull-based dispatch. Also drop unused imports (requests, current_app). Update test_edit_config to match new API shape. --- model/schemas/submission.py | 3 +-- model/submission.py | 44 ++----------------------------------- tests/test_submission.py | 20 ++--------------- 3 files changed, 5 insertions(+), 62 deletions(-) diff --git a/model/schemas/submission.py b/model/schemas/submission.py index 6d00933a..bf89c29b 100644 --- a/model/schemas/submission.py +++ b/model/schemas/submission.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Optional from .base import BaseSchema @@ -26,4 +26,3 @@ class GradeSubmissionBody(BaseSchema): class UpdateConfigBody(BaseSchema): rate_limit: int - sandbox_instances: List[Any] diff --git a/model/submission.py b/model/submission.py index 6c8a7556..e733f2b6 100644 --- a/model/submission.py +++ b/model/submission.py @@ -1,6 +1,5 @@ import io from typing import Optional -import requests as rq import random import secrets import json @@ -8,7 +7,6 @@ Blueprint, send_file, request, - current_app, ) from datetime import datetime, timedelta from mongo import * @@ -470,50 +468,12 @@ def get_config(user): @identity_verify(0) @parse_body(UpdateConfigBody) def update_config(user, body: UpdateConfigBody): - rate_limit = body.rate_limit - sandbox_instances = body.sandbox_instances config = Submission.config() - # try to convert json object to Sandbox instance try: - sandbox_instances = [ - *map( - lambda s: engine.Sandbox(**s), - sandbox_instances, - ) - ] - except engine.ValidationError as e: - return HTTPError( - 'wrong Sandbox schema', - 400, - data=e.to_dict(), - ) - # skip if during testing - if not current_app.config['TESTING']: - resps = [] - # check sandbox status - for sb in sandbox_instances: - resp = rq.get(f'{sb.url}/status') - if not resp.ok: - resps.append((sb.name, resp)) - # some exception occurred - if len(resps) != 0: - return HTTPError( - 'some error occurred when check sandbox status', - 400, - data=[{ - 'name': name, - 'statusCode': resp.status_code, - 'response': resp.text, - } for name, resp in resps], - ) - try: - config.update( - rate_limit=rate_limit, - sandbox_instances=sandbox_instances, - ) + config.update(rate_limit=body.rate_limit) except ValidationError as e: return HTTPError(str(e), 400) - return HTTPResponse('success.') + return HTTPResponse('config updated') @submission_api.post('//migrate-code') diff --git a/tests/test_submission.py b/tests/test_submission.py index 8e3fdb7a..ef75f1c2 100644 --- a/tests/test_submission.py +++ b/tests/test_submission.py @@ -1003,30 +1003,14 @@ def test_get_config(self, client_admin): def test_edit_config(self, client_admin): rv = client_admin.put( f'/submission/config', - json={ - 'rateLimit': - 10, - 'sandboxInstances': [{ - 'name': 'Test', - 'url': 'http://sandbox:6666', - 'token': 'AAAAA', - }] - }, + json={'rateLimit': 10}, ) json = rv.get_json() assert rv.status_code == 200, json rv = client_admin.get(f'/submission/config') json = rv.get_json() assert rv.status_code == 200, json - assert json['data'] == { - 'rateLimit': - 10, - 'sandboxInstances': [{ - 'name': 'Test', - 'url': 'http://sandbox:6666', - 'token': 'AAAAA', - }] - } + assert json['data']['rateLimit'] == 10 def test_student_cannot_view_WA_submission_output(forge_client, app): From 84e72024ff5f9d421bab12481acebc6982fab407 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:12:06 +0800 Subject: [PATCH 18/22] refactor(engine): deprecate Sandbox EmbeddedDocument; keep field as untyped ListField Delete class Sandbox(EmbeddedDocument) which is no longer referenced. Convert SubmissionConfig.sandbox_instances to a plain ListField with no schema to preserve DB compatibility on production until the PG migration. --- mongo/engine.py | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/mongo/engine.py b/mongo/engine.py index 09ca7f1e..3ba691ca 100644 --- a/mongo/engine.py +++ b/mongo/engine.py @@ -431,25 +431,12 @@ class Config(Document): name = StringField(required=True, max_length=64, primary_key=True) -class Sandbox(EmbeddedDocument): - name = StringField(required=True) - url = StringField(required=True) - token = StringField(required=True) - - class SubmissionConfig(Config): rate_limit = IntField(default=0, db_field='rateLimit') - sandbox_instances = EmbeddedDocumentListField( - Sandbox, - default=[ - Sandbox( - name='Sandbox-0', - url='http://sandbox:1450', - token='KoNoSandboxDa', - ), - ], - db_field='sandboxInstances', - ) + # DEPRECATED: this field is no longer used since the pull-based dispatch + # refactor (2026-04-28). Kept here as ListField (no schema) only to avoid + # breaking existing production documents. Remove during PG migration. + sandbox_instances = ListField(default=list, db_field='sandboxInstances') class LoginRecords(Document): From 8213928be8c812c61860986edbebbe0bd48b0276 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:13:09 +0800 Subject: [PATCH 19/22] style: fix pre-existing yapf formatting in dispatch and tests --- dispatch/config.py | 6 +++--- dispatch/redis_keys.py | 5 +++++ dispatch/runner.py | 6 +++--- .../unittest/dispatch/test_reclaim_script.py | 20 ++++++++++--------- tests/unittest/dispatch/test_runner.py | 9 ++++++--- 5 files changed, 28 insertions(+), 18 deletions(-) diff --git a/dispatch/config.py b/dispatch/config.py index 8362a700..ef6d660c 100644 --- a/dispatch/config.py +++ b/dispatch/config.py @@ -11,12 +11,12 @@ # Heartbeat / lease parameters HEARTBEAT_INTERVAL_SEC: int = 15 -RUNNER_ALIVE_TTL_SEC: int = 30 # 2x heartbeat +RUNNER_ALIVE_TTL_SEC: int = 30 # 2x heartbeat POLL_INTERVAL_SEC: int = 3 MAX_CONCURRENT_JOBS_PER_RUNNER: int = 8 # Job retry policy -MAX_ATTEMPTS: int = 3 # 1 initial + 2 reclaims, then mark JE +MAX_ATTEMPTS: int = 3 # 1 initial + 2 reclaims, then mark JE # Presigned URL TTL for code download (seconds) -CODE_PRESIGNED_URL_TTL_SEC: int = 3600 # 1 hour +CODE_PRESIGNED_URL_TTL_SEC: int = 3600 # 1 hour diff --git a/dispatch/redis_keys.py b/dispatch/redis_keys.py index e35f0209..b9a26e38 100644 --- a/dispatch/redis_keys.py +++ b/dispatch/redis_keys.py @@ -3,19 +3,24 @@ # Runner namespace RUNNERS_REGISTERED = "runners:registered" + def runner_meta_key(rn_id: str) -> str: return f"runner:{rn_id}:meta" + def runner_token_key(rn_id: str) -> str: """STRING key holding SHA-256 digest of the runner's bearer token.""" return f"runner:{rn_id}:token_hash" + def runner_alive_key(rn_id: str) -> str: return f"runner:{rn_id}:alive" + # Job namespace JOBS_PENDING = "jobs:pending" JOBS_LEASED = "jobs:leased" + def job_key(jb_id: str) -> str: return f"job:{jb_id}" diff --git a/dispatch/runner.py b/dispatch/runner.py index 7a0423a8..d292667c 100644 --- a/dispatch/runner.py +++ b/dispatch/runner.py @@ -63,9 +63,9 @@ def renew_alive(rn_id: str) -> None: Caller MUST verify the runner's token before calling this — does not check existence and will create the key for any rn_id string. """ - RedisCache().client.set( - runner_alive_key(rn_id), "1", ex=RUNNER_ALIVE_TTL_SEC - ) + RedisCache().client.set(runner_alive_key(rn_id), + "1", + ex=RUNNER_ALIVE_TTL_SEC) def is_alive(rn_id: str) -> bool: diff --git a/tests/unittest/dispatch/test_reclaim_script.py b/tests/unittest/dispatch/test_reclaim_script.py index ed9e3fa3..1883927b 100644 --- a/tests/unittest/dispatch/test_reclaim_script.py +++ b/tests/unittest/dispatch/test_reclaim_script.py @@ -19,11 +19,12 @@ def clear_redis(): def _seed_leased_job(jb_id: str, owner: str, attempts: int = 1): rds = RedisCache().client - rds.hset(job_key(jb_id), mapping={ - "leased_by": owner, - "leased_at": datetime.now(timezone.utc).isoformat(), - "attempts": attempts, - }) + rds.hset(job_key(jb_id), + mapping={ + "leased_by": owner, + "leased_at": datetime.now(timezone.utc).isoformat(), + "attempts": attempts, + }) rds.sadd(JOBS_LEASED, jb_id) @@ -49,13 +50,14 @@ def test_reclaim_fails_when_owner_changed(): result = reclaim_orphan_atomic( jb_id="jb_1", - expected_owner="rn_old", # we expected rn_old + expected_owner="rn_old", # we expected rn_old new_owner="rn_new", max_attempts=3, ) assert result == 0 # not reclaimed - assert RedisCache().client.hget(job_key("jb_1"), "leased_by") == b"rn_someone_else" + assert RedisCache().client.hget(job_key("jb_1"), + "leased_by") == b"rn_someone_else" def test_reclaim_returns_negative_when_max_attempts_reached(): @@ -83,6 +85,6 @@ def test_reclaim_is_atomic_under_concurrent_calls(): r1 = reclaim_orphan_atomic("jb_1", "rn_old", "rn_new1", max_attempts=3) r2 = reclaim_orphan_atomic("jb_1", "rn_old", "rn_new2", max_attempts=3) - assert r1 == 1 # first wins - assert r2 == 0 # second sees owner already changed + assert r1 == 1 # first wins + assert r2 == 0 # second sees owner already changed assert RedisCache().client.hget(job_key("jb_1"), "leased_by") == b"rn_new1" diff --git a/tests/unittest/dispatch/test_runner.py b/tests/unittest/dispatch/test_runner.py index 498e6b35..5b58865e 100644 --- a/tests/unittest/dispatch/test_runner.py +++ b/tests/unittest/dispatch/test_runner.py @@ -19,14 +19,16 @@ def clear_redis(): def test_register_returns_id_and_token(): - rn_id, rk_token = runner_mod.register(name="my-runner", registration_ip="1.2.3.4") + rn_id, rk_token = runner_mod.register(name="my-runner", + registration_ip="1.2.3.4") assert rn_id.startswith("rn_") assert rk_token.startswith("rk_") assert len(rk_token) > 30 # actually random def test_register_persists_to_redis(): - rn_id, rk_token = runner_mod.register(name="my-runner", registration_ip="1.2.3.4") + rn_id, rk_token = runner_mod.register(name="my-runner", + registration_ip="1.2.3.4") rds = RedisCache().client # Meta hash exists with name + ip @@ -76,7 +78,8 @@ def test_renew_alive_resets_ttl(): assert rds.ttl(runner_alive_key(rn_id)) <= 5 runner_mod.renew_alive(rn_id) - assert (RUNNER_ALIVE_TTL_SEC - 5) < rds.ttl(runner_alive_key(rn_id)) <= RUNNER_ALIVE_TTL_SEC + assert (RUNNER_ALIVE_TTL_SEC - 5) < rds.ttl( + runner_alive_key(rn_id)) <= RUNNER_ALIVE_TTL_SEC def test_is_alive_returns_true_when_key_exists(): From 3a6de2fcdc83f0b0adede638bffc142777756396 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:18:16 +0800 Subject: [PATCH 20/22] refactor: migrate problem endpoints from SANDBOX_TOKEN to per-runner token MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - dispatch/runner.py: add verify_any_token() — scans registered runners - model/problem.py: get_testdata/get_checksum/get_meta now accept any runner's token - mongo/sandbox.py: deleted (find_by_token replaced) Resolves the gap where Plan A Task 14 was blocked by these legacy endpoints. Runners (Sandbox) now authenticate to the testdata fetch path with their per-runner rk_token instead of the shared SANDBOX_TOKEN. --- dispatch/runner.py | 16 +++++++++++++ model/problem.py | 14 +++++------ mongo/sandbox.py | 13 ---------- tests/test_problem.py | 33 ++++++++++---------------- tests/unittest/dispatch/test_runner.py | 14 +++++++++++ 5 files changed, 50 insertions(+), 40 deletions(-) delete mode 100644 mongo/sandbox.py diff --git a/dispatch/runner.py b/dispatch/runner.py index d292667c..274371ee 100644 --- a/dispatch/runner.py +++ b/dispatch/runner.py @@ -71,3 +71,19 @@ def renew_alive(rn_id: str) -> None: def is_alive(rn_id: str) -> bool: """True if this runner has a non-expired alive key.""" return bool(RedisCache().client.exists(runner_alive_key(rn_id))) + + +def verify_any_token(rk_token: str) -> bool: + """Check if rk_token belongs to any registered runner. + + Used by endpoints that accept rk_token via query string (legacy auth shape + inherited from old SANDBOX_TOKEN endpoints — testdata fetching). + Linear scan over registered runners; fine for small N. If runner count + grows, add a reverse-index of token_hash -> rn_id. + """ + rds = RedisCache().client + for rn_id_bytes in rds.smembers(RUNNERS_REGISTERED): + rn_id = rn_id_bytes.decode() + if verify_token(rn_id, rk_token): + return True + return False diff --git a/model/problem.py b/model/problem.py index 0d873da5..6be1f486 100644 --- a/model/problem.py +++ b/model/problem.py @@ -7,7 +7,7 @@ from zipfile import BadZipFile from mongo import * from mongo import engine -from mongo import sandbox +from dispatch import runner as runner_mod from mongo.utils import drop_none from mongo.problem import * from .auth import * @@ -304,8 +304,8 @@ def get_test_case(user: User, problem: Problem): @Request.doc('problem_id', 'problem', Problem) def get_testdata(query: GetTestdataQuery, problem: Problem): token = query.token - if sandbox.find_by_token(token) is None: - return HTTPError('Invalid sandbox token', 401) + if not runner_mod.verify_any_token(token): + return HTTPError('Invalid runner token', 401) return send_file( problem.get_test_case(), mimetype='application/zip', @@ -318,8 +318,8 @@ def get_testdata(query: GetTestdataQuery, problem: Problem): @parse_query(GetTestdataQuery) def get_checksum(query: GetTestdataQuery, problem_id: int): token = query.token - if sandbox.find_by_token(token) is None: - return HTTPError('Invalid sandbox token', 401) + if not runner_mod.verify_any_token(token): + return HTTPError('Invalid runner token', 401) problem = Problem(problem_id) if not problem: return HTTPError(f'{problem} not found', 404) @@ -337,8 +337,8 @@ def get_checksum(query: GetTestdataQuery, problem_id: int): @parse_query(GetTestdataQuery) def get_meta(query: GetTestdataQuery, problem_id: int): token = query.token - if sandbox.find_by_token(token) is None: - return HTTPError('Invalid sandbox token', 401) + if not runner_mod.verify_any_token(token): + return HTTPError('Invalid runner token', 401) problem = Problem(problem_id) if not problem: return HTTPError(f'{problem} not found', 404) diff --git a/mongo/sandbox.py b/mongo/sandbox.py deleted file mode 100644 index b5483571..00000000 --- a/mongo/sandbox.py +++ /dev/null @@ -1,13 +0,0 @@ -import secrets -from .submission import Submission - - -def find_by_token(token: str): - ''' - Find sandbox by token. return None if cannot find a sandbox with that token. - ''' - sandboxes = Submission.config().sandbox_instances - for sandbox in sandboxes: - if secrets.compare_digest(token, sandbox.token): - return sandbox - return None diff --git a/tests/test_problem.py b/tests/test_problem.py index 984d1507..862e975e 100644 --- a/tests/test_problem.py +++ b/tests/test_problem.py @@ -683,15 +683,15 @@ def test_admin_update_problem_test_case(self, client_admin, monkeypatch): def test_get_testdata_with_invalid_token(self, client): rv = client.get('/problem/3/testdata?token=InvalidToken8787') assert rv.status_code == 401, rv.get_json() - assert rv.get_json()['message'] == 'Invalid sandbox token' + assert rv.get_json()['message'] == 'Invalid runner token' def test_get_testdata(self, client, monkeypatch): # FIXME: it should be impl in mock monkeypatch.setattr( Problem, 'get_test_case', lambda *_: get_file('bogay/test_case.zip')['case'][0]) - from model.problem import sandbox - monkeypatch.setattr(sandbox, 'find_by_token', lambda *_: True) + from dispatch import runner as runner_mod + monkeypatch.setattr(runner_mod, 'verify_any_token', lambda *_: True) rv = client.get('/problem/3/testdata?token=ValidToken') assert rv.status_code == 200 with ZipFile(io.BytesIO(rv.data)) as zf: @@ -708,12 +708,12 @@ def test_get_testdata(self, client, monkeypatch): def test_get_checksum_with_invalid_token(self, client): rv = client.get('/problem/3/checksum?token=InvalidToken8787') assert rv.status_code == 401, rv.get_json() - assert rv.get_json()['message'] == 'Invalid sandbox token' + assert rv.get_json()['message'] == 'Invalid runner token' def test_get_checksum_with_problem_does_not_exist(self, client, monkeypatch): - from model.problem import sandbox - monkeypatch.setattr(sandbox, 'find_by_token', lambda *_: True) + from dispatch import runner as runner_mod + monkeypatch.setattr(runner_mod, 'verify_any_token', lambda *_: True) rv = client.get('/problem/878787/checksum?token=SandboxToken') assert rv.status_code == 404, rv.get_json() assert rv.get_json()['message'] == 'problem [878787] not found' @@ -723,8 +723,8 @@ def test_get_checksum(self, client, monkeypatch): monkeypatch.setattr( Problem, 'get_test_case', lambda *_: get_file('bogay/test_case.zip')['case'][0]) - from model.problem import sandbox - monkeypatch.setattr(sandbox, 'find_by_token', lambda *_: True) + from dispatch import runner as runner_mod + monkeypatch.setattr(runner_mod, 'verify_any_token', lambda *_: True) rv = client.get('/problem/3/checksum?token=SandboxToken') assert rv.status_code == 200, rv.get_json() assert rv.get_json()['data'] == 'b80aa4fad6b5dea9a5bca3237ac3ba89' @@ -732,25 +732,18 @@ def test_get_checksum(self, client, monkeypatch): def test_get_meta_with_invalid_token(self, client): rv = client.get('/problem/3/meta?token=InvalidToken8787') assert rv.status_code == 401, rv.get_json() - assert rv.get_json()['message'] == 'Invalid sandbox token' + assert rv.get_json()['message'] == 'Invalid runner token' def test_get_meta_with_problem_does_not_exist(self, client, monkeypatch): - from model.problem import sandbox - monkeypatch.setattr(sandbox, 'find_by_token', lambda *_: True) + from dispatch import runner as runner_mod + monkeypatch.setattr(runner_mod, 'verify_any_token', lambda *_: True) rv = client.get('/problem/878787/meta?token=SandboxToken') assert rv.status_code == 404, rv.get_json() assert rv.get_json()['message'] == 'problem [878787] not found' def test_get_meta(self, client, monkeypatch): - - class MockSandbox: - token = 'SandboxToken' - - class MockConfig: - sandbox_instances = [MockSandbox()] - - from mongo.sandbox import Submission - monkeypatch.setattr(Submission, 'config', MockConfig) + from dispatch import runner as runner_mod + monkeypatch.setattr(runner_mod, 'verify_any_token', lambda *_: True) rv = client.get('/problem/3/meta?token=SandboxToken') assert rv.status_code == 200, rv.get_json() assert rv.get_json()['data'] == { diff --git a/tests/unittest/dispatch/test_runner.py b/tests/unittest/dispatch/test_runner.py index 5b58865e..22036f57 100644 --- a/tests/unittest/dispatch/test_runner.py +++ b/tests/unittest/dispatch/test_runner.py @@ -91,3 +91,17 @@ def test_is_alive_returns_false_when_key_expired(): rn_id, _ = runner_mod.register(name="x", registration_ip="1.1.1.1") RedisCache().client.delete(runner_alive_key(rn_id)) assert runner_mod.is_alive(rn_id) is False + + +def test_verify_any_token_returns_true_for_registered(): + _, rk = runner_mod.register(name="r1", registration_ip="1.1.1.1") + assert runner_mod.verify_any_token(rk) is True + + +def test_verify_any_token_returns_false_for_unregistered(): + runner_mod.register(name="r1", registration_ip="1.1.1.1") + assert runner_mod.verify_any_token("rk_unknown") is False + + +def test_verify_any_token_returns_false_for_empty_runners_set(): + assert runner_mod.verify_any_token("rk_anything") is False From bf62532c9a5637aeb57725ac1a2d1e715c1764ae Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:22:51 +0800 Subject: [PATCH 21/22] test(integration): add end-to-end mock-runner integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Covers Plan A spec Section 13 failure modes: - Happy path: submit → pull → complete → status=AC - Orphan reclaim: rn1 dies mid-job, rn2 picks up, rn1 zombie complete rejected - Max attempts: 3 reclaims exhausted → Submission marked JE These complete Plan A's verification — Backend dispatch path is end-to-end exercised without needing the real Sandbox. --- tests/integration/__init__.py | 0 tests/integration/test_runner_flow.py | 296 ++++++++++++++++++++++++++ 2 files changed, 296 insertions(+) create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/test_runner_flow.py diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/test_runner_flow.py b/tests/integration/test_runner_flow.py new file mode 100644 index 00000000..a1409391 --- /dev/null +++ b/tests/integration/test_runner_flow.py @@ -0,0 +1,296 @@ +"""End-to-end mock-runner integration tests. + +These tests use the Flask test client to simulate a complete runner lifecycle: +register → poll → claim → complete. Verifies the entire Backend pipeline works +without needing a real Sandbox container. +""" +import io +import tempfile +import zipfile + +import pytest +from mongo.utils import RedisCache +from dispatch.config import RUNNER_REGISTRATION_TOKEN, MAX_ATTEMPTS + +# Realistic 1-task / 1-case AC result that process_result accepts. +# Outer list = tasks, inner list = cases per task. +_AC_TASKS = [ + [ + { + "exitCode": 0, + "status": "AC", + "execTime": 10, + "memoryUsage": 1024, + "stdout": "", + "stderr": "", + }, + ], +] + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_zip_source(code: bytes = b"int main(){}") -> io.BytesIO: + """Return a BytesIO holding a zip with main.c containing *code*.""" + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: + zf.writestr("main.c", code) + buf.seek(0) + return buf + + +def _create_pending_submission(app, owner="first_admin", lang=0): + """Create a problem + submission ready for judging (status=-1, job enqueued). + + Returns a *detached* Submission object (must call .reload() inside app + context to refresh). + """ + from tests.utils.problem import create_problem + from mongo import Problem, Submission + from tests.test_problem import get_file + + with app.app_context(): + prob = create_problem( + owner=owner, + status=0, + test_case_info={ + "language": + 0, + "fill_in_template": + "", + "tasks": [ + { + "caseCount": 1, + "taskScore": 100, + "memoryLimit": 32768, + "timeLimit": 1000, + }, + ], + }, + ) + # Attach test case data so process_result can look up tasks + test_case = get_file("default/test_case.zip")["case"][0] + prob.update_test_case(test_case) + + sub = Submission.add( + problem_id=prob.id, + username=owner, + lang=lang, + ip_addr="127.0.0.1", + ) + # submit() uploads source to MinIO and enqueues job → status=-1 + sub.submit(_make_zip_source()) + sub.reload() + return sub + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def mock_runner(client): + """Register a runner and return a helper object for runner-side actions.""" + rv = client.post( + "/runners/register", + json={ + "registration_token": RUNNER_REGISTRATION_TOKEN, + "name": "mock-runner-1", + }, + ) + assert rv.status_code == 201 + body = rv.get_json()["data"] + rn_id = body["runner_id"] + rk = body["token"] + + headers = {"Authorization": f"Bearer {rk}"} + + class Runner: + runner_id = rn_id + token = rk + + def heartbeat(self): + return client.post(f"/runners/{rn_id}/heartbeat", headers=headers) + + def next_job(self): + return client.get(f"/runners/{rn_id}/next-job", headers=headers) + + def complete(self, jb_id, tasks): + return client.put( + f"/runners/{rn_id}/jobs/{jb_id}/complete", + headers=headers, + json={"tasks": tasks}, + ) + + return Runner() + + +# --------------------------------------------------------------------------- +# Task 19: Happy path +# --------------------------------------------------------------------------- + + +def test_full_flow_submit_pull_complete(app, mock_runner): + """Submit code → runner pulls → runner submits result → submission shows AC.""" + sub = _create_pending_submission(app) + assert sub.status == -1 # Pending after submit + + # Runner pulls + rv = mock_runner.next_job() + assert rv.status_code == 200, rv.get_data() + payload = rv.get_json()["data"] + assert payload["submission_id"] == str(sub.id) + jb_id = payload["job_id"] + + # Runner sends back AC result + rv = mock_runner.complete(jb_id, _AC_TASKS) + assert rv.status_code == 204, rv.get_data() + + with app.app_context(): + sub.reload() + assert sub.status == 0 # AC + + +def test_no_pending_returns_204(mock_runner): + rv = mock_runner.next_job() + assert rv.status_code == 204 + + +def test_heartbeat_keeps_runner_alive(mock_runner): + for _ in range(3): + rv = mock_runner.heartbeat() + assert rv.status_code == 204 + + +# --------------------------------------------------------------------------- +# Task 20: Orphan reclaim +# --------------------------------------------------------------------------- + + +def test_orphan_reclaim_when_runner_dies(app, client): + """Runner1 takes a job and dies; runner2 should reclaim and complete it.""" + from dispatch.redis_keys import runner_alive_key + + rv = client.post( + "/runners/register", + json={ + "registration_token": RUNNER_REGISTRATION_TOKEN, + "name": "rn1", + }, + ) + assert rv.status_code == 201 + rn1 = rv.get_json()["data"] + rv = client.post( + "/runners/register", + json={ + "registration_token": RUNNER_REGISTRATION_TOKEN, + "name": "rn2", + }, + ) + assert rv.status_code == 201 + rn2 = rv.get_json()["data"] + + h1 = {"Authorization": f"Bearer {rn1['token']}"} + h2 = {"Authorization": f"Bearer {rn2['token']}"} + + sub = _create_pending_submission(app) + + # rn1 takes the job + rv = client.get(f"/runners/{rn1['runner_id']}/next-job", headers=h1) + assert rv.status_code == 200 + jb_id = rv.get_json()["data"]["job_id"] + + # Simulate rn1 death + RedisCache().client.delete(runner_alive_key(rn1["runner_id"])) + + # rn2 polls — should reclaim + rv = client.get(f"/runners/{rn2['runner_id']}/next-job", headers=h2) + assert rv.status_code == 200, rv.get_data() + reclaimed = rv.get_json()["data"] + assert reclaimed["job_id"] == jb_id # same job! + + # rn2 completes it + rv = client.put( + f"/runners/{rn2['runner_id']}/jobs/{jb_id}/complete", + headers=h2, + json={"tasks": _AC_TASKS}, + ) + assert rv.status_code == 204, rv.get_data() + + # If rn1 zombie comes back and tries to complete, should be rejected + # (404 because the job was deleted after rn2 completed it) + rv = client.put( + f"/runners/{rn1['runner_id']}/jobs/{jb_id}/complete", + headers=h1, + json={"tasks": _AC_TASKS}, + ) + assert rv.status_code == 404 + + +# --------------------------------------------------------------------------- +# Task 21: Max attempts → JE +# --------------------------------------------------------------------------- + + +def test_max_attempts_marks_submission_je(app, client): + """When a job is reclaimed too many times, Submission must be marked JE (status=6). + + Exhaustion logic in the Lua script: + attempts >= max_attempts → remove from leased set, return -1 + + Sequence for MAX_ATTEMPTS=3: + - Runner 0 claims from pending → attempts=1, then dies + - Runner 1 reclaims → attempts=2, then dies + - Runner 2 reclaims → attempts=3, then dies + - Runner 3 polls → Lua sees attempts=3 >= 3 → exhausted, JE marked + Total runners needed: MAX_ATTEMPTS + 1. + """ + from dispatch.redis_keys import runner_alive_key + + sub = _create_pending_submission(app) + assert sub.status == -1 + + # Register MAX_ATTEMPTS + 1 runners (one extra to trigger exhaustion) + total_runners = MAX_ATTEMPTS + 1 + runners = [] + for i in range(total_runners): + rv = client.post( + "/runners/register", + json={ + "registration_token": RUNNER_REGISTRATION_TOKEN, + "name": f"rn{i}", + }, + ) + assert rv.status_code == 201 + runners.append(rv.get_json()["data"]) + + # Round 0: runner 0 claims from pending (attempts → 1) + rn = runners[0] + h = {"Authorization": f"Bearer {rn['token']}"} + rv = client.get(f"/runners/{rn['runner_id']}/next-job", headers=h) + assert rv.status_code == 200 + jb_id = rv.get_json()["data"]["job_id"] + # Kill runner 0 + RedisCache().client.delete(runner_alive_key(rn["runner_id"])) + + # Rounds 1..MAX_ATTEMPTS-1: each runner reclaims then dies (attempts fills up to MAX_ATTEMPTS) + for i in range(1, MAX_ATTEMPTS): + rn = runners[i] + h = {"Authorization": f"Bearer {rn['token']}"} + rv = client.get(f"/runners/{rn['runner_id']}/next-job", headers=h) + assert rv.status_code == 200, f"reclaim round {i} failed: {rv.get_data()}" + RedisCache().client.delete(runner_alive_key(rn["runner_id"])) + + # The (MAX_ATTEMPTS+1)-th poll observes exhaustion and returns 204 + last_rn = runners[MAX_ATTEMPTS] + h = {"Authorization": f"Bearer {last_rn['token']}"} + rv = client.get(f"/runners/{last_rn['runner_id']}/next-job", headers=h) + assert rv.status_code == 204 # exhausted; no job available + + # And the Submission is marked JE + with app.app_context(): + sub.reload() + assert sub.status == 6 # JE From 4ff3474065f5678c07e1760891e67c66a5e5a956 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 14:04:34 +0800 Subject: [PATCH 22/22] fix(submission): get_detailed_result uses task['cases'] not task.cases After task.to_mongo() the task is a SON dict (no .cases attribute). The same fix is already applied in get_result() above; this method was missed. Triggered 500 on GET /api/submission/ for any submission with at least one task result. --- mongo/submission.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongo/submission.py b/mongo/submission.py index 755a5525..19a8d2b4 100644 --- a/mongo/submission.py +++ b/mongo/submission.py @@ -689,7 +689,7 @@ def get_detailed_result(self) -> List[Dict[str, Any]]: ''' tasks = [task.to_mongo() for task in self.tasks] for i, task in enumerate(tasks): - for j, case in enumerate(task.cases): + for j, case in enumerate(task['cases']): output = self.get_single_output(i, j) case['stdout'] = output['stdout'] case['stderr'] = output['stderr']