From 2156a557ae4553b84e8178348cc6881d5c3d1d79 Mon Sep 17 00:00:00 2001 From: KrKOo Date: Fri, 15 May 2026 20:15:17 +0200 Subject: [PATCH 1/5] add request queue limiter --- litellm/proxy/hooks/__init__.py | 3 + litellm/proxy/hooks/request_queue_limiter.py | 664 +++++++++++++++++++ 2 files changed, 667 insertions(+) create mode 100644 litellm/proxy/hooks/request_queue_limiter.py diff --git a/litellm/proxy/hooks/__init__.py b/litellm/proxy/hooks/__init__.py index 790ebcd8791..b10f4cecf88 100644 --- a/litellm/proxy/hooks/__init__.py +++ b/litellm/proxy/hooks/__init__.py @@ -10,6 +10,7 @@ from .parallel_request_limiter import _PROXY_MaxParallelRequestsHandler from .parallel_request_limiter_v3 import _PROXY_MaxParallelRequestsHandler_v3 from .responses_id_security import ResponsesIDSecurity +from .request_queue_limiter import _PROXY_RequestQueueLimiter ### CHECK IF ENTERPRISE HOOKS ARE AVAILABLE #### @@ -27,6 +28,7 @@ "litellm_skills": SkillsInjectionHook, "max_iterations_limiter": _PROXY_MaxIterationsHandler, "max_budget_per_session_limiter": _PROXY_MaxBudgetPerSessionHandler, + "request_queue_limiter": _PROXY_RequestQueueLimiter, } ## FEATURE FLAG HOOKS ## @@ -46,6 +48,7 @@ def get_proxy_hook( "managed_files", "parallel_request_limiter", "cache_control_check", + "request_queue_limiter", ], str, ], diff --git a/litellm/proxy/hooks/request_queue_limiter.py b/litellm/proxy/hooks/request_queue_limiter.py new file mode 100644 index 00000000000..c952d263aa9 --- /dev/null +++ b/litellm/proxy/hooks/request_queue_limiter.py @@ -0,0 +1,664 @@ +""" +Redis-based request queue limiter for LiteLLM proxy. + +This module provides a queue-based rate limiting mechanism that: +- Allows up to MAX_TOTAL_REQUESTS per API key - GLOBAL +- Processes maximum MAX_CONCURRENT_REQUESTS concurrently per API key - GLOBAL +- Queues remaining requests in Redis using FIFO ordering +- Releases queued requests when running requests complete +- Waits internally for queued requests to be processed + +The queue is GLOBAL per API key, not per-model. + +Key principle: The queue list stores ONLY pending request IDs. The running counter +(separate key) tracks executing requests. No markers are stored in the queue. +""" + +import asyncio +import os +import uuid +from typing import TYPE_CHECKING, Any, Optional, Union + +from fastapi import HTTPException + +from litellm._logging import verbose_proxy_logger +from litellm.integrations.custom_logger import CustomLogger + +if TYPE_CHECKING: + from opentelemetry.trace import Span as _Span + + from litellm.caching.caching import DualCache + from litellm.proxy._types import UserAPIKeyAuth + from litellm.proxy.utils import InternalUsageCache as _InternalUsageCache + + Span = Union[_Span, Any] + InternalUsageCache = _InternalUsageCache +else: + Span = Any + InternalUsageCache = Any + DualCache = Any + UserAPIKeyAuth = Any + + +# Default values for queue configuration +MAX_CONCURRENT_REQUESTS = 2 # Maximum requests processed concurrently per API key +QUEUE_POLL_INTERVAL = 2 # Seconds between queue checks +QUEUE_MAX_WAIT_TIME = 600 # Maximum time (seconds) to wait in queue before + +# Lua script for atomically trying to acquire a slot +# Returns: "RUNNING" (-1) if request can run immediately, queue position (int) if queued, "FULL" (-2) if queue is full +TRY_ACQUIRE_SLOT_LUA = """ +local running_key = KEYS[1] +local queue_key = KEYS[2] +local max_concurrent = tonumber(ARGV[1]) +local max_total = tonumber(ARGV[2]) +local request_id = ARGV[3] +local queue_key_ttl = tonumber(ARGV[4]) + +-- Get current running count +local running_count = tonumber(redis.call('GET', running_key)) or 0 + +-- Get current total count (running + queued) +local queue_length = redis.call('LLEN', queue_key) +local total_count = running_count + queue_length + +-- Check if we can run immediately +if running_count < max_concurrent then + -- Increment running count and set TTL. Queue stores only pending requests. + redis.call('INCR', running_key) + redis.call('EXPIRE', running_key, queue_key_ttl) + return -1 +end + +-- Check if we can queue +if total_count < max_total then + -- Add to queue and return queue position, set TTL on queue + redis.call('RPUSH', queue_key, request_id) + redis.call('EXPIRE', queue_key, queue_key_ttl) + return tostring(queue_length + 1) +end + +-- Queue is full +return -2 +""" + + +# This decrements the running count only. Queue promotion is handled by polling in _wait_for_slot(). +# ARGV[1] = queue_key_ttl (TTL in seconds) +RELEASE_SLOT_LUA = """ +local running_key = KEYS[1] +local queue_key = KEYS[2] +local queue_key_ttl = tonumber(ARGV[1]) + +-- Decrement running count and refresh TTL (don't go below 0) +local running_count = tonumber(redis.call('GET', running_key)) or 0 +if running_count > 0 then + redis.call('DECR', running_key) + redis.call('EXPIRE', running_key, queue_key_ttl) +end + +-- Queue promotion is handled by CHECK_AND_PROMOTE_LUA during polling in _wait_for_slot(). +-- We do not manipulate the queue here to avoid race conditions. +return 'OK' +""" + +# Lua script for checking if a request is at the front of the queue and can run +CHECK_AND_PROMOTE_LUA = """ +local running_key = KEYS[1] +local queue_key = KEYS[2] +local request_id = ARGV[1] +local max_concurrent = tonumber(ARGV[2]) +local queue_key_ttl = tonumber(ARGV[3]) + +-- Get current running count +local running_count = tonumber(redis.call('GET', running_key)) or 0 + +-- Check if we can run now +if running_count >= max_concurrent then + return 0 +end + +-- Check if this request is at the front of the queue +local first_item = redis.call('LINDEX', queue_key, 0) +if first_item == request_id then + -- Remove from queue and increment running count, refresh TTL on both keys + redis.call('LPOP', queue_key) + redis.call('INCR', running_key) + redis.call('EXPIRE', running_key, queue_key_ttl) + redis.call('EXPIRE', queue_key, queue_key_ttl) + return 1 +end + +return 0 +""" + +# Lua script for cleaning up a queued request (removes from queue without affecting running count) +CLEANUP_QUEUED_LUA = """ +local queue_key = KEYS[1] +local request_id = ARGV[1] +local queue_key_ttl = tonumber(ARGV[2]) + +-- Remove the request from queue and refresh TTL (queue only stores pending request IDs) +local removed = redis.call('LREM', queue_key, 1, request_id) +redis.call('EXPIRE', queue_key, queue_key_ttl) + +return tostring(removed) +""" + + +class _PROXY_RequestQueueLimiter(CustomLogger): + """ + Redis-based request queue limiter for LiteLLM proxy. + + This limiter manages request queuing using Redis, allowing: + - Up to MAX_CONCURRENT_REQUESTS to run concurrently per API key + - Up to MAX_TOTAL_REQUESTS total (running + queued) per API key + - FIFO queue for pending requests + - Internal waiting for queued requests + + The queue is GLOBAL per API key, not per-model. + """ + + def __init__(self, internal_usage_cache: InternalUsageCache): + """ + Initialize the request queue limiter. + + Args: + internal_usage_cache: The internal usage cache instance with Redis access + """ + super().__init__() + self.internal_usage_cache = internal_usage_cache + + # Register Lua scripts with Redis + self._try_acquire_script = None + self._release_slot_script = None + self._check_and_promote_script = None + self._cleanup_queued_script = None + + if self.internal_usage_cache.dual_cache.redis_cache is not None: + try: + self._try_acquire_script = ( + self.internal_usage_cache.dual_cache.redis_cache.async_register_script( + TRY_ACQUIRE_SLOT_LUA + ) + ) + self._release_slot_script = ( + self.internal_usage_cache.dual_cache.redis_cache.async_register_script( + RELEASE_SLOT_LUA + ) + ) + self._check_and_promote_script = ( + self.internal_usage_cache.dual_cache.redis_cache.async_register_script( + CHECK_AND_PROMOTE_LUA + ) + ) + self._cleanup_queued_script = ( + self.internal_usage_cache.dual_cache.redis_cache.async_register_script( + CLEANUP_QUEUED_LUA + ) + ) + verbose_proxy_logger.debug( + "RequestQueueLimiter: Lua scripts registered successfully" + ) + except Exception as e: + verbose_proxy_logger.warning( + f"RequestQueueLimiter: Failed to register Lua scripts: {str(e)}" + ) + + # Configuration constants + self.max_concurrent_requests = int( + os.getenv("LITELLM_QUEUE_MAX_CONCURRENT_REQUESTS", MAX_CONCURRENT_REQUESTS) + ) + + # Queue wait configuration + self.queue_poll_interval = int(os.getenv("LITELLM_QUEUE_POLL_INTERVAL", QUEUE_POLL_INTERVAL)) + self.max_queue_wait_time = int(os.getenv("LITELLM_QUEUE_MAX_WAIT_TIME", QUEUE_MAX_WAIT_TIME)) + + def _get_queue_keys( + self, api_key: str + ) -> tuple[str, str]: + """ + Get the Redis keys for the running counter, queue, and notify channel for an API key. + + Uses {} hash tag syntax for Redis cluster compatibility. + + Args: + api_key: The API key to get keys for + + Returns: + Tuple of (running_key, queue_key, notify_channel) + """ + # Use hash tag to ensure keys are in same slot for Redis cluster + running_key = f"{{{api_key}}}:queue:running" + queue_key = f"{{{api_key}}}:queue:pending" + return running_key, queue_key + + def _get_request_id(self, data: dict) -> str: + """Extract the request ID from the data dictionary if available.""" + return data["metadata"]["headers"].get("x-request-id") or str(uuid.uuid4()) + + def _set_counter_incremented_flag(self, data: dict) -> None: + """Set a flag in metadata to indicate the counter was incremented. + + This flag is passed through to async_log_failure_event via the standard_logging_object. + """ + if "metadata" not in data: + data["metadata"] = {} + if "user_api_key_auth_metadata" not in data["metadata"]: + data["metadata"]["user_api_key_auth_metadata"] = {} + data["metadata"]["user_api_key_auth_metadata"]["request_queue_counter_incremented"] = True + + async def _wait_for_slot( + self, + api_key: str, + request_id: str, + ) -> None: + """ + Wait for a slot to become available in the queue. + + This method polls Redis to check if the request has been promoted + from the queue to running state. + + Args: + api_key: The API key for this request + request_id: The unique request ID + + Raises: + HTTPException: 429 if the request times out waiting in queue + """ + running_key, queue_key = self._get_queue_keys(api_key) + + event_loop = asyncio.get_event_loop() + start_time = event_loop.time() + + while True: + # Check elapsed time + elapsed = event_loop.time() - start_time + if elapsed > self.max_queue_wait_time: + verbose_proxy_logger.warning( + f"RequestQueueLimiter: Request {request_id[:8]} timed out waiting in queue after {elapsed:.1f}s" + ) + # Clean up this request from the queue + await self._cleanup_queued_request(api_key, request_id) + raise HTTPException( + status_code=429, + detail=( + f"Request timed out waiting in queue. Waited {elapsed:.1f}s. " + f"Maximum wait time is {self.max_queue_wait_time}s." + ), + headers={ + "retry-after": "10", + "x-rate-limit-queue-status": "timeout", + }, + ) + + # Check if we can run now + if self._check_and_promote_script is not None: + try: + result = await self._check_and_promote_script( + keys=[running_key, queue_key], + args=[request_id, self.max_concurrent_requests, self.max_queue_wait_time], + ) + + if result == 1: + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Request {request_id[:8]} promoted to running after waiting" + ) + # Set flag to indicate counter was incremented when queued request was promoted + # Note: We can't directly set the flag here since we don't have access to `data` dict + # The flag will be set in async_pre_call_hook after _wait_for_slot returns successfully + return # Slot acquired, exit wait loop + + # Still waiting, update position info + if result == 0: + # Get current queue position + if self.internal_usage_cache.dual_cache.redis_cache is not None: + queue_items = self.internal_usage_cache.dual_cache.redis_cache.redis_client.lrange(queue_key, 0, -1) + for idx, item in enumerate(queue_items): + if item == request_id: + current_position = idx + 1 + break + except Exception as e: + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Check/promote script failed: {str(e)}" + ) + + await asyncio.sleep(self.queue_poll_interval) + + async def _cleanup_queued_request( + self, + api_key: str, + request_id: str, + ) -> None: + """ + Clean up a queued request that didn't complete (e.g., client disconnected, timeout). + + This method removes a request from the queue. The queue only stores pending + request IDs, so no running count adjustment is needed. + + Args: + api_key: The API key associated with the request + request_id: The unique request ID to clean up + """ + try: + _, queue_key = self._get_queue_keys(api_key) + + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Cleaning up queued request {request_id[:8]} for API key {api_key[:8]}" + ) + + if self._cleanup_queued_script is not None: + try: + await self._cleanup_queued_script( + keys=[queue_key], + args=[request_id, self.max_queue_wait_time], + ) + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Cleanup completed for request {request_id[:8]}" + ) + except Exception as e: + verbose_proxy_logger.warning( + f"RequestQueueLimiter: Failed to cleanup queued request: {str(e)}" + ) + + except Exception as e: + verbose_proxy_logger.exception( + f"RequestQueueLimiter: Error in cleanup: {str(e)}" + ) + + def do_rate_limit_check(self, user_api_key_dict: UserAPIKeyAuth) -> bool: + """ + Synchronous rate limit check to determine if the request should be processed. + + Args: + user_api_key_dict: The user API key authentication dictionary + + Returns: + True if the request should be processed, False if it should be rejected immediately + """ + max_parallel_requests = user_api_key_dict.max_parallel_requests + if max_parallel_requests is None: + return True # No limit for this user, allow request to proceed + + if max_parallel_requests <= self.max_concurrent_requests: + return True + + return True + + async def async_pre_call_hook( + self, + user_api_key_dict: UserAPIKeyAuth, + cache: DualCache, + data: dict, + call_type: str, + ) -> Optional[Union[Exception, str, dict]]: + """ + Pre-call hook to try to acquire a slot in the queue. + + This hook is called before the LLM API call is made. It attempts to: + 1. Acquire a running slot if available (running < MAX_CONCURRENT_REQUESTS) + 2. Queue the request if running slots are full but total < MAX_TOTAL_REQUESTS + 3. Wait for a slot to become available (internal waiting) + 4. Raise HTTPException 429 if the queue is full or wait times out + + Args: + user_api_key_dict: User API key authentication dictionary + cache: Dual cache instance + data: Request data dictionary + call_type: Type of call being made + + Returns: + - None: Allow request to proceed + - Raises HTTPException 429: If queue is full or timeout + + Raises: + HTTPException: 429 if the request queue is full or times out + """ + if not self.do_rate_limit_check(user_api_key_dict): + return None + + max_parallel_requests = user_api_key_dict.max_parallel_requests + try: + # Get the API key from the user_api_key_dict + api_key = getattr(user_api_key_dict, "api_key", None) + if not api_key: + verbose_proxy_logger.debug( + "RequestQueueLimiter: No API key found, skipping queue check" + ) + return None + + request_id = self._get_request_id(data) + + # Get Redis keys for this API key + running_key, queue_key = self._get_queue_keys(api_key) + + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Trying to acquire slot for API key {api_key[:8]}..." + ) + + # Try to acquire a slot using Lua script + if self._try_acquire_script is None: + verbose_proxy_logger.debug( + "RequestQueueLimiter: Lua scripts not registered, allowing request" + ) + return None + + + try: + result = await self._try_acquire_script( + keys=[running_key, queue_key], + args=[ + self.max_concurrent_requests, + max_parallel_requests, + request_id, + self.max_queue_wait_time, + ], + ) + + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Acquire result for {api_key[:8]}: {result}" + ) + + if result == -2: + # Queue is full, reject the request + verbose_proxy_logger.warning( + f"RequestQueueLimiter: Queue full for API key {api_key[:8]}, rejecting request" + ) + raise HTTPException( + status_code=429, + detail=( + f"Request queue is full. Maximum {max_parallel_requests} " + f"requests allowed (running + queued). Please try again later." + ), + headers={ + "retry-after": "30", + "x-rate-limit-queue-status": "full", + "x-rate-limit-max-concurrent": str(self.max_concurrent_requests), + "x-rate-limit-max-total": str(max_parallel_requests), + }, + ) + + elif result == -1: + # Request can run immediately - counter was incremented by Lua script + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Request {request_id[:8]} allowed to run immediately for API key {api_key[:8]}" + ) + # Set flag to indicate counter was incremented + self._set_counter_incremented_flag(data) + return None + else: + # Request is queued, result is the queue position + queue_position = int(result) + verbose_proxy_logger.info( + f"RequestQueueLimiter: Request {request_id[:8]} queued at position {queue_position} for API key {api_key[:8]}. Waiting for slot..." + ) + + # Wait for a slot to become available (internal waiting) + await self._wait_for_slot(api_key, request_id) + + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Request {request_id[:8]} acquired slot after waiting" + ) + # Counter was incremented when queued request was promoted via CHECK_AND_PROMOTE_LUA + self._set_counter_incremented_flag(data) + return None + + except HTTPException: + # Re-raise HTTP exceptions + raise + except Exception as e: + verbose_proxy_logger.warning( + f"RequestQueueLimiter: Lua script execution failed: {str(e)}, falling back to allowing request" + ) + # If Lua script fails, allow the request to proceed + return None + except HTTPException: + # Re-raise HTTP exceptions + raise + except Exception as e: + verbose_proxy_logger.exception( + f"RequestQueueLimiter: Error in pre_call_hook: {str(e)}" + ) + # On error, allow the request to proceed + return None + + async def async_post_call_success_hook( + self, + data: dict, + user_api_key_dict: UserAPIKeyAuth, + response: Any, + ) -> None: + """ + Post-call success hook to release a slot when a request completes successfully. + + This hook decrements the running count only. Queue promotion is handled by + the CHECK_AND_PROMOTE_LUA script during polling in _wait_for_slot(). + + Args: + data: Request data dictionary + user_api_key_dict: User API key authentication dictionary + response: The response object from the LLM API + """ + + counter_incremented = data.get("metadata", {}).get("user_api_key_auth_metadata", {}).get("request_queue_counter_incremented", False) + if not counter_incremented: + verbose_proxy_logger.debug( + "RequestQueueLimiter: Counter was not incremented for this request, skipping slot release - success" + ) + return + + try: + # Get the API key from the user_api_key_dict + api_key = getattr(user_api_key_dict, "api_key", None) + if not api_key: + verbose_proxy_logger.debug( + "RequestQueueLimiter: No API key found in success hook, skipping slot release" + ) + return + + # Get Redis keys for this API key + running_key, queue_key = self._get_queue_keys(api_key) + + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Releasing slot for API key {api_key[:8]} after successful request" + ) + + # Release the slot using Lua script + if self._release_slot_script is not None: + try: + result = await self._release_slot_script( + keys=[running_key, queue_key], + args=[self.max_queue_wait_time], + ) + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Slot released for API key {api_key[:8]}, next queued: {result}" + ) + except Exception as e: + verbose_proxy_logger.warning( + f"RequestQueueLimiter: Failed to release slot: {str(e)}" + ) + else: + verbose_proxy_logger.debug( + "RequestQueueLimiter: Release script not registered" + ) + + except Exception as e: + verbose_proxy_logger.exception( + f"RequestQueueLimiter: Error in post_call_success_hook: {str(e)}" + ) + + async def async_log_failure_event( + self, + kwargs: dict, + response_obj: Any, + start_time: Any, + end_time: Any, + ) -> None: + """ + Log failure event to release a slot when a request fails. + + This hook decrements the running count only. Queue promotion is handled by + the CHECK_AND_PROMOTE_LUA script during polling in _wait_for_slot(). + + Args: + kwargs: Request kwargs from the logging system + response_obj: The response object (may be None for failures) + start_time: Request start time + end_time: Request end time + """ + + try: + # Get metadata from standard_logging_object + standard_logging_object = kwargs.get("standard_logging_object") or {} + standard_logging_metadata = standard_logging_object.get("metadata") or {} + + # Get the API key from metadata + api_key = standard_logging_metadata.get("user_api_key_hash") + + if not api_key: + verbose_proxy_logger.debug( + "RequestQueueLimiter: No API key found in failure event, skipping slot release - failure" + ) + return + + # Check if the counter was incremented for this request + # The flag is stored in user_api_key_auth_metadata which is passed from pre_call_hook + user_api_key_auth_metadata = standard_logging_metadata.get("user_api_key_auth_metadata") or {} + counter_incremented = user_api_key_auth_metadata.get("request_queue_counter_incremented", False) + + if not counter_incremented: + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Counter was not incremented for this request, skipping decrement" + ) + return + + # Get Redis keys for this API key + running_key, queue_key = self._get_queue_keys(api_key) + + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Releasing slot for API key {api_key[:8]} after failed request" + ) + + # Release the slot using Lua script + if self._release_slot_script is not None: + try: + result = await self._release_slot_script( + keys=[running_key, queue_key], + args=[self.max_queue_wait_time], + ) + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Slot released for API key {api_key[:8]} after failure, next queued: {result}" + ) + except Exception as e: + verbose_proxy_logger.warning( + f"RequestQueueLimiter: Failed to release slot after failure: {str(e)}" + ) + else: + verbose_proxy_logger.debug( + "RequestQueueLimiter: Release script not registered" + ) + + except Exception as e: + verbose_proxy_logger.exception( + f"RequestQueueLimiter: Error in async_log_failure_event: {str(e)}" + ) From 2392421a10364ca86d6dfb416ba7ddfa62b4bf57 Mon Sep 17 00:00:00 2001 From: KrKOo Date: Fri, 15 May 2026 21:26:06 +0200 Subject: [PATCH 2/5] disable parallel_request_limiter --- litellm/proxy/hooks/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/litellm/proxy/hooks/__init__.py b/litellm/proxy/hooks/__init__.py index b10f4cecf88..9c1d69dafba 100644 --- a/litellm/proxy/hooks/__init__.py +++ b/litellm/proxy/hooks/__init__.py @@ -22,7 +22,7 @@ # List of all available hooks that can be enabled PROXY_HOOKS = { "max_budget_limiter": _PROXY_MaxBudgetLimiter, - "parallel_request_limiter": _PROXY_MaxParallelRequestsHandler_v3, + #"parallel_request_limiter": _PROXY_MaxParallelRequestsHandler_v3, "cache_control_check": _PROXY_CacheControlCheck, "responses_id_security": ResponsesIDSecurity, "litellm_skills": SkillsInjectionHook, @@ -46,7 +46,7 @@ def get_proxy_hook( Literal[ "max_budget_limiter", "managed_files", - "parallel_request_limiter", + #"parallel_request_limiter", "cache_control_check", "request_queue_limiter", ], From dc0d43726eb5366162a0ccd8daf1efd95c2c2c4a Mon Sep 17 00:00:00 2001 From: KrKOo Date: Fri, 15 May 2026 21:46:48 +0200 Subject: [PATCH 3/5] request_queue_limiter always checks max_parallel_prequests --- litellm/proxy/hooks/request_queue_limiter.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/litellm/proxy/hooks/request_queue_limiter.py b/litellm/proxy/hooks/request_queue_limiter.py index c952d263aa9..0e1efb1bb4a 100644 --- a/litellm/proxy/hooks/request_queue_limiter.py +++ b/litellm/proxy/hooks/request_queue_limiter.py @@ -378,10 +378,7 @@ def do_rate_limit_check(self, user_api_key_dict: UserAPIKeyAuth) -> bool: """ max_parallel_requests = user_api_key_dict.max_parallel_requests if max_parallel_requests is None: - return True # No limit for this user, allow request to proceed - - if max_parallel_requests <= self.max_concurrent_requests: - return True + return False # No limit for this user, allow request to proceed return True From eaa3ca79a239634db954ffaceada9741f8b0c5d2 Mon Sep 17 00:00:00 2001 From: KrKOo Date: Wed, 20 May 2026 16:14:02 +0200 Subject: [PATCH 4/5] always generate new request id for queue tracking --- litellm/proxy/hooks/request_queue_limiter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/litellm/proxy/hooks/request_queue_limiter.py b/litellm/proxy/hooks/request_queue_limiter.py index 0e1efb1bb4a..baca2d42e2b 100644 --- a/litellm/proxy/hooks/request_queue_limiter.py +++ b/litellm/proxy/hooks/request_queue_limiter.py @@ -235,7 +235,7 @@ def _get_queue_keys( def _get_request_id(self, data: dict) -> str: """Extract the request ID from the data dictionary if available.""" - return data["metadata"]["headers"].get("x-request-id") or str(uuid.uuid4()) + return str(uuid.uuid4()) def _set_counter_incremented_flag(self, data: dict) -> None: """Set a flag in metadata to indicate the counter was incremented. From cb22bc505b7152411a4ed4aa81d94732412e6642 Mon Sep 17 00:00:00 2001 From: KrKOo Date: Thu, 21 May 2026 15:13:00 +0200 Subject: [PATCH 5/5] fix: async_post_call_success_hook -> async_log_success_event --- litellm/proxy/hooks/request_queue_limiter.py | 52 +++++++++++--------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/litellm/proxy/hooks/request_queue_limiter.py b/litellm/proxy/hooks/request_queue_limiter.py index baca2d42e2b..741e4d369c9 100644 --- a/litellm/proxy/hooks/request_queue_limiter.py +++ b/litellm/proxy/hooks/request_queue_limiter.py @@ -519,11 +519,12 @@ async def async_pre_call_hook( # On error, allow the request to proceed return None - async def async_post_call_success_hook( + async def async_log_success_event( self, - data: dict, - user_api_key_dict: UserAPIKeyAuth, - response: Any, + kwargs: dict, + response_obj: Any, + start_time: Any, + end_time: Any, ) -> None: """ Post-call success hook to release a slot when a request completes successfully. @@ -532,24 +533,34 @@ async def async_post_call_success_hook( the CHECK_AND_PROMOTE_LUA script during polling in _wait_for_slot(). Args: - data: Request data dictionary - user_api_key_dict: User API key authentication dictionary - response: The response object from the LLM API + kwargs: Request kwargs from the logging system + response_obj: The response object from the LLM API + start_time: Request start time + end_time: Request end time """ - - counter_incremented = data.get("metadata", {}).get("user_api_key_auth_metadata", {}).get("request_queue_counter_incremented", False) - if not counter_incremented: - verbose_proxy_logger.debug( - "RequestQueueLimiter: Counter was not incremented for this request, skipping slot release - success" - ) - return + verbose_proxy_logger.debug( + "RequestQueueLimiter: In async_log_success_event" + ) try: - # Get the API key from the user_api_key_dict - api_key = getattr(user_api_key_dict, "api_key", None) + standard_logging_object = kwargs.get("standard_logging_object") or {} + standard_logging_metadata = standard_logging_object.get("metadata") or {} + api_key = standard_logging_metadata.get("user_api_key_hash") + if not api_key: verbose_proxy_logger.debug( - "RequestQueueLimiter: No API key found in success hook, skipping slot release" + "RequestQueueLimiter: No API key found in success event, skipping slot release" + ) + return + + # Check if the counter was incremented for this request + # The flag is stored in user_api_key_auth_metadata which is passed from pre_call_hook + user_api_key_auth_metadata = standard_logging_metadata.get("user_api_key_auth_metadata") or {} + counter_incremented = user_api_key_auth_metadata.get("request_queue_counter_incremented", False) + + if not counter_incremented: + verbose_proxy_logger.debug( + f"RequestQueueLimiter: Counter was not incremented for this request, skipping decrement" ) return @@ -581,7 +592,7 @@ async def async_post_call_success_hook( except Exception as e: verbose_proxy_logger.exception( - f"RequestQueueLimiter: Error in post_call_success_hook: {str(e)}" + f"RequestQueueLimiter: Error in async_log_success_event: {str(e)}" ) async def async_log_failure_event( @@ -605,16 +616,13 @@ async def async_log_failure_event( """ try: - # Get metadata from standard_logging_object standard_logging_object = kwargs.get("standard_logging_object") or {} standard_logging_metadata = standard_logging_object.get("metadata") or {} - - # Get the API key from metadata api_key = standard_logging_metadata.get("user_api_key_hash") if not api_key: verbose_proxy_logger.debug( - "RequestQueueLimiter: No API key found in failure event, skipping slot release - failure" + "RequestQueueLimiter: No API key found in failure event, skipping slot release" ) return