@@ -42,18 +42,22 @@ def _create_malloc_release() -> Callable[[], int] | None:
4242_malloc_release = _create_malloc_release ()
4343
4444
45+ BACKPRESSURE_TIMEOUT = 30 # seconds to wait before rejecting
46+
47+
4548class ConcurrencyLimiter :
46- """Memory-based concurrency limiter.
49+ """Memory-based concurrency limiter with backpressure .
4750
48- Tracks reserved memory across concurrent requests and rejects new requests
49- when the configured limit would be exceeded .
51+ Tracks reserved memory across concurrent requests. When the limit would be
52+ exceeded, waits for memory to free up instead of rejecting immediately .
5053 """
5154
5255 def __init__ (self , limit_mb : int = 128 ) -> None :
5356 self ._limit_mb = limit_mb
5457 self ._limit_bytes = limit_mb * 1024 * 1024
5558 self ._active_bytes = 0
5659 self ._lock = asyncio .Lock ()
60+ self ._condition = asyncio .Condition (self ._lock )
5761 MEMORY_LIMIT_BYTES .set (self ._limit_bytes )
5862
5963 @property
@@ -76,39 +80,54 @@ def set_memory_limit(self, limit_mb: int) -> None:
7680 MEMORY_LIMIT_BYTES .set (self ._limit_bytes )
7781
7882 async def try_acquire (self , bytes_needed : int ) -> int :
79- """Reserve memory. Returns bytes reserved. Raises S3Error.slow_down if exhausted ."""
83+ """Reserve memory, waiting up to BACKPRESSURE_TIMEOUT if at capacity ."""
8084 if self ._limit_bytes <= 0 :
8185 return 0
8286
8387 to_reserve = max (MIN_RESERVATION , min (bytes_needed , self ._limit_bytes ))
8488
85- async with self ._lock :
86- if self ._active_bytes + to_reserve > self ._limit_bytes :
87- active_mb = self ._active_bytes / 1024 / 1024
88- request_mb = to_reserve / 1024 / 1024
89- limit_mb = self ._limit_bytes / 1024 / 1024
90- logger .warning (
91- "MEMORY_REJECTED" ,
92- active_mb = round (active_mb , 2 ),
93- requested_mb = round (request_mb , 2 ),
94- limit_mb = round (limit_mb , 2 ),
95- )
96- MEMORY_REJECTIONS .inc ()
97- raise S3Error .slow_down (
98- f"Memory limit: { active_mb :.0f} MB + { request_mb :.0f} MB > { limit_mb :.0f} MB"
89+ async with self ._condition :
90+ deadline = asyncio .get_event_loop ().time () + BACKPRESSURE_TIMEOUT
91+ while self ._active_bytes + to_reserve > self ._limit_bytes :
92+ remaining = deadline - asyncio .get_event_loop ().time ()
93+ if remaining <= 0 :
94+ active_mb = self ._active_bytes / 1024 / 1024
95+ request_mb = to_reserve / 1024 / 1024
96+ limit_mb = self ._limit_bytes / 1024 / 1024
97+ logger .warning (
98+ "MEMORY_REJECTED" ,
99+ active_mb = round (active_mb , 2 ),
100+ requested_mb = round (request_mb , 2 ),
101+ limit_mb = round (limit_mb , 2 ),
102+ waited_sec = BACKPRESSURE_TIMEOUT ,
103+ )
104+ MEMORY_REJECTIONS .inc ()
105+ raise S3Error .slow_down (
106+ f"Memory limit: { active_mb :.0f} MB + { request_mb :.0f} MB > { limit_mb :.0f} MB"
107+ )
108+ logger .info (
109+ "MEMORY_BACKPRESSURE" ,
110+ active_mb = round (self ._active_bytes / 1024 / 1024 , 2 ),
111+ requested_mb = round (to_reserve / 1024 / 1024 , 2 ),
112+ limit_mb = round (self ._limit_bytes / 1024 / 1024 , 2 ),
113+ remaining_sec = round (remaining , 1 ),
99114 )
115+ with contextlib .suppress (TimeoutError ):
116+ await asyncio .wait_for (self ._condition .wait (), timeout = remaining )
117+
100118 self ._active_bytes += to_reserve
101119 MEMORY_RESERVED_BYTES .set (self ._active_bytes )
102120 return to_reserve
103121
104122 async def release (self , bytes_reserved : int ) -> None :
105- """Release reserved memory and trigger OS memory release ."""
123+ """Release reserved memory and wake waiting requests ."""
106124 if self ._limit_bytes <= 0 or bytes_reserved <= 0 :
107125 return
108126
109- async with self ._lock :
127+ async with self ._condition :
110128 self ._active_bytes = max (0 , self ._active_bytes - bytes_reserved )
111129 MEMORY_RESERVED_BYTES .set (self ._active_bytes )
130+ self ._condition .notify_all ()
112131
113132 # Run garbage collection and release memory to OS
114133 gc .collect (0 )
@@ -124,7 +143,7 @@ async def release(self, bytes_reserved: int) -> None:
124143
125144
126145# Default instance used by module-level functions
127- _default = ConcurrencyLimiter (limit_mb = int (os .environ .get ("S3PROXY_MEMORY_LIMIT_MB" , "128 " )))
146+ _default = ConcurrencyLimiter (limit_mb = int (os .environ .get ("S3PROXY_MEMORY_LIMIT_MB" , "64 " )))
128147
129148
130149def estimate_memory_footprint (method : str , content_length : int ) -> int :
0 commit comments