Skip to content

Commit 09db25b

Browse files
Make manifest cache size configurable (#2993)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${2952} --> Closes #2952 # Rationale for this change Through discussion in issue #2325, we realized that there was a memory leak in the manifest cache. PR #2951 fixed this memory leak, but we decided that it would be best for developer experience if users could configure the cache for their needs. This includes the ability to configure the manifest cache size, including setting it to `0` to disable caching. ## Are these changes tested? Yes - unit tests are included to verify these changes ## Are there any user-facing changes? Yes: - `manifest-cache-size` can be used in `.pyiceberg.yaml` to configure the size of the manifest cache - `PYICEBERG_MANIFEST_CACHE_SIZE` can be used to configure the size of the manifest cache <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Kevin Liu <kevin.jq.liu@gmail.com>
1 parent 6c3c2f3 commit 09db25b

4 files changed

Lines changed: 277 additions & 35 deletions

File tree

mkdocs/docs/configuration.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,29 @@ The environment variable picked up by Iceberg starts with `PYICEBERG_` and then
4848

4949
For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-key-id` on the `default` catalog.
5050

51+
## Manifest Caching
52+
53+
PyIceberg caches `ManifestFile` objects locally and uses an LRU policy to bound the cache size. By default, up to `128`
54+
distinct manifest files are retained.
55+
56+
You can tune the `manifest-cache-size` configuration in `.pyiceberg.yaml`:
57+
58+
```yaml
59+
manifest-cache-size: 256
60+
```
61+
62+
Permitted values: any non-negative integer. Set the value to `0` to disable manifest caching entirely.
63+
64+
You can also set it with the `PYICEBERG_MANIFEST_CACHE_SIZE` environment variable:
65+
66+
```sh
67+
export PYICEBERG_MANIFEST_CACHE_SIZE=256
68+
```
69+
70+
The memory used by this cache depends on the size and number of distinct manifests your workload touches. Lower the value
71+
if you want a tighter memory bound, or call `clear_manifest_cache()` to proactively release cached manifest metadata in
72+
long-lived processes.
73+
5174
## Tables
5275

5376
Iceberg tables support table properties to configure table behavior.

pyiceberg/manifest.py

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
StringType,
5252
StructType,
5353
)
54+
from pyiceberg.utils.config import Config
5455

5556
UNASSIGNED_SEQ = -1
5657
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
@@ -891,17 +892,70 @@ def __hash__(self) -> int:
891892
return hash(self.manifest_path)
892893

893894

894-
# Global cache for ManifestFile objects, keyed by manifest_path.
895-
# This deduplicates ManifestFile objects across manifest lists, which commonly
896-
# share manifests after append operations.
897-
_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128)
895+
class _ManifestCache:
896+
"""Process-wide ManifestFile cache keyed by manifest_path.
898897
899-
# Lock for thread-safe cache access
900-
_manifest_cache_lock = threading.RLock()
898+
Consecutive snapshots often reference the same manifests after append
899+
operations, so reusing ManifestFile instances avoids retaining duplicate
900+
objects.
901+
"""
902+
903+
DEFAULT_SIZE = 128
904+
905+
_cache: LRUCache[str, ManifestFile] | None
906+
907+
def __init__(self) -> None:
908+
self.maxsize = self._load_configured_size()
909+
self._cache = LRUCache(maxsize=self.maxsize) if self.maxsize > 0 else None
910+
self._lock = threading.RLock()
911+
912+
@classmethod
913+
def _load_configured_size(cls) -> int:
914+
configured_size = Config().get_int("manifest-cache-size")
915+
if configured_size is None:
916+
return cls.DEFAULT_SIZE
917+
if configured_size < 0:
918+
raise ValueError(
919+
f"manifest-cache-size should be a non-negative integer or left unset. Current value: {configured_size}"
920+
)
921+
return configured_size
922+
923+
def clear(self) -> None:
924+
with self._lock:
925+
if self._cache is not None:
926+
self._cache.clear()
927+
928+
def get_or_cache(self, manifest_file: ManifestFile) -> ManifestFile:
929+
if self._cache is None:
930+
return manifest_file
931+
932+
with self._lock:
933+
manifest_path = manifest_file.manifest_path
934+
if manifest_path in self._cache:
935+
return self._cache[manifest_path]
936+
937+
self._cache[manifest_path] = manifest_file
938+
return manifest_file
939+
940+
def __len__(self) -> int:
941+
with self._lock:
942+
return len(self._cache) if self._cache is not None else 0
943+
944+
945+
_manifest_cache = _ManifestCache()
946+
947+
948+
def clear_manifest_cache() -> None:
949+
"""Clear cached ManifestFile objects.
950+
951+
This is primarily useful in long-lived or memory-sensitive processes that
952+
want to release cached manifest metadata between bursts of table reads.
953+
"""
954+
_manifest_cache.clear()
901955

902956

903957
def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
904-
"""Read manifests from a manifest list, deduplicating ManifestFile objects via cache.
958+
"""Read manifests from a manifest list, reusing cached ManifestFile objects.
905959
906960
Caches individual ManifestFile objects by manifest_path. This is memory-efficient
907961
because consecutive manifest lists typically share most of their manifests:
@@ -927,17 +981,7 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
927981
file = io.new_input(manifest_list)
928982
manifest_files = list(read_manifest_list(file))
929983

930-
result = []
931-
with _manifest_cache_lock:
932-
for manifest_file in manifest_files:
933-
manifest_path = manifest_file.manifest_path
934-
if manifest_path in _manifest_cache:
935-
result.append(_manifest_cache[manifest_path])
936-
else:
937-
_manifest_cache[manifest_path] = manifest_file
938-
result.append(manifest_file)
939-
940-
return tuple(result)
984+
return tuple(_manifest_cache.get_or_cache(manifest_file) for manifest_file in manifest_files)
941985

942986

943987
def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:

tests/benchmark/test_memory_benchmark.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@
3232
import pyarrow as pa
3333
import pytest
3434

35+
from pyiceberg import manifest as manifest_module
3536
from pyiceberg.catalog.memory import InMemoryCatalog
36-
from pyiceberg.manifest import _manifest_cache
37+
from pyiceberg.manifest import clear_manifest_cache
3738

3839

3940
def generate_test_dataframe() -> pa.Table:
@@ -64,16 +65,16 @@ def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog:
6465
@pytest.fixture(autouse=True)
6566
def clear_caches() -> None:
6667
"""Clear caches before each test."""
67-
_manifest_cache.clear()
68+
clear_manifest_cache()
6869
gc.collect()
6970

7071

7172
@pytest.mark.benchmark
7273
def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
7374
"""Benchmark memory growth of manifest cache during repeated appends.
7475
75-
This test reproduces the issue from GitHub #2325 where each append creates
76-
a new manifest list entry in the cache, causing memory to grow.
76+
This test reproduces the issue from GitHub #2325 where the old cache stored
77+
each manifest list result, causing memory to grow.
7778
7879
With the old caching strategy (tuple per manifest list), memory grew as O(N²).
7980
With the new strategy (individual ManifestFile objects), memory grows as O(N).
@@ -95,7 +96,7 @@ def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
9596
# Sample memory at intervals
9697
if (i + 1) % 10 == 0:
9798
current, _ = tracemalloc.get_traced_memory()
98-
cache_size = len(_manifest_cache)
99+
cache_size = len(manifest_module._manifest_cache)
99100

100101
memory_samples.append((i + 1, current, cache_size))
101102
print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}")
@@ -150,13 +151,13 @@ def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) ->
150151

151152
gc.collect()
152153
before_clear_memory, _ = tracemalloc.get_traced_memory()
153-
cache_size_before = len(_manifest_cache)
154+
cache_size_before = len(manifest_module._manifest_cache)
154155
print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB")
155156
print(f" Cache size: {cache_size_before}")
156157

157158
# Phase 2: Clear cache and GC
158159
print("\nPhase 2: Clearing cache and running GC...")
159-
_manifest_cache.clear()
160+
clear_manifest_cache()
160161
gc.collect()
161162
gc.collect() # Multiple GC passes for thorough cleanup
162163

@@ -192,6 +193,7 @@ def test_manifest_cache_deduplication_efficiency() -> None:
192193
ManifestEntry,
193194
ManifestEntryStatus,
194195
_manifests,
196+
clear_manifest_cache,
195197
write_manifest,
196198
write_manifest_list,
197199
)
@@ -245,7 +247,7 @@ def test_manifest_cache_deduplication_efficiency() -> None:
245247
num_lists = 10
246248
print(f"Creating {num_lists} manifest lists with overlapping manifests...")
247249

248-
_manifest_cache.clear()
250+
clear_manifest_cache()
249251

250252
for i in range(num_lists):
251253
list_path = f"{tmp_dir}/manifest-list_{i}.avro"
@@ -265,7 +267,7 @@ def test_manifest_cache_deduplication_efficiency() -> None:
265267
_manifests(io, list_path)
266268

267269
# Analyze cache efficiency
268-
cache_entries = len(_manifest_cache)
270+
cache_entries = len(manifest_module._manifest_cache)
269271
# List i contains manifests 0..i, so only the first num_lists manifests are actually used
270272
manifests_actually_used = num_lists
271273

0 commit comments

Comments
 (0)