Skip to content

Commit 9b615b8

Browse files
author
Project Team
committed
Add async queue-based verify to eliminate CloudFront timeout
- New SQLite-backed job queue (queue_manager.py) shared between API and worker containers via a named Docker volume - New worker container (worker.py) processes one Ollama inference at a time, retrying up to 3 times on failure — eliminates all GPU-level concurrency issues - POST /verify/async + GET /verify/status/{job_id} API endpoints replace the blocking /verify call for UI and test_verifier.py remote mode - UI POST /ui/verify now enqueues and redirects to a polling spinner page (verify_pending.html) instead of blocking on Ollama - test_verifier.py --remote-host updated to use async endpoint with poll_for_result() helper — fixes the sequential timeout failures - Dockerfile gains a worker stage; docker-compose.dev.yml and production docker-compose.yml (instance_init.sh) gain the worker service - deploy.yml builds and pushes ttb-verifier-worker:latest alongside app
1 parent 9d8b7bd commit 9b615b8

13 files changed

Lines changed: 1602 additions & 165 deletions

File tree

.github/workflows/deploy.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ on:
77
paths:
88
- 'app/**'
99
- 'Dockerfile'
10+
- 'docker-compose.dev.yml'
1011
- 'scripts/deploy.sh'
1112
- '.github/workflows/deploy.yml'
1213

@@ -79,6 +80,18 @@ jobs:
7980
cache-from: type=gha
8081
cache-to: type=gha,mode=max
8182

83+
- name: Build and push worker Docker image
84+
uses: docker/build-push-action@v5
85+
with:
86+
context: .
87+
target: worker
88+
push: true
89+
tags: |
90+
${{ env.REGISTRY }}/${{ steps.image.outputs.name }}-worker:latest
91+
${{ env.REGISTRY }}/${{ steps.image.outputs.name }}-worker:${{ github.ref_name }}-${{ github.sha }}
92+
cache-from: type=gha
93+
cache-to: type=gha,mode=max
94+
8295
deploy:
8396
needs: build-and-push
8497
runs-on: ubuntu-latest

Dockerfile

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ RUN pytest tests/ \
3131
--cov-fail-under=50 \
3232
-v
3333

34-
# Stage 4: Production image
34+
# Stage 4: Production image (FastAPI app — uvicorn 4 workers)
3535
FROM base AS production
3636

3737
WORKDIR /app
@@ -48,8 +48,8 @@ COPY app/templates ./templates
4848
# Create samples directory (optional - for golden samples)
4949
RUN mkdir -p ./samples
5050

51-
# Create jobs directory for async batch processing
52-
RUN mkdir -p /app/tmp/jobs
51+
# Create jobs directory for async batch processing and queue DB
52+
RUN mkdir -p /app/tmp/jobs /app/tmp/async
5353

5454
# Set environment
5555
ENV PATH=/root/.local/bin:$PATH \
@@ -65,3 +65,29 @@ EXPOSE 8000
6565

