feat: Add Redis-based workspace stream quota for WebRTC sessions#2025
feat: Add Redis-based workspace stream quota for WebRTC sessions#2025rafel-roboflow wants to merge 40 commits intomainfrom
Conversation
- Limit concurrent WebRTC streams per workspace (default: 10) - Return HTTP 429 when quota exceeded - Add heartbeat endpoint for Modal workers to refresh session TTL
…streams-and-update
|
@rafel-roboflow - sorry, will not be added to todays release |
…-set-rate-limit-to-10-concurrent-streams-and-update
…-update' of github.com:roboflow/inference into feat/dg-232-set-rate-limit-to-10-concurrent-streams-and-update
⚡️ Codeflash found optimizations for this PR📄 153% (1.53x) speedup for
|
…etr' into feat/dg-232-set-rate-limit-to-10-concurrent-streams-and-update
…streams-and-update
…-set-rate-limit-to-10-concurrent-streams-and-update
…-update' of github.com:roboflow/inference into feat/dg-232-set-rate-limit-to-10-concurrent-streams-and-update
…streams-and-update
| session_id, | ||
| workspace_id, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Missing session cleanup on Modal spawn failure leaks quota
Medium Severity
register_webrtc_session is called before spawn_rtc_peer_connection_modal, but there's no try/finally to call deregister_webrtc_session if the spawn fails. spawn_rtc_peer_connection_modal has many failure points before a watchdog is ever created (plan validation, Modal client auth, app lookup, workflow spec retrieval, etc.). Each failure leaves a phantom session in Redis that occupies a quota slot until TTL expiry (default 60s). With a low quota (e.g. 3 during testing), a few rapid retries of a failing request can lock the workspace out entirely.
…-set-rate-limit-to-10-concurrent-streams-and-update
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Unconditional blocking HTTP call in async function
start_workernow resolves workspace/session only when quota or heartbeat is enabled and usesawait get_roboflow_workspace_async(...)to avoid blocking the event loop.
Or push these changes by commenting:
@cursor push d569c37645
Preview (d569c37645)
diff --git a/inference/core/interfaces/webrtc_worker/__init__.py b/inference/core/interfaces/webrtc_worker/__init__.py
--- a/inference/core/interfaces/webrtc_worker/__init__.py
+++ b/inference/core/interfaces/webrtc_worker/__init__.py
@@ -6,6 +6,7 @@
WEBRTC_MODAL_TOKEN_ID,
WEBRTC_MODAL_TOKEN_SECRET,
WEBRTC_MODAL_USAGE_QUOTA_ENABLED,
+ WEBRTC_SESSION_HEARTBEAT_URL,
WEBRTC_WORKSPACE_STREAM_QUOTA,
WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED,
WEBRTC_WORKSPACE_STREAM_TTL_SECONDS,
@@ -19,7 +20,7 @@
WebRTCWorkerResult,
)
from inference.core.logger import logger
-from inference.core.roboflow_api import get_roboflow_workspace
+from inference.core.roboflow_api import get_roboflow_workspace_async
async def start_worker(
@@ -56,10 +57,14 @@
raise CreditsExceededError("API key over quota")
workspace_id = None
- session_id = str(uuid.uuid4())
- workspace_id = get_roboflow_workspace(api_key=webrtc_request.api_key)
- webrtc_request.workspace_id = workspace_id
- webrtc_request.session_id = session_id
+ session_id = None
+ if WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED or WEBRTC_SESSION_HEARTBEAT_URL:
+ session_id = str(uuid.uuid4())
+ workspace_id = await get_roboflow_workspace_async(
+ api_key=webrtc_request.api_key
+ )
+ webrtc_request.workspace_id = workspace_id
+ webrtc_request.session_id = session_id
if WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED:
if workspace_id and is_over_workspace_session_quota(
@@ -77,7 +82,7 @@
f"concurrent streams."
)
- if workspace_id:
+ if workspace_id and session_id:
register_webrtc_session(
workspace_id=workspace_id,
session_id=session_id,| session_id = str(uuid.uuid4()) | ||
| workspace_id = get_roboflow_workspace(api_key=webrtc_request.api_key) | ||
| webrtc_request.workspace_id = workspace_id | ||
| webrtc_request.session_id = session_id |
There was a problem hiding this comment.
Unconditional blocking HTTP call in async function
Medium Severity
get_roboflow_workspace is a synchronous function that makes an HTTP request to the Roboflow API, but it's called directly inside async def start_worker, blocking the event loop. Additionally, this call runs unconditionally for every WebRTC session, even when both WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED (default: False) and WEBRTC_SESSION_HEARTBEAT_URL (default: None) are disabled — meaning the resulting workspace_id is never actually used. The async counterpart get_roboflow_workspace_async exists and is already used in the heartbeat endpoints.
…streams-and-update
…-set-rate-limit-to-10-concurrent-streams-and-update
…endpoints The webrtc_session_heartbeat and webrtc_session_end endpoints were missing the error handler decorator that other endpoints use. This ensures unhandled exceptions are properly logged and return appropriate error responses instead of generic 500 errors.
…dpoints - Add missing @with_route_exceptions_async decorator to webrtc_session_heartbeat and webrtc_session_end endpoints for consistent exception handling - Create WebRTCSessionHeartbeatRequest Pydantic model for request body validation
…-update' of github.com:roboflow/inference into feat/dg-232-set-rate-limit-to-10-concurrent-streams-and-update
| count = get_concurrent_session_count(workspace_id, ttl_seconds) | ||
| logger.info( | ||
| "Workspace %s has %d concurrent sessions (quota: %d)", | ||
| workspace_id, |
Check failure
Code scanning / CodeQL
Clear-text logging of sensitive information High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 3 days ago
In general, the fix is to ensure that data derived from the API key (or otherwise considered sensitive) is not written directly to logs. Here that means changing log messages which include workspace_id so they no longer output the raw identifier, while preserving the usefulness of the logs.
Concretely, we will adjust logging in inference/core/interfaces/webrtc_worker/utils.py and inference/core/interfaces/webrtc_worker/__init__.py:
- In
is_over_workspace_session_quota(utils.py), change thelogger.infocall to remove theworkspace_idfrom the message and arguments. We will keep the count and quota so operators can still see quota usage, but not which specific workspace hit that count. - In
start_worker(init.py), change the warning when quota is exceeded and the final info log about starting a session so they no longer includeworkspace_id. They will still report the quota value and the session id, which is sufficient for debugging without exposing workspace identifiers.
No new methods or imports are needed; we only modify existing log message strings and their parameters. Functionality (quotas, control flow, return values) remains unchanged.
| @@ -410,8 +410,7 @@ | ||
| """ | ||
| count = get_concurrent_session_count(workspace_id, ttl_seconds) | ||
| logger.info( | ||
| "Workspace %s has %d concurrent sessions (quota: %d)", | ||
| workspace_id, | ||
| "Workspace has %d concurrent sessions (quota: %d)", | ||
| count, | ||
| quota, | ||
| ) |
| @@ -68,8 +68,7 @@ | ||
| ttl_seconds=WEBRTC_WORKSPACE_STREAM_TTL_SECONDS, | ||
| ): | ||
| logger.warning( | ||
| "Workspace %s has exceeded the stream quota of %d", | ||
| workspace_id, | ||
| "A workspace has exceeded the stream quota of %d", | ||
| WEBRTC_WORKSPACE_STREAM_QUOTA, | ||
| ) | ||
| raise WorkspaceStreamQuotaError( | ||
| @@ -92,9 +91,8 @@ | ||
| ) | ||
|
|
||
| logger.info( | ||
| "Started WebRTC session %s for workspace %s", | ||
| "Started WebRTC session %s", | ||
| session_id, | ||
| workspace_id, | ||
| ) | ||
|
|
||
| loop = asyncio.get_event_loop() |
| session_refreshed = refresh_webrtc_session( | ||
| workspace_id=workspace_id, | ||
| session_id=request.session_id, | ||
| ) |
There was a problem hiding this comment.
Sync blocking Redis calls in async endpoints
Low Severity
refresh_webrtc_session and deregister_webrtc_session are synchronous functions that make multiple blocking Redis calls (zscore, zadd, expire, zrem), but they're called from async def FastAPI endpoints. This blocks the event loop for the duration of each Redis round-trip. Since the heartbeat endpoint is called every 30 seconds per active session, this could accumulate under load.
Additional Locations (1)
…streams-and-update
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
The heartbeat and session-end endpoints were returning plain dicts with
{"status": "error"} on auth failure, which FastAPI serialized as HTTP 200.
This caused the watchdog client to log these as successful heartbeats,
masking authentication failures from operators.
Now raises HTTPException with 401 for unauthorized and 404 for session
not found.
…-set-rate-limit-to-10-concurrent-streams-and-update
…-update' of github.com:roboflow/inference into feat/dg-232-set-rate-limit-to-10-concurrent-streams-and-update



What does this PR do?
Related Issue(s): DG-232
Type of Change
Testing
Test details:
I put max connections=3;
Checklist
Additional Context
Note
Medium Risk
Adds Redis-backed quota enforcement to WebRTC session startup and a new heartbeat/end API, which can block new sessions (HTTP 429) and depends on Redis/heartbeat timing for correct slot cleanup.
Overview
Implements a Redis-backed per-workspace concurrent WebRTC stream quota (default 10) enforced when starting Modal-based WebRTC workers; excess sessions raise
WorkspaceStreamQuotaErrorand now return HTTP 429.Adds session tracking + TTL refresh via Redis sorted sets (
register_webrtc_session,refresh_webrtc_session,deregister_webrtc_session) and introduces new HTTP endpointsPOST /webrtc/session/heartbeatandPOST /webrtc/session/heartbeat/endfor Modal workers to keep sessions alive and free quota slots on shutdown.Extends the Modal worker
Watchdogto periodically POST heartbeats (and send an end signal on stop) and passesworkspace_id/session_idthroughWebRTCWorkerRequest; also guardsheartbeat_callbackcalls to handle it beingNone.Written by Cursor Bugbot for commit bfd3f93. This will update automatically on new commits. Configure here.