Skip to content

Commit 97e811c

Browse files
fix: accurate memory accounting for encrypted GETs and streaming PUTs
- Fix concurrency limiter to reject requests exceeding budget instead of silently capping reservations - Fix PUT memory estimate: streaming PUTs hold buffer + ciphertext (16MB not 8MB) - Add dynamic memory acquisition in GET handler for encrypted decrypts (ciphertext + plaintext buffered simultaneously) - Add container-based OOM proof test (256MB container, 5GB+ data) - Split CI into parallel unit/integration jobs, separate OOM workflow - Add make test-integration target
1 parent d0291cd commit 97e811c

File tree

9 files changed

+225
-118
lines changed

9 files changed

+225
-118
lines changed

.github/workflows/oom-test.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
name: OOM Proof Test
2+
3+
on:
4+
push:
5+
branches: [main]
6+
paths:
7+
- 's3proxy/**'
8+
- 'tests/**'
9+
workflow_dispatch:
10+
11+
jobs:
12+
oom-test:
13+
runs-on: ubuntu-latest
14+
timeout-minutes: 20
15+
steps:
16+
- uses: actions/checkout@v6
17+
18+
- name: Set up Python
19+
uses: actions/setup-python@v6
20+
with:
21+
python-version: '3.14'
22+
cache: 'pip'
23+
24+
- name: Install uv
25+
run: pip install uv
26+
27+
- name: Install dependencies
28+
run: uv sync --extra dev
29+
30+
- name: OOM proof test (256MB container)
31+
run: make test-oom

.github/workflows/test.yml

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ on:
1111
workflow_dispatch:
1212

1313
jobs:
14-
tests:
14+
unit:
1515
runs-on: ubuntu-latest
16-
timeout-minutes: 30
16+
timeout-minutes: 10
1717
steps:
1818
- uses: actions/checkout@v6
1919

@@ -29,8 +29,26 @@ jobs:
2929
- name: Install dependencies
3030
run: uv sync --extra dev
3131

32-
- name: Run all tests
33-
run: make test-all
32+
- name: Run unit tests
33+
run: make test-unit
3434

35-
- name: OOM proof test (128MB container)
36-
run: make test-oom
35+
integration:
36+
runs-on: ubuntu-latest
37+
timeout-minutes: 20
38+
steps:
39+
- uses: actions/checkout@v6
40+
41+
- name: Set up Python
42+
uses: actions/setup-python@v6
43+
with:
44+
python-version: '3.14'
45+
cache: 'pip'
46+
47+
- name: Install uv
48+
run: pip install uv
49+
50+
- name: Install dependencies
51+
run: uv sync --extra dev
52+
53+
- name: Run integration tests
54+
run: make test-integration

Makefile

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.PHONY: test test-all test-unit test-run test-oom e2e cluster lint
1+
.PHONY: test test-all test-unit test-integration test-run test-oom e2e cluster lint
22

33
# Lint: ruff check + format check
44
lint:
@@ -12,7 +12,17 @@ test: test-unit
1212
test-unit:
1313
uv run pytest -m "not e2e and not ha" -v -n auto
1414

15-
# Run all tests with containers (parallel execution)
15+
# Run integration tests (needs minio/redis containers)
16+
test-integration:
17+
@docker compose -f tests/docker-compose.yml down 2>/dev/null || true
18+
@docker compose -f tests/docker-compose.yml up -d
19+
@sleep 3
20+
@AWS_ACCESS_KEY_ID=minioadmin AWS_SECRET_ACCESS_KEY=minioadmin uv run pytest -m "e2e" -v -n auto --dist loadgroup; \
21+
EXIT_CODE=$$?; \
22+
docker compose -f tests/docker-compose.yml down; \
23+
exit $$EXIT_CODE
24+
25+
# Run all tests with containers (unit + integration)
1626
test-all:
1727
@docker compose -f tests/docker-compose.yml down 2>/dev/null || true
1828
@docker compose -f tests/docker-compose.yml up -d
@@ -33,7 +43,7 @@ test-run:
3343
docker compose -f tests/docker-compose.yml down; \
3444
exit $$EXIT_CODE
3545

36-
# OOM proof test: runs s3proxy in a 128MB container and hammers it
46+
# OOM proof test: runs s3proxy in a 256MB container and hammers it
3747
test-oom:
3848
@docker compose -f tests/docker-compose.yml --profile oom down 2>/dev/null || true
3949
@docker compose -f tests/docker-compose.yml --profile oom up -d --build

s3proxy/concurrency.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,21 @@ async def try_acquire(self, bytes_needed: int) -> int:
8484
if self._limit_bytes <= 0:
8585
return 0
8686

