From da0824808b40d5da98c01464373a4585ba589048 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:17:01 +0800 Subject: [PATCH 01/15] feat(agent): scaffold agent package with config --- agent/__init__.py | 4 ++++ agent/config.py | 31 +++++++++++++++++++++++++++++++ tests/agent/__init__.py | 0 3 files changed, 35 insertions(+) create mode 100644 agent/__init__.py create mode 100644 agent/config.py create mode 100644 tests/agent/__init__.py diff --git a/agent/__init__.py b/agent/__init__.py new file mode 100644 index 0000000..194022a --- /dev/null +++ b/agent/__init__.py @@ -0,0 +1,4 @@ +"""GH-style runner agent: polls Backend, dispatches jobs, sends results. + +Distinct from Sandbox/runner/ which contains the Docker execution code. +""" diff --git a/agent/config.py b/agent/config.py new file mode 100644 index 0000000..5f516aa --- /dev/null +++ b/agent/config.py @@ -0,0 +1,31 @@ +"""Environment-driven configuration for the runner agent.""" +import os +from pathlib import Path + +# Backend URL — where to register, poll, send results +BACKEND_URL: str = os.getenv("BACKEND_URL", "http://web:8080") + +# Shared registration secret (must match backend's RUNNER_REGISTRATION_TOKEN). +RUNNER_REGISTRATION_TOKEN: str = os.getenv( + "RUNNER_REGISTRATION_TOKEN", + "dev-only-registration-token-change-me", +) + +# Display name shown in admin/listing. Defaults to hostname. +RUNNER_NAME: str = os.getenv("RUNNER_NAME", os.uname().nodename) + +# Tunings (defaults match what backend returns from register; override here is rarely needed) +HEARTBEAT_INTERVAL_SEC: int = int(os.getenv("HEARTBEAT_INTERVAL_SEC", "15")) +POLL_INTERVAL_SEC: int = int(os.getenv("POLL_INTERVAL_SEC", "3")) + +# Result delivery retry policy +RESULT_RETRY_MAX_ATTEMPTS: int = 5 +RESULT_RETRY_INITIAL_BACKOFF_SEC: float = 1.0 +RESULT_RETRY_MAX_BACKOFF_SEC: float = 16.0 + +# HTTP timeouts +HTTP_REQUEST_TIMEOUT_SEC: int = 10 + +# Where to download code zip to (per-job temp dir) +CODE_DOWNLOAD_DIR: Path = Path(os.getenv("CODE_DOWNLOAD_DIR", "/tmp/runner-code")) +CODE_DOWNLOAD_DIR.mkdir(exist_ok=True) diff --git a/tests/agent/__init__.py b/tests/agent/__init__.py new file mode 100644 index 0000000..e69de29 From aab040f332ec36260147b71344a0728f05b8a3b2 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:23:14 +0800 Subject: [PATCH 02/15] feat(agent): add BackendClient with auth, error mapping, retry hints --- agent/client.py | 104 +++++++++++++++++++++++++++++++++ tests/agent/test_client.py | 115 +++++++++++++++++++++++++++++++++++++ tests/requirements.txt | 1 + 3 files changed, 220 insertions(+) create mode 100644 agent/client.py create mode 100644 tests/agent/test_client.py diff --git a/agent/client.py b/agent/client.py new file mode 100644 index 0000000..3043281 --- /dev/null +++ b/agent/client.py @@ -0,0 +1,104 @@ +"""HTTP client for the Backend runner API.""" +import requests + +from . import config + + +class BackendClient: + """Thin wrapper around requests, adding auth, base URL, error mapping.""" + + class AuthError(Exception): + """Raised when backend rejects auth (401).""" + + class TransientError(Exception): + """Raised on 5xx or network errors — caller should retry.""" + + def __init__(self, base_url: str = None, rk_token: str = None): + self.base_url = base_url or config.BACKEND_URL + self.rk_token = rk_token + + # ------- Public API ------- + + def register(self, name: str, registration_token: str) -> dict: + """Register this runner. Returns the `data` payload from backend.""" + rv = self._request( + "POST", "/runners/register", + json_body={"registration_token": registration_token, "name": name}, + need_auth=False, + expected_statuses=(201,), + ) + return rv.json()["data"] + + def heartbeat(self, runner_id: str) -> None: + """Send a heartbeat. Raises AuthError on 401.""" + self._request( + "POST", f"/runners/{runner_id}/heartbeat", + expected_statuses=(204,), + ) + + def next_job(self, runner_id: str) -> dict | None: + """Poll for next job. Returns None if no job available (204).""" + rv = self._request( + "GET", f"/runners/{runner_id}/next-job", + expected_statuses=(200, 204), + ) + if rv.status_code == 204: + return None + return rv.json()["data"] + + def complete_job(self, runner_id: str, job_id: str, tasks: list) -> str: + """Send result. Returns 'ok' / 'reclaimed' / 'not_found'. + + Raises TransientError on 5xx or network — caller should retry. + """ + rv = self._request( + "PUT", f"/runners/{runner_id}/jobs/{job_id}/complete", + json_body={"tasks": tasks}, + expected_statuses=(204, 409, 404), + ) + return {204: "ok", 409: "reclaimed", 404: "not_found"}[rv.status_code] + + def download_code(self, code_url: str, dest_path: str) -> None: + """Download code zip from a presigned URL.""" + try: + with requests.get(code_url, stream=True, + timeout=config.HTTP_REQUEST_TIMEOUT_SEC) as r: + r.raise_for_status() + with open(dest_path, "wb") as f: + for chunk in r.iter_content(chunk_size=64 * 1024): + f.write(chunk) + except requests.RequestException as e: + raise self.TransientError(f"code download failed: {e}") from e + + # ------- Internals ------- + + def _request(self, method: str, path: str, *, + json_body=None, need_auth=True, + expected_statuses=(200,)) -> requests.Response: + headers = {} + if need_auth: + if not self.rk_token: + raise self.AuthError("rk_token not set") + headers["Authorization"] = f"Bearer {self.rk_token}" + if json_body is not None: + headers["Content-Type"] = "application/json" + try: + rv = requests.request( + method=method, + url=f"{self.base_url}{path}", + json=json_body, + headers=headers, + timeout=config.HTTP_REQUEST_TIMEOUT_SEC, + ) + except requests.RequestException as e: + raise self.TransientError(f"network error: {e}") from e + + if rv.status_code == 401: + raise self.AuthError(rv.text) + if rv.status_code >= 500: + raise self.TransientError(f"backend {rv.status_code}: {rv.text}") + if rv.status_code not in expected_statuses: + raise self.TransientError( + f"unexpected status {rv.status_code}: {rv.text}" + ) + return rv diff --git a/tests/agent/test_client.py b/tests/agent/test_client.py new file mode 100644 index 0000000..2360b13 --- /dev/null +++ b/tests/agent/test_client.py @@ -0,0 +1,115 @@ +"""Tests for the BackendClient HTTP wrapper.""" +import pytest +import responses + +from agent.client import BackendClient + + +@pytest.fixture +def client(): + return BackendClient(base_url="http://test-backend", rk_token="rk_test") + + +@responses.activate +def test_register_posts_to_correct_url(client): + responses.add( + responses.POST, + "http://test-backend/runners/register", + json={"data": {"runner_id": "rn_1", "token": "rk_xyz", "config": {}}}, + status=201, + ) + rv = client.register(name="r1", registration_token="dev-token") + assert rv["runner_id"] == "rn_1" + assert rv["token"] == "rk_xyz" + req = responses.calls[0].request + assert req.headers["Content-Type"] == "application/json" + + +@responses.activate +def test_register_raises_on_401(): + c = BackendClient(base_url="http://test-backend", rk_token=None) + responses.add( + responses.POST, + "http://test-backend/runners/register", + json={"message": "invalid"}, + status=401, + ) + with pytest.raises(BackendClient.AuthError): + c.register(name="r1", registration_token="wrong") + + +@responses.activate +def test_heartbeat_sends_bearer_token(client): + responses.add( + responses.POST, + "http://test-backend/runners/rn_1/heartbeat", + status=204, + ) + client.heartbeat(runner_id="rn_1") + req = responses.calls[0].request + assert req.headers["Authorization"] == "Bearer rk_test" + + +@responses.activate +def test_next_job_returns_payload_when_200(client): + responses.add( + responses.GET, + "http://test-backend/runners/rn_1/next-job", + json={"data": {"job_id": "jb_1", "submission_id": "sub_1", + "problem_id": 42, "language": 0, "code_url": "http://...", + "checker": "", "tasks": []}}, + status=200, + ) + job = client.next_job(runner_id="rn_1") + assert job["job_id"] == "jb_1" + + +@responses.activate +def test_next_job_returns_none_when_204(client): + responses.add( + responses.GET, + "http://test-backend/runners/rn_1/next-job", + status=204, + ) + assert client.next_job(runner_id="rn_1") is None + + +@responses.activate +def test_complete_job_returns_status_string(client): + responses.add( + responses.PUT, + "http://test-backend/runners/rn_1/jobs/jb_1/complete", + status=204, + ) + assert client.complete_job("rn_1", "jb_1", tasks=[]) == "ok" + + +@responses.activate +def test_complete_job_returns_reclaimed_on_409(client): + responses.add( + responses.PUT, + "http://test-backend/runners/rn_1/jobs/jb_1/complete", + status=409, + ) + assert client.complete_job("rn_1", "jb_1", tasks=[]) == "reclaimed" + + +@responses.activate +def test_complete_job_returns_not_found_on_404(client): + responses.add( + responses.PUT, + "http://test-backend/runners/rn_1/jobs/jb_1/complete", + status=404, + ) + assert client.complete_job("rn_1", "jb_1", tasks=[]) == "not_found" + + +@responses.activate +def test_complete_job_raises_on_5xx(client): + responses.add( + responses.PUT, + "http://test-backend/runners/rn_1/jobs/jb_1/complete", + status=503, + ) + with pytest.raises(BackendClient.TransientError): + client.complete_job("rn_1", "jb_1", tasks=[]) diff --git a/tests/requirements.txt b/tests/requirements.txt index 82b7742..a916b54 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,2 +1,3 @@ pytest fakeredis~=1.7 +responses~=0.25 From fa3360d07228cbb838eced491e249fbdd9219dd5 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:25:03 +0800 Subject: [PATCH 03/15] feat(agent): add startup registration with backend --- agent/registration.py | 33 ++++++++++++++++++++++++ tests/agent/test_registration.py | 44 ++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 agent/registration.py create mode 100644 tests/agent/test_registration.py diff --git a/agent/registration.py b/agent/registration.py new file mode 100644 index 0000000..ec4e5d6 --- /dev/null +++ b/agent/registration.py @@ -0,0 +1,33 @@ +"""Self-registration on startup.""" +from dataclasses import dataclass + +from .client import BackendClient + + +@dataclass(frozen=True) +class RunnerCredentials: + runner_id: str + token: str + heartbeat_interval_sec: int + poll_interval_sec: int + max_concurrent_jobs: int + + +def register_runner( + client: BackendClient, + name: str, + registration_token: str, +) -> RunnerCredentials: + """Call backend's register endpoint and return RunnerCredentials. + + Raises BackendClient.AuthError or TransientError on failure. + """ + rv = client.register(name=name, registration_token=registration_token) + cfg = rv.get("config", {}) + return RunnerCredentials( + runner_id=rv["runner_id"], + token=rv["token"], + heartbeat_interval_sec=cfg.get("heartbeat_interval_sec", 15), + poll_interval_sec=cfg.get("poll_interval_sec", 3), + max_concurrent_jobs=cfg.get("max_concurrent_jobs", 8), + ) diff --git a/tests/agent/test_registration.py b/tests/agent/test_registration.py new file mode 100644 index 0000000..ed90b9e --- /dev/null +++ b/tests/agent/test_registration.py @@ -0,0 +1,44 @@ +from unittest.mock import MagicMock +from agent.registration import register_runner +from agent.client import BackendClient + + +def test_register_runner_returns_credentials_and_config(): + fake_client = MagicMock(spec=BackendClient) + fake_client.register.return_value = { + "runner_id": "rn_xyz", + "token": "rk_xyz", + "config": { + "heartbeat_interval_sec": 10, + "poll_interval_sec": 5, + "max_concurrent_jobs": 4, + }, + } + + result = register_runner(fake_client, name="r1", + registration_token="dev-token") + + assert result.runner_id == "rn_xyz" + assert result.token == "rk_xyz" + assert result.heartbeat_interval_sec == 10 + assert result.poll_interval_sec == 5 + assert result.max_concurrent_jobs == 4 + + fake_client.register.assert_called_once_with( + name="r1", registration_token="dev-token" + ) + + +def test_register_runner_uses_defaults_for_missing_config_fields(): + fake_client = MagicMock(spec=BackendClient) + fake_client.register.return_value = { + "runner_id": "rn_a", "token": "rk_a", "config": {}, + } + + result = register_runner(fake_client, name="r1", + registration_token="t") + + # Backend should always send config, but defensively use sensible defaults + assert result.heartbeat_interval_sec == 15 + assert result.poll_interval_sec == 3 + assert result.max_concurrent_jobs == 8 From e1d0584390eb0b5ec25d8a3b2738dbd96889fa0b Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:26:58 +0800 Subject: [PATCH 04/15] feat(agent): add heartbeat daemon thread --- agent/heartbeat.py | 39 ++++++++++++++++++++++ tests/agent/test_heartbeat.py | 63 +++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 agent/heartbeat.py create mode 100644 tests/agent/test_heartbeat.py diff --git a/agent/heartbeat.py b/agent/heartbeat.py new file mode 100644 index 0000000..3e30c7a --- /dev/null +++ b/agent/heartbeat.py @@ -0,0 +1,39 @@ +"""Heartbeat daemon thread: refreshes runner alive TTL on backend.""" +import logging +import threading + +from .client import BackendClient + +log = logging.getLogger(__name__) + + +class HeartbeatThread(threading.Thread): + """Periodically POSTs heartbeat. Tolerates transient errors silently.""" + + def __init__( + self, + client: BackendClient, + runner_id: str, + interval_sec: float, + shutdown_event: threading.Event, + ): + super().__init__(daemon=True, name="heartbeat") + self.client = client + self.runner_id = runner_id + self.interval_sec = interval_sec + self.shutdown_event = shutdown_event + + def run(self) -> None: + while not self.shutdown_event.is_set(): + try: + self.client.heartbeat(runner_id=self.runner_id) + except BackendClient.TransientError as e: + log.warning(f"heartbeat failed (transient): {e}") + except BackendClient.AuthError as e: + # Auth fail means the backend forgot us (e.g., Redis loss). + # Caller will need to re-register; for now just log. + log.error(f"heartbeat auth failed: {e}") + except Exception as e: # defensive — never let thread die + log.exception(f"heartbeat unexpected error: {e}") + # Wait, but break early on shutdown + self.shutdown_event.wait(timeout=self.interval_sec) diff --git a/tests/agent/test_heartbeat.py b/tests/agent/test_heartbeat.py new file mode 100644 index 0000000..2ae7a06 --- /dev/null +++ b/tests/agent/test_heartbeat.py @@ -0,0 +1,63 @@ +import threading +import time +from unittest.mock import MagicMock + +import pytest + +from agent.client import BackendClient +from agent.heartbeat import HeartbeatThread + + +def test_heartbeat_calls_client_at_interval(): + client = MagicMock(spec=BackendClient) + shutdown = threading.Event() + + hb = HeartbeatThread( + client=client, runner_id="rn_1", + interval_sec=0.05, shutdown_event=shutdown, + ) + hb.start() + time.sleep(0.18) # ~3 intervals + shutdown.set() + hb.join(timeout=1) + + # Should have been called 3-4 times + assert 2 <= client.heartbeat.call_count <= 5 + client.heartbeat.assert_called_with(runner_id="rn_1") + + +def test_heartbeat_swallows_transient_errors_and_keeps_going(): + client = MagicMock(spec=BackendClient) + client.heartbeat.side_effect = [ + BackendClient.TransientError("boom"), + None, + None, + ] + shutdown = threading.Event() + + hb = HeartbeatThread( + client=client, runner_id="rn_1", + interval_sec=0.05, shutdown_event=shutdown, + ) + hb.start() + time.sleep(0.2) + shutdown.set() + hb.join(timeout=1) + + # Despite first call raising, subsequent calls happened + assert client.heartbeat.call_count >= 3 + + +def test_heartbeat_stops_promptly_on_shutdown(): + client = MagicMock(spec=BackendClient) + shutdown = threading.Event() + hb = HeartbeatThread( + client=client, runner_id="rn_1", + interval_sec=10.0, # long interval + shutdown_event=shutdown, + ) + hb.start() + time.sleep(0.05) + shutdown.set() + hb.join(timeout=0.5) + assert not hb.is_alive(), "heartbeat thread should exit promptly" From 27ba4e8600fd788bd2ee4dd15a5f0cf60d02b036 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:27:55 +0800 Subject: [PATCH 05/15] fix(agent): apply yapf formatting; add download_code test coverage Addresses code review findings: - yapf -ir on client.py and test_client.py to satisfy CI - 3 new tests for download_code (happy path, 4xx, network error) --- agent/client.py | 38 ++++++++++++++++--------- tests/agent/test_client.py | 57 +++++++++++++++++++++++++++++++++++--- 2 files changed, 78 insertions(+), 17 deletions(-) diff --git a/agent/client.py b/agent/client.py index 3043281..37bea13 100644 --- a/agent/client.py +++ b/agent/client.py @@ -22,24 +22,30 @@ def __init__(self, base_url: str = None, rk_token: str = None): def register(self, name: str, registration_token: str) -> dict: """Register this runner. Returns the `data` payload from backend.""" rv = self._request( - "POST", "/runners/register", - json_body={"registration_token": registration_token, "name": name}, + "POST", + "/runners/register", + json_body={ + "registration_token": registration_token, + "name": name + }, need_auth=False, - expected_statuses=(201,), + expected_statuses=(201, ), ) return rv.json()["data"] def heartbeat(self, runner_id: str) -> None: """Send a heartbeat. Raises AuthError on 401.""" self._request( - "POST", f"/runners/{runner_id}/heartbeat", - expected_statuses=(204,), + "POST", + f"/runners/{runner_id}/heartbeat", + expected_statuses=(204, ), ) def next_job(self, runner_id: str) -> dict | None: """Poll for next job. Returns None if no job available (204).""" rv = self._request( - "GET", f"/runners/{runner_id}/next-job", + "GET", + f"/runners/{runner_id}/next-job", expected_statuses=(200, 204), ) if rv.status_code == 204: @@ -52,7 +58,8 @@ def complete_job(self, runner_id: str, job_id: str, tasks: list) -> str: Raises TransientError on 5xx or network — caller should retry. """ rv = self._request( - "PUT", f"/runners/{runner_id}/jobs/{job_id}/complete", + "PUT", + f"/runners/{runner_id}/jobs/{job_id}/complete", json_body={"tasks": tasks}, expected_statuses=(204, 409, 404), ) @@ -61,7 +68,8 @@ def complete_job(self, runner_id: str, job_id: str, tasks: list) -> str: def download_code(self, code_url: str, dest_path: str) -> None: """Download code zip from a presigned URL.""" try: - with requests.get(code_url, stream=True, + with requests.get(code_url, + stream=True, timeout=config.HTTP_REQUEST_TIMEOUT_SEC) as r: r.raise_for_status() with open(dest_path, "wb") as f: @@ -72,9 +80,14 @@ def download_code(self, code_url: str, dest_path: str) -> None: # ------- Internals ------- - def _request(self, method: str, path: str, *, - json_body=None, need_auth=True, - expected_statuses=(200,)) -> requests.Response: + def _request( + self, + method: str, + path: str, + *, + json_body=None, + need_auth=True, + expected_statuses=(200, )) -> requests.Response: headers = {} if need_auth: if not self.rk_token: @@ -99,6 +112,5 @@ def _request(self, method: str, path: str, *, raise self.TransientError(f"backend {rv.status_code}: {rv.text}") if rv.status_code not in expected_statuses: raise self.TransientError( - f"unexpected status {rv.status_code}: {rv.text}" - ) + f"unexpected status {rv.status_code}: {rv.text}") return rv diff --git a/tests/agent/test_client.py b/tests/agent/test_client.py index 2360b13..b8a7a7c 100644 --- a/tests/agent/test_client.py +++ b/tests/agent/test_client.py @@ -1,5 +1,6 @@ """Tests for the BackendClient HTTP wrapper.""" import pytest +import requests import responses from agent.client import BackendClient @@ -15,7 +16,11 @@ def test_register_posts_to_correct_url(client): responses.add( responses.POST, "http://test-backend/runners/register", - json={"data": {"runner_id": "rn_1", "token": "rk_xyz", "config": {}}}, + json={"data": { + "runner_id": "rn_1", + "token": "rk_xyz", + "config": {} + }}, status=201, ) rv = client.register(name="r1", registration_token="dev-token") @@ -55,9 +60,17 @@ def test_next_job_returns_payload_when_200(client): responses.add( responses.GET, "http://test-backend/runners/rn_1/next-job", - json={"data": {"job_id": "jb_1", "submission_id": "sub_1", - "problem_id": 42, "language": 0, "code_url": "http://...", - "checker": "", "tasks": []}}, + json={ + "data": { + "job_id": "jb_1", + "submission_id": "sub_1", + "problem_id": 42, + "language": 0, + "code_url": "http://...", + "checker": "", + "tasks": [] + } + }, status=200, ) job = client.next_job(runner_id="rn_1") @@ -113,3 +126,39 @@ def test_complete_job_raises_on_5xx(client): ) with pytest.raises(BackendClient.TransientError): client.complete_job("rn_1", "jb_1", tasks=[]) + + +@responses.activate +def test_download_code_writes_to_dest_path(tmp_path, client): + responses.add( + responses.GET, + "http://minio/code.zip", + body=b"PK\x03\x04zipcontent", + status=200, + ) + dest = tmp_path / "code.zip" + client.download_code("http://minio/code.zip", str(dest)) + assert dest.read_bytes() == b"PK\x03\x04zipcontent" + + +@responses.activate +def test_download_code_raises_transient_on_404(tmp_path, client): + responses.add( + responses.GET, + "http://minio/missing.zip", + status=404, + ) + with pytest.raises(BackendClient.TransientError): + client.download_code("http://minio/missing.zip", + str(tmp_path / "x.zip")) + + +@responses.activate +def test_download_code_raises_transient_on_network_error(tmp_path, client): + responses.add( + responses.GET, + "http://nowhere/x.zip", + body=requests.exceptions.ConnectionError("refused"), + ) + with pytest.raises(BackendClient.TransientError): + client.download_code("http://nowhere/x.zip", str(tmp_path / "x.zip")) From a10eb1b24b9b8c953ea4260e3a1081a6a08e60f0 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:29:41 +0800 Subject: [PATCH 06/15] feat(agent): add result_sender thread with retry/backoff --- agent/result_sender.py | 88 +++++++++++++++++++ tests/agent/test_result_sender.py | 141 ++++++++++++++++++++++++++++++ 2 files changed, 229 insertions(+) create mode 100644 agent/result_sender.py create mode 100644 tests/agent/test_result_sender.py diff --git a/agent/result_sender.py b/agent/result_sender.py new file mode 100644 index 0000000..f884fd7 --- /dev/null +++ b/agent/result_sender.py @@ -0,0 +1,88 @@ +"""Result delivery daemon thread with exponential backoff retry.""" +import logging +import queue +import threading +from dataclasses import dataclass +from typing import List + +from .client import BackendClient + +log = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class JobResult: + """Pending result waiting to be sent to backend.""" + job_id: str + tasks: List[dict] + + +class ResultSenderThread(threading.Thread): + """Drains result_queue, POSTs to backend, retries on transient errors.""" + + def __init__( + self, + client: BackendClient, + runner_id: str, + result_queue: queue.Queue, + shutdown_event: threading.Event, + retry_max_attempts: int, + retry_initial_backoff_sec: float, + retry_max_backoff_sec: float, + ): + super().__init__(daemon=True, name="result_sender") + self.client = client + self.runner_id = runner_id + self.result_queue = result_queue + self.shutdown_event = shutdown_event + self.retry_max_attempts = retry_max_attempts + self.retry_initial_backoff_sec = retry_initial_backoff_sec + self.retry_max_backoff_sec = retry_max_backoff_sec + + def run(self) -> None: + while not (self.shutdown_event.is_set() and self.result_queue.empty()): + try: + job_result = self.result_queue.get(timeout=0.5) + except queue.Empty: + continue + try: + self._deliver_with_retry(job_result) + except Exception as e: # defensive + log.exception( + f"result delivery for {job_result.job_id} crashed: {e}") + finally: + self.result_queue.task_done() + + def _deliver_with_retry(self, jr: JobResult) -> None: + backoff = self.retry_initial_backoff_sec + for attempt in range(1, self.retry_max_attempts + 1): + try: + outcome = self.client.complete_job( + runner_id=self.runner_id, + job_id=jr.job_id, + tasks=jr.tasks, + ) + except BackendClient.TransientError as e: + log.warning( + f"complete_job {jr.job_id} attempt {attempt} failed: {e}") + if attempt == self.retry_max_attempts: + log.error( + f"giving up on {jr.job_id} after {attempt} attempts") + return + self.shutdown_event.wait(timeout=backoff) + backoff = min(backoff * 2, self.retry_max_backoff_sec) + continue + except BackendClient.AuthError as e: + log.error(f"complete_job {jr.job_id} auth failed: {e}") + return # cannot retry without re-register + # Outcome handling + if outcome == "ok": + log.info(f"delivered {jr.job_id}") + return + if outcome == "reclaimed": + log.warning(f"{jr.job_id} was reclaimed; dropping result") + return + if outcome == "not_found": + log.warning( + f"{jr.job_id} not found on backend; dropping result") + return diff --git a/tests/agent/test_result_sender.py b/tests/agent/test_result_sender.py new file mode 100644 index 0000000..9c5063d --- /dev/null +++ b/tests/agent/test_result_sender.py @@ -0,0 +1,141 @@ +import queue +import threading +import time +from unittest.mock import MagicMock + +import pytest + +from agent.client import BackendClient +from agent.result_sender import ResultSenderThread, JobResult + + +def test_result_sender_delivers_one_result(): + client = MagicMock(spec=BackendClient) + client.complete_job.return_value = "ok" + result_queue: queue.Queue = queue.Queue() + shutdown = threading.Event() + + sender = ResultSenderThread( + client=client, + runner_id="rn_1", + result_queue=result_queue, + shutdown_event=shutdown, + retry_max_attempts=3, + retry_initial_backoff_sec=0.01, + retry_max_backoff_sec=0.1, + ) + sender.start() + result_queue.put(JobResult(job_id="jb_1", tasks=[{"status": "AC"}])) + time.sleep(0.05) + shutdown.set() + sender.join(timeout=1) + + client.complete_job.assert_called_once_with(runner_id="rn_1", + job_id="jb_1", + tasks=[{ + "status": "AC" + }]) + + +def test_result_sender_retries_on_transient_error(): + client = MagicMock(spec=BackendClient) + client.complete_job.side_effect = [ + BackendClient.TransientError("first"), + BackendClient.TransientError("second"), + "ok", + ] + rq: queue.Queue = queue.Queue() + shutdown = threading.Event() + + sender = ResultSenderThread( + client=client, + runner_id="rn_1", + result_queue=rq, + shutdown_event=shutdown, + retry_max_attempts=5, + retry_initial_backoff_sec=0.01, + retry_max_backoff_sec=0.1, + ) + sender.start() + rq.put(JobResult(job_id="jb_1", tasks=[])) + time.sleep(0.5) + shutdown.set() + sender.join(timeout=1) + + assert client.complete_job.call_count == 3 + + +def test_result_sender_drops_on_reclaimed(): + """If backend says reclaimed (409), drop the result silently — no retry.""" + client = MagicMock(spec=BackendClient) + client.complete_job.return_value = "reclaimed" + rq: queue.Queue = queue.Queue() + shutdown = threading.Event() + + sender = ResultSenderThread( + client=client, + runner_id="rn_1", + result_queue=rq, + shutdown_event=shutdown, + retry_max_attempts=3, + retry_initial_backoff_sec=0.01, + retry_max_backoff_sec=0.1, + ) + sender.start() + rq.put(JobResult(job_id="jb_1", tasks=[])) + time.sleep(0.05) + shutdown.set() + sender.join(timeout=1) + + assert client.complete_job.call_count == 1 # no retry + + +def test_result_sender_drops_on_not_found(): + """404 also drops — submission may have been deleted.""" + client = MagicMock(spec=BackendClient) + client.complete_job.return_value = "not_found" + rq: queue.Queue = queue.Queue() + shutdown = threading.Event() + sender = ResultSenderThread( + client=client, + runner_id="rn_1", + result_queue=rq, + shutdown_event=shutdown, + retry_max_attempts=3, + retry_initial_backoff_sec=0.01, + retry_max_backoff_sec=0.1, + ) + sender.start() + rq.put(JobResult(job_id="jb_1", tasks=[])) + time.sleep(0.05) + shutdown.set() + sender.join(timeout=1) + assert client.complete_job.call_count == 1 + + +def test_result_sender_gives_up_after_max_attempts(): + """After exhausting retries, give up but keep the thread alive for next job.""" + client = MagicMock(spec=BackendClient) + client.complete_job.side_effect = BackendClient.TransientError("always") + rq: queue.Queue = queue.Queue() + shutdown = threading.Event() + sender = ResultSenderThread( + client=client, + runner_id="rn_1", + result_queue=rq, + shutdown_event=shutdown, + retry_max_attempts=3, + retry_initial_backoff_sec=0.01, + retry_max_backoff_sec=0.1, + ) + sender.start() + rq.put(JobResult(job_id="jb_1", tasks=[])) + time.sleep(0.5) + + # Should have given up + assert client.complete_job.call_count == 3 + + # Thread still alive, ready for next job + assert sender.is_alive() + shutdown.set() + sender.join(timeout=1) From 0b7fb4ae06a439c5bfd2b8a81be7b0e23cf2dfee Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:32:56 +0800 Subject: [PATCH 07/15] feat(agent): add poller thread that pulls jobs and dispatches --- agent/poller.py | 97 ++++++++++++++++++++++++++++ tests/agent/test_poller.py | 128 +++++++++++++++++++++++++++++++++++++ 2 files changed, 225 insertions(+) create mode 100644 agent/poller.py create mode 100644 tests/agent/test_poller.py diff --git a/agent/poller.py b/agent/poller.py new file mode 100644 index 0000000..58191b8 --- /dev/null +++ b/agent/poller.py @@ -0,0 +1,97 @@ +"""Poller daemon thread: pulls jobs from backend, hands to dispatcher.""" +import logging +import tempfile +import threading + +from dispatcher.constant import Language +from dispatcher.testdata import ( + ensure_testdata, + get_problem_meta, + get_problem_root, +) +from .client import BackendClient + +log = logging.getLogger(__name__) + + +def prepare_submission_dir_for_job(dispatcher, job: dict, + client: BackendClient): + """Download code + ensure testdata + extract into dispatcher's submission dir. + + Reuses the existing dispatcher.prepare_submission_dir() — same testdata + fetching path as the old POST /submit handler. + """ + submission_id = job["submission_id"] + problem_id = job["problem_id"] + language = Language(job["language"]) + + ensure_testdata(problem_id) + meta = get_problem_meta(problem_id, language) + + with tempfile.NamedTemporaryFile(suffix=".zip") as tmp: + client.download_code(job["code_url"], tmp.name) + with open(tmp.name, "rb") as src: + dispatcher.prepare_submission_dir( + root_dir=dispatcher.SUBMISSION_DIR, + submission_id=submission_id, + meta=meta, + source=src, + testdata=get_problem_root(problem_id), + ) + + +class PollerThread(threading.Thread): + """Polls backend for jobs and dispatches them to the internal dispatcher.""" + + def __init__( + self, + client: BackendClient, + runner_id: str, + dispatcher, # existing Dispatcher instance + poll_interval_sec: float, + shutdown_event: threading.Event, + ): + super().__init__(daemon=True, name="poller") + self.client = client + self.runner_id = runner_id + self.dispatcher = dispatcher + self.poll_interval_sec = poll_interval_sec + self.shutdown_event = shutdown_event + + def run(self) -> None: + while not self.shutdown_event.is_set(): + if not self.dispatcher.has_capacity(): + self.shutdown_event.wait(timeout=0.5) + continue + try: + job = self.client.next_job(runner_id=self.runner_id) + except BackendClient.TransientError as e: + log.warning(f"next_job failed: {e}") + self.shutdown_event.wait(timeout=self.poll_interval_sec) + continue + except BackendClient.AuthError as e: + log.error(f"next_job auth failed: {e}") + self.shutdown_event.wait(timeout=self.poll_interval_sec) + continue + except Exception as e: + log.warning(f"next_job unexpected error: {e}") + self.shutdown_event.wait(timeout=self.poll_interval_sec) + continue + + if job is None: + self.shutdown_event.wait(timeout=self.poll_interval_sec) + continue + + try: + prepare_submission_dir_for_job(self.dispatcher, job, + self.client) + self.dispatcher.handle( + submission_id=job["submission_id"], + job_id=job["job_id"], + ) + log.info(f"dispatched submission={job['submission_id']} " + f"job={job['job_id']}") + except Exception as e: + log.exception( + f"failed to dispatch job {job.get('job_id')}: {e}") + # Don't retry — backend will reclaim after lease expiry. diff --git a/tests/agent/test_poller.py b/tests/agent/test_poller.py new file mode 100644 index 0000000..f4aafab --- /dev/null +++ b/tests/agent/test_poller.py @@ -0,0 +1,128 @@ +import threading +import time +from unittest.mock import MagicMock, patch + +from agent.client import BackendClient +from agent.poller import PollerThread + + +def _make_dispatcher(can_accept=True): + d = MagicMock() + d.has_capacity.return_value = can_accept + return d + + +def test_poller_does_nothing_when_no_jobs(): + client = MagicMock(spec=BackendClient) + client.next_job.return_value = None + dispatcher = _make_dispatcher() + shutdown = threading.Event() + + poller = PollerThread( + client=client, + runner_id="rn_1", + dispatcher=dispatcher, + poll_interval_sec=0.05, + shutdown_event=shutdown, + ) + poller.start() + time.sleep(0.15) + shutdown.set() + poller.join(timeout=1) + + assert client.next_job.call_count >= 2 + dispatcher.handle.assert_not_called() + + +def test_poller_dispatches_job_when_received(tmp_path): + """When a job comes back, poller downloads code and calls dispatcher.""" + client = MagicMock(spec=BackendClient) + job_payload = { + "job_id": + "jb_1", + "submission_id": + "sub_1", + "problem_id": + 42, + "language": + 0, + "code_url": + "http://minio/code.zip", + "checker": + "", + "tasks": [{ + "task_id": 0, + "case_count": 1, + "memory_limit": 1024, + "time_limit": 1000 + }], + } + client.next_job.side_effect = [job_payload, None, None] + dispatcher = _make_dispatcher() + shutdown = threading.Event() + + with patch("agent.poller.prepare_submission_dir_for_job") as prepare: + poller = PollerThread( + client=client, + runner_id="rn_1", + dispatcher=dispatcher, + poll_interval_sec=0.05, + shutdown_event=shutdown, + ) + poller.start() + time.sleep(0.15) + shutdown.set() + poller.join(timeout=1) + + prepare.assert_called_once() + dispatcher.handle.assert_called_once_with( + submission_id="sub_1", + job_id="jb_1", + ) + + +def test_poller_skips_when_dispatcher_full(): + client = MagicMock(spec=BackendClient) + dispatcher = _make_dispatcher(can_accept=False) + shutdown = threading.Event() + + poller = PollerThread( + client=client, + runner_id="rn_1", + dispatcher=dispatcher, + poll_interval_sec=0.05, + shutdown_event=shutdown, + ) + poller.start() + time.sleep(0.15) + shutdown.set() + poller.join(timeout=1) + + # When at capacity, poller should NOT call next_job + client.next_job.assert_not_called() + + +def test_poller_swallows_transient_errors(): + client = MagicMock(spec=BackendClient) + client.next_job.side_effect = [ + BackendClient.TransientError("boom"), + None, + None, + ] + dispatcher = _make_dispatcher() + shutdown = threading.Event() + + poller = PollerThread( + client=client, + runner_id="rn_1", + dispatcher=dispatcher, + poll_interval_sec=0.05, + shutdown_event=shutdown, + ) + poller.start() + time.sleep(0.2) + shutdown.set() + poller.join(timeout=1) + + # Despite first call failing, subsequent polls happen + assert client.next_job.call_count >= 2 From f53937e4d1ad9a854b6c715c1889143c7b967ee4 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:34:36 +0800 Subject: [PATCH 08/15] feat(dispatcher): add has_capacity() and result_queue --- dispatcher/dispatcher.py | 14 ++++++++++++++ tests/test_dispatcher.py | 19 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index 3590b86..b767113 100644 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -57,6 +57,20 @@ def __init__( self.timeout = 300 self.created_at = {} + # Result queue: completed submissions are pushed here for the agent's + # result_sender thread to deliver to backend. + self.result_queue: queue.Queue = queue.Queue() + + # Map submission_id -> job_id (set when handle() is called by poller) + self.job_ids: dict = {} + + def has_capacity(self) -> bool: + """Whether dispatcher can accept a new submission right now. + + Leaves 30% headroom — when queue gets near full, stop pulling new work. + """ + return self.queue.qsize() < int(self.MAX_TASK_COUNT * 0.7) + def compile_need(self, lang: Language): return lang in {Language.C, Language.CPP} diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py index 590085a..cc58ba3 100644 --- a/tests/test_dispatcher.py +++ b/tests/test_dispatcher.py @@ -48,3 +48,22 @@ def test_duplicated_submission( except DuplicatedSubmissionIdError: return assert False + + +import queue + + +def test_dispatcher_has_capacity_returns_true_when_queue_empty( + docker_dispatcher): + assert docker_dispatcher.has_capacity() is True + + +def test_dispatcher_exposes_result_queue(docker_dispatcher): + """Dispatcher should expose a result_queue that result_sender drains.""" + assert isinstance(docker_dispatcher.result_queue, queue.Queue) + + +def test_dispatcher_exposes_job_ids_mapping(docker_dispatcher): + """Dispatcher should track submission_id -> job_id mapping.""" + assert isinstance(docker_dispatcher.job_ids, dict) + assert docker_dispatcher.job_ids == {} From 00f32a6fe3d662fbbeaef7c20ef30abb5a426bbb Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:36:20 +0800 Subject: [PATCH 09/15] feat(dispatcher): accept job_id in handle() and track per submission --- dispatcher/dispatcher.py | 4 +++- tests/test_dispatcher.py | 26 ++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index b767113..191724d 100644 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -118,7 +118,7 @@ def prepare_submission_dir( else: raise - def handle(self, submission_id: str): + def handle(self, submission_id: str, job_id: str = None): ''' handle a submission, save its config and push into task queue ''' @@ -144,6 +144,8 @@ def handle(self, submission_id: str): self.locks[submission_id] = threading.Lock() self.compile_locks[submission_id] = threading.Lock() self.created_at[submission_id] = datetime.now() + if job_id is not None: + self.job_ids[submission_id] = job_id logger().debug(f'current submissions: {[*self.result.keys()]}') try: diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py index cc58ba3..8ab1bb0 100644 --- a/tests/test_dispatcher.py +++ b/tests/test_dispatcher.py @@ -67,3 +67,29 @@ def test_dispatcher_exposes_job_ids_mapping(docker_dispatcher): """Dispatcher should track submission_id -> job_id mapping.""" assert isinstance(docker_dispatcher.job_ids, dict) assert docker_dispatcher.job_ids == {} + + +def test_handle_records_job_id(docker_dispatcher, submission_generator): + """When poller calls handle() with job_id, dispatcher records the mapping.""" + docker_dispatcher.start() + submission_ids = list(submission_generator.submission_ids.keys()) + assert submission_ids, "submission_generator should have created at least one submission" + sub_id = submission_ids[0] + + docker_dispatcher.handle(submission_id=sub_id, job_id="jb_xyz") + + assert docker_dispatcher.job_ids.get(sub_id) == "jb_xyz" + + +def test_handle_works_without_job_id_for_backwards_compat( + docker_dispatcher, submission_generator): + """handle() should work without job_id — for backwards compat with existing tests.""" + docker_dispatcher.start() + submission_ids = list(submission_generator.submission_ids.keys()) + assert submission_ids + sub_id = submission_ids[0] + + docker_dispatcher.handle(submission_id=sub_id) # no job_id + + # Should not raise, and submission should be tracked + assert docker_dispatcher.contains(sub_id) From 257812b1fb218989f119f2bc9ce88888f791bfc0 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:39:51 +0800 Subject: [PATCH 10/15] refactor(dispatcher): push completed submissions to result_queue (was inline PUT) Replace on_submission_complete's inline HTTP PUT with a push to result_queue so the agent's result_sender thread handles delivery. Remove now-unused imports (requests, tempfile). Add two TDD tests verifying queue push and state cleanup behaviour. --- dispatcher/dispatcher.py | 54 +++++++++++++++---------------------- tests/test_dispatcher.py | 57 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 33 deletions(-) diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index 191724d..b345742 100644 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -2,11 +2,9 @@ import os import threading import time -import requests import pathlib import queue import shutil -import tempfile from datetime import datetime from runner.submission import SubmissionRunner @@ -373,46 +371,36 @@ def on_submission_complete(self, submission_id: str): f'skip submission post processing in testing [submission_id={submission_id}]' ) return True + _, results = self.result[submission_id] - # parse results + # parse results into nested list-of-tasks shape submission_result = {} for no, r in results.items(): task_no = int(no[:2]) case_no = int(no[2:]) - if task_no not in submission_result: - submission_result[task_no] = {} - submission_result[task_no][case_no] = r + submission_result.setdefault(task_no, {})[case_no] = r # convert to list and check for task_no, cases in submission_result.items(): assert [*cases.keys()] == [*range(len(cases))] submission_result[task_no] = [*cases.values()] assert [*submission_result.keys()] == [*range(len(submission_result))] submission_result = [*submission_result.values()] - # post data - with tempfile.NamedTemporaryFile("w") as tmpf: - submission_data = { - 'tasks': submission_result, - 'token': config.SANDBOX_TOKEN - } - # write payload to file - json.dump(submission_data, tmpf) - tmpf.flush() - # release resources - del submission_data - self.release(submission_id) - logger().info(f'send to BE [submission_id={submission_id}]') - # open in binary mode as requests needs a binary stream - with open(tmpf.name, "rb") as payload: - resp = requests.put( - f'{config.BACKEND_API}/submission/{submission_id}/complete', - data=payload, - headers={'Content-Type': 'application/json'}, - ) - logger().debug(f'get BE response: [{resp.status_code}] {resp.text}', ) - # clear - if resp.ok: - file_manager.clean_data(submission_id) - # copy to another place - else: - file_manager.backup_data(submission_id) + # Push to result_queue for agent's result_sender to deliver + from agent.result_sender import JobResult + job_id = self.job_ids.get(submission_id) + if job_id is None: + logger().error( + f"submission_complete with no job_id mapping [submission_id={submission_id}]" + ) + # Don't release — let lease expire and reclaim happen on backend + return + + self.result_queue.put(JobResult(job_id=job_id, + tasks=submission_result)) + + # Clean up local state + file_manager.clean_data(submission_id) + self.release(submission_id) + if submission_id in self.job_ids: + del self.job_ids[submission_id] diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py index 8ab1bb0..b5b669b 100644 --- a/tests/test_dispatcher.py +++ b/tests/test_dispatcher.py @@ -93,3 +93,60 @@ def test_handle_works_without_job_id_for_backwards_compat( # Should not raise, and submission should be tracked assert docker_dispatcher.contains(sub_id) + + +import threading +from unittest.mock import MagicMock + + +def _setup_fake_submission(docker_dispatcher, submission_id, job_id): + """Helper: set up fake in-memory state + create the submission dir on disk.""" + from dispatcher import file_manager + docker_dispatcher.testing = False + docker_dispatcher.job_ids[submission_id] = job_id + docker_dispatcher.result[submission_id] = ( + MagicMock(language=2), # python doesn't compile + { + "0000": { + "stdout": "", + "stderr": "", + "exitCode": 0, + "execTime": 1, + "memoryUsage": 1, + "status": "AC" + }, + }, + ) + docker_dispatcher.locks[submission_id] = threading.Lock() + docker_dispatcher.compile_locks[submission_id] = threading.Lock() + docker_dispatcher.created_at[submission_id] = __import__( + "datetime").datetime.now() + # create submission dir so file_manager.clean_data doesn't fail + sub_dir = file_manager.config.SUBMISSION_DIR / submission_id + sub_dir.mkdir(parents=True, exist_ok=True) + + +def test_on_submission_complete_pushes_to_result_queue(docker_dispatcher): + """When a submission is done, result lands in result_queue (no HTTP).""" + from agent.result_sender import JobResult + + _setup_fake_submission(docker_dispatcher, "sub_1", "jb_1") + docker_dispatcher.on_submission_complete("sub_1") + + pushed = docker_dispatcher.result_queue.get_nowait() + assert isinstance(pushed, JobResult) + assert pushed.job_id == "jb_1" + assert isinstance(pushed.tasks, list) + assert len(pushed.tasks) == 1 + + +def test_on_submission_complete_releases_state_after_push(docker_dispatcher): + """After completion, submission state should be cleared.""" + from agent.result_sender import JobResult + + _setup_fake_submission(docker_dispatcher, "sub_2", "jb_2") + docker_dispatcher.on_submission_complete("sub_2") + + # State cleaned up + assert "sub_2" not in docker_dispatcher.result + assert "sub_2" not in docker_dispatcher.job_ids From 762e5dd3284708ba52cbbde8487bf65273e52ebc Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:41:44 +0800 Subject: [PATCH 11/15] feat(agent): add main.py entrypoint wiring all threads --- main.py | 111 +++++++++++++++++++++++++++++++++++++++++++++ tests/test_main.py | 11 +++++ 2 files changed, 122 insertions(+) create mode 100644 main.py create mode 100644 tests/test_main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..cb1f533 --- /dev/null +++ b/main.py @@ -0,0 +1,111 @@ +"""Runner agent entrypoint. + +Replaces the old Flask app — this process actively polls Backend instead of +listening for incoming HTTP. Spawns 4 daemon threads: + - dispatcher (existing) + - heartbeat + - poller + - result_sender + +Graceful shutdown on SIGTERM/SIGINT: stop polling, drain result queue, exit. +""" +import logging +import os +import signal +import threading +import time + +from agent import config as agent_config +from agent.client import BackendClient +from agent.heartbeat import HeartbeatThread +from agent.poller import PollerThread +from agent.registration import register_runner +from agent.result_sender import ResultSenderThread +from dispatcher.dispatcher import Dispatcher + +# Ensure logs/ directory exists before configuring file handler +os.makedirs("logs", exist_ok=True) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s/%(threadName)s] %(levelname)s: %(message)s", + handlers=[ + logging.FileHandler("logs/runner.log"), + logging.StreamHandler(), + ], +) +log = logging.getLogger("main") + + +def main(): + log.info("runner agent starting") + shutdown_event = threading.Event() + + # 1. Register + bootstrap_client = BackendClient() # no token yet + creds = register_runner( + client=bootstrap_client, + name=agent_config.RUNNER_NAME, + registration_token=agent_config.RUNNER_REGISTRATION_TOKEN, + ) + log.info(f"registered as {creds.runner_id}") + + # 2. Authenticated client used by all daemon threads + client = BackendClient(rk_token=creds.token) + + # 3. Start dispatcher (existing) + dispatcher_config_path = os.getenv("DISPATCHER_CONFIG", + ".config/dispatcher.json.example") + dispatcher = Dispatcher(dispatcher_config_path) + dispatcher.start() + log.info("dispatcher started") + + # 4. Start daemon threads + heartbeat = HeartbeatThread( + client=client, + runner_id=creds.runner_id, + interval_sec=creds.heartbeat_interval_sec, + shutdown_event=shutdown_event, + ) + poller = PollerThread( + client=client, + runner_id=creds.runner_id, + dispatcher=dispatcher, + poll_interval_sec=creds.poll_interval_sec, + shutdown_event=shutdown_event, + ) + sender = ResultSenderThread( + client=client, + runner_id=creds.runner_id, + result_queue=dispatcher.result_queue, + shutdown_event=shutdown_event, + retry_max_attempts=agent_config.RESULT_RETRY_MAX_ATTEMPTS, + retry_initial_backoff_sec=agent_config. + RESULT_RETRY_INITIAL_BACKOFF_SEC, + retry_max_backoff_sec=agent_config.RESULT_RETRY_MAX_BACKOFF_SEC, + ) + heartbeat.start() + poller.start() + sender.start() + log.info("all threads started") + + # 5. Wait for shutdown signal + def handle_sig(signum, frame): + log.info(f"received signal {signum}, shutting down") + shutdown_event.set() + dispatcher.stop() + + signal.signal(signal.SIGTERM, handle_sig) + signal.signal(signal.SIGINT, handle_sig) + + while not shutdown_event.is_set(): + time.sleep(1) + + # 6. Graceful drain — give result_sender up to 60s to flush + log.info("waiting for in-flight work to complete (max 60s)") + sender.join(timeout=60) + log.info("runner agent exiting") + + +if __name__ == "__main__": + main() diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..f07f2f5 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,11 @@ +"""Smoke test for main.py — verify imports work and signal handler attaches.""" + + +def test_main_module_imports_cleanly(): + """Import test — main.py shouldn't crash on import.""" + import main # noqa: F401 + + +def test_main_function_exists_and_is_callable(): + import main + assert callable(main.main) From bf9453d3b6fa7f634959696d6c791c36e2abae26 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 04:43:22 +0800 Subject: [PATCH 12/15] build(sandbox): switch entrypoint from flask app to main.py - delete app.py and gunicorn.conf.py - Dockerfile CMD: python main.py (+ mkdir -p logs) - requirements.txt: drop flask + gunicorn (no longer needed) - dispatcher/utils.py: update logger fallback from 'gunicorn.error' to 'dispatcher' Sandbox is now an active runner agent that polls Backend, not a passive HTTP server. The pull-loop modules in agent/ replace the old Flask routes. --- Dockerfile | 5 ++- app.py | 88 --------------------------------------------- dispatcher/utils.py | 2 +- gunicorn.conf.py | 10 ------ requirements.txt | 2 -- 5 files changed, 5 insertions(+), 102 deletions(-) delete mode 100644 app.py delete mode 100644 gunicorn.conf.py diff --git a/Dockerfile b/Dockerfile index 3377783..bd69b33 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,4 +6,7 @@ WORKDIR /app COPY requirements.txt requirements.txt RUN pip install -r requirements.txt -CMD ["gunicorn", "-c", "gunicorn.conf.py", "app:app"] +# Make logs directory exist +RUN mkdir -p logs + +CMD ["python", "main.py"] diff --git a/app.py b/app.py deleted file mode 100644 index 7eb7b4d..0000000 --- a/app.py +++ /dev/null @@ -1,88 +0,0 @@ -import os -import logging -import queue -import secrets -from flask import Flask, request, jsonify -from dispatcher.constant import Language -from dispatcher.dispatcher import Dispatcher -from dispatcher.testdata import ( - ensure_testdata, - get_problem_meta, - get_problem_root, -) -from dispatcher.config import (SANDBOX_TOKEN, SUBMISSION_DIR) - -logging.basicConfig(filename='logs/sandbox.log') -app = Flask(__name__) -if __name__ != '__main__': - # let flask app use gunicorn's logger - gunicorn_logger = logging.getLogger('gunicorn.error') - app.logger.handlers = gunicorn_logger.handlers - app.logger.setLevel(gunicorn_logger.level) - logging.getLogger().setLevel(gunicorn_logger.level) -logger = app.logger - -# setup dispatcher -DISPATCHER_CONFIG = os.getenv( - 'DISPATCHER_CONFIG', - '.config/dispatcher.json.example', -) -DISPATCHER = Dispatcher(DISPATCHER_CONFIG) -DISPATCHER.start() - - -@app.post('/submit/') -def submit(submission_id: str): - token = request.values.get('token', '') - if not secrets.compare_digest(token, SANDBOX_TOKEN): - logger.debug(f'get invalid token: {token}') - return 'invalid token', 403 - # Ensure the testdata is up to data - problem_id = request.form.get('problem_id', type=int) - if problem_id is None: - return 'missing problen id', 400 - ensure_testdata(problem_id) - language = Language(request.form.get('language', type=int)) - try: - DISPATCHER.prepare_submission_dir( - root_dir=SUBMISSION_DIR, - submission_id=submission_id, - meta=get_problem_meta(problem_id, language), - source=request.files['src'], - testdata=get_problem_root(problem_id), - ) - except ValueError as e: - return str(e), 400 - logger.debug(f'send submission {submission_id} to dispatcher') - try: - DISPATCHER.handle(submission_id) - except queue.Full: - return jsonify({ - 'status': 'err', - 'msg': 'task queue is full now.\n' - 'please wait a moment and re-send the submission.', - 'data': None, - }), 500 - return jsonify({ - 'status': 'ok', - 'msg': 'ok', - 'data': 'ok', - }) - - -@app.get('/status') -def status(): - ret = { - 'load': DISPATCHER.queue.qsize() / DISPATCHER.MAX_TASK_COUNT, - } - # if token is provided - if secrets.compare_digest(SANDBOX_TOKEN, request.args.get('token', '')): - ret.update({ - 'queueSize': DISPATCHER.queue.qsize(), - 'maxTaskCount': DISPATCHER.MAX_TASK_COUNT, - 'containerCount': DISPATCHER.container_count, - 'maxContainerCount': DISPATCHER.MAX_TASK_COUNT, - 'submissions': [*DISPATCHER.result.keys()], - 'running': DISPATCHER.do_run, - }) - return jsonify(ret), 200 diff --git a/dispatcher/utils.py b/dispatcher/utils.py index dea4bbc..2d6cd86 100644 --- a/dispatcher/utils.py +++ b/dispatcher/utils.py @@ -8,7 +8,7 @@ def logger() -> logging.Logger: try: return current_app.logger except RuntimeError: - return logging.getLogger('gunicorn.error') + return logging.getLogger('dispatcher') # Fake redis server diff --git a/gunicorn.conf.py b/gunicorn.conf.py deleted file mode 100644 index a18dd9d..0000000 --- a/gunicorn.conf.py +++ /dev/null @@ -1,10 +0,0 @@ -port = 1450 -bind = f'0.0.0.0:{port}' -timeout = 60 - -# loglevel = 'debug' -accesslog = 'logs/access.log' -errorlog = 'logs/error.log' - -worker_class = 'gthread' -threads = 5 diff --git a/requirements.txt b/requirements.txt index 42fd0ab..bd96e3c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,5 @@ docker==7.1.0 requests~=2.27 -gunicorn~=20.1 -flask~=2.0 yapf~=0.32 pydantic~=1.9 redis~=4.1.4 From 9e3a9e17d81efe01124d225da2e04920b12ba110 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 05:16:55 +0800 Subject: [PATCH 13/15] refactor(testdata): use per-runner rk_token instead of SANDBOX_TOKEN After Plan B's removal of SANDBOX_TOKEN, dispatcher/testdata.py needs to authenticate to backend's testdata endpoints with the runner's own rk_token (set after registration). - Module-level _RK_TOKEN with set_runner_token() setter - fetch_problem_meta, fetch_testdata, get_checksum use it - main.py calls dispatcher_testdata.set_runner_token(creds.token) after registration --- dispatcher/testdata.py | 23 +++++++++++++++++++---- main.py | 2 ++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/dispatcher/testdata.py b/dispatcher/testdata.py index 4b748ac..75d5182 100644 --- a/dispatcher/testdata.py +++ b/dispatcher/testdata.py @@ -15,10 +15,25 @@ ) from .config import ( BACKEND_API, - SANDBOX_TOKEN, TESTDATA_ROOT, ) +_RK_TOKEN: str | None = None + + +def set_runner_token(rk_token: str) -> None: + """Called once after registration to give testdata module the rk_token.""" + global _RK_TOKEN + _RK_TOKEN = rk_token + + +def _get_token() -> str: + if _RK_TOKEN is None: + raise RuntimeError( + "runner token not set; call set_runner_token() first") + return _RK_TOKEN + + META_DIR = TESTDATA_ROOT / 'meta' META_DIR.mkdir(exist_ok=True) @@ -43,7 +58,7 @@ def fetch_problem_meta(problem_id: int) -> str: resp = rq.get( f'{BACKEND_API}/problem/{problem_id}/meta', params={ - 'token': SANDBOX_TOKEN, + 'token': _get_token(), }, ) handle_problem_response(resp) @@ -73,7 +88,7 @@ def fetch_testdata(problem_id: int): resp = rq.get( f'{BACKEND_API}/problem/{problem_id}/testdata', params={ - 'token': SANDBOX_TOKEN, + 'token': _get_token(), }, ) handle_problem_response(resp) @@ -84,7 +99,7 @@ def get_checksum(problem_id: int) -> str: resp = rq.get( f'{BACKEND_API}/problem/{problem_id}/checksum', params={ - 'token': SANDBOX_TOKEN, + 'token': _get_token(), }, ) handle_problem_response(resp) diff --git a/main.py b/main.py index cb1f533..9356865 100644 --- a/main.py +++ b/main.py @@ -21,6 +21,7 @@ from agent.poller import PollerThread from agent.registration import register_runner from agent.result_sender import ResultSenderThread +from dispatcher import testdata as dispatcher_testdata from dispatcher.dispatcher import Dispatcher # Ensure logs/ directory exists before configuring file handler @@ -49,6 +50,7 @@ def main(): registration_token=agent_config.RUNNER_REGISTRATION_TOKEN, ) log.info(f"registered as {creds.runner_id}") + dispatcher_testdata.set_runner_token(creds.token) # 2. Authenticated client used by all daemon threads client = BackendClient(rk_token=creds.token) From 3c94589fc9eeda53bcbb802f1ddf554277eff4f2 Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 13:46:45 +0800 Subject: [PATCH 14/15] fix(dispatcher): remove leftover flask import in utils.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit main.py is no longer a Flask app — current_app fallback is dead code. Caused ModuleNotFoundError when running the sandbox container. --- dispatcher/utils.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dispatcher/utils.py b/dispatcher/utils.py index 2d6cd86..75323df 100644 --- a/dispatcher/utils.py +++ b/dispatcher/utils.py @@ -1,14 +1,10 @@ import logging import os import redis -from flask import current_app def logger() -> logging.Logger: - try: - return current_app.logger - except RuntimeError: - return logging.getLogger('dispatcher') + return logging.getLogger('dispatcher') # Fake redis server From e44d92aaac53417056ee2a838de795963a6095fe Mon Sep 17 00:00:00 2001 From: as535364 Date: Tue, 28 Apr 2026 14:37:33 +0800 Subject: [PATCH 15/15] style: yapf reformat to satisfy CI --- agent/config.py | 3 ++- tests/agent/test_heartbeat.py | 15 ++++++++++----- tests/agent/test_registration.py | 13 +++++++------ 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/agent/config.py b/agent/config.py index 5f516aa..9487c46 100644 --- a/agent/config.py +++ b/agent/config.py @@ -27,5 +27,6 @@ HTTP_REQUEST_TIMEOUT_SEC: int = 10 # Where to download code zip to (per-job temp dir) -CODE_DOWNLOAD_DIR: Path = Path(os.getenv("CODE_DOWNLOAD_DIR", "/tmp/runner-code")) +CODE_DOWNLOAD_DIR: Path = Path( + os.getenv("CODE_DOWNLOAD_DIR", "/tmp/runner-code")) CODE_DOWNLOAD_DIR.mkdir(exist_ok=True) diff --git a/tests/agent/test_heartbeat.py b/tests/agent/test_heartbeat.py index 2ae7a06..32753ff 100644 --- a/tests/agent/test_heartbeat.py +++ b/tests/agent/test_heartbeat.py @@ -13,8 +13,10 @@ def test_heartbeat_calls_client_at_interval(): shutdown = threading.Event() hb = HeartbeatThread( - client=client, runner_id="rn_1", - interval_sec=0.05, shutdown_event=shutdown, + client=client, + runner_id="rn_1", + interval_sec=0.05, + shutdown_event=shutdown, ) hb.start() time.sleep(0.18) # ~3 intervals @@ -36,8 +38,10 @@ def test_heartbeat_swallows_transient_errors_and_keeps_going(): shutdown = threading.Event() hb = HeartbeatThread( - client=client, runner_id="rn_1", - interval_sec=0.05, shutdown_event=shutdown, + client=client, + runner_id="rn_1", + interval_sec=0.05, + shutdown_event=shutdown, ) hb.start() time.sleep(0.2) @@ -52,7 +56,8 @@ def test_heartbeat_stops_promptly_on_shutdown(): client = MagicMock(spec=BackendClient) shutdown = threading.Event() hb = HeartbeatThread( - client=client, runner_id="rn_1", + client=client, + runner_id="rn_1", interval_sec=10.0, # long interval shutdown_event=shutdown, ) diff --git a/tests/agent/test_registration.py b/tests/agent/test_registration.py index ed90b9e..99e92f6 100644 --- a/tests/agent/test_registration.py +++ b/tests/agent/test_registration.py @@ -15,7 +15,8 @@ def test_register_runner_returns_credentials_and_config(): }, } - result = register_runner(fake_client, name="r1", + result = register_runner(fake_client, + name="r1", registration_token="dev-token") assert result.runner_id == "rn_xyz" @@ -25,18 +26,18 @@ def test_register_runner_returns_credentials_and_config(): assert result.max_concurrent_jobs == 4 fake_client.register.assert_called_once_with( - name="r1", registration_token="dev-token" - ) + name="r1", registration_token="dev-token") def test_register_runner_uses_defaults_for_missing_config_fields(): fake_client = MagicMock(spec=BackendClient) fake_client.register.return_value = { - "runner_id": "rn_a", "token": "rk_a", "config": {}, + "runner_id": "rn_a", + "token": "rk_a", + "config": {}, } - result = register_runner(fake_client, name="r1", - registration_token="t") + result = register_runner(fake_client, name="r1", registration_token="t") # Backend should always send config, but defensively use sensible defaults assert result.heartbeat_interval_sec == 15