Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pingora-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ http = { workspace = true }
indexmap = "1"
once_cell = { workspace = true }
regex = "1"
blake2 = "0.10"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
serde = { version = "1.0", features = ["derive"] }
rmp-serde = "1.3.0"
bytes = { workspace = true }
Expand Down
70 changes: 35 additions & 35 deletions pingora-cache/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

use super::*;

use blake2::{Blake2b, Digest};
use http::Extensions;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter, Result as FmtResult};
use xxhash_rust::xxh3::Xxh3;

// 16-byte / 128-bit key: large enough to avoid collision
const KEY_SIZE: usize = 16;
Expand Down Expand Up @@ -63,10 +63,11 @@ pub trait CacheHashKey {
fn combined_bin(&self) -> HashBinary {
let key = self.primary_bin();
if let Some(v) = self.variance_bin() {
let mut hasher = Blake2b128::new();
hasher.update(key);
hasher.update(v);
hasher.finalize().into()
let mut hasher = Xxh3::new();
hasher.update(&key);
hasher.update(&v);
let hash = hasher.digest128();
hash.to_le_bytes()
} else {
// if there is no variance, combined_bin should return the same as primary_bin
key
Expand Down Expand Up @@ -180,38 +181,37 @@ impl CacheHashKey for CompactCacheKey {
}

/*
* We use blake2 hashing, which is faster and more secure, to replace md5.
* We have not given too much thought on whether non-crypto hash can be safely
* use because hashing performance is not critical.
* Note: we should avoid hashes like ahash which does not have consistent output
* across machines because it is designed purely for in memory hashtable
*/

// hash output: we use 128 bits (16 bytes) hash which will map to 32 bytes hex string
pub(crate) type Blake2b128 = Blake2b<blake2::digest::consts::U16>;
* We use xxHash3, which is a fast non-cryptographic hash function.
* Cache keys don't require cryptographic security, and xxHash3 is ~10x faster
* than Blake2 while providing excellent collision resistance for cache use cases.
* We use the 128-bit variant (xxh3_128) for consistency with the previous Blake2b128.
* Note: we avoid hashes like ahash which don't have consistent output across
* machines because they're designed purely for in-memory hashtables.
*/

/// helper function: hash str to u8
pub fn hash_u8(key: &str) -> u8 {
let mut hasher = Blake2b128::new();
hasher.update(key);
let raw = hasher.finalize();
raw[0]
let mut hasher = Xxh3::new();
hasher.update(key.as_bytes());
let hash = hasher.digest128();
(hash & 0xFF) as u8
}

/// helper function: hash key (String or Bytes) to [HashBinary]
pub fn hash_key<K: AsRef<[u8]>>(key: K) -> HashBinary {
let mut hasher = Blake2b128::new();
let mut hasher = Xxh3::new();
hasher.update(key.as_ref());
let raw = hasher.finalize();
raw.into()
let hash = hasher.digest128();
hash.to_le_bytes()
}

impl CacheKey {
fn primary_hasher(&self) -> Blake2b128 {
let mut hasher = Blake2b128::new();
fn primary_hash(&self) -> HashBinary {
let mut hasher = Xxh3::new();
hasher.update(&self.namespace);
hasher.update(&self.primary);
hasher
let hash = hasher.digest128();
hash.to_le_bytes()
}

/// Create a default [CacheKey] from a request, which just takes its URI as the primary key.
Expand Down Expand Up @@ -271,7 +271,7 @@ impl CacheHashKey for CacheKey {
if let Some(primary_bin_override) = self.primary_bin_override {
primary_bin_override
} else {
self.primary_hasher().finalize().into()
self.primary_hash()
}
}

Expand Down Expand Up @@ -299,7 +299,7 @@ mod tests {
extensions: Extensions::new(),
};
let hash = key.primary();
assert_eq!(hash, "ac10f2aef117729f8dad056b3059eb7e");
assert_eq!(hash, "3393a146a6429236209bd346d394feb9");
assert!(key.variance().is_none());
assert_eq!(key.combined(), hash);
let compact = key.to_compact();
Expand Down Expand Up @@ -350,39 +350,39 @@ mod tests {
extensions: Extensions::new(),
};
let hash = key.primary();
assert_eq!(hash, "ac10f2aef117729f8dad056b3059eb7e");
assert_eq!(hash, "3393a146a6429236209bd346d394feb9");
assert_eq!(key.variance().unwrap(), "00000000000000000000000000000000");
assert_eq!(key.combined(), "004174d3e75a811a5b44c46b3856f3ee");
assert_eq!(key.combined(), "b03c278e7fd4cc0630a352947348e37f");
let compact = key.to_compact();
assert_eq!(compact.primary(), "ac10f2aef117729f8dad056b3059eb7e");
assert_eq!(compact.primary(), "3393a146a6429236209bd346d394feb9");
assert_eq!(
compact.variance().unwrap(),
"00000000000000000000000000000000"
);
assert_eq!(compact.combined(), "004174d3e75a811a5b44c46b3856f3ee");
assert_eq!(compact.combined(), "b03c278e7fd4cc0630a352947348e37f");
}

#[test]
fn test_cache_key_vary_hash_override() {
let key = CacheKey {
namespace: Vec::new(),
primary: b"saaaad".to_vec(),
primary_bin_override: str2hex("ac10f2aef117729f8dad056b3059eb7e"),
primary_bin_override: str2hex("3393a146a6429236209bd346d394feb9"),
variance: Some([0u8; 16]),
user_tag: "1".into(),
extensions: Extensions::new(),
};
let hash = key.primary();
assert_eq!(hash, "ac10f2aef117729f8dad056b3059eb7e");
assert_eq!(hash, "3393a146a6429236209bd346d394feb9");
assert_eq!(key.variance().unwrap(), "00000000000000000000000000000000");
assert_eq!(key.combined(), "004174d3e75a811a5b44c46b3856f3ee");
assert_eq!(key.combined(), "b03c278e7fd4cc0630a352947348e37f");
let compact = key.to_compact();
assert_eq!(compact.primary(), "ac10f2aef117729f8dad056b3059eb7e");
assert_eq!(compact.primary(), "3393a146a6429236209bd346d394feb9");
assert_eq!(
compact.variance().unwrap(),
"00000000000000000000000000000000"
);
assert_eq!(compact.combined(), "004174d3e75a811a5b44c46b3856f3ee");
assert_eq!(compact.combined(), "b03c278e7fd4cc0630a352947348e37f");
}

#[test]
Expand Down
58 changes: 33 additions & 25 deletions pingora-cache/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//TODO: Mark this module #[test] only

use super::*;
use crate::key::CompactCacheKey;
use crate::key::{CompactCacheKey, HashBinary};
use crate::storage::{streaming_write::U64WriteId, HandleHit, HandleMiss};
use crate::trace::SpanHandle;

Expand All @@ -37,7 +37,7 @@ type BinaryMeta = (Vec<u8>, Vec<u8>);

pub(crate) struct CacheObject {
pub meta: BinaryMeta,
pub body: Arc<Vec<u8>>,
pub body: Bytes,
}

pub(crate) struct TempObject {
Expand All @@ -56,10 +56,10 @@ impl TempObject {
bytes_written: Arc::new(tx),
}
}
// this is not at all optimized
fn make_cache_object(&self) -> CacheObject {
let meta = self.meta.clone();
let body = Arc::new(self.body.read().clone());
// Convert Vec<u8> to Bytes for zero-copy slicing
let body = Bytes::from(self.body.read().clone());
CacheObject { meta, body }
}
}
Expand All @@ -68,8 +68,12 @@ impl TempObject {
///
/// For testing only, not for production use.
pub struct MemCache {
pub(crate) cached: Arc<RwLock<HashMap<String, CacheObject>>>,
pub(crate) temp: Arc<RwLock<HashMap<String, HashMap<u64, TempObject>>>>,
// Use HashBinary ([u8; 16]) keys instead of String for better performance:
// - Avoids hex string conversion (6% CPU savings)
// - Smaller key size (16 bytes vs 32+ bytes)
// - No heap allocation for keys
pub(crate) cached: Arc<RwLock<HashMap<HashBinary, CacheObject>>>,
pub(crate) temp: Arc<RwLock<HashMap<HashBinary, HashMap<u64, TempObject>>>>,
pub(crate) last_temp_id: AtomicU64,
}

Expand All @@ -96,7 +100,7 @@ enum PartialState {
}

pub struct CompleteHit {
body: Arc<Vec<u8>>,
body: Bytes,
done: bool,
range_start: usize,
range_end: usize,
Expand All @@ -108,9 +112,8 @@ impl CompleteHit {
None
} else {
self.done = true;
Some(Bytes::copy_from_slice(
&self.body.as_slice()[self.range_start..self.range_end],
))
// Zero-copy slice instead of copy_from_slice
Some(self.body.slice(self.range_start..self.range_end))
}
}

Expand Down Expand Up @@ -236,12 +239,12 @@ pub struct MemMissHandler {
body: Arc<RwLock<Vec<u8>>>,
bytes_written: Arc<watch::Sender<PartialState>>,
// these are used only in finish() to data from temp to cache
key: String,
key: HashBinary,
temp_id: U64WriteId,
// key -> cache object
cache: Arc<RwLock<HashMap<String, CacheObject>>>,
cache: Arc<RwLock<HashMap<HashBinary, CacheObject>>>,
// key -> (temp writer id -> temp object) to support concurrent writers
temp: Arc<RwLock<HashMap<String, HashMap<u64, TempObject>>>>,
temp: Arc<RwLock<HashMap<HashBinary, HashMap<u64, TempObject>>>>,
}

#[async_trait]
Expand Down Expand Up @@ -273,7 +276,8 @@ impl HandleMiss for MemMissHandler {
.unwrap()
.make_cache_object();
let size = cache_object.body.len(); // FIXME: this just body size, also track meta size
self.cache.write().insert(self.key.clone(), cache_object);

self.cache.write().insert(self.key, cache_object);
self.temp
.write()
.get_mut(&self.key)
Expand Down Expand Up @@ -313,7 +317,8 @@ impl Storage for MemCache {
key: &CacheKey,
_trace: &SpanHandle,
) -> Result<Option<(CacheMeta, HitHandler)>> {
let hash = key.combined();
// Use combined_bin() to get binary hash directly, avoiding hex string conversion
let hash = key.combined_bin();
// always prefer partial read otherwise fresh asset will not be visible on expired asset
// until it is fully updated
// no preference on which partial read we get (if there are multiple writers)
Expand Down Expand Up @@ -345,7 +350,7 @@ impl Storage for MemCache {
streaming_write_tag: Option<&[u8]>,
_trace: &SpanHandle,
) -> Result<Option<(CacheMeta, HitHandler)>> {
let hash = key.combined();
let hash = key.combined_bin();
let write_tag: U64WriteId = streaming_write_tag
.expect("tag must be set during streaming write")
.try_into()
Expand All @@ -365,14 +370,15 @@ impl Storage for MemCache {
meta: &CacheMeta,
_trace: &SpanHandle,
) -> Result<MissHandler> {
let hash = key.combined();
let hash = key.combined_bin();
let meta = meta.serialize()?;
let temp_obj = TempObject::new(meta);
let temp_id = self.last_temp_id.fetch_add(1, Ordering::Relaxed);
let miss_handler = MemMissHandler {
body: temp_obj.body.clone(),
bytes_written: temp_obj.bytes_written.clone(),
key: hash.clone(),

key: hash,
cache: self.cached.clone(),
temp: self.temp.clone(),
temp_id: temp_id.into(),
Expand All @@ -393,7 +399,7 @@ impl Storage for MemCache {
) -> Result<bool> {
// This usually purges the primary key because, without a lookup, the variance key is usually
// empty
let hash = key.combined();
let hash = key.combined_bin();
let temp_removed = self.temp.write().remove(&hash).is_some();
let cache_removed = self.cached.write().remove(&hash).is_some();
Ok(temp_removed || cache_removed)
Expand All @@ -405,7 +411,7 @@ impl Storage for MemCache {
meta: &CacheMeta,
_trace: &SpanHandle,
) -> Result<bool> {
let hash = key.combined();
let hash = key.combined_bin();
if let Some(obj) = self.cached.write().get_mut(&hash) {
obj.meta = meta.serialize()?;
Ok(true)
Expand Down Expand Up @@ -598,7 +604,7 @@ mod test {
let cache = &MEM_CACHE;

let key = CacheKey::new("", "a", "1").to_compact();
let hash = key.combined();
let hash = key.combined_bin();
let meta = (
"meta_key".as_bytes().to_vec(),
"meta_value".as_bytes().to_vec(),
Expand All @@ -607,7 +613,8 @@ mod test {
let temp_obj = TempObject::new(meta);
let mut map = HashMap::new();
map.insert(0, temp_obj);
cache.temp.write().insert(hash.clone(), map);

cache.temp.write().insert(hash, map);

assert!(cache.temp.read().contains_key(&hash));

Expand All @@ -625,17 +632,18 @@ mod test {
let cache = &MEM_CACHE;

let key = CacheKey::new("", "a", "1").to_compact();
let hash = key.combined();
let hash = key.combined_bin();
let meta = (
"meta_key".as_bytes().to_vec(),
"meta_value".as_bytes().to_vec(),
);
let body = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
let cache_obj = CacheObject {
meta,
body: Arc::new(body),
body: Bytes::from(body),
};
cache.cached.write().insert(hash.clone(), cache_obj);

cache.cached.write().insert(hash, cache_obj);

assert!(cache.cached.read().contains_key(&hash));

Expand Down
17 changes: 9 additions & 8 deletions pingora-cache/src/variance.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{borrow::Cow, collections::BTreeMap};

use blake2::Digest;
use xxhash_rust::xxh3::Xxh3;

use crate::key::{Blake2b128, HashBinary};
use crate::key::HashBinary;

/// A builder for variance keys, used for distinguishing multiple cached assets
/// at the same URL. This is intended to be easily passed to helper functions,
Expand Down Expand Up @@ -44,14 +44,15 @@ impl<'a> VarianceBuilder<'a> {
pub fn finalize(self) -> Option<HashBinary> {
const SALT: &[u8; 1] = &[0u8; 1];
if self.has_variance() {
let mut hash = Blake2b128::new();
let mut hasher = Xxh3::new();
for (name, value) in self.values.iter() {
hash.update(name.as_bytes());
hash.update(SALT);
hash.update(value);
hash.update(SALT);
hasher.update(name.as_bytes());
hasher.update(SALT);
hasher.update(value);
hasher.update(SALT);
}
Some(hash.finalize().into())
let hash = hasher.digest128();
Some(hash.to_le_bytes())
} else {
None
}
Expand Down