87-
to_reserve = max(MIN_RESERVATION, min(bytes_needed, self._limit_bytes))
87+
to_reserve = max(MIN_RESERVATION, bytes_needed)
88+
89+
# Single request exceeds entire budget — can never fit, reject immediately
90+
if to_reserve > self._limit_bytes:
91+
request_mb = to_reserve / 1024 / 1024
92+
limit_mb = self._limit_bytes / 1024 / 1024
93+
logger.warning(
94+
"MEMORY_TOO_LARGE",
95+
requested_mb=round(request_mb, 2),
96+
limit_mb=round(limit_mb, 2),
97+
)
98+
MEMORY_REJECTIONS.inc()
99+
raise S3Error.slow_down(
100+
f"Request needs {request_mb:.0f}MB but budget is {limit_mb:.0f}MB"
101+
)
88102

89103
async with self._condition:
90104
deadline = asyncio.get_event_loop().time() + BACKPRESSURE_TIMEOUT
@@ -147,7 +161,12 @@ async def release(self, bytes_reserved: int) -> None:
147161

148162

149163
def estimate_memory_footprint(method: str, content_length: int) -> int:
150-
"""Estimate memory needed for a request."""
164+
"""Estimate memory needed for a request.
165+
166+
Streaming PUTs hold an 8MB plaintext buffer + 8MB ciphertext simultaneously,
167+
so large PUTs need 2x MAX_BUFFER_SIZE. Small PUTs buffer the whole body + ciphertext.
168+
GETs reserve a baseline here; encrypted GETs acquire additional memory in the handler.
169+
"""
151170
if method in ("HEAD", "DELETE"):
152171
return 0
153172
if method == "GET":
@@ -156,7 +175,7 @@ def estimate_memory_footprint(method: str, content_length: int) -> int:
156175
return MIN_RESERVATION
157176
if content_length <= MAX_BUFFER_SIZE:
158177
return max(MIN_RESERVATION, content_length * 2)
159-
return MAX_BUFFER_SIZE
178+
return MAX_BUFFER_SIZE * 2
160179

161180

162181
# Module-level convenience functions delegating to the default instance

s3proxy/handlers/objects/get.py

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from fastapi.responses import StreamingResponse
1212
from structlog.stdlib import BoundLogger
1313