6666
# Run FastAPI with uvicorn (4 workers for concurrent request handling)
6767
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
68+
69+
# Stage 5: Worker image (single-process queue consumer)
70+
FROM base AS worker
71+
72+
WORKDIR /app
73+
74+
# Copy Python packages from builder
75+
COPY --from=builder /root/.local /root/.local
76+
77+
# Copy application code (worker only needs queue_manager, worker, label_validator, ocr_backends)
78+
COPY app/*.py ./
79+
80+
# Create shared volume directories
81+
RUN mkdir -p /app/tmp/jobs /app/tmp/async
82+
83+
# Set environment
84+
ENV PATH=/root/.local/bin:$PATH \
85+
PYTHONUNBUFFERED=1 \
86+
PYTHONDONTWRITEBYTECODE=1
87+
88+
# No health check port — worker has no HTTP server.
89+
# Docker will mark it healthy if the process is running.
90+
HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \
91+
CMD python -c "from queue_manager import QueueManager; QueueManager().queue_depth()" || exit 1
92+
93+
CMD ["python", "worker.py"]

app/api.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from auth import get_current_user
3131
from middleware import HostCheckMiddleware
3232
from job_manager import JobManager, JobStatus
33+
from queue_manager import QueueManager
3334

3435

3536
# ============================================================================
@@ -48,6 +49,12 @@
4849
# Initialize job manager for async batch processing
4950
job_manager = JobManager()
5051

52+
# Initialize queue manager for async single-image processing
53+
verify_queue = QueueManager(
54+
db_path=Path(settings.queue_db_path),
55+
max_attempts=settings.queue_max_attempts,
56+
)
57+
5158

5259
# ============================================================================
5360
# Pydantic Models
@@ -154,6 +161,24 @@ class ErrorResponse(BaseModel):
154161
correlation_id: str
155162

156163

164+
class AsyncVerifySubmitResponse(BaseModel):
165+
"""Response from async single-image verify submission."""
166+
job_id: str
167+
status: str # always 'pending' on submit
168+
message: str
169+
170+
171+
class AsyncVerifyStatusResponse(BaseModel):
172+
"""Response from async single-image verify status poll."""
173+
job_id: str
174+
status: str # pending | processing | completed | failed | cancelled
175+
attempts: int
176+
max_attempts: int
177+
result: Optional[VerifyResponse] = None
178+
error: Optional[str] = None
179+
queue_depth: Optional[int] = None # jobs ahead in queue (only when pending)
180+
181+
157182
# ============================================================================
158183
# FastAPI Application
159184
# ============================================================================
@@ -910,6 +935,131 @@ async def delete_batch_job(
910935
return {"message": f"Job {job_id} deleted successfully"}
911936

912937

938+
# ============================================================================
939+
# Async Single-Image Verify Endpoints (queue-based, CloudFront-safe)
940+
# ============================================================================
941+
942+
@app.post("/verify/async", response_model=AsyncVerifySubmitResponse)
943+
async def submit_async_verify(
944+
image: UploadFile = File(..., description="Label image file (max 10MB)"),
945+
ground_truth: Optional[str] = Form(None, description="Ground truth JSON string"),
946+
username: str = Depends(get_current_user)
947+
) -> AsyncVerifySubmitResponse:
948+
"""
949+
Submit a single label image for asynchronous verification via the worker queue.
950+
951+
Returns immediately with a ``job_id``. Poll ``GET /verify/status/{job_id}``
952+
every 2–3 seconds until ``status`` is ``completed`` or ``failed``.
953+
954+
This endpoint is designed to work within CloudFront's 60-second origin
955+
read timeout: the HTTP response is sent instantly, and Ollama processing
956+
(~10s) happens in the separate worker container.
957+
958+
**Request:**
959+
- ``image``: Label image (JPEG or PNG, max 10MB)
960+
- ``ground_truth``: Optional JSON with expected values
961+
962+
**Response:**
963+
- ``job_id``: Use to poll ``GET /verify/status/{job_id}``
964+
- ``status``: ``pending``
965+
966+
**Example:**
967+
```bash
968+
JOB=$(curl -s -X POST https://example.com/verify/async \\
969+
-F "image=@label.jpg" | jq -r .job_id)
970+
# Poll until done:
971+
curl https://example.com/verify/status/$JOB
972+
```
973+
"""
974+
correlation_id = get_correlation_id()
975+
logger.info(f"[{correlation_id}] POST /verify/async - {image.filename}")
976+
977+
# Validate image
978+
validate_image_file(image, correlation_id)
979+
980+
# Parse optional ground truth
981+
ground_truth_data = parse_ground_truth(ground_truth, correlation_id)
982+
983+
# Persist image to shared volume so the worker container can read it.
984+
# Each job gets its own subdirectory to avoid filename collisions.
985+
job_dir = Path(settings.queue_db_path).parent / "async" / str(uuid.uuid4())
986+
job_dir.mkdir(parents=True, exist_ok=True)
987+
988+
# Sanitise filename (keep extension only)
989+
suffix = Path(image.filename).suffix.lower() if image.filename else ".jpg"
990+
image_dest = job_dir / f"image{suffix}"
991+
await save_upload_file(image, image_dest)
992+
993+
job_id = verify_queue.enqueue(
994+
image_path=str(image_dest),
995+
ground_truth=ground_truth_data,
996+
)
997+
998+
logger.info(f"[{correlation_id}] Enqueued async verify job {job_id}")
999+
1000+
return AsyncVerifySubmitResponse(
1001+
job_id=job_id,
1002+
status="pending",
1003+
message=f"Job submitted. Poll GET /verify/status/{job_id} for results.",
1004+
)
1005+
1006+
1007+
@app.get("/verify/status/{job_id}", response_model=AsyncVerifyStatusResponse)
1008+
async def get_async_verify_status(
1009+
job_id: str,
1010+
username: str = Depends(get_current_user)
1011+
) -> AsyncVerifyStatusResponse:
1012+
"""
1013+
Poll the status of a queued single-image verify job.
1014+
1015+
Call this endpoint every 2–3 seconds after submitting via
1016+
``POST /verify/async``.
1017+
1018+
**Status values:**
1019+
- ``pending`` — job is waiting in the queue
1020+
- ``processing`` — worker is currently running Ollama inference
1021+
- ``completed`` — result is ready (see ``result`` field)
1022+
- ``failed`` — all retry attempts exhausted (see ``error`` field)
1023+
- ``cancelled`` — job was cancelled
1024+
1025+
**Example:**
1026+
```bash
1027+
curl https://example.com/verify/status/abc123
1028+
```
1029+
"""
1030+
correlation_id = get_correlation_id()
1031+
job = verify_queue.get(job_id)
1032+
1033+
if job is None:
1034+
raise HTTPException(
1035+
status_code=status.HTTP_404_NOT_FOUND,
1036+
detail=f"Verify job {job_id} not found",
1037+
)
1038+
1039+
result_obj: Optional[VerifyResponse] = None
1040+
if job["status"] == "completed" and job.get("result"):
1041+
try:
1042+
result_obj = VerifyResponse(**job["result"])
1043+
except Exception as exc:
1044+
logger.error(
1045+
f"[{correlation_id}] Failed to deserialise result for job {job_id}: {exc}"
1046+
)
1047+
1048+
queue_depth = None
1049+
if job["status"] == "pending":
1050+
queue_depth = verify_queue.queue_depth()
1051+
1052+
return AsyncVerifyStatusResponse(
1053+
job_id=job_id,
1054+
status=job["status"],
1055+
attempts=job["attempts"],
1056+
max_attempts=job["max_attempts"],
1057+
result=result_obj,
1058+
error=job.get("error"),
1059+
queue_depth=queue_depth,
1060+
)
1061+
1062+
9131063
# ============================================================================
9141064
# Exception Handlers
9151065
# ============================================================================

app/config.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,20 @@ class Settings(BaseSettings):
5353
default=3600,
5454
description="Interval between job cleanup runs in seconds (default: 1 hour)"
5555
)
56+
57+
# Async single-image queue configuration
58+
queue_db_path: str = Field(
59+
default="/app/tmp/queue.db",
60+
description="Path to the SQLite queue database (shared volume)"
61+
)
62+
queue_max_attempts: int = Field(
63+
default=3,
64+
description="Maximum processing attempts per queued verify job before permanent failure"
65+
)
66+
worker_ollama_timeout_seconds: int = Field(
67+
default=12,
68+
description="Per-attempt Ollama timeout used by the worker process (seconds)"
69+
)
5670

5771
# CORS Configuration
5872
cors_origins: str = Field(

0 commit comments

Comments
 (0)