-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
861 lines (749 loc) · 31.6 KB
/
app.py
File metadata and controls
861 lines (749 loc) · 31.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
import asyncio
import hashlib
import hmac
import json
import logging
import os
import re
import tempfile
import time
from typing import Any
from urllib.parse import urlparse
import httpx
import modal
from fastapi import HTTPException, Request
# Configure logging
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(record.created)),
"level": record.levelname,
"message": record.getMessage(),
"logger": record.name,
}
for field in (
"job_id",
"repo",
"duration",
"status",
"error_code",
"action",
"attempt",
"delay",
):
value = getattr(record, field, None)
if value is not None:
payload[field] = value
return json.dumps(payload, default=str)
_handler = logging.StreamHandler()
_handler.setFormatter(JsonFormatter())
_root_logger = logging.getLogger()
_root_logger.handlers.clear()
_root_logger.addHandler(_handler)
_root_logger.setLevel(logging.INFO)
logger = logging.getLogger("modal-github-runner")
# =============================================================================
# CONFIGURATION
# =============================================================================
# CRITICAL: Use 2025.06+ Image Builder for Docker-in-Sandbox support
# https://modal.com/docs/guide/docker-in-sandboxes
os.environ.setdefault("MODAL_IMAGE_BUILDER_VERSION", "2025.06")
# Runner version - configurable via environment for security updates
RUNNER_VERSION = os.environ.get("RUNNER_VERSION", "2.333.1")
# Sandbox timeout (3 hours to accommodate long-running crawlers)
TIMEOUT_SECONDS = 10800
# Request body size limit (1MB)
MAX_BODY_SIZE = 1_000_000
# Rate limiting - in-memory deduplication
_processed_jobs: dict[str, float] = {}
_processed_deliveries: set[str] = set()
JOB_DEDUP_WINDOW_SECONDS = 300 # 5 minutes
MAX_PROCESSED_CACHE_SIZE = 10000
# Replay protection - delivery ID cache
DELIVERY_CACHE_MAX_SIZE = 10000
# Repository allowlist (comma-separated, empty = allow all)
# Set via environment: ALLOWED_REPOS="owner/repo1,owner/repo2"
ALLOWED_REPOS_STR = os.environ.get("ALLOWED_REPOS", "")
ALLOWED_REPOS = [r.strip() for r in ALLOWED_REPOS_STR.split(",") if r.strip()]
# HTTP client timeout
HTTP_TIMEOUT_SECONDS = 30.0
# Per-repo concurrency limiting (None or 0 = unlimited)
_concurrent_jobs: dict[str, int] = {}
MAX_CONCURRENT_PER_REPO = os.environ.get("MAX_CONCURRENT_PER_REPO")
if MAX_CONCURRENT_PER_REPO is not None:
MAX_CONCURRENT_PER_REPO = int(MAX_CONCURRENT_PER_REPO)
if MAX_CONCURRENT_PER_REPO <= 0:
MAX_CONCURRENT_PER_REPO = None
GPU_LABEL_TO_ATTR = {
"t4": "T4",
"l4": "L4",
"a100": "A100",
"a100-80gb": "A100_80GB",
"h100": "H100",
}
ALLOWED_CIDRS_STR = os.environ.get("ALLOWED_CIDRS", "")
ALLOWED_CIDRS = [c.strip() for c in ALLOWED_CIDRS_STR.split(",") if c.strip()] if ALLOWED_CIDRS_STR else None
BLOCK_NETWORK = os.environ.get("BLOCK_NETWORK", "").lower() in ("true", "1", "yes")
CACHE_VOLUME_NAME = os.environ.get("CACHE_VOLUME_NAME", "") or None
MODAL_REGION = os.environ.get("MODAL_REGION", "") or None
SANDBOX_EXTRA_ENV: dict = {}
_extra_env_raw = os.environ.get("SANDBOX_EXTRA_ENV", "")
if _extra_env_raw:
try:
parsed = json.loads(_extra_env_raw)
if isinstance(parsed, dict):
SANDBOX_EXTRA_ENV = {str(k): str(v) for k, v in parsed.items()}
else:
logger.warning("SANDBOX_EXTRA_ENV is not a JSON object, ignoring")
except json.JSONDecodeError:
logger.warning("SANDBOX_EXTRA_ENV is not valid JSON, ignoring")
def _get_gpu_config(gpu_key: str):
attr_name = GPU_LABEL_TO_ATTR.get(gpu_key)
if attr_name is None:
return None
gpu_cls = getattr(modal.gpu, attr_name, None)
if gpu_cls is None:
return None
return gpu_cls()
# =============================================================================
# TRUST MODEL
# =============================================================================
# SECURITY NOTE: This runner executes with RUNNER_ALLOW_RUNASROOT=1
#
# Trust Model:
# - Only repositories in ALLOWED_REPOS can trigger runner creation
# - Each job runs in an ephemeral, isolated Modal sandbox
# - JIT tokens are single-use and job-specific
# - Sandbox is destroyed after job completion
#
# Risks:
# - A malicious workflow in an allowed repo could access secrets during execution
# - Root access allows full control within the sandbox during job lifetime
#
# Mitigations:
# - Use ALLOWED_REPOS to restrict to trusted repositories only
# - Modal sandbox isolation limits blast radius
# - JIT tokens cannot be reused after job completion
# - Consider using fine-grained PATs with minimal repository access
# =============================================================================
# start-dockerd.sh content for proper Docker networking inside Modal Sandbox
# Sets up NAT/SNAT rules, forces iptables-legacy (gVisor lacks nftables support),
# and starts dockerd with --iptables=false since we manage rules manually.
# Ref: https://modal.com/docs/guide/docker-in-sandboxes
START_DOCKERD_SH = r"""#!/bin/bash
set -e -o pipefail
dev=$(ip route show default 2>/dev/null | awk '/default/ {print $5}')
if [ -n "$dev" ]; then
addr=$(ip addr show dev "$dev" | grep -w inet | awk '{print $2}' | cut -d/ -f1)
if [ -n "$addr" ]; then
echo 1 > /proc/sys/net/ipv4/ip_forward
iptables-legacy -t nat -A POSTROUTING -o "$dev" -j SNAT --to-source "$addr" -p tcp 2>/dev/null || true
iptables-legacy -t nat -A POSTROUTING -o "$dev" -j SNAT --to-source "$addr" -p udp 2>/dev/null || true
fi
fi
update-alternatives --set iptables /usr/sbin/iptables-legacy 2>/dev/null || true
update-alternatives --set ip6tables /usr/sbin/ip6tables-legacy 2>/dev/null || true
# gVisor requires tmpfs for Docker overlay2 storage driver.
# Without this, Docker falls back to the vfs driver (10-50x slower).
mkdir -p /var/lib/docker
mount -t tmpfs -o size=16G tmpfs /var/lib/docker
exec /usr/bin/dockerd --iptables=false --ip6tables=false > /var/log/dockerd.log 2>&1
"""
_temp_dockerd_script = tempfile.NamedTemporaryFile(mode="w", suffix=".sh", delete=False)
_temp_dockerd_script.write(START_DOCKERD_SH)
_temp_dockerd_script.flush()
os.chmod(_temp_dockerd_script.name, 0o755)
_TEMP_DOCKERD_PATH = _temp_dockerd_script.name
runner_image = (
modal.Image.from_registry("ubuntu:22.04")
.env({"DEBIAN_FRONTEND": "noninteractive"})
# Layer 1: System deps + Docker setup (stable, rarely changes)
.apt_install(
"wget",
"ca-certificates",
"curl",
"iproute2",
"git",
)
.run_commands(
"install -m 0755 -d /etc/apt/keyrings",
"curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc",
"chmod a+r /etc/apt/keyrings/docker.asc",
'echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu $(. /etc/os-release && echo \\"${UBUNTU_CODENAME:-$VERSION_CODENAME}\\") stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null', # noqa: E501
)
.apt_install(
"docker-ce=5:27.5.0-1~ubuntu.22.04~jammy",
"docker-ce-cli=5:27.5.0-1~ubuntu.22.04~jammy",
"containerd.io",
"docker-buildx-plugin",
"docker-compose-plugin",
)
.run_commands(
"rm -f $(which runc) || true",
"wget -q https://github.com/opencontainers/runc/releases/download/v1.3.0/runc.amd64",
"chmod +x runc.amd64",
"mv runc.amd64 /usr/local/bin/runc",
"update-alternatives --set iptables /usr/sbin/iptables-legacy",
"update-alternatives --set ip6tables /usr/sbin/ip6tables-legacy",
)
.add_local_file(_TEMP_DOCKERD_PATH, "/start-dockerd.sh", copy=True)
.run_commands("chmod +x /start-dockerd.sh")
# Layer 1c: Pre-download base images with crane (uses native networking, bypasses gVisor)
.run_commands(
"curl -sL https://github.com/google/go-containerregistry/releases/download/v0.21.5/go-containerregistry_Linux_x86_64.tar.gz | tar -xzf - -C /usr/local/bin crane", # noqa: E501
"mkdir -p /images",
"crane pull node:22-alpine /images/node-22-alpine.tar",
)
# Layer 2: Python deps (semi-stable)
.apt_install("python3", "python3-pip", "python-is-python3")
.pip_install("fastapi==0.115.0", "httpx==0.27.0")
# Layer 3: Runner binary (changes with RUNNER_VERSION)
.run_commands(
"mkdir -p /actions-runner",
f"curl -L https://github.com/actions/runner/releases/download/v{RUNNER_VERSION}/actions-runner-linux-x64-{RUNNER_VERSION}.tar.gz | tar -xz -C /actions-runner", # noqa: E501
"/actions-runner/bin/installdependencies.sh",
)
)
app = modal.App("modal-github-runner")
# Secrets should contain GITHUB_TOKEN, WEBHOOK_SECRET, and optionally ALLOWED_REPOS
github_secret = modal.Secret.from_name("github-full-secret")
# Distributed job queue for controlling concurrent sandbox creation
# This prevents spawning too many sandboxes when GitHub sends many webhooks at once
job_queue = modal.Queue.from_name("github-runner-job-queue", create_if_missing=True)
# Default max concurrent sandboxes (configurable via environment)
DEFAULT_MAX_PARALLEL = int(os.environ.get("DEFAULT_MAX_PARALLEL", "2"))
def _cleanup_job_cache():
"""Remove expired entries from job deduplication cache."""
global _processed_jobs
if len(_processed_jobs) > MAX_PROCESSED_CACHE_SIZE:
current_time = time.time()
_processed_jobs = {
job_id: timestamp
for job_id, timestamp in _processed_jobs.items()
if current_time - timestamp < JOB_DEDUP_WINDOW_SECONDS
}
def _cleanup_delivery_cache():
"""Limit delivery cache size to prevent memory issues."""
global _processed_deliveries
if len(_processed_deliveries) > DELIVERY_CACHE_MAX_SIZE:
_processed_deliveries = set(
list(_processed_deliveries)[DELIVERY_CACHE_MAX_SIZE // 2 :]
)
def _validate_github_url(url: str) -> bool:
"""Validate URL is a legitimate GitHub API URL."""
if not url:
return False
try:
parsed = urlparse(url)
# Only allow github.com and api.github.com
# Also support GitHub Enterprise with custom domains if needed
allowed_domains = {"github.com", "api.github.com"}
github_enterprise_domain = os.environ.get("GITHUB_ENTERPRISE_DOMAIN", "")
if github_enterprise_domain:
allowed_domains.add(github_enterprise_domain)
allowed_domains.add(f"api.{github_enterprise_domain}")
return parsed.netloc in allowed_domains and parsed.scheme == "https"
except Exception:
return False
def _sanitize_error_message(error_text: str, max_length: int = 200) -> str:
"""Sanitize error messages to prevent information disclosure."""
if not error_text:
return "[empty response]"
# Remove potential sensitive patterns
sanitized = re.sub(
r'(token|key|secret|password|auth)["\']?\s*[:=]\s*["\']?[^"\'\s]+',
r"\1=[REDACTED]",
error_text,
flags=re.IGNORECASE,
)
# Truncate to prevent log flooding
if len(sanitized) > max_length:
sanitized = sanitized[:max_length] + "...[truncated]"
return sanitized
async def _call_github_api(
client: Any, method: str, url: str, max_attempts: int = 3, **kwargs
) -> Any:
"""Call GitHub API with exponential backoff retry on transient failures."""
backoffs = [1, 2, 4]
for attempt in range(1, max_attempts + 1):
try:
response = await client.request(method, url, **kwargs)
response.raise_for_status()
return response
except httpx.HTTPStatusError as exc:
status_code = exc.response.status_code
should_retry = status_code == 429 or 500 <= status_code < 600
if not should_retry or attempt == max_attempts:
raise
delay = backoffs[attempt - 1]
logger.warning(
"Retrying GitHub API request",
extra={"attempt": attempt, "status": status_code, "delay": delay},
)
await asyncio.sleep(delay)
except httpx.TimeoutException:
if attempt == max_attempts:
raise
delay = backoffs[attempt - 1]
logger.warning(
"Retrying GitHub API request after timeout",
extra={"attempt": attempt, "delay": delay, "error_code": "timeout"},
)
await asyncio.sleep(delay)
async def verify_signature(request: Request, body: bytes) -> str | None:
"""
Verify GitHub webhook signature using HMAC-SHA256.
Returns the X-GitHub-Delivery header value if valid, for replay protection.
"""
webhook_secret = os.environ.get("WEBHOOK_SECRET")
if not webhook_secret:
logger.error(
"Webhook secret not configured",
extra={"error_code": "missing_webhook_secret"},
)
raise HTTPException(status_code=500, detail="Internal server error")
# Validate Content-Type
content_type = request.headers.get("Content-Type", "")
if "application/json" not in content_type:
logger.warning(
"Invalid Content-Type",
extra={"error_code": "invalid_content_type"},
)
raise HTTPException(status_code=400, detail="Invalid Content-Type")
signature = request.headers.get("X-Hub-Signature-256")
if not signature:
logger.error(
"Missing X-Hub-Signature-256 header",
extra={"error_code": "missing_signature"},
)
raise HTTPException(status_code=403, detail="Signature missing")
# Get delivery ID for replay protection
delivery_id = request.headers.get("X-GitHub-Delivery")
if not delivery_id:
logger.warning(
"Missing X-GitHub-Delivery header",
extra={"error_code": "missing_delivery_id"},
)
raise HTTPException(status_code=400, detail="Missing delivery ID")
def _check_secret(secret: str) -> bool:
computed = hmac.new(secret.encode(), msg=body, digestmod=hashlib.sha256)
return hmac.compare_digest("sha256=" + computed.hexdigest(), signature)
if not _check_secret(webhook_secret):
old_secret = os.environ.get("WEBHOOK_SECRET_OLD")
if old_secret and _check_secret(old_secret):
logger.info(
"Signature verified with old webhook secret (rotation in progress)",
extra={"delivery_id": delivery_id},
)
else:
logger.error("Invalid signature", extra={"error_code": "invalid_signature"})
raise HTTPException(status_code=403, detail="Invalid signature")
return delivery_id
@app.function(image=runner_image, secrets=[github_secret])
@modal.fastapi_endpoint(method="GET")
async def health(request: Request):
return {"status": "ok"}
@app.function(image=runner_image, secrets=[github_secret])
@modal.fastapi_endpoint(method="POST")
async def github_webhook(request: Request):
# Check body size before reading
content_length = request.headers.get("Content-Length")
if content_length:
try:
if int(content_length) > MAX_BODY_SIZE:
logger.warning(
"Request body too large",
extra={"error_code": "payload_too_large"},
)
raise HTTPException(status_code=413, detail="Payload too large")
except ValueError:
pass # Invalid Content-Length, let it fail later
body = await request.body()
# Verify actual body size
if len(body) > MAX_BODY_SIZE:
logger.warning(
"Request body too large",
extra={"error_code": "payload_too_large"},
)
raise HTTPException(status_code=413, detail="Payload too large")
# Verify signature and get delivery ID
delivery_id = await verify_signature(request, body)
# Replay protection
if not delivery_id:
raise HTTPException(status_code=400, detail="Missing delivery ID")
if delivery_id in _processed_deliveries:
logger.warning(
"Duplicate delivery ID detected",
extra={"error_code": "duplicate_delivery"},
)
return {"status": "duplicate", "message": "Request already processed"}
_cleanup_delivery_cache()
_processed_deliveries.add(delivery_id)
start_time = time.monotonic()
try:
payload = json.loads(body)
except Exception as e:
logger.error(
"Failed to parse JSON payload",
extra={"error_code": type(e).__name__},
)
raise HTTPException(status_code=400, detail="Invalid JSON")
# Only spawn sandboxes for jobs that are queued
# GitHub's max-parallel setting is enforced by the job queue
if payload.get("action") != "queued":
# Handle job cancellation - terminate sandbox by tag lookup
# This works correctly because we use job-specific labels (job-{job_id})
# ensuring 1:1 binding between sandbox and job
if payload.get("action") == "completed":
workflow_job = payload.get("workflow_job", {})
job_id = str(workflow_job.get("id", "unknown"))
conclusion = workflow_job.get("conclusion", "")
# Only terminate on cancellation - normal completions exit naturally
if conclusion == "cancelled":
for sb in modal.Sandbox.list(
app_id=app.app_id, tags={"job_id": job_id}
):
if sb.poll() is None: # Still running
logger.info(
"Terminating sandbox for cancelled job",
extra={"job_id": job_id},
)
try:
sb.terminate()
except Exception as e:
logger.error(
"Failed to terminate sandbox for job",
extra={"job_id": job_id, "error_code": type(e).__name__},
)
return {"status": "terminated", "job_id": job_id}
logger.debug(
"Ignoring non-queued action",
extra={"action": payload.get("action")},
)
logger.info(
"Job metrics",
extra={
"metric": "job_complete",
"status": "ignored",
"duration_seconds": round(time.monotonic() - start_time, 3),
},
)
return {"status": "ignored"}
workflow_job = payload.get("workflow_job")
if not isinstance(workflow_job, dict):
raise HTTPException(status_code=400, detail="Missing workflow_job in payload")
if workflow_job.get("id") is None:
raise HTTPException(status_code=400, detail="Missing workflow_job.id")
job_labels = workflow_job.get("labels")
if not isinstance(job_labels, list):
raise HTTPException(status_code=400, detail="Missing workflow_job.labels")
repository = payload.get("repository")
if not isinstance(repository, dict):
raise HTTPException(status_code=400, detail="Missing repository in payload")
repo_url = repository.get("url")
if not repo_url:
raise HTTPException(status_code=400, detail="Missing repository.url")
repo_full_name = repository.get("full_name")
if not repo_full_name:
raise HTTPException(status_code=400, detail="Missing repository.full_name")
job_id = workflow_job["id"]
# CHECK FOR MODAL LABEL
# Ignore jobs that don't explicitly request 'modal' runner
if "modal" not in job_labels:
logger.info(
"Ignoring job without modal label",
extra={"job_id": job_id, "repo": repo_full_name},
)
logger.info(
"Job metrics",
extra={
"metric": "job_complete",
"job_id": job_id,
"repo": repo_full_name,
"status": "ignored",
"duration_seconds": round(time.monotonic() - start_time, 3),
},
)
return {"status": "ignored", "reason": "no modal label"}
gpu_config = None
gpu_requested = None
for label in job_labels:
if label.startswith("gpu:"):
gpu_key = label.split(":", 1)[1].lower()
gpu_requested = gpu_key
gpu_config = _get_gpu_config(gpu_key)
if not gpu_config:
logger.warning(
"Unknown GPU type requested",
extra={"job_id": job_id, "gpu": gpu_key},
)
else:
logger.info(
"GPU requested for job",
extra={"job_id": job_id, "gpu": gpu_key},
)
break
# Repository allowlist validation
if ALLOWED_REPOS and repo_full_name not in ALLOWED_REPOS:
logger.warning(
"Rejected webhook from unauthorized repo",
extra={"repo": repo_full_name, "error_code": "repo_not_allowed"},
)
raise HTTPException(status_code=403, detail="Repository not authorized")
# Validate repo URL domain
if not _validate_github_url(repo_url):
logger.error(
"Invalid repository URL domain",
extra={"repo": repo_full_name, "error_code": "invalid_repo_url"},
)
raise HTTPException(status_code=400, detail="Invalid repository URL")
# Rate limiting - deduplicate job IDs
current_time = time.time()
if str(job_id) in _processed_jobs:
last_processed = _processed_jobs[str(job_id)]
if current_time - last_processed < JOB_DEDUP_WINDOW_SECONDS:
logger.warning(
"Duplicate job ID detected",
extra={"job_id": job_id, "repo": repo_full_name, "error_code": "duplicate_job"},
)
logger.info(
"Job metrics",
extra={
"metric": "job_complete",
"job_id": job_id,
"repo": repo_full_name,
"status": "duplicate",
"duration_seconds": round(time.monotonic() - start_time, 3),
},
)
return {"status": "duplicate", "job_id": job_id}
_cleanup_job_cache()
_processed_jobs[str(job_id)] = current_time
if MAX_CONCURRENT_PER_REPO is not None:
current_concurrent = _concurrent_jobs.get(repo_full_name, 0)
if current_concurrent >= MAX_CONCURRENT_PER_REPO:
logger.warning(
"Per-repo concurrency limit reached",
extra={
"job_id": job_id,
"repo": repo_full_name,
"current": current_concurrent,
"limit": MAX_CONCURRENT_PER_REPO,
"error_code": "concurrency_limit",
},
)
raise HTTPException(
status_code=429,
detail="Too many concurrent jobs for this repository",
)
_concurrent_jobs[repo_full_name] = current_concurrent + 1
logger.info(
"Incremented per-repo concurrency counter",
extra={
"job_id": job_id,
"repo": repo_full_name,
"current": current_concurrent + 1,
"limit": MAX_CONCURRENT_PER_REPO,
},
)
# Fetch configuration from environment with defaults
runner_group_id = int(os.environ.get("RUNNER_GROUP_ID", 1))
# Use labels from the webhook directly - workflow defines unique labels
# Workflow should use: runs-on: [self-hosted, modal, job-${{ github.run_id }}-${{ strategy.job-index }}]
# This ensures 1:1 binding between runner and job
runner_labels = job_labels if job_labels else ["self-hosted", "modal"]
headers = {
"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}",
"Accept": "application/vnd.github+json",
}
data = {
"name": f"modal-runner-{job_id}",
"runner_group_id": runner_group_id,
"labels": runner_labels,
"work_directory": "_work",
}
try:
logger.info(
"Requesting JIT config",
extra={"job_id": job_id, "repo": repo_full_name},
)
api_start = time.monotonic()
jit_duration = 0.0
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT_SECONDS) as client:
try:
response = await _call_github_api(
client,
"POST",
f"{repo_url}/actions/runners/generate-jitconfig",
headers=headers,
json=data,
)
jit_duration = time.monotonic() - api_start
jit_config = response.json()["encoded_jit_config"]
logger.info(
"Received JIT config",
extra={
"job_id": job_id,
"repo": repo_full_name,
"status": response.status_code,
"jit_request_duration": round(jit_duration, 3),
},
)
except httpx.HTTPStatusError as e:
jit_duration = time.monotonic() - api_start
sanitized_error = _sanitize_error_message(e.response.text)
logger.error(
"GitHub API error",
extra={
"job_id": job_id,
"repo": repo_full_name,
"status": e.response.status_code,
"jit_request_duration": round(jit_duration, 3),
"error_code": sanitized_error,
},
)
raise HTTPException(
status_code=e.response.status_code,
detail="Failed to generate JIT config",
)
except httpx.TimeoutException:
jit_duration = time.monotonic() - api_start
logger.error(
"GitHub API timeout",
extra={
"job_id": job_id,
"repo": repo_full_name,
"jit_request_duration": round(jit_duration, 3),
"error_code": "timeout",
},
)
raise HTTPException(status_code=504, detail="GitHub API timeout")
except Exception as e:
jit_duration = time.monotonic() - api_start
logger.error(
"Unexpected error calling GitHub API",
extra={
"job_id": job_id,
"repo": repo_full_name,
"jit_request_duration": round(jit_duration, 3),
"error_code": type(e).__name__,
},
)
raise HTTPException(status_code=500, detail="Internal server error")
cache_volume = None
sandbox_volumes = {}
if CACHE_VOLUME_NAME:
cache_volume = modal.Volume.from_name(CACHE_VOLUME_NAME, create_if_missing=True)
sandbox_volumes["/cache"] = cache_volume
logger.info("Spawning sandbox", extra={"job_id": job_id, "repo": repo_full_name, "gpu": gpu_config is not None})
cmd = (
"bash -c '/start-dockerd.sh &'"
" && sleep 5"
" && for i in $(seq 1 60); do docker info > /dev/null 2>&1 && break; sleep 1; done"
" && docker info > /dev/null 2>&1 "
"|| { echo \"dockerd failed to start\"; cat /var/log/dockerd.log 2>/dev/null; exit 1; }"
" && (docker load -i /images/node-22-alpine.tar || true)"
" && cd /actions-runner"
" && export RUNNER_ALLOW_RUNASROOT=1"
" && export DOCKER_BUILDKIT=1"
" && ./run.sh --jitconfig $GHA_JIT_CONFIG 2>&1 | tee /tmp/runner.log"
)
sandbox_kwargs = dict(
image=runner_image,
app=app,
timeout=TIMEOUT_SECONDS,
env={"GHA_JIT_CONFIG": jit_config, **SANDBOX_EXTRA_ENV},
gpu=gpu_config,
cidr_allowlist=ALLOWED_CIDRS,
block_network=BLOCK_NETWORK,
volumes=sandbox_volumes,
experimental_options={"enable_docker": True},
)
if MODAL_REGION:
sandbox_kwargs["region"] = MODAL_REGION
spawn_start = time.monotonic()
sandbox = await modal.Sandbox.create.aio(
"bash",
"-c",
cmd,
**sandbox_kwargs,
)
await sandbox.set_tags.aio({"job_id": str(job_id)})
spawn_duration = time.monotonic() - spawn_start
logger.info(
"Successfully spawned sandbox",
extra={
"job_id": job_id,
"repo": repo_full_name,
"sandbox_spawn_duration": round(spawn_duration, 3),
},
)
logger.info(
"Job metrics",
extra={
"metric": "job_complete",
"job_id": job_id,
"repo": repo_full_name,
"status": "success",
"duration_seconds": round(time.monotonic() - start_time, 3),
"gpu_requested": gpu_requested,
"region": MODAL_REGION,
"jit_request_duration": round(jit_duration, 3),
"sandbox_spawn_duration": round(spawn_duration, 3),
},
)
return {"status": "provisioned", "job_id": job_id}
except HTTPException:
raise
except Exception as e:
logger.error(
"Failed to spawn sandbox",
extra={"job_id": job_id, "repo": repo_full_name, "error_code": type(e).__name__},
)
logger.info(
"Job metrics",
extra={
"metric": "job_complete",
"job_id": job_id,
"repo": repo_full_name,
"status": "failed",
"duration_seconds": round(time.monotonic() - start_time, 3),
"gpu_requested": gpu_requested,
"region": MODAL_REGION,
"error_code": type(e).__name__,
},
)
raise HTTPException(status_code=500, detail="Failed to spawn runner sandbox")
finally:
if MAX_CONCURRENT_PER_REPO is not None:
_concurrent_jobs[repo_full_name] = max(
0, _concurrent_jobs.get(repo_full_name, 0) - 1
)
logger.info(
"Decremented per-repo concurrency counter",
extra={
"repo": repo_full_name,
"current": _concurrent_jobs.get(repo_full_name, 0),
"limit": MAX_CONCURRENT_PER_REPO,
},
)
@app.function(image=runner_image, secrets=[github_secret])
@modal.fastapi_endpoint(method="POST")
async def debug_runner_log(request: Request):
"""Read /tmp/runner.log from a running sandbox by job_id."""
try:
body = await request.json()
job_id = body.get("job_id")
if not job_id:
raise HTTPException(status_code=400, detail="Missing job_id")
for sb in modal.Sandbox.list(app_id=app.app_id, tags={"job_id": str(job_id)}):
try:
log = sb.filesystem.read_text("/tmp/runner.log")
return {"job_id": job_id, "log": log}
except Exception:
continue
return {"job_id": job_id, "log": "Sandbox not found or log not available"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading runner log: {e}")