Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ pusher
agent-proxy (agent-proxy/main.py)
└── ws ──► user agent VM (private IP, port 8080)

backend-sync (main.py, Cloud Run)
└── ──────► Cloud Tasks queue `sync-jobs` ──► POST /v2/sync-jobs/run (OIDC, same service)

notifications-job (modal/job.py) [cron]
```

Expand All @@ -83,6 +86,7 @@ Helm charts: `backend/charts/{backend-listen,pusher,diarizer,vad,deepgram-self-h
- **diarizer** (`diarizer/main.py`) — GPU. Speaker embeddings at `/v2/embedding`. Called by backend and pusher (`HOSTED_SPEAKER_EMBEDDING_API_URL`).
- **vad** (`modal/main.py`) — GPU. `/v1/vad` and `/v1/speaker-identification`. Called by backend only.
- **deepgram** — STT. Streaming uses self-hosted (`DEEPGRAM_SELF_HOSTED_URL`) or cloud based on `DEEPGRAM_SELF_HOSTED_ENABLED`. Pre-recorded always uses Deepgram cloud. Called by backend and pusher.
- **backend-sync** (`main.py`, same image as backend) — Cloud Run service for `/v2/sync-local-files`. When `SYNC_DISPATCH_MODE=cloud_tasks`: stages raw audio in GCS, enqueues to Cloud Tasks queue `sync-jobs`, which POSTs `/v2/sync-jobs/run` (OIDC-verified, `utils/cloud_tasks.py`) to run decode→VAD→STT inside a request. Inline fallback when the flag is off, env is incomplete, BYOK headers are present, or enqueue fails.
- **notifications-job** (`modal/job.py`) — Cron job, reads Firestore/Redis, sends push notifications.

Keep this map up to date. When adding, removing, or changing inter-service calls, update this section. If a PR changes audio streaming, transcription, conversation lifecycle, speaker identification, or the listen/pusher WebSocket protocol — update `docs/doc/developer/backend/listen_pusher_pipeline.mdx` in the same PR.
Expand Down
98 changes: 98 additions & 0 deletions backend/database/sync_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@
JOB_TTL_SECONDS = 86400 # 24 hours — reconcile window (see module docstring)
STALE_THRESHOLD_SECONDS = 600 # 10 minutes — if processing exceeds this, treat as failed

TERMINAL_STATUSES = ('completed', 'partial_failure', 'failed')

RUN_LOCK_KEY_PREFIX = 'sync_job_lock:'
# Must stay above the handler's request timeout (HTTP_SYNC_JOBS_RUN_TIMEOUT,
# 1500s) so the lock can never expire while a run is still executing.
RUN_LOCK_TTL_SECONDS = 1800

PROCESSED_SEGMENTS_KEY_PREFIX = 'sync_job_segments:'
ONCE_KEY_PREFIX = 'sync_job_once:'


def create_sync_job(uid: str, total_files: int, total_segments: int, job_id: str | None = None) -> dict:
"""Create a new sync job and store in Redis. Returns the job dict."""
Expand Down Expand Up @@ -163,3 +173,91 @@ def mark_job_failed(job_id: str, error: str) -> Optional[dict]:
'error': error,
},
)


def mark_job_queued_for_retry(job_id: str, attempt: int, error: str) -> Optional[dict]:
"""Reset a job to 'queued' before a Cloud Tasks retry.

'queued' is exempt from the stale detector in get_sync_job(), so the app
polling during the retry backoff window cannot flip the job to a terminal
'failed' while a retry is still pending.
"""
return update_sync_job(
job_id,
{
'status': 'queued',
'attempt': attempt,
'last_error': error,
},
)


def try_acquire_job_run_lock(job_id: str) -> Optional[str]:
"""Acquire the per-job run lock. Returns a release token, or None if held.

Fails CLOSED: Redis errors propagate to the caller. An unobtainable lock
must block execution (the Cloud Tasks retry will come back later), never
allow two concurrent runs of the same job.
"""
token = str(uuid.uuid4())
acquired = r.set(f'{RUN_LOCK_KEY_PREFIX}{job_id}', token, nx=True, ex=RUN_LOCK_TTL_SECONDS)
return token if acquired else None


_RELEASE_LOCK_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""


def release_job_run_lock(job_id: str, token: str) -> None:
"""Release the run lock if we still own it (compare-and-delete).

Best-effort: on Redis failure the lock simply expires via its TTL and a
duplicate delivery in the meantime gets 409-retried.
"""
try:
r.eval(_RELEASE_LOCK_SCRIPT, 1, f'{RUN_LOCK_KEY_PREFIX}{job_id}', token)
except Exception as e:
logger.warning('release_job_run_lock failed for %s: %s', job_id, e)


def add_processed_segment(job_id: str, segment_path: str) -> None:
"""Record a segment as fully processed (conversation written) for this job.

Lets a Cloud Tasks retry skip segments that already landed. Best-effort:
on failure the retry falls back to the timestamp-based segment dedup.
"""
try:
key = f'{PROCESSED_SEGMENTS_KEY_PREFIX}{job_id}'
r.sadd(key, segment_path)
r.expire(key, JOB_TTL_SECONDS)
except Exception as e:
logger.warning('add_processed_segment failed for %s: %s', job_id, e)


def get_processed_segments(job_id: str) -> set:
"""Return segment paths already processed for this job."""
try:
members = r.smembers(f'{PROCESSED_SEGMENTS_KEY_PREFIX}{job_id}')
return {m.decode() if isinstance(m, bytes) else m for m in members}
except Exception as e:
logger.warning('get_processed_segments failed for %s: %s', job_id, e)
return set()


def try_mark_once(job_id: str, tag: str) -> bool:
"""SETNX guard so per-job side effects (fair-use metering, usage recording)
run at most once across Cloud Tasks retries.

Fails OPEN (returns True on Redis error) to match the metering functions'
own fail-open posture — better to occasionally double-count than to
silently never count.
"""
try:
return bool(r.set(f'{ONCE_KEY_PREFIX}{job_id}:{tag}', '1', nx=True, ex=JOB_TTL_SECONDS))
except Exception as e:
logger.warning('try_mark_once failed for %s:%s: %s', job_id, tag, e)
return True
9 changes: 8 additions & 1 deletion backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,14 @@
"DELETE": os.environ.get('HTTP_DELETE_TIMEOUT'),
}

app.add_middleware(TimeoutMiddleware, methods_timeout=methods_timeout)
# The Cloud Tasks sync-job handler runs the whole pipeline inside the request,
# so it needs a much higher cap than the default. Must stay below the job run
# lock TTL (1800s) so a lock can never expire under a live run.
paths_timeout = {
"/v2/sync-jobs/run": os.environ.get('HTTP_SYNC_JOBS_RUN_TIMEOUT', 1500),
}

app.add_middleware(TimeoutMiddleware, methods_timeout=methods_timeout, paths_timeout=paths_timeout)

from utils.byok import BYOKMiddleware

Expand Down
1 change: 1 addition & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ google-auth-httplib2==0.2.0
google-cloud-core==2.4.1
google-cloud-firestore==2.20.0
google-cloud-storage==2.18.0
google-cloud-tasks==2.16.4
google-crc32c==1.5.0
google-resumable-media==2.7.1
googleapis-common-protos==1.63.2
Expand Down
Loading
Loading