14-
from ... import crypto
14+
from ... import concurrency, crypto
15+
from ...concurrency import MAX_BUFFER_SIZE
1516
from ...errors import S3Error
1617
from ...s3client import S3Client, S3Credentials
1718
from ...state import (
@@ -147,37 +148,51 @@ async def _decrypt_single_object(
147148
) -> Response:
148149
logger.info("GET_ENCRYPTED_SINGLE", bucket=bucket, key=key)
149150
resp = await client.get_object(bucket, key)
150-
wrapped_dek = base64.b64decode(wrapped_dek_b64)
151-
# Read and close body to release aioboto3/aiohttp resources
152-
async with resp["Body"] as body:
153-
ciphertext = await body.read()
154-
plaintext = crypto.decrypt_object(ciphertext, wrapped_dek, self.settings.kek)
155-
del ciphertext # Free memory
151+
content_length = resp.get("ContentLength", 0)
156152

157-
content_type = head_resp.get("ContentType", "application/octet-stream")
158-
cache_control = head_resp.get("CacheControl")
159-
expires = head_resp.get("Expires")
153+
# Encrypted decrypts buffer ciphertext + plaintext simultaneously.
154+
# Acquire additional memory beyond the initial MAX_BUFFER_SIZE reservation.
155+
additional = max(0, content_length * 2 - MAX_BUFFER_SIZE)
156+
extra_reserved = 0
157+
try:
158+
if additional > 0:
159+
extra_reserved = await concurrency.try_acquire_memory(additional)
160+
161+
wrapped_dek = base64.b64decode(wrapped_dek_b64)
162+
async with resp["Body"] as body:
163+
ciphertext = await body.read()
164+
plaintext = crypto.decrypt_object(ciphertext, wrapped_dek, self.settings.kek)
165+
del ciphertext
166+
167+
content_type = head_resp.get("ContentType", "application/octet-stream")
168+
cache_control = head_resp.get("CacheControl")
169+
expires = head_resp.get("Expires")
170+
171+
if range_header:
172+
start, end = self._parse_range(range_header, len(plaintext))
173+
headers = self._build_headers(
174+
content_type=content_type,
175+
content_length=end - start + 1,
176+
last_modified=last_modified,
177+
cache_control=cache_control,
178+
expires=expires,
179+
)
180+
headers["Content-Range"] = f"bytes {start}-{end}/{len(plaintext)}"
181+
return Response(
182+
content=plaintext[start : end + 1], status_code=206, headers=headers
183+
)
160184

161-
if range_header:
162-
start, end = self._parse_range(range_header, len(plaintext))
163185
headers = self._build_headers(
164186
content_type=content_type,
165-
content_length=end - start + 1,
187+
content_length=len(plaintext),
166188
last_modified=last_modified,
167189
cache_control=cache_control,
168190
expires=expires,
169191
)
170-
headers["Content-Range"] = f"bytes {start}-{end}/{len(plaintext)}"
171-
return Response(content=plaintext[start : end + 1], status_code=206, headers=headers)
172-
173-
headers = self._build_headers(
174-
content_type=content_type,
175-
content_length=len(plaintext),
176-
last_modified=last_modified,
177-
cache_control=cache_control,
178-
expires=expires,
179-
)
180-
return Response(content=plaintext, headers=headers)
192+
return Response(content=plaintext, headers=headers)
193+
finally:
194+
if extra_reserved > 0:
195+
await concurrency.release_memory(extra_reserved)
181196

182197
async def _get_multipart(
183198
self,
@@ -378,13 +393,17 @@ async def _fetch_internal_part(
378393
ct_end: int,
379394
dek: bytes,
380395
) -> bytes:
396+
expected_size = ct_end - ct_start + 1
397+
additional = max(0, expected_size * 2 - MAX_BUFFER_SIZE)
398+
extra_reserved = 0
381399
try:
400+
if additional > 0:
401+
extra_reserved = await concurrency.try_acquire_memory(additional)
402+
382403
resp = await client.get_object(bucket, key, f"bytes={ct_start}-{ct_end}")
383-
# Read and close body to release aioboto3/aiohttp resources
384404
async with resp["Body"] as body:
385405
ciphertext = await body.read()
386406

387-
expected_size = ct_end - ct_start + 1
388407
if len(ciphertext) < crypto.ENCRYPTION_OVERHEAD or len(ciphertext) != expected_size:
389408
logger.error(
390409
"GET_CIPHERTEXT_SIZE_MISMATCH",
@@ -419,6 +438,9 @@ async def _fetch_internal_part(
419438
f"range {ct_start}-{ct_end} invalid"
420439
) from e
421440
raise
441+
finally:
442+
if extra_reserved > 0:
443+
await concurrency.release_memory(extra_reserved)
422444

423445
async def _fetch_and_decrypt_part(
424446
self,
@@ -445,12 +467,21 @@ async def _fetch_and_decrypt_part(
445467

446468
self._validate_ciphertext_range(bucket, key, part_num, 0, ct_end, actual_size)
447469

448-
resp = await client.get_object(bucket, key, f"bytes={ct_start}-{ct_end}")
449-
# Read and close body to release aioboto3/aiohttp resources
450-
async with resp["Body"] as body:
451-
ciphertext = await body.read()
452-
decrypted = crypto.decrypt(ciphertext, dek)
453-
return decrypted[off_start : off_end + 1]
470+
part_size = part_meta.ciphertext_size
471+
additional = max(0, part_size * 2 - MAX_BUFFER_SIZE)
472+
extra_reserved = 0
473+
try:
474+
if additional > 0:
475+
extra_reserved = await concurrency.try_acquire_memory(additional)
476+
477+
resp = await client.get_object(bucket, key, f"bytes={ct_start}-{ct_end}")
478+
async with resp["Body"] as body:
479+
ciphertext = await body.read()
480+
decrypted = crypto.decrypt(ciphertext, dek)
481+
return decrypted[off_start : off_end + 1]
482+
finally:
483+
if extra_reserved > 0:
484+
await concurrency.release_memory(extra_reserved)
454485

455486
def _build_response_headers(self, resp: dict, last_modified: str | None) -> dict[str, str]:
456487
return self._build_headers(

tests/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ services:
2222
context: ../
2323
dockerfile: Dockerfile
2424
container_name: s3proxy-test-server
25-
mem_limit: 128m
25+
mem_limit: 256m
2626
ports:
2727
- "4433:4433"
2828
environment:
2929
S3PROXY_ENCRYPT_KEY: "test-encryption-key-32-bytes!!"
3030
S3PROXY_HOST: "http://minio:9000"
3131
S3PROXY_REGION: "us-east-1"
32-
S3PROXY_MEMORY_LIMIT_MB: "16"
32+
S3PROXY_MEMORY_LIMIT_MB: "48"
3333
S3PROXY_LOG_LEVEL: "WARNING"
3434
AWS_ACCESS_KEY_ID: minioadmin
3535
AWS_SECRET_ACCESS_KEY: minioadmin

0 commit comments

Comments
 (0)