feat(sync): dispatch v2 sync pipeline via Cloud Tasks#7801
Conversation
The Cloud Tasks sync-job handler runs the whole pipeline inside the request and needs a cap above the 120s default. Path overrides take precedence over method timeouts.
…S_RUN_TIMEOUT, default 1500s)
- per-job run lock (fail-closed, compare-and-delete release) so duplicate task deliveries can never run a job concurrently - mark_job_queued_for_retry: stale-detector-exempt reset before retries - processed-segment ledger so retries skip segments that already landed - try_mark_once guards so fair-use/usage metering counts once per job
enqueue_sync_job creates one named HTTP task per job (task id = job_id for enqueue-side dedup) with an OIDC token for the invoker SA. verify_cloud_tasks_oidc fails closed when SYNC_TASKS_* env is unset so services sharing the image never accept task traffic.
Blob name = local relative path (existing pipeline convention). download returns False on NotFound so the handler can consume tasks whose blobs were removed by the bucket's 1-day lifecycle rule.
The pipeline previously ran as a fire-and-forget asyncio task on the instance that received the upload: invisible to the Cloud Run autoscaler, killed on scale-in, no retries, no fleet-wide balancing. Fast path now stages raw .bin files in GCS and enqueues one Cloud Task per job (SYNC_DISPATCH_MODE=cloud_tasks); the task POSTs back to /v2/sync-jobs/run which runs the same pipeline inside the request. Inline path is kept for rollback, BYOK requests (header-scoped keys cannot follow a task), and enqueue failures. Handler semantics: per-job run lock (409 while held), terminal jobs acked without re-running, queued-reset before retryable 500s so the stale detector cannot kill jobs during backoff, final attempt marks failed and consumes. Staged blobs are deleted on every terminal outcome; the bucket's 1-day lifecycle covers hard crashes. Metering once-guards and a processed-segment ledger make retries idempotent.
Greptile SummaryThis PR replaces the fire-and-forget asyncio background task in
Confidence Score: 5/5Safe to merge. The default SYNC_DISPATCH_MODE=inline means deploying changes nothing until the flag is flipped; the new Cloud Tasks path is fully gated and falls back to inline on any enqueue failure. All three concerns from the previous review cycle are addressed. The only new observations are a partial-download cleanup gap that is practically unreachable (all blobs in a job expire simultaneously), and a lazy-singleton init pattern harmless under the GIL. Flag-based rollout and automatic inline fallback make the change low-risk. No files require special attention - routers/sync.py has the most new logic but retry/idempotency paths are well-tested by 24 new unit tests. Important Files Changed
Sequence DiagramsequenceDiagram
participant App
participant SyncEndpoint as POST /v2/sync-local-files
participant GCS as GCS syncing-local
participant CT as Cloud Tasks queue
participant Handler as POST /v2/sync-jobs/run
participant Redis
participant Pipeline as Pipeline decode-VAD-STT-LLM
App->>SyncEndpoint: multipart upload
SyncEndpoint->>GCS: stage raw bin blobs
SyncEndpoint->>CT: enqueue named task job_id
SyncEndpoint-->>App: 202 job_id
CT->>Handler: POST payload plus OIDC token
Handler->>Handler: verify_cloud_tasks_oidc
Handler->>Redis: try_acquire_job_run_lock
Redis-->>Handler: token or None
Handler->>Redis: get_sync_job
Redis-->>Handler: job status
alt terminal status
Handler->>GCS: delete staged blobs
Handler-->>CT: 200 acked
else normal run
Handler->>GCS: download blobs to local paths
Handler->>Pipeline: run pipeline task_mode True
Pipeline->>Redis: mark_job_processing
Pipeline->>Pipeline: decode VAD STT LLM
Pipeline->>Redis: add_processed_segment
Pipeline->>Redis: try_mark_once metering
alt success
Handler->>GCS: delete staged blobs
Handler-->>CT: 200 done
else retryable error
Handler->>Redis: mark_job_queued_for_retry
Handler-->>CT: 500 retry
else final attempt
Handler->>Redis: mark_job_failed
Handler->>GCS: delete staged blobs
Handler-->>CT: 200 failed_final
end
end
Handler->>Redis: release_job_run_lock
Reviews (2): Last reviewed commit: "fix(sync): log OIDC verification failure..." | Re-trigger Greptile |
| timeout = self.paths_timeout.get(request.url.path) or self.methods_timeout.get( | ||
| request.method, self.default_timeout | ||
| ) |
There was a problem hiding this comment.
Using
or to select the path timeout means any falsy value (specifically 0.0) silently falls back to the method timeout instead of applying the configured value. If HTTP_SYNC_JOBS_RUN_TIMEOUT=0 were ever set, the sync-jobs path would get the POST method timeout (120 s) rather than 0 s, with no error or warning. An explicit is not None check matches the intent.
| timeout = self.paths_timeout.get(request.url.path) or self.methods_timeout.get( | |
| request.method, self.default_timeout | |
| ) | |
| path_timeout = self.paths_timeout.get(request.url.path) | |
| timeout = path_timeout if path_timeout is not None else self.methods_timeout.get( | |
| request.method, self.default_timeout | |
| ) |
| try: | ||
| claims = id_token.verify_oauth2_token(auth_header[len('Bearer ') :], _get_auth_request(), audience=audience) | ||
| except Exception: | ||
| raise HTTPException(status_code=403, detail='Invalid OIDC token') |
There was a problem hiding this comment.
The bare
except Exception: catches both genuine token-validation failures and transient errors (e.g. a network timeout while fetching Google's JWKS endpoint), then raises the same 403 without logging the underlying cause. When cert-fetch latency spikes, operators would see a flood of "Invalid OIDC token" 403s with nothing in the logs to distinguish them from actual bad tokens, making the failure very hard to triage.
| try: | |
| claims = id_token.verify_oauth2_token(auth_header[len('Bearer ') :], _get_auth_request(), audience=audience) | |
| except Exception: | |
| raise HTTPException(status_code=403, detail='Invalid OIDC token') | |
| try: | |
| claims = id_token.verify_oauth2_token(auth_header[len('Bearer ') :], _get_auth_request(), audience=audience) | |
| except Exception as e: | |
| logger.warning('OIDC token verification failed: %s', e) | |
| raise HTTPException(status_code=403, detail='Invalid OIDC token') |
| except Exception as e: | ||
| logger.error(f'sync_v2: Cloud Tasks dispatch failed job={job_id}, falling back inline: {e}') | ||
| start_background_task(_delete_staged_blobs_async(owned_paths), name=f'sync_unstage:{job_id}') |
There was a problem hiding this comment.
The
except block deletes staged blobs unconditionally, but dispatched = True is set before the local-cleanup steps. If an exception were to propagate after that point (e.g. a future change makes _cleanup_files raise), this code would delete the blobs that the already-enqueued task is about to download, silently breaking the job — the task would run, find no blobs, and mark the job staged_audio_expired. Both _cleanup_files and shutil.rmtree(_, True) currently swallow errors, so this path is unreachable today, but the guard is cheap and keeps the invariant explicit.
| except Exception as e: | |
| logger.error(f'sync_v2: Cloud Tasks dispatch failed job={job_id}, falling back inline: {e}') | |
| start_background_task(_delete_staged_blobs_async(owned_paths), name=f'sync_unstage:{job_id}') | |
| except Exception as e: | |
| logger.error(f'sync_v2: Cloud Tasks dispatch failed job={job_id}, falling back inline: {e}') | |
| if not dispatched: | |
| start_background_task(_delete_staged_blobs_async(owned_paths), name=f'sync_unstage:{job_id}') |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
Skipping this one per project convention: we don't add guards for paths that are unreachable by design — _cleanup_files and shutil.rmtree(_, True) both swallow errors deliberately, so nothing after dispatched = True can raise. Please remember this preference for future reviews on this repo.
There was a problem hiding this comment.
Got it, makes sense. I've noted this for future reviews: View rule
Tip: You can customize Greptile's behavior for this repo with .greptile/rules.md and .greptile/config.json.
|
@greptile-apps re-review |
Problem
The
/v2/sync-local-filespipeline (decode → VAD → fair-use → STT → LLM) runs as a fire-and-forget asyncio task on whicheverbackend-syncinstance received the upload. Consequences:202. Instances grind up to 16 pipelines each while Cloud Run sees them as idle, so sync bursts overload the service (the min-10 instance floor exists to compensate).Change
With
SYNC_DISPATCH_MODE=cloud_tasks, the fast path stages the raw.binfiles in thesyncing-localGCS bucket (blob name = local path, the pipeline's existing convention), enqueues one named Cloud Task per job (task id = job_id, enqueue-side dedup), and returns the same202 {job_id}. The task POSTs back to a new OIDC-verifiedPOST /v2/sync-jobs/runon the same service, which runs the same pipeline code inside the request — autoscaler-visible, durable, retried with backoff, rate-limitable at the queue.Zero app changes. Endpoint contract, multipart format,
conversation_idparam,202+poll flow, and/v1/sync-local-filesare untouched.Inline path is preserved and used when: the flag is off (default), the request carries BYOK headers (keys are request-scoped and cannot follow a task), or staging/enqueue fails (automatic fallback — a Cloud Tasks outage degrades to today's behavior).
Idempotency / failure handling
409and re-check terminal status later.500s so the 600s stale detector can't terminally fail a job while the retry backoff elapses.X-CloudTasks-TaskRetryCount) marks the job failed, deletes blobs, returns200(consume).record_speech_ms/record_dg_usage_ms/record_usagefire at most once per job across retries (no fair-use double-counting)./v2/sync-jobs/rungetsHTTP_SYNC_JOBS_RUN_TIMEOUT(default 1500s) instead of the 120s default that would otherwise kill every attempt.backend/backend-integrationrun the same image but reject all task traffic.Tests
tests/unit/test_sync_cloud_tasks.py(24 tests): lock semantics incl. fail-closed Redis errors, queued-reset, ledger, once-guards, OIDC rejection matrix, structural contract. Registered intest.sh.scripts/scan_async_blockers.py: no findings in changed files.🚀 Deployment (ordered — the code ships inert)
The default is
SYNC_DISPATCH_MODE=inline, so merging + deploying changes nothing until step 4.1. Merge + deploy (regular merge, no squash):
2. One-time infra (Cloud Tasks API is currently NOT enabled on
based-hardware):3. Env vars on backend-sync only (deploy workflow preserves env vars; one-time update):
Invariants to keep in sync:
SYNC_TASKS_MAX_ATTEMPTSmust mirror the queue's--max-attempts;HTTP_SYNC_JOBS_RUN_TIMEOUT(1500) must stay below the 1800s run-lock TTL. Do NOT set theSYNC_TASKS_*vars onbackend/backend-integration— unset env keeps the handler inert there.4. Smoke test, then flip:
# smoke: upload a small WAL sync from the app (inline mode still active), confirm unchanged behavior gcloud run services update backend-sync --region us-central1 --update-env-vars SYNC_DISPATCH_MODE=cloud_tasksWatch: queue depth (
cloudtasks.googleapis.com/queue/depth), handler 5xx ratio on/v2/sync-jobs/run, job terminal-status mix, andfailed_final/staged_audio_expiredlog events. Suggested alert: queue depth > 200 for 15 min.5. Rollback = flip the flag back:
In-flight queued tasks still drain through the handler — harmless.
6. Later (separate change): once autoscaling-on-real-load is proven for
a week, lower backend-sync$1k/month at current always-on pricing).minScale10 → 2 (🤖 Generated with Claude Code