feat: add pull-based runner client (GitHub Actions runner style)#50
feat: add pull-based runner client (GitHub Actions runner style)#50as535364 wants to merge 5 commits intoNormal-OJ:mainfrom
Conversation
Add runner_client.py that operates as a pull-based runner instead of the traditional push-based Flask server. The runner polls the backend for pending judge jobs, claims them, processes them in Docker containers, and reports results back — similar to GitHub Actions self-hosted runners. Changes: - runner_client.py: Polling-based runner with concurrent job processing, heartbeat support, and graceful shutdown - Dockerfile: Support SANDBOX_MODE=pull env var to start runner client https://claude.ai/code/session_01CD3q84rDaDXMnAmQVh58qR
|
社長 記得 format😭😭 |
Cover poll_for_jobs, claim_job, download_code, download_testdata, send_heartbeat, report_result, and the main run loop with 18 tests using mocked HTTP responses. https://claude.ai/code/session_01CD3q84rDaDXMnAmQVh58qR
|
是 claude 不乖ㄛ |
- Patch runner_client.time.sleep instead of global time.sleep - Reset running_jobs in mock_sleep so the drain loop exits - Simplify test_claims_and_processes_jobs mock setup https://claude.ai/code/session_01CD3q84rDaDXMnAmQVh58qR
There was a problem hiding this comment.
Pull request overview
This PR introduces a new pull-based sandbox runner client that polls a backend for pending judge jobs, claims them, executes them via existing SubmissionRunner Docker execution, and reports results—enabling a GitHub Actions runner–style workflow alongside the existing push-based Flask server.
Changes:
- Added
runner_client.pyimplementing polling, claiming, downloading artifacts, executing jobs concurrently, heartbeat, and shutdown handling. - Added
tests/test_runner_client.pywith unit tests for the runner client’s networking helpers and main-loop behavior. - Updated
Dockerfileto supportSANDBOX_MODE=pullto start the new runner client instead of gunicorn.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
runner_client.py |
Implements the new pull-based runner client, including job lifecycle handling and Docker-based execution. |
tests/test_runner_client.py |
Adds tests for polling/claiming/download/report helpers and basic run-loop behavior. |
Dockerfile |
Adds an environment switch to run either the legacy push server or the new pull runner. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Prepare submission directory | ||
| if submission_dir.exists(): | ||
| shutil.rmtree(submission_dir) | ||
| submission_dir.mkdir(parents=True) |
There was a problem hiding this comment.
submission_dir is derived from submission_id (backend-controlled) and then passed to shutil.rmtree(). If submission_id contains path separators/.., this can delete outside SUBMISSION_DIR. Validate submission_id (e.g., strict allowed charset) and/or assert submission_dir.resolve() is within SUBMISSION_DIR.resolve() before deleting/creating.
runner_client.py
Outdated
| # Read config to get working_dir for host paths | ||
| with open('.config/submission.json') as f: | ||
| s_config = json.load(f) | ||
| host_base = pathlib.Path( | ||
| s_config['working_dir']) / submission_id / 'testcase' |
There was a problem hiding this comment.
.config/submission.json is opened and parsed inside the innermost testcase loop, so it will be re-read for every case. Load this config (and compute host_base) once per job (or once per process) before iterating cases to reduce IO and simplify the control flow.
runner_client.py
Outdated
| import sys | ||
| import tempfile |
There was a problem hiding this comment.
sys and tempfile are imported but not used in this module. Removing unused imports will avoid lint noise and keep the dependency surface minimal.
| import sys | |
| import tempfile |
| CMD if [ "$SANDBOX_MODE" = "pull" ]; then \ | ||
| python runner_client.py; \ | ||
| else \ | ||
| gunicorn -c gunicorn.conf.py app:app; \ | ||
| fi |
There was a problem hiding this comment.
This shell-form CMD runs under /bin/sh -c, so the actual process (python/gunicorn) won't be PID 1 and may not receive SIGTERM/SIGINT properly (can break graceful shutdown and health signaling). Prefer an ENTRYPOINT script or sh -c with exec in both branches so the selected process becomes PID 1.
| CMD if [ "$SANDBOX_MODE" = "pull" ]; then \ | |
| python runner_client.py; \ | |
| else \ | |
| gunicorn -c gunicorn.conf.py app:app; \ | |
| fi | |
| CMD ["/bin/sh", "-c", "if [ \"$SANDBOX_MODE\" = \"pull\" ]; then exec python runner_client.py; else exec gunicorn -c gunicorn.conf.py app:app; fi"] |
| def process_job(self, job: JobInfo): | ||
| """ | ||
| Process a single job: download data, compile, execute, report. | ||
| Runs in a separate thread. | ||
| """ |
There was a problem hiding this comment.
process_job() contains most of the new runner behavior (meta parsing, path construction, compile/run execution, result shaping, cleanup), but the current tests only cover the networking helpers and mock out process_job in the run-loop test. Add tests that exercise process_job with SubmissionRunner/network calls mocked so regressions in result formatting and cleanup are caught.
runner_client.py
Outdated
| src_dir = dest_dir / 'src' | ||
| src_dir.mkdir(exist_ok=True) | ||
| with ZipFile(io.BytesIO(resp.content)) as zf: | ||
| zf.extractall(src_dir) |
There was a problem hiding this comment.
ZipFile.extractall() is used on backend-provided zip content without validating member paths. A zip with .. or absolute paths can write outside dest_dir/src (Zip Slip). Consider validating each ZipInfo.filename stays within src_dir before extracting (or extract to a temp dir and move safe paths).
runner_client.py
Outdated
| testcase_dir = dest_dir / 'testcase' | ||
| testcase_dir.mkdir(exist_ok=True) | ||
| with ZipFile(io.BytesIO(resp.content)) as zf: | ||
| zf.extractall(testcase_dir) |
There was a problem hiding this comment.
ZipFile.extractall() is used on backend-provided zip content without validating member paths. A malicious zip can write outside dest_dir/testcase via .. or absolute paths. Add a safe-extraction check (reject/skip unsafe members) before extracting.
…PID 1 - Remove unused imports (sys, tempfile) - Add _safe_extractall() to prevent Zip Slip attacks on downloaded zips - Move .config/submission.json read outside the innermost test case loop - Use exec in Dockerfile CMD so the process becomes PID 1 https://claude.ai/code/session_01CD3q84rDaDXMnAmQVh58qR
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| with open('.config/submission.json') as f: | ||
| s_config = json.load(f) | ||
| host_base = pathlib.Path( | ||
| s_config['working_dir']) / submission_id / 'testcase' |
There was a problem hiding this comment.
process_job() writes files under SUBMISSION_DIR, but SubmissionRunner uses .config/submission.json's working_dir to locate src (and Docker bind mounts use the host path). If SUBMISSION_DIR and working_dir diverge, compile/run will fail because the expected working_dir/<id>/src and working_dir/<id>/testcase paths won’t exist. Consider deriving the submission directory from working_dir (or validating they match early and failing fast with a clear error).
| s_config['working_dir']) / submission_id / 'testcase' | |
| s_config['working_dir']) / submission_id / 'testcase' | |
| # Ensure submission_dir root matches config working_dir to avoid | |
| # mismatched paths between where files are written and where | |
| # SubmissionRunner/Docker expect them. | |
| config_root = pathlib.Path(s_config['working_dir']) | |
| submission_root = submission_dir.parent | |
| if submission_root != config_root: | |
| logger.error( | |
| 'Configuration error: submission_dir root (%s) does not ' | |
| 'match working_dir from .config/submission.json (%s). ' | |
| 'Ensure SUBMISSION_DIR and working_dir are consistent.', | |
| submission_root, | |
| config_root, | |
| ) | |
| raise RuntimeError( | |
| 'Runner misconfiguration: SUBMISSION_DIR and ' | |
| 'working_dir mismatch' | |
| ) |
There was a problem hiding this comment.
SUBMISSION_DIR vs working_dir mismatch (runner_client.py L254)
This is by design. SUBMISSION_DIR is where the runner client writes files inside its own container. working_dir from submission.json is the Docker volume host mount path used by SubmissionRunner to map into the sandbox container. They are intentionally different paths serving different purposes.
from claude
| signal.signal(signal.SIGINT, on_signal) | ||
| signal.signal(signal.SIGTERM, on_signal) | ||
|
|
||
| while not self.shutdown: | ||
| # Check if we can take more jobs | ||
| with self.running_lock: | ||
| available_slots = MAX_CONCURRENT - self.running_jobs | ||
|
|
||
| if available_slots <= 0: | ||
| time.sleep(POLL_INTERVAL) | ||
| continue | ||
|
|
||
| # Poll for jobs | ||
| jobs = self.poll_for_jobs() | ||
| if not jobs: | ||
| time.sleep(POLL_INTERVAL) | ||
| continue | ||
|
|
||
| # Try to claim and process jobs | ||
| for job_info in jobs[:available_slots]: | ||
| job = self.claim_job(job_info['submissionId']) | ||
| if job is None: | ||
| continue | ||
|
|
||
| with self.running_lock: | ||
| self.running_jobs += 1 | ||
|
|
||
| thread = threading.Thread( | ||
| target=self.process_job, | ||
| args=(job, ), | ||
| daemon=True, | ||
| ) | ||
| thread.start() | ||
|
|
||
| time.sleep(POLL_INTERVAL) | ||
|
|
||
| # Wait for running jobs to finish | ||
| logger.info('Waiting for running jobs to complete...') | ||
| while True: | ||
| with self.running_lock: | ||
| if self.running_jobs == 0: | ||
| break | ||
| time.sleep(1) | ||
| logger.info('Runner stopped') | ||
|
|
||
|
|
There was a problem hiding this comment.
run() unconditionally installs SIGINT/SIGTERM handlers. signal.signal() raises ValueError if called outside the main thread, and it also mutates global process state (which can interfere with pytest’s own SIGINT handling when tests call Runner.run()). Consider guarding this with a main-thread check (or a flag to disable signal handler installation for embedding/tests) and/or restoring previous handlers on exit.
| signal.signal(signal.SIGINT, on_signal) | |
| signal.signal(signal.SIGTERM, on_signal) | |
| while not self.shutdown: | |
| # Check if we can take more jobs | |
| with self.running_lock: | |
| available_slots = MAX_CONCURRENT - self.running_jobs | |
| if available_slots <= 0: | |
| time.sleep(POLL_INTERVAL) | |
| continue | |
| # Poll for jobs | |
| jobs = self.poll_for_jobs() | |
| if not jobs: | |
| time.sleep(POLL_INTERVAL) | |
| continue | |
| # Try to claim and process jobs | |
| for job_info in jobs[:available_slots]: | |
| job = self.claim_job(job_info['submissionId']) | |
| if job is None: | |
| continue | |
| with self.running_lock: | |
| self.running_jobs += 1 | |
| thread = threading.Thread( | |
| target=self.process_job, | |
| args=(job, ), | |
| daemon=True, | |
| ) | |
| thread.start() | |
| time.sleep(POLL_INTERVAL) | |
| # Wait for running jobs to finish | |
| logger.info('Waiting for running jobs to complete...') | |
| while True: | |
| with self.running_lock: | |
| if self.running_jobs == 0: | |
| break | |
| time.sleep(1) | |
| logger.info('Runner stopped') | |
| is_main_thread = threading.current_thread() is threading.main_thread() | |
| old_sigint_handler = None | |
| old_sigterm_handler = None | |
| if is_main_thread: | |
| # Save existing handlers and install our own for graceful shutdown | |
| old_sigint_handler = signal.getsignal(signal.SIGINT) | |
| old_sigterm_handler = signal.getsignal(signal.SIGTERM) | |
| signal.signal(signal.SIGINT, on_signal) | |
| signal.signal(signal.SIGTERM, on_signal) | |
| try: | |
| while not self.shutdown: | |
| # Check if we can take more jobs | |
| with self.running_lock: | |
| available_slots = MAX_CONCURRENT - self.running_jobs | |
| if available_slots <= 0: | |
| time.sleep(POLL_INTERVAL) | |
| continue | |
| # Poll for jobs | |
| jobs = self.poll_for_jobs() | |
| if not jobs: | |
| time.sleep(POLL_INTERVAL) | |
| continue | |
| # Try to claim and process jobs | |
| for job_info in jobs[:available_slots]: | |
| job = self.claim_job(job_info['submissionId']) | |
| if job is None: | |
| continue | |
| with self.running_lock: | |
| self.running_jobs += 1 | |
| thread = threading.Thread( | |
| target=self.process_job, | |
| args=(job, ), | |
| daemon=True, | |
| ) | |
| thread.start() | |
| time.sleep(POLL_INTERVAL) | |
| # Wait for running jobs to finish | |
| logger.info('Waiting for running jobs to complete...') | |
| while True: | |
| with self.running_lock: | |
| if self.running_jobs == 0: | |
| break | |
| time.sleep(1) | |
| logger.info('Runner stopped') | |
| finally: | |
| # Restore previous signal handlers to avoid leaking global state | |
| if is_main_thread and old_sigint_handler is not None: | |
| signal.signal(signal.SIGINT, old_sigint_handler) | |
| if is_main_thread and old_sigterm_handler is not None: | |
| signal.signal(signal.SIGTERM, old_sigterm_handler) |
There was a problem hiding this comment.
Signal handler outside main thread (runner_client.py L386)
run() is only called from if name == 'main', which is always the main thread. In tests, the loop is controlled via mocked poll_for_jobs — signal handlers are never exercised. Adding a main-thread guard is unnecessary complexity.
from claude
| runner.poll_for_jobs = MagicMock(return_value=[]) | ||
|
|
||
| def set_shutdown(*_args): | ||
| runner.shutdown = True | ||
|
|
||
| runner.poll_for_jobs.side_effect = set_shutdown | ||
| runner.run() |
There was a problem hiding this comment.
In test_shutdown_stops_loop, poll_for_jobs.side_effect sets runner.shutdown but does not return a list. That makes jobs become None, so the loop still hits time.sleep(POLL_INTERVAL) once before exiting, adding an avoidable real-time delay to the test. Consider returning [] from the side effect and/or patching runner_client.time.sleep to a no-op in this test to keep the suite fast and deterministic.
| runner.poll_for_jobs = MagicMock(return_value=[]) | |
| def set_shutdown(*_args): | |
| runner.shutdown = True | |
| runner.poll_for_jobs.side_effect = set_shutdown | |
| runner.run() | |
| runner.poll_for_jobs = MagicMock() | |
| def set_shutdown(*_args): | |
| runner.shutdown = True | |
| return [] | |
| runner.poll_for_jobs.side_effect = set_shutdown | |
| with patch.object(runner_client.time, 'sleep', return_value=None): | |
| runner.run() |
| submission_dir.mkdir(parents=True) | ||
|
|
||
| # Write meta.json | ||
| meta = Meta.parse_obj(job.meta) |
There was a problem hiding this comment.
process_job() parses job.meta as a dispatcher.meta.Meta, but Meta requires a language field while claim_job() (and the unit test) only populate meta with tasks. This will raise a validation error at runtime and prevent writing meta.json / executing the job. Consider constructing a full Meta payload (e.g., merge job.meta with {'language': job.language}) or avoid Meta.parse_obj here and write a JSON that matches what the rest of the system expects.
| meta = Meta.parse_obj(job.meta) | |
| # Ensure Meta receives required fields (e.g., language) in addition to job.meta | |
| meta_payload = {} | |
| if isinstance(job.meta, dict): | |
| meta_payload.update(job.meta) | |
| elif job.meta is not None: | |
| # Fallback: try to parse non-dict meta as JSON string | |
| try: | |
| meta_payload.update(json.loads(job.meta)) | |
| except (TypeError, ValueError): | |
| logger.warning('Unexpected meta format for job %s; using empty meta', submission_id) | |
| # Meta model requires language; use language from the job info if not present | |
| if 'language' not in meta_payload: | |
| meta_payload['language'] = job.language | |
| meta = Meta.parse_obj(meta_payload) |
There was a problem hiding this comment.
Meta validation error (runner_client.py L207)
Not an issue. The backend's claim_job response already includes language in the meta dict (see model/runner.py L136: 'language': sub.language). Meta.parse_obj(job.meta) will parse correctly.
from claude
Add runner_client.py that operates as a pull-based runner instead of the traditional push-based Flask server. The runner polls the backend for pending judge jobs, claims them, processes them in Docker containers, and reports results back — similar to GitHub Actions self-hosted runners.
Changes:
https://claude.ai/code/session_01CD3q84rDaDXMnAmQVh58qR