Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion datasketches/src/frequencies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ mod serde;
mod serialization;
mod sketch;

pub use self::serde::ItemsSerde;
Copy link
Copy Markdown
Member Author

@tisonkun tisonkun Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To @PsiACE,

No need ItemsSerde if we already implement serde over concrete FrequentItemsSketch<i64> and FrequentItemsSketch<String>.

But the C++ impl does have a Serde abstraction you may investigate in https://github.com/apache/datasketches-cpp/blob/7bb979d3/common/include/serde.hpp

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @AlexanderSaydakov I'd appreciate it if you can share the story and usage of serde.hpp.

pub use self::sketch::ErrorType;
pub use self::sketch::FrequentItemsSketch;
pub use self::sketch::Row;
26 changes: 13 additions & 13 deletions datasketches/src/frequencies/reverse_purge_item_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(super) struct ReversePurgeItemHashMap<T> {
lg_length: u8,
load_threshold: usize,
keys: Vec<Option<T>>,
values: Vec<i64>,
values: Vec<u64>,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All offsets and counts are non-negative. Use u64 for all these places.

states: Vec<u16>,
num_active: usize,
}
Expand All @@ -59,7 +59,7 @@ impl<T: Eq + Hash> ReversePurgeItemHashMap<T> {
}

/// Returns the value for `key`, or zero if the key is not present.
pub fn get(&self, key: &T) -> i64 {
pub fn get(&self, key: &T) -> u64 {
let probe = self.hash_probe(key);
if self.states[probe] > 0 {
return self.values[probe];
Expand All @@ -68,7 +68,7 @@ impl<T: Eq + Hash> ReversePurgeItemHashMap<T> {
}

/// Adds `adjust_amount` to the value for `key`, inserting if absent.
pub fn adjust_or_put_value(&mut self, key: T, adjust_amount: i64) {
pub fn adjust_or_put_value(&mut self, key: T, adjust_amount: u64) {
let mask = self.keys.len() - 1;
let mut probe = (hash_item(&key) as usize) & mask;
let mut drift: usize = 1;
Expand All @@ -95,20 +95,20 @@ impl<T: Eq + Hash> ReversePurgeItemHashMap<T> {
}

/// Removes all keys with non-positive counts.
pub fn keep_only_positive_counts(&mut self) {
fn keep_only_positive_counts(&mut self) {
let len = self.keys.len();
let mut first_probe = len - 1;
while self.states[first_probe] > 0 {
first_probe -= 1;
}
for probe in (0..first_probe).rev() {
if self.states[probe] > 0 && self.values[probe] <= 0 {
if self.states[probe] > 0 && self.values[probe] == 0 {
self.hash_delete(probe);
self.num_active -= 1;
}
}
for probe in (first_probe..len).rev() {
if self.states[probe] > 0 && self.values[probe] <= 0 {
if self.states[probe] > 0 && self.values[probe] == 0 {
self.hash_delete(probe);
self.num_active -= 1;
}
Expand All @@ -118,16 +118,16 @@ impl<T: Eq + Hash> ReversePurgeItemHashMap<T> {
/// Shifts all values by `adjust_amount`.
///
/// This is used during purges to decrement counters.
pub fn adjust_all_values_by(&mut self, adjust_amount: i64) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revoke pub when it's only used in the same file. Less publicity, easier reasoning.

for value in &mut self.values {
*value += adjust_amount;
fn adjust_all_values_by(&mut self, adjust_amount: u64) {
for value in self.values.iter_mut() {
*value = value.saturating_sub(adjust_amount);
}
}

/// Purges the map by estimating the median count and removing non-positive entries.
///
/// Returns the estimated median value that was subtracted from all counts.
pub fn purge(&mut self, sample_size: usize) -> i64 {
pub fn purge(&mut self, sample_size: usize) -> u64 {
let limit = sample_size.min(self.num_active).min(MAX_SAMPLE_SIZE);
let mut samples = Vec::with_capacity(limit);
let mut i = 0usize;
Expand All @@ -140,7 +140,7 @@ impl<T: Eq + Hash> ReversePurgeItemHashMap<T> {
let mid = samples.len() / 2;
samples.select_nth_unstable(mid);
let median = samples[mid];
self.adjust_all_values_by(-median);
self.adjust_all_values_by(median);
self.keep_only_positive_counts();
median
}
Expand Down Expand Up @@ -206,7 +206,7 @@ impl<T: Eq + Hash> ReversePurgeItemHashMap<T> {
}

/// Returns the active values in the map.
pub fn active_values(&self) -> Vec<i64> {
pub fn active_values(&self) -> Vec<u64> {
if self.num_active == 0 {
return Vec::new();
}
Expand Down Expand Up @@ -292,7 +292,7 @@ impl<'a, T> ReversePurgeItemIter<'a, T> {
}

impl<'a, T> Iterator for ReversePurgeItemIter<'a, T> {
type Item = (&'a T, i64);
type Item = (&'a T, u64);

fn next(&mut self) -> Option<Self::Item> {
if self.count >= self.map.num_active {
Expand Down
10 changes: 0 additions & 10 deletions datasketches/src/frequencies/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@ use crate::error::Error;
use crate::frequencies::serialization::read_i64_le;
use crate::frequencies::serialization::read_u32_le;

/// Built-in serializers for frequency sketch items.
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ItemsSerde {
/// i64 items compatible with ArrayOfLongsSerDe in Java.
Int64,
/// UTF-8 strings compatible with ArrayOfStringsSerDe in Java.
String,
}

pub(crate) fn serialize_string_items(items: &[String]) -> Vec<u8> {
let total_len: usize = items.iter().map(|item| 4 + item.len()).sum();
let mut out = Vec::with_capacity(total_len);
Expand Down
21 changes: 18 additions & 3 deletions datasketches/src/frequencies/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub const STREAM_WEIGHT_LONG: usize = 16;
/// Offset of offset (fourth pre-long).
pub const OFFSET_LONG: usize = 24;

/// Read a u32 value from bytes at the given offset (little-endian).
/// Read an u32 value from bytes at the given offset (little-endian).
#[inline]
pub fn read_u32_le(bytes: &[u8], offset: usize) -> u32 {
u32::from_le_bytes([
Expand All @@ -76,14 +76,29 @@ pub fn read_i64_le(bytes: &[u8], offset: usize) -> i64 {
])
}

/// Read an u64 value from bytes at the given offset (little-endian).
#[inline]
pub fn read_u64_le(bytes: &[u8], offset: usize) -> u64 {
u64::from_le_bytes([
bytes[offset],
bytes[offset + 1],
bytes[offset + 2],
bytes[offset + 3],
bytes[offset + 4],
bytes[offset + 5],
bytes[offset + 6],
bytes[offset + 7],
])
}

/// Write a u32 value to bytes at the given offset (little-endian).
#[inline]
pub fn write_u32_le(bytes: &mut [u8], offset: usize, value: u32) {
bytes[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
}

/// Write an i64 value to bytes at the given offset (little-endian).
/// Write an u64 value to bytes at the given offset (little-endian).
#[inline]
pub fn write_i64_le(bytes: &mut [u8], offset: usize, value: i64) {
pub fn write_u64_le(bytes: &mut [u8], offset: usize, value: u64) {
bytes[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
}
75 changes: 26 additions & 49 deletions datasketches/src/frequencies/sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
use std::hash::Hash;

use crate::error::Error;
use crate::error::ErrorKind;
use crate::frequencies::reverse_purge_item_hash_map::ReversePurgeItemHashMap;
use crate::frequencies::serde::ItemsSerde;
use crate::frequencies::serde::deserialize_i64_items;
use crate::frequencies::serde::deserialize_string_items;
use crate::frequencies::serde::serialize_i64_items;
Expand Down Expand Up @@ -52,8 +50,8 @@ pub enum ErrorType {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Row<T> {
item: T,
estimate: i64,
upper_bound: i64,
estimate: u64,
upper_bound: u64,
lower_bound: u64,
}

Expand All @@ -64,12 +62,12 @@ impl<T> Row<T> {
}

/// Returns the estimated frequency.
pub fn estimate(&self) -> i64 {
pub fn estimate(&self) -> u64 {
self.estimate
}

/// Returns the upper bound for the frequency.
pub fn upper_bound(&self) -> i64 {
pub fn upper_bound(&self) -> u64 {
self.upper_bound
}

Expand All @@ -91,8 +89,8 @@ impl<T> Row<T> {
pub struct FrequentItemsSketch<T> {
lg_max_map_size: u8,
cur_map_cap: usize,
offset: i64,
stream_weight: i64,
offset: u64,
stream_weight: u64,
sample_size: usize,
hash_map: ReversePurgeItemHashMap<T>,
}
Expand Down Expand Up @@ -124,14 +122,14 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
/// Returns the total weight of the stream.
///
/// This is the sum of all counts passed to `update` and `update_with_count`.
pub fn total_weight(&self) -> i64 {
pub fn total_weight(&self) -> u64 {
self.stream_weight
}

/// Returns the estimated frequency for an item.
///
/// If the item is tracked, this is `item_count + offset`. Otherwise it is zero.
pub fn estimate(&self, item: &T) -> i64 {
pub fn estimate(&self, item: &T) -> u64 {
let value = self.hash_map.get(item);
if value > 0 { value + self.offset } else { 0 }
}
Expand All @@ -141,15 +139,14 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
/// This value is never negative and is guaranteed to be no larger than the true frequency.
/// If the item is not tracked, the lower bound is zero.
pub fn lower_bound(&self, item: &T) -> u64 {
let value = self.hash_map.get(item);
value.max(0) as u64
self.hash_map.get(item)
}

/// Returns the guaranteed upper bound frequency for an item.
///
/// This value is guaranteed to be no smaller than the true frequency.
/// If the item is tracked, this is `item_count + offset`.
pub fn upper_bound(&self, item: &T) -> i64 {
pub fn upper_bound(&self, item: &T) -> u64 {
self.hash_map.get(item) + self.offset
}

Expand All @@ -158,7 +155,7 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
///
/// This is equivalent to the maximum distance between the upper bound and the lower bound
/// for any item.
pub fn maximum_error(&self) -> i64 {
pub fn maximum_error(&self) -> u64 {
self.offset
}

Expand Down Expand Up @@ -208,12 +205,8 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {

/// Updates the sketch with an item and count.
///
/// # Panics
///
/// Panics if `count` is negative.
///
/// A count of zero is a no-op.
pub fn update_with_count(&mut self, item: T, count: i64) {
pub fn update_with_count(&mut self, item: T, count: u64) {
if count == 0 {
return;
}
Expand Down Expand Up @@ -266,7 +259,7 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
pub fn frequent_items_with_threshold(
&self,
error_type: ErrorType,
threshold: i64,
threshold: u64,
) -> Vec<Row<T>>
where
T: Clone,
Expand All @@ -285,7 +278,7 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
item: item.clone(),
estimate: upper,
upper_bound: upper,
lower_bound: lower.max(0) as u64,
lower_bound: lower,
});
}
}
Expand Down Expand Up @@ -357,12 +350,12 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
out[LG_CUR_MAP_SIZE_BYTE] = self.hash_map.lg_length();
out[FLAGS_BYTE] = 0;
write_u32_le(&mut out, ACTIVE_ITEMS_INT, active_items as u32);
write_i64_le(&mut out, STREAM_WEIGHT_LONG, self.stream_weight);
write_i64_le(&mut out, OFFSET_LONG, self.offset);
write_u64_le(&mut out, STREAM_WEIGHT_LONG, self.stream_weight);
write_u64_le(&mut out, OFFSET_LONG, self.offset);

let mut offset = PREAMBLE_LONGS_NONEMPTY as usize * 8;
for value in values {
write_i64_le(&mut out, offset, value);
write_u64_le(&mut out, offset, value);
offset += 8;
}
out[offset..offset + items_bytes.len()].copy_from_slice(&items_bytes);
Expand Down Expand Up @@ -415,8 +408,8 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
return Err(Error::insufficient_data("full preamble"));
}
let active_items = read_u32_le(bytes, ACTIVE_ITEMS_INT) as usize;
let stream_weight = read_i64_le(bytes, STREAM_WEIGHT_LONG);
let offset_val = read_i64_le(bytes, OFFSET_LONG);
let stream_weight = read_u64_le(bytes, STREAM_WEIGHT_LONG);
let offset_val = read_u64_le(bytes, OFFSET_LONG);
let values_offset = PREAMBLE_LONGS_NONEMPTY as usize * 8;
let values_bytes = active_items
.checked_mul(8)
Expand All @@ -427,7 +420,7 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
}
let mut values = Vec::with_capacity(active_items);
for i in 0..active_items {
values.push(read_i64_le(bytes, values_offset + i * 8));
values.push(read_u64_le(bytes, values_offset + i * 8));
}
let (items, consumed) = deserialize_items(&bytes[items_offset..], active_items)?;
if items.len() != active_items {
Expand All @@ -454,17 +447,9 @@ impl FrequentItemsSketch<i64> {
self.serialize_inner(serialize_i64_items)
}

/// Deserializes a sketch from bytes using the selected serializer.
///
/// Returns an error if `serde` does not match the sketch item type.
pub fn deserialize(bytes: &[u8], serde: ItemsSerde) -> Result<Self, Error> {
match serde {
ItemsSerde::Int64 => Self::deserialize_inner(bytes, deserialize_i64_items),
ItemsSerde::String => Err(Error::new(
ErrorKind::InvalidArgument,
"ItemsSerde::String cannot deserialize i64 items",
)),
}
/// Deserializes a sketch from bytes.
pub fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
Self::deserialize_inner(bytes, deserialize_i64_items)
}
}

Expand All @@ -474,17 +459,9 @@ impl FrequentItemsSketch<String> {
self.serialize_inner(serialize_string_items)
}

/// Deserializes a sketch from bytes using the selected serializer.
///
/// Returns an error if `serde` does not match the sketch item type.
pub fn deserialize(bytes: &[u8], serde: ItemsSerde) -> Result<Self, Error> {
match serde {
ItemsSerde::String => Self::deserialize_inner(bytes, deserialize_string_items),
ItemsSerde::Int64 => Err(Error::new(
ErrorKind::InvalidArgument,
"ItemsSerde::Int64 cannot deserialize String items",
)),
}
/// Deserializes a sketch from bytes.
pub fn deserialize(bytes: &[u8]) -> Result<Self, Error> {
Self::deserialize_inner(bytes, deserialize_string_items)
}
}

Expand Down
3 changes: 3 additions & 0 deletions datasketches/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ pub mod theta;

mod codec;
mod hash;
mod resize;

pub use self::resize::ResizeFactor;
Loading