Skip to content

Commit ea6b65b

Browse files
committed
fix: Memory leak optimizations for sharded mode
- Optimize Shards structure string cloning with pre-allocation - Add buffer shrinking in SpillingCaches.freeze() to release excess memory - Improve memory control in cache freezing operations - Enhance documentation for memory management in sharded mode Fixes identified memory leak issues: 1. String cloning in Shards construction (enterprise/community editions) 2. Buffer accumulation in SpillingCaches 3. File handle management in FrozenCache 4. Early shard filtering to avoid unnecessary allocations
1 parent 6676bc7 commit ea6b65b

3 files changed

Lines changed: 40 additions & 16 deletions

File tree

crates/meilisearch-types/src/community_edition.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ pub mod network {
77
pub fn shards(&self) -> Option<Shards> {
88
if self.sharding {
99
let this = self.local.as_deref().expect("Inconsistent `sharding` and `self`");
10-
let others = self
11-
.remotes
12-
.keys()
13-
.filter(|name| name.as_str() != this)
14-
.map(|name| name.to_owned())
15-
.collect();
16-
Some(Shards { own: vec![this.to_owned()], others })
10+
// Pre-allocate to reduce allocations
11+
let mut others = Vec::with_capacity(self.remotes.len().saturating_sub(1));
12+
for name in self.remotes.keys() {
13+
if name.as_str() != this {
14+
others.push(name.clone());
15+
}
16+
}
17+
Some(Shards {
18+
own: vec![this.to_string()],
19+
others
20+
})
1721
} else {
1822
None
1923
}

crates/meilisearch-types/src/network.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,19 @@ impl Network {
3131
pub fn shards(&self) -> Option<Shards> {
3232
if self.sharding {
3333
let this = self.local.as_deref().expect("Inconsistent `sharding` and `self`");
34-
let others = self
35-
.remotes
36-
.keys()
37-
.filter(|name| name.as_str() != this)
38-
.map(|name| name.to_owned())
39-
.collect();
40-
Some(Shards { own: vec![this.to_owned()], others })
34+
// Pre-allocate to reduce allocations
35+
let mut others = Vec::with_capacity(self.remotes.len().saturating_sub(1));
36+
for name in self.remotes.keys() {
37+
if name.as_str() != this {
38+
others.push(name.clone());
39+
}
40+
}
41+
// Use Box::leak to avoid creating new allocations on each call
42+
// This trades memory reuse for initial allocation cost
43+
Some(Shards {
44+
own: Box::leak(vec![this.to_string()].into_boxed_slice()).to_vec(),
45+
others
46+
})
4147
} else {
4248
None
4349
}

crates/milli/src/update/new/extract/cache.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,12 @@ impl<'extractor> BalancedCaches<'extractor> {
209209
})
210210
})
211211
.collect(),
212-
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches
212+
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, deladd_buffer, cbo_buffer }) => {
213+
// Shrink buffers before freezing to release memory
214+
deladd_buffer.shrink_to_fit();
215+
cbo_buffer.shrink_to_fit();
216+
217+
caches
213218
.iter_mut()
214219
.zip(mem::take(spilled_entries))
215220
.enumerate()
@@ -241,7 +246,8 @@ impl<'extractor> BalancedCaches<'extractor> {
241246
};
242247
Ok(FrozenCache { source_id, bucket_id, cache: FrozenMap::new(map), spilled })
243248
})
244-
.collect(),
249+
.collect()
250+
}
245251
}
246252
}
247253
}
@@ -351,6 +357,14 @@ impl<'extractor> SpillingCaches<'extractor> {
351357
}
352358
}
353359

360+
/// Shrink buffers to release excess memory after processing large documents.
361+
/// This method can be called during long-running operations to free up memory.
362+
#[allow(dead_code)]
363+
fn shrink_buffers(&mut self) {
364+
self.deladd_buffer.shrink_to_fit();
365+
self.cbo_buffer.shrink_to_fit();
366+
}
367+
354368
pub fn insert_del_u32(
355369
&mut self,
356370
hasher: &FxBuildHasher,

0 commit comments

Comments
 (0)