Skip to content

Commit cb8a3ca

Browse files
committed
wip: add get_validate_value_or_guard_async
1 parent d3046f3 commit cb8a3ca

4 files changed

Lines changed: 103 additions & 14 deletions

File tree

src/shard.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,10 +1050,16 @@ impl<
10501050
Ok(())
10511051
}
10521052

1053+
/// Upserts a placeholder, optionally validating existing resident values
1054+
///
1055+
/// Returns:
1056+
/// - `Ok((token, value))` if a valid resident was found
1057+
/// - `Err((placeholder, is_new))` where `is_new` indicates if this is a newly created placeholder
10531058
pub fn upsert_placeholder<Q>(
10541059
&mut self,
10551060
hash: u64,
10561061
key: &Q,
1062+
validator: &mut impl FnMut(&Val) -> bool,
10571063
) -> Result<(Token, &Val), (Plh, bool)>
10581064
where
10591065
Q: Hash + Equivalent<Key> + ToOwned<Owned = Key> + ?Sized,
@@ -1063,6 +1069,40 @@ impl<
10631069
let (entry, _) = self.entries.get_mut(idx).unwrap();
10641070
match entry {
10651071
Entry::Resident(resident) => {
1072+
if !validator(&resident.value) {
1073+
let old_state = resident.state;
1074+
let old_key = &resident.key;
1075+
let old_value = &resident.value;
1076+
let weight = self.weighter.weight(old_key, old_value);
1077+
1078+
let shared = Plh::new(hash, idx);
1079+
*entry = Entry::Placeholder(Placeholder {
1080+
key: key.to_owned(),
1081+
hot: old_state,
1082+
shared: shared.clone(),
1083+
});
1084+
1085+
match old_state {
1086+
ResidentState::Hot => {
1087+
self.num_hot -= 1;
1088+
self.weight_hot -= weight;
1089+
if weight != 0 {
1090+
self.hot_head = self.entries.unlink(idx);
1091+
}
1092+
}
1093+
ResidentState::Cold => {
1094+
self.num_cold -= 1;
1095+
self.weight_cold -= weight;
1096+
if weight != 0 {
1097+
self.cold_head = self.entries.unlink(idx);
1098+
}
1099+
}
1100+
}
1101+
1102+
record_miss_mut!(self);
1103+
return Err((shared, false)); // false = replaced existing
1104+
}
1105+
10661106
if *resident.referenced.get_mut() < MAX_F {
10671107
*resident.referenced.get_mut() += 1;
10681108
}

src/sync.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -435,14 +435,42 @@ impl<
435435
&'a self,
436436
key: &Q,
437437
) -> Result<Val, PlaceholderGuard<'a, Key, Val, We, B, L>>
438+
where
439+
Q: Hash + Equivalent<Key> + ToOwned<Owned = Key> + ?Sized,
440+
{
441+
self.get_validate_value_or_guard_async(key, |_| true).await
442+
}
443+
444+
/// Gets an item from the cache with key `key`.
445+
///
446+
/// If the corresponding value isn't present in the cache, this functions returns a guard
447+
/// that can be used to insert the value once it's computed.
448+
/// While the returned guard is alive, other calls with the same key using the
449+
/// `get_value_guard` or `get_or_insert` family of functions will wait until the guard
450+
/// is dropped or the value is inserted.
451+
pub async fn get_validate_value_or_guard_async<'a, Q>(
452+
&'a self,
453+
key: &Q,
454+
mut validation: impl FnMut(&Val) -> bool + Unpin,
455+
) -> Result<Val, PlaceholderGuard<'a, Key, Val, We, B, L>>
438456
where
439457
Q: Hash + Equivalent<Key> + ToOwned<Owned = Key> + ?Sized,
440458
{
441459
let (shard, hash) = self.shard_for(key).unwrap();
442-
if let Some(v) = shard.read().get(hash, key) {
443-
return Ok(v.clone());
460+
461+
// Try fast path with read lock first
462+
{
463+
let reader = shard.read();
464+
if let Some(v) = reader.get(hash, key) {
465+
if validation(v) {
466+
return Ok(v.clone());
467+
}
468+
// Validation failed, fall through to JoinFuture
469+
}
470+
// No entry found or validation failed, let JoinFuture handle everything
444471
}
445-
JoinFuture::new(&self.lifecycle, shard, hash, key).await
472+
473+
JoinFuture::new(&self.lifecycle, shard, hash, key, validation).await
446474
}
447475

448476
/// Gets or inserts an item in the cache with key `key`.

src/sync_placeholder.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ impl<
224224
Q: Hash + Equivalent<Key> + ToOwned<Owned = Key> + ?Sized,
225225
{
226226
let mut shard_guard = shard.write();
227-
let shared = match shard_guard.upsert_placeholder(hash, key) {
227+
let shared = match shard_guard.upsert_placeholder(hash, key, &mut |_| true) {
228228
Ok((_, v)) => return GuardResult::Value(v.clone()),
229229
Err((shared, true)) => {
230230
return GuardResult::Guard(Self::start_loading(lifecycle, shard, shared));
@@ -413,11 +413,12 @@ impl<Key, Val, We, B, L> std::fmt::Debug for PlaceholderGuard<'_, Key, Val, We,
413413
}
414414

415415
/// Future that results in an Ok(Value) or Err(Guard)
416-
pub struct JoinFuture<'a, 'b, Q: ?Sized, Key, Val, We, B, L> {
416+
pub struct JoinFuture<'a, 'b, Q: ?Sized, Key, Val, We, B, L, F: FnMut(&'a Val) -> bool> {
417417
lifecycle: &'a L,
418418
shard: &'a RwLock<CacheShard<Key, Val, We, B, L, SharedPlaceholder<Val>>>,
419419
state: JoinFutureState<'b, Q, Val>,
420420
notified: AtomicBool,
421+
validation: F,
421422
}
422423

423424
enum JoinFutureState<'b, Q: ?Sized, Val> {
@@ -432,18 +433,22 @@ enum JoinFutureState<'b, Q: ?Sized, Val> {
432433
Done,
433434
}
434435

435-
impl<'a, 'b, Q: ?Sized, Key, Val, We, B, L> JoinFuture<'a, 'b, Q, Key, Val, We, B, L> {
436+
impl<'a, 'b, Q: ?Sized, Key, Val, We, B, L, F: FnMut(&'a Val) -> bool>
437+
JoinFuture<'a, 'b, Q, Key, Val, We, B, L, F>
438+
{
436439
pub fn new(
437440
lifecycle: &'a L,
438441
shard: &'a RwLock<CacheShard<Key, Val, We, B, L, SharedPlaceholder<Val>>>,
439442
hash: u64,
440443
key: &'b Q,
441-
) -> JoinFuture<'a, 'b, Q, Key, Val, We, B, L> {
444+
validation: F,
445+
) -> JoinFuture<'a, 'b, Q, Key, Val, We, B, L, F> {
442446
Self {
443447
lifecycle,
444448
shard,
445449
state: JoinFutureState::Created { hash, key },
446450
notified: Default::default(),
451+
validation,
447452
}
448453
}
449454

@@ -480,7 +485,10 @@ impl<'a, 'b, Q: ?Sized, Key, Val, We, B, L> JoinFuture<'a, 'b, Q, Key, Val, We,
480485
}
481486
}
482487

483-
impl<Q: ?Sized, Key, Val, We, B, L> Drop for JoinFuture<'_, '_, Q, Key, Val, We, B, L> {
488+
impl<'a, Q: ?Sized, Key, Val, We, B, L, F> Drop for JoinFuture<'a, '_, Q, Key, Val, We, B, L, F>
489+
where
490+
F: FnMut(&'a Val) -> bool,
491+
{
484492
#[inline]
485493
fn drop(&mut self) {
486494
if matches!(self.state, JoinFutureState::Pending { .. }) {
@@ -497,7 +505,8 @@ impl<
497505
We: Weighter<Key, Val>,
498506
B: BuildHasher,
499507
L: Lifecycle<Key, Val>,
500-
> Future for JoinFuture<'a, '_, Q, Key, Val, We, B, L>
508+
F: FnMut(&Val) -> bool + Unpin,
509+
> Future for JoinFuture<'a, '_, Q, Key, Val, We, B, L, F>
501510
{
502511
type Output = Result<Val, PlaceholderGuard<'a, Key, Val, We, B, L>>;
503512

@@ -509,7 +518,7 @@ impl<
509518
JoinFutureState::Created { hash, key } => {
510519
debug_assert!(!this.notified.load(Ordering::Acquire));
511520
let mut shard_guard = shard.write();
512-
match shard_guard.upsert_placeholder(*hash, *key) {
521+
match shard_guard.upsert_placeholder(*hash, *key, &mut this.validation) {
513522
Ok((_, v)) => {
514523
this.state = JoinFutureState::Done;
515524
Poll::Ready(Ok(v.clone()))

src/unsync.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,10 @@ impl<Key: Eq + Hash, Val, We: Weighter<Key, Val>, B: BuildHasher, L: Lifecycle<K
249249
where
250250
Q: Hash + Equivalent<Key> + ToOwned<Owned = Key> + ?Sized,
251251
{
252-
let idx = match self.shard.upsert_placeholder(self.shard.hash(key), key) {
252+
let idx = match self
253+
.shard
254+
.upsert_placeholder(self.shard.hash(key), key, &mut |_| true)
255+
{
253256
Ok((idx, _)) => idx,
254257
Err((plh, _)) => {
255258
let v = with()?;
@@ -275,7 +278,10 @@ impl<Key: Eq + Hash, Val, We: Weighter<Key, Val>, B: BuildHasher, L: Lifecycle<K
275278
where
276279
Q: Hash + Equivalent<Key> + ToOwned<Owned = Key> + ?Sized,
277280
{
278-
let idx = match self.shard.upsert_placeholder(self.shard.hash(key), key) {
281+
let idx = match self
282+
.shard
283+
.upsert_placeholder(self.shard.hash(key), key, &mut |_| true)
284+
{
279285
Ok((idx, _)) => idx,
280286
Err((plh, _)) => {
281287
let v = with()?;
@@ -297,7 +303,10 @@ impl<Key: Eq + Hash, Val, We: Weighter<Key, Val>, B: BuildHasher, L: Lifecycle<K
297303
Q: Hash + Equivalent<Key> + ToOwned<Owned = Key> + ?Sized,
298304
{
299305
// TODO: this could be using a simpler entry API
300-
match self.shard.upsert_placeholder(self.shard.hash(key), key) {
306+
match self
307+
.shard
308+
.upsert_placeholder(self.shard.hash(key), key, &mut |_| true)
309+
{
301310
Ok((_, v)) => unsafe {
302311
// Rustc gets insanely confused about returning from mut borrows
303312
// Safety: v has the same lifetime as self
@@ -323,7 +332,10 @@ impl<Key: Eq + Hash, Val, We: Weighter<Key, Val>, B: BuildHasher, L: Lifecycle<K
323332
Q: Hash + Equivalent<Key> + ToOwned<Owned = Key> + ?Sized,
324333
{
325334
// TODO: this could be using a simpler entry API
326-
match self.shard.upsert_placeholder(self.shard.hash(key), key) {
335+
match self
336+
.shard
337+
.upsert_placeholder(self.shard.hash(key), key, &mut |_| true)
338+
{
327339
Ok((idx, _)) => Ok(self.shard.peek_token_mut(idx).map(RefMut)),
328340
Err((placeholder, _)) => Err(Guard {
329341
cache: self,

0 commit comments

Comments
 (0)