diff --git a/src/commands/cmd_cuckoo_filter.cc b/src/commands/cmd_cuckoo_filter.cc new file mode 100644 index 00000000000..2002912f94d --- /dev/null +++ b/src/commands/cmd_cuckoo_filter.cc @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "command_parser.h" +#include "commander.h" +#include "error_constants.h" +#include "server/server.h" +#include "types/redis_cuckoo_chain.h" + +namespace redis { + +class CommandCFReserve : public Commander { + public: + Status Parse(const std::vector &args) override { + // CF.RESERVE key capacity [BUCKETSIZE bs] [MAXITERATIONS mi] [EXPANSION ex] + if (args.size() < 3) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + // Parse capacity (required) + auto parse_capacity = ParseInt(args[2], 10); + if (!parse_capacity) { + return {Status::RedisParseErr, "invalid capacity"}; + } + capacity_ = *parse_capacity; + if (capacity_ <= 0) { + return {Status::RedisParseErr, "capacity must be larger than 0"}; + } + + // Parse optional parameters + CommandParser parser(args, 3); + while (parser.Good()) { + if (parser.EatEqICase("BUCKETSIZE")) { + auto parse_bucket_size = parser.TakeInt(); + if (!parse_bucket_size.IsOK()) { + return {Status::RedisParseErr, "invalid bucket size"}; + } + bucket_size_ = parse_bucket_size.GetValue(); + if (bucket_size_ == 0 || bucket_size_ > 255) { + return {Status::RedisParseErr, "bucket size must be between 1 and 255"}; + } + } else if (parser.EatEqICase("MAXITERATIONS")) { + auto parse_max_iterations = parser.TakeInt(); + if (!parse_max_iterations.IsOK()) { + return {Status::RedisParseErr, "invalid max iterations"}; + } + max_iterations_ = parse_max_iterations.GetValue(); + if (max_iterations_ == 0) { + return {Status::RedisParseErr, "max iterations must be larger than 0"}; + } + } else if (parser.EatEqICase("EXPANSION")) { + auto parse_expansion = parser.TakeInt(); + if (!parse_expansion.IsOK()) { + return {Status::RedisParseErr, "invalid expansion factor"}; + } + expansion_ = parse_expansion.GetValue(); + if (expansion_ > kCFMaxExpansion) { + return {Status::RedisParseErr, "expansion must be between 0 and 32768"}; + } + } else { + return {Status::RedisParseErr, errInvalidSyntax}; + } + } + + return Commander::Parse(args); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace()); + auto s = cuckoo_db.Reserve(ctx, args_[1], capacity_, bucket_size_, max_iterations_, expansion_, + kCuckooFilterDefaultPageSize); + + if (!s.ok()) { + if (s.IsInvalidArgument()) { + // Return error message to client + return {Status::RedisExecErr, s.ToString()}; + } + return {Status::RedisExecErr, "failed to create cuckoo filter"}; + } + + *output = redis::SimpleString("OK"); + return Status::OK(); + } + + private: + uint64_t capacity_ = kCFDefaultCapacity; + uint8_t bucket_size_ = kCFDefaultBucketSize; + uint16_t max_iterations_ = kCFDefaultMaxIterations; + uint16_t expansion_ = kCFDefaultExpansion; +}; + +class CommandCFAdd : public Commander { + public: + Status Parse(const std::vector &args) override { + // CF.ADD key item + if (args.size() != 3) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + return Commander::Parse(args); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace()); + bool added = false; + auto s = cuckoo_db.Add(ctx, args_[1], args_[2], &added); + + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + // Duplicate items are allowed, so successful insertions return 1. + *output = redis::Integer(added ? 1 : 0); + return Status::OK(); + } +}; + +// Register the CF.RESERVE and CF.ADD commands +REDIS_REGISTER_COMMANDS(CuckooFilter, MakeCmdAttr("cf.reserve", -3, "write", 1, 1, 1), + MakeCmdAttr("cf.add", 3, "write", 1, 1, 1)) + +} // namespace redis diff --git a/src/commands/commander.h b/src/commands/commander.h index 3f38db02580..cee52bbdba3 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -96,6 +96,7 @@ enum class CommandCategory : uint8_t { Bit, BloomFilter, Cluster, + CuckooFilter, Function, Geo, Hash, diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index d62e566ff62..2aecfdab53b 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -335,7 +335,7 @@ bool Metadata::IsSingleKVType() const { return Type() == kRedisString || Type() bool Metadata::IsEmptyableType() const { return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog || - Type() == kRedisTDigest || Type() == kRedisTimeSeries; + Type() == kRedisTDigest || Type() == kRedisTimeSeries || Type() == kRedisCuckooFilter; } bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); } @@ -644,3 +644,50 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) { return rocksdb::Status::OK(); } + +void CuckooChainMetadata::Encode(std::string *dst) const { + Metadata::Encode(dst); + + PutFixed16(dst, n_filters); + PutFixed16(dst, expansion); + PutFixed64(dst, base_capacity); + PutFixed8(dst, bucket_size); + PutFixed16(dst, max_iterations); + PutFixed64(dst, num_deleted_items); + PutFixed32(dst, page_size); +} + +rocksdb::Status CuckooChainMetadata::Decode(Slice *input) { + if (auto s = Metadata::Decode(input); !s.ok()) { + return s; + } + + if (input->size() < sizeof(uint16_t) * 3 + sizeof(uint64_t) * 2 + sizeof(uint32_t) + sizeof(uint8_t)) { + return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + } + + GetFixed16(input, &n_filters); + GetFixed16(input, &expansion); + GetFixed64(input, &base_capacity); + GetFixed8(input, &bucket_size); + GetFixed16(input, &max_iterations); + GetFixed64(input, &num_deleted_items); + GetFixed32(input, &page_size); + + return rocksdb::Status::OK(); +} + +uint64_t CuckooChainMetadata::GetTotalCapacity() const { + if (expansion == 0 || n_filters == 1) { + return base_capacity; + } + + // Calculate total capacity across all filters + uint64_t total = 0; + uint64_t filter_capacity = base_capacity; + for (uint16_t i = 0; i < n_filters; i++) { + total += filter_capacity; + filter_capacity *= expansion; + } + return total; +} diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index abfc6098917..5ad5677a1db 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -54,12 +54,13 @@ enum RedisType : uint8_t { kRedisHyperLogLog = 11, kRedisTDigest = 12, kRedisTimeSeries = 13, + kRedisCuckooFilter = 14, kRedisTypeMax }; inline constexpr const std::array RedisTypeNames = { - "none", "string", "hash", "list", "set", "zset", "bitmap", - "sortedint", "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries"}; + "none", "string", "hash", "list", "set", "zset", "bitmap", "sortedint", + "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries", "MBbloomCF"}; struct RedisTypes { RedisTypes(std::initializer_list list) { @@ -334,6 +335,50 @@ class BloomChainMetadata : public Metadata { bool IsScaling() const { return expansion != 0; }; }; +constexpr uint32_t kCuckooFilterDefaultPageSize = 2048; // bytes + +class CuckooChainMetadata : public Metadata { + public: + /// The number of sub-filters in the chain + uint16_t n_filters; + + /// Expansion factor for new filters + /// When a filter is full, a new one is created with capacity = base_capacity * expansion^n + uint16_t expansion; + + /// The capacity of the first filter. + uint64_t base_capacity; + + /// Number of fingerprints per bucket + uint8_t bucket_size; + + /// Maximum number of cuckoo kicks before considering filter full + uint16_t max_iterations; + + /// Track number of deleted items for maintenance + uint64_t num_deleted_items; + + /// Target maximum payload size for each persisted Cuckoo Filter page, in bytes + uint32_t page_size; + + explicit CuckooChainMetadata(bool generate_version = true) + : Metadata(kRedisCuckooFilter, generate_version), + n_filters(0), + expansion(0), + base_capacity(0), + bucket_size(0), + max_iterations(0), + num_deleted_items(0), + page_size(kCuckooFilterDefaultPageSize) {} + + void Encode(std::string *dst) const override; + using Metadata::Decode; + rocksdb::Status Decode(Slice *input) override; + + uint64_t GetTotalCapacity() const; + bool IsScaling() const { return expansion > 0; } +}; + enum class JsonStorageFormat : uint8_t { JSON = 0, CBOR = 1, diff --git a/src/types/cuckoo_filter.h b/src/types/cuckoo_filter.h new file mode 100644 index 00000000000..8d35af64f06 --- /dev/null +++ b/src/types/cuckoo_filter.h @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include "vendor/murmurhash2.h" + +namespace redis { + +constexpr long double kCuckooFilterLoadFactor = 0.955L; +constexpr uint64_t kCuckooFilterMaxSupportedBuckets = std::numeric_limits::max() / 2 + 1ULL; +constexpr uint64_t kCuckooFilterFingerprintModulus = 255; +constexpr uint64_t kCuckooFilterAltHashMultiplier = 0x5bd1e995ULL; + +// Cuckoo filter implementation from the paper: +// "Cuckoo Filter: Practically Better Than Bloom" by Fan et al. +// Buckets are grouped into page values in RocksDB. The Cuckoo algorithm still +// works with logical bucket indexes, while the storage layer maps buckets to pages. +// +// Hash calculation follows RedisBloom's design: +// - fp = hash % 255 + 1 (fingerprint, non-zero, range: 1-255) +// - h1 = hash (primary hash) +// - h2 = h1 ^ (fp * 0x5bd1e995) (alternate hash via XOR) +// - bucket_index = hash % num_buckets (only apply modulo when indexing) +class CuckooFilterHelper { + public: + static bool IsCapacitySupported(uint64_t capacity, uint8_t bucket_size) { + uint32_t num_buckets = 0; + return CalculateRequiredBuckets(capacity, bucket_size, &num_buckets).ok(); + } + + // Returns the power-of-two bucket count required for the requested capacity. + static rocksdb::Status CalculateRequiredBuckets(uint64_t capacity, uint8_t bucket_size, uint32_t *num_buckets) { + if (bucket_size == 0) { + return rocksdb::Status::InvalidArgument("bucket_size must be larger than 0"); + } + + auto max_supported_capacity = + static_cast(kCuckooFilterMaxSupportedBuckets * bucket_size * kCuckooFilterLoadFactor); + if (capacity > max_supported_capacity) { + return rocksdb::Status::InvalidArgument("capacity is too large"); + } + + auto exact_buckets = static_cast(capacity) / bucket_size / kCuckooFilterLoadFactor; + auto required_buckets = static_cast(exact_buckets); + if (static_cast(required_buckets) < exact_buckets) required_buckets++; + if (required_buckets == 0) required_buckets = 1; + + // Round up to next power of 2 for better hash distribution. + uint32_t power = 1; + while (power < required_buckets) power <<= 1; + *num_buckets = power; + return rocksdb::Status::OK(); + } + + // Following RedisBloom: fp = hash % 255 + 1. + static uint8_t GenerateFingerprint(uint64_t hash) { + return static_cast(hash % kCuckooFilterFingerprintModulus + 1); + } + + // Calculate alternate hash using XOR (following RedisBloom) + // h2 = h1 ^ (fp * kCuckooFilterAltHashMultiplier) + // This preserves symmetry: GetAltHash(fp, GetAltHash(fp, h)) == h + static uint64_t GetAltHash(uint8_t fingerprint, uint64_t hash) { + return hash ^ (static_cast(fingerprint) * kCuckooFilterAltHashMultiplier); + } + + // Calculate an alternate bucket from a bucket index and fingerprint. + static uint32_t GetAltBucketIndex(uint32_t bucket_idx, uint8_t fingerprint, uint32_t num_buckets) { + uint64_t hash = bucket_idx; + uint64_t alt_hash = GetAltHash(fingerprint, hash); + return static_cast(alt_hash % num_buckets); + } + + // Compute hash for a given item using MurmurHash2 (compatible with RedisBloom). + static uint64_t Hash(const char *data, size_t length) { return HllMurMurHash64A(data, static_cast(length), 0); } + + // Convenience overload for std::string + static uint64_t Hash(const std::string &item) { return Hash(item.data(), item.size()); } + + // Calculate the capacity of a sub-filter at a given index in the chain. + // Returns false if overflow would occur. + static bool CalculateFilterCapacity(uint64_t base_capacity, uint16_t expansion, uint16_t filter_index, + uint64_t *filter_capacity) { + uint64_t capacity = base_capacity; + for (uint16_t i = 0; i < filter_index; ++i) { + if (expansion != 0 && capacity > std::numeric_limits::max() / expansion) return false; + capacity *= expansion; + } + *filter_capacity = capacity; + return true; + } + + // Calculate the number of buckets for a sub-filter at a given index. + static rocksdb::Status GetFilterNumBuckets(uint64_t base_capacity, uint16_t expansion, uint8_t bucket_size, + uint16_t filter_index, uint32_t *num_buckets) { + uint64_t filter_capacity = 0; + if (!CalculateFilterCapacity(base_capacity, expansion, filter_index, &filter_capacity)) { + return rocksdb::Status::Corruption("invalid metadata: filter capacity is too large"); + } + + auto s = CalculateRequiredBuckets(filter_capacity, bucket_size, num_buckets); + if (!s.ok()) return rocksdb::Status::Corruption("invalid metadata: filter capacity is too large"); + return rocksdb::Status::OK(); + } +}; + +} // namespace redis diff --git a/src/types/cuckoo_filter_page.cc b/src/types/cuckoo_filter_page.cc new file mode 100644 index 00000000000..1ef4a4c803d --- /dev/null +++ b/src/types/cuckoo_filter_page.cc @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "cuckoo_filter_page.h" + +#include + +#include "common/encoding.h" +#include "storage/redis_db.h" + +namespace redis { + +namespace { + +uint32_t GetBucketsPerPage(uint32_t page_size, uint8_t bucket_size) { + return std::max(1, page_size / bucket_size); +} + +uint32_t GetPageIndex(uint32_t bucket_index, uint32_t buckets_per_page) { return bucket_index / buckets_per_page; } + +uint32_t GetBucketOffset(uint32_t bucket_index, uint32_t buckets_per_page, uint8_t bucket_size) { + return (bucket_index % buckets_per_page) * bucket_size; +} + +uint32_t GetExpectedPageSize(uint32_t page_index, uint32_t num_buckets, uint32_t buckets_per_page, + uint8_t bucket_size) { + uint32_t first_bucket = page_index * buckets_per_page; + uint32_t page_bucket_count = std::min(buckets_per_page, num_buckets - first_bucket); + return page_bucket_count * bucket_size; +} + +std::string GetCuckooPageKey(const Slice &ns_key, uint64_t version, bool slot_id_encoded, uint16_t filter_index, + uint32_t page_index) { + std::string sub_key; + PutFixed16(&sub_key, filter_index); + PutFixed32(&sub_key, page_index); + return InternalKey(ns_key, sub_key, version, slot_id_encoded).Encode(); +} + +} // namespace + +CuckooPageCache::CuckooPageCache(engine::Storage *storage, engine::Context &ctx, const Slice &ns_key, + bool slot_id_encoded, uint64_t version, uint8_t bucket_size, uint32_t page_size) + : storage_(storage), + ctx_(ctx), + ns_key_(ns_key.ToString()), + slot_id_encoded_(slot_id_encoded), + version_(version), + bucket_size_(bucket_size), + page_size_(page_size) {} + +rocksdb::Status CuckooPageCache::PrefetchBuckets(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket1_index, + uint32_t bucket2_index) { + BucketLocation location1, location2; + auto s = resolveBucketLocation(filter_index, num_buckets, bucket1_index, &location1); + if (!s.ok()) return s; + s = resolveBucketLocation(filter_index, num_buckets, bucket2_index, &location2); + if (!s.ok()) return s; + + std::vector missing_locations; + if (pages_.find(location1.page_key) == pages_.end()) missing_locations.push_back(location1); + if (location2.page_key != location1.page_key && pages_.find(location2.page_key) == pages_.end()) { + missing_locations.push_back(location2); + } + return loadPages(missing_locations); +} + +rocksdb::Status CuckooPageCache::TryInsertInBucket(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index, + uint8_t fingerprint, bool *inserted) { + *inserted = false; + BucketRef bucket; + auto s = ensureBucketLoaded(filter_index, num_buckets, bucket_index, &bucket); + if (!s.ok()) return s; + + size_t slot_idx = 0; + *inserted = tryInsertInBucketRef(bucket, fingerprint, &slot_idx); + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooPageCache::GetBucketSlot(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index, + uint32_t slot_idx, uint8_t *fingerprint) { + if (slot_idx >= bucket_size_) return rocksdb::Status::InvalidArgument("invalid cuckoo filter bucket slot"); + + BucketRef bucket; + auto s = ensureBucketLoaded(filter_index, num_buckets, bucket_index, &bucket); + if (!s.ok()) return s; + *fingerprint = getBucketRefSlot(bucket, slot_idx); + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooPageCache::SetBucketSlot(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index, + uint32_t slot_idx, uint8_t fingerprint) { + if (slot_idx >= bucket_size_) return rocksdb::Status::InvalidArgument("invalid cuckoo filter bucket slot"); + + BucketRef bucket; + auto s = ensureBucketLoaded(filter_index, num_buckets, bucket_index, &bucket); + if (!s.ok()) return s; + setBucketRefSlot(bucket, slot_idx, fingerprint); + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooPageCache::WriteBackDirtyPages(rocksdb::WriteBatchBase *batch) { + for (const auto &entry : pages_) { + if (!entry.second.is_dirty) continue; + auto s = batch->Put(entry.first, entry.second.data); + if (!s.ok()) return s; + } + return rocksdb::Status::OK(); +} + +void CuckooPageCache::DiscardCachedPages() { pages_.clear(); } + +rocksdb::Status CuckooPageCache::resolveBucketLocation(uint16_t filter_index, uint32_t num_buckets, + uint32_t bucket_index, BucketLocation *location) const { + if (bucket_size_ == 0 || num_buckets == 0 || bucket_index >= num_buckets) { + return rocksdb::Status::Corruption("invalid cuckoo filter bucket location"); + } + + uint32_t buckets_per_page = GetBucketsPerPage(page_size_, bucket_size_); + uint32_t page_index = GetPageIndex(bucket_index, buckets_per_page); + location->page_key = GetCuckooPageKey(ns_key_, version_, slot_id_encoded_, filter_index, page_index); + location->offset = GetBucketOffset(bucket_index, buckets_per_page, bucket_size_); + location->expected_page_size = GetExpectedPageSize(page_index, num_buckets, buckets_per_page, bucket_size_); + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooPageCache::ensureBucketLoaded(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index, + BucketRef *bucket) { + BucketLocation location; + auto s = resolveBucketLocation(filter_index, num_buckets, bucket_index, &location); + if (!s.ok()) return s; + + PageEntry *page = nullptr; + s = loadPage(location, &page); + if (!s.ok()) return s; + + bucket->page = page; + bucket->offset = location.offset; + bucket->size = bucket_size_; + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooPageCache::loadPage(const BucketLocation &location, PageEntry **page) { + auto iter = pages_.find(location.page_key); + if (iter != pages_.end()) { + *page = &iter->second; + return rocksdb::Status::OK(); + } + + PageEntry page_entry; + auto s = storage_->Get(ctx_, ctx_.GetReadOptions(), location.page_key, &page_entry.data); + if (!s.ok() && !s.IsNotFound()) return s; + s = normalizePage(s, location.expected_page_size, &page_entry); + if (!s.ok()) return s; + + auto result = pages_.emplace(location.page_key, std::move(page_entry)); + *page = &result.first->second; + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooPageCache::loadPages(const std::vector &locations) { + if (locations.empty()) return rocksdb::Status::OK(); + if (locations.size() == 1) { + PageEntry *page = nullptr; + return loadPage(locations[0], &page); + } + + std::vector keys; + keys.reserve(locations.size()); + for (const auto &location : locations) keys.emplace_back(location.page_key); + + std::vector values(locations.size()); + std::vector statuses(locations.size()); + storage_->MultiGet(ctx_, ctx_.DefaultMultiGetOptions(), storage_->GetDB()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + + for (size_t i = 0; i < locations.size(); ++i) { + PageEntry page_entry; + if (statuses[i].ok()) page_entry.data.assign(values[i].data(), values[i].size()); + auto s = normalizePage(statuses[i], locations[i].expected_page_size, &page_entry); + if (!s.ok()) return s; + pages_.emplace(locations[i].page_key, std::move(page_entry)); + } + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooPageCache::normalizePage(const rocksdb::Status &status, uint32_t expected_size, PageEntry *page) { + if (!status.ok() && !status.IsNotFound()) return status; + if (status.IsNotFound()) { + page->data.assign(expected_size, 0); + return rocksdb::Status::OK(); + } + if (page->data.size() > expected_size) return rocksdb::Status::Corruption("invalid cuckoo filter page size"); + if (page->data.size() < expected_size) return rocksdb::Status::Corruption("invalid cuckoo filter page size"); + return rocksdb::Status::OK(); +} + +bool CuckooPageCache::tryInsertInBucketRef(const BucketRef &bucket, uint8_t fingerprint, size_t *slot_idx) { + for (size_t i = 0; i < bucket.size; ++i) { + size_t offset = bucket.offset + i; + if (static_cast(bucket.page->data[offset]) == 0) { + bucket.page->data[offset] = static_cast(fingerprint); + bucket.page->is_dirty = true; + *slot_idx = i; + return true; + } + } + return false; +} + +uint8_t CuckooPageCache::getBucketRefSlot(const BucketRef &bucket, uint32_t slot_idx) { + return static_cast(bucket.page->data[bucket.offset + slot_idx]); +} + +void CuckooPageCache::setBucketRefSlot(const BucketRef &bucket, uint32_t slot_idx, uint8_t fingerprint) { + bucket.page->data[bucket.offset + slot_idx] = static_cast(fingerprint); + bucket.page->is_dirty = true; +} + +} // namespace redis diff --git a/src/types/cuckoo_filter_page.h b/src/types/cuckoo_filter_page.h new file mode 100644 index 00000000000..ff2efeb50c7 --- /dev/null +++ b/src/types/cuckoo_filter_page.h @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include "storage/storage.h" + +namespace redis { + +class CuckooPageCache { + public: + CuckooPageCache(engine::Storage *storage, engine::Context &ctx, const Slice &ns_key, bool slot_id_encoded, + uint64_t version, uint8_t bucket_size, uint32_t page_size); + + rocksdb::Status PrefetchBuckets(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket1_index, + uint32_t bucket2_index); + rocksdb::Status TryInsertInBucket(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index, + uint8_t fingerprint, bool *inserted); + rocksdb::Status GetBucketSlot(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index, uint32_t slot_idx, + uint8_t *fingerprint); + rocksdb::Status SetBucketSlot(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index, uint32_t slot_idx, + uint8_t fingerprint); + rocksdb::Status WriteBackDirtyPages(rocksdb::WriteBatchBase *batch); + + void DiscardCachedPages(); + + private: + struct PageEntry { + std::string data; + bool is_dirty = false; + }; + + struct BucketRef { + PageEntry *page = nullptr; + uint32_t offset = 0; + uint8_t size = 0; + }; + + struct BucketLocation { + std::string page_key; + uint32_t offset = 0; + uint32_t expected_page_size = 0; + }; + + rocksdb::Status resolveBucketLocation(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index, + BucketLocation *location) const; + rocksdb::Status ensureBucketLoaded(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index, + BucketRef *bucket); + rocksdb::Status loadPage(const BucketLocation &location, PageEntry **page); + rocksdb::Status loadPages(const std::vector &locations); + static rocksdb::Status normalizePage(const rocksdb::Status &status, uint32_t expected_size, PageEntry *page); + + static bool tryInsertInBucketRef(const BucketRef &bucket, uint8_t fingerprint, size_t *slot_idx); + static uint8_t getBucketRefSlot(const BucketRef &bucket, uint32_t slot_idx); + static void setBucketRefSlot(const BucketRef &bucket, uint32_t slot_idx, uint8_t fingerprint); + + engine::Storage *storage_ = nullptr; + engine::Context &ctx_; + std::string ns_key_; + bool slot_id_encoded_ = false; + uint64_t version_ = 0; + uint8_t bucket_size_ = 0; + uint32_t page_size_ = 0; + // Maps encoded cuckoo page keys to cached page entries. + std::unordered_map pages_; +}; + +} // namespace redis diff --git a/src/types/cuckoo_filter_sub_filter.cc b/src/types/cuckoo_filter_sub_filter.cc new file mode 100644 index 00000000000..0a0748cc354 --- /dev/null +++ b/src/types/cuckoo_filter_sub_filter.cc @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "cuckoo_filter_sub_filter.h" + +#include "cuckoo_filter.h" + +namespace redis { + +CuckooSubFilter::CuckooSubFilter(engine::Storage *storage, engine::Context &ctx, const Slice &ns_key, + bool slot_id_encoded, uint64_t version, uint8_t bucket_size, uint32_t page_size, + uint16_t filter_index, uint32_t num_buckets) + : bucket_size_(bucket_size), + filter_index_(filter_index), + num_buckets_(num_buckets), + pages_(storage, ctx, ns_key, slot_id_encoded, version, bucket_size, page_size) {} + +rocksdb::Status CuckooSubFilter::TryInsert(uint64_t hash, uint8_t fingerprint, bool *inserted) { + *inserted = false; + uint32_t bucket1_idx = getPrimaryBucketIndex(hash); + uint32_t bucket2_idx = getSecondaryBucketIndex(hash, fingerprint); + auto s = pages_.PrefetchBuckets(filter_index_, num_buckets_, bucket1_idx, bucket2_idx); + if (!s.ok()) return s; + + s = pages_.TryInsertInBucket(filter_index_, num_buckets_, bucket1_idx, fingerprint, inserted); + if (!s.ok() || *inserted || bucket1_idx == bucket2_idx) return s; + + return pages_.TryInsertInBucket(filter_index_, num_buckets_, bucket2_idx, fingerprint, inserted); +} + +rocksdb::Status CuckooSubFilter::TryKickOutInsert(uint64_t hash, uint8_t fingerprint, uint16_t max_iterations, + bool *inserted) { + *inserted = false; + + uint32_t current_bucket_idx = getPrimaryBucketIndex(hash); + uint8_t current_fp = fingerprint; + uint32_t victim_slot = 0; + + for (uint16_t iteration = 0; iteration < max_iterations; ++iteration) { + uint8_t old_fp = 0; + auto s = pages_.GetBucketSlot(filter_index_, num_buckets_, current_bucket_idx, victim_slot, &old_fp); + if (!s.ok()) { + pages_.DiscardCachedPages(); + return s; + } + s = pages_.SetBucketSlot(filter_index_, num_buckets_, current_bucket_idx, victim_slot, current_fp); + if (!s.ok()) { + pages_.DiscardCachedPages(); + return s; + } + current_fp = old_fp; + + if (current_fp == 0) { + *inserted = true; + return rocksdb::Status::OK(); + } + + uint32_t alt_bucket_idx = CuckooFilterHelper::GetAltBucketIndex(current_bucket_idx, current_fp, num_buckets_); + + bool inserted_in_alt_bucket = false; + s = pages_.TryInsertInBucket(filter_index_, num_buckets_, alt_bucket_idx, current_fp, &inserted_in_alt_bucket); + if (!s.ok()) { + pages_.DiscardCachedPages(); + return s; + } + if (inserted_in_alt_bucket) { + *inserted = true; + return rocksdb::Status::OK(); + } + + current_bucket_idx = alt_bucket_idx; + victim_slot = (victim_slot + 1) % bucket_size_; + } + + pages_.DiscardCachedPages(); + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooSubFilter::WriteToBatch(rocksdb::WriteBatchBase *batch) { + return pages_.WriteBackDirtyPages(batch); +} + +uint32_t CuckooSubFilter::getPrimaryBucketIndex(uint64_t hash) const { return hash % num_buckets_; } + +uint32_t CuckooSubFilter::getSecondaryBucketIndex(uint64_t hash, uint8_t fingerprint) const { + return CuckooFilterHelper::GetAltHash(fingerprint, hash) % num_buckets_; +} + +} // namespace redis diff --git a/src/types/cuckoo_filter_sub_filter.h b/src/types/cuckoo_filter_sub_filter.h new file mode 100644 index 00000000000..2bd26df8541 --- /dev/null +++ b/src/types/cuckoo_filter_sub_filter.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include + +#include + +#include "cuckoo_filter_page.h" + +namespace redis { + +class CuckooSubFilter { + public: + CuckooSubFilter(engine::Storage *storage, engine::Context &ctx, const Slice &ns_key, bool slot_id_encoded, + uint64_t version, uint8_t bucket_size, uint32_t page_size, uint16_t filter_index, + uint32_t num_buckets); + + uint16_t Index() const { return filter_index_; } + uint32_t NumBuckets() const { return num_buckets_; } + + rocksdb::Status TryInsert(uint64_t hash, uint8_t fingerprint, bool *inserted); + // Performs speculative kick-out mutations in the page cache. On success, dirty pages remain staged for + // WriteToBatch(); on inserted=false or non-OK status, cached pages are discarded before returning. + rocksdb::Status TryKickOutInsert(uint64_t hash, uint8_t fingerprint, uint16_t max_iterations, bool *inserted); + rocksdb::Status WriteToBatch(rocksdb::WriteBatchBase *batch); + + private: + uint32_t getPrimaryBucketIndex(uint64_t hash) const; + uint32_t getSecondaryBucketIndex(uint64_t hash, uint8_t fingerprint) const; + + uint8_t bucket_size_ = 0; + uint16_t filter_index_ = 0; + uint32_t num_buckets_ = 0; + CuckooPageCache pages_; +}; + +} // namespace redis diff --git a/src/types/redis_cuckoo_chain.cc b/src/types/redis_cuckoo_chain.cc new file mode 100644 index 00000000000..2cadbc550e4 --- /dev/null +++ b/src/types/redis_cuckoo_chain.cc @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "redis_cuckoo_chain.h" + +#include "cuckoo_filter.h" +#include "cuckoo_filter_sub_filter.h" +#include "logging.h" + +namespace redis { + +rocksdb::Status CuckooChain::getCuckooChainMetadata(engine::Context &ctx, const Slice &ns_key, + CuckooChainMetadata *metadata) { + return Database::GetMetadata(ctx, {kRedisCuckooFilter}, ns_key, metadata); +} + +rocksdb::Status CuckooChain::validateMetadata(const CuckooChainMetadata &metadata) { + if (metadata.n_filters == 0) { + return rocksdb::Status::Corruption("invalid metadata: n_filters is 0"); + } + if (metadata.base_capacity == 0) { + return rocksdb::Status::Corruption("invalid metadata: base_capacity is 0"); + } + if (metadata.bucket_size == 0) { + return rocksdb::Status::Corruption("invalid metadata: bucket_size is 0"); + } + if (metadata.max_iterations == 0) { + return rocksdb::Status::Corruption("invalid metadata: max_iterations is 0"); + } + if (metadata.page_size < metadata.bucket_size) { + return rocksdb::Status::Corruption("invalid metadata: page_size is smaller than bucket_size"); + } + if (!CuckooFilterHelper::IsCapacitySupported(metadata.base_capacity, metadata.bucket_size)) { + return rocksdb::Status::Corruption("invalid metadata: base_capacity is too large"); + } + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooChain::Reserve(engine::Context &ctx, const Slice &user_key, uint64_t capacity, + uint8_t bucket_size, uint16_t max_iterations, uint16_t expansion, + uint32_t page_size) { + if (capacity == 0) { + return rocksdb::Status::InvalidArgument("capacity must be larger than 0"); + } + + // RedisBloom requires minimum capacity to ensure at least one bucket can be created + // With load factor 0.955, capacity=1 and bucket_size=4 results in 0 buckets + if (capacity < 2) { + return rocksdb::Status::InvalidArgument("capacity must be at least 2"); + } + + if (bucket_size == 0 || bucket_size > 255) { + return rocksdb::Status::InvalidArgument("bucket_size must be between 1 and 255"); + } + + if (max_iterations == 0) { + return rocksdb::Status::InvalidArgument("max_iterations must be larger than 0"); + } + if (page_size == 0) { + return rocksdb::Status::InvalidArgument("page_size must be larger than 0"); + } + if (page_size < bucket_size) { + return rocksdb::Status::InvalidArgument("page_size must be at least bucket_size"); + } + if (expansion > kCFMaxExpansion) { + return rocksdb::Status::InvalidArgument("expansion must be between 0 and 32768"); + } + if (!CuckooFilterHelper::IsCapacitySupported(capacity, bucket_size)) { + return rocksdb::Status::InvalidArgument("capacity is too large"); + } + + std::string ns_key = AppendNamespacePrefix(user_key); + + CuckooChainMetadata existing_metadata; + auto s = getCuckooChainMetadata(ctx, ns_key, &existing_metadata); + if (!s.ok() && !s.IsNotFound()) return s; + if (!s.IsNotFound()) { + return rocksdb::Status::InvalidArgument("the key already exists"); + } + + CuckooChainMetadata metadata; + + metadata.size = 0; + metadata.base_capacity = capacity; + metadata.bucket_size = bucket_size; + metadata.max_iterations = max_iterations; + metadata.expansion = expansion; + metadata.n_filters = 1; + metadata.num_deleted_items = 0; + metadata.page_size = page_size; + + // Create a write batch for atomic operation + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisCuckooFilter, std::vector{"reserve", user_key.ToString()}); + s = batch->PutLogData(log_data.Encode()); + if (!s.ok()) return s; + + std::string metadata_bytes; + metadata.Encode(&metadata_bytes); + s = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); + if (!s.ok()) return s; + + // Pages are created lazily on first write. Reserve only persists metadata so sparse filters don't preallocate page + // values that may never be used. + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status CuckooChain::Add(engine::Context &ctx, const Slice &user_key, const Slice &item, bool *added) { + std::string ns_key = AppendNamespacePrefix(user_key); + + CuckooChainMetadata metadata(false); + auto s = getCuckooChainMetadata(ctx, ns_key, &metadata); + if (s.IsNotFound()) { + // RedisBloom CF.ADD auto-creates the filter when the key does not exist: + // https://redis.io/docs/latest/commands/cf.add/ + metadata = CuckooChainMetadata(); + metadata.size = 0; + metadata.base_capacity = kCFDefaultCapacity; + metadata.bucket_size = kCFDefaultBucketSize; + metadata.max_iterations = kCFDefaultMaxIterations; + metadata.expansion = kCFDefaultExpansion; + metadata.n_filters = 1; + metadata.num_deleted_items = 0; + metadata.page_size = kCuckooFilterDefaultPageSize; + } + if (!s.ok() && !s.IsNotFound()) return s; + + s = validateMetadata(metadata); + if (!s.ok()) return s; + + // Calculate hash and fingerprint for the item + uint64_t hash = CuckooFilterHelper::Hash(item.data(), item.size()); + uint8_t fingerprint = CuckooFilterHelper::GenerateFingerprint(hash); + + bool inserted = false; + s = tryCuckooInsert(ctx, user_key, ns_key, &metadata, hash, fingerprint, &inserted); + if (!s.ok()) return s; + if (inserted) { + *added = true; + return rocksdb::Status::OK(); + } + + s = tryCuckooKickOut(ctx, user_key, ns_key, &metadata, hash, fingerprint, &inserted); + if (!s.ok()) return s; + if (inserted) { + *added = true; + return rocksdb::Status::OK(); + } + + s = expandAndInsertCuckooChain(ctx, user_key, ns_key, &metadata, hash, fingerprint, &inserted); + if (!s.ok()) return s; + if (inserted) { + *added = true; + return rocksdb::Status::OK(); + } + + // No expansion allowed and filter is full + *added = false; + return rocksdb::Status::Aborted("filter is full"); +} + +rocksdb::Status CuckooChain::tryCuckooInsert(engine::Context &ctx, const Slice &user_key, const std::string &ns_key, + CuckooChainMetadata *metadata, uint64_t hash, uint8_t fingerprint, + bool *inserted) { + *inserted = false; + + // RedisBloom prioritizes the newest sub-filter to avoid repeatedly probing older, fuller filters. + for (int filter_idx = static_cast(metadata->n_filters) - 1; filter_idx >= 0; --filter_idx) { + auto current_filter_idx = static_cast(filter_idx); + uint32_t num_buckets = 0; + auto s = CuckooFilterHelper::GetFilterNumBuckets(metadata->base_capacity, metadata->expansion, + metadata->bucket_size, current_filter_idx, &num_buckets); + if (!s.ok()) return s; + + CuckooSubFilter sub_filter(storage_, ctx, ns_key, storage_->IsSlotIdEncoded(), metadata->version, + metadata->bucket_size, metadata->page_size, current_filter_idx, num_buckets); + bool current_inserted = false; + s = sub_filter.TryInsert(hash, fingerprint, ¤t_inserted); + if (!s.ok()) return s; + + if (current_inserted) { + s = commitSubFilterAndMetadata(ctx, user_key, ns_key, metadata, &sub_filter); + if (!s.ok()) return s; + *inserted = true; + return rocksdb::Status::OK(); + } + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooChain::tryCuckooKickOut(engine::Context &ctx, const Slice &user_key, const std::string &ns_key, + CuckooChainMetadata *metadata, uint64_t hash, uint8_t fingerprint, + bool *inserted) { + *inserted = false; + + // No space found in any filter, try kick-out on the last filter + uint16_t last_filter_idx = metadata->n_filters - 1; + uint32_t num_buckets = 0; + auto s = CuckooFilterHelper::GetFilterNumBuckets(metadata->base_capacity, metadata->expansion, metadata->bucket_size, + last_filter_idx, &num_buckets); + if (!s.ok()) return s; + + CuckooSubFilter last_filter(storage_, ctx, ns_key, storage_->IsSlotIdEncoded(), metadata->version, + metadata->bucket_size, metadata->page_size, last_filter_idx, num_buckets); + bool kickout_inserted = false; + s = last_filter.TryKickOutInsert(hash, fingerprint, metadata->max_iterations, &kickout_inserted); + if (!s.ok()) return s; + if (kickout_inserted) { + s = commitSubFilterAndMetadata(ctx, user_key, ns_key, metadata, &last_filter); + if (!s.ok()) return s; + *inserted = true; + return rocksdb::Status::OK(); + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooChain::expandAndInsertCuckooChain(engine::Context &ctx, const Slice &user_key, + const std::string &ns_key, CuckooChainMetadata *metadata, + uint64_t hash, uint8_t fingerprint, bool *inserted) { + *inserted = false; + + // Kick-out failed, try to expand if allowed + if (metadata->expansion == 0) return rocksdb::Status::OK(); + + if (metadata->n_filters >= UINT16_MAX) return rocksdb::Status::Aborted("maximum number of filters reached"); + + // Retry insertion in the new expanded filter + uint16_t new_filter_idx = metadata->n_filters; + uint32_t new_num_buckets = 0; + auto s = CuckooFilterHelper::GetFilterNumBuckets(metadata->base_capacity, metadata->expansion, metadata->bucket_size, + new_filter_idx, &new_num_buckets); + if (s.IsCorruption()) { + return rocksdb::Status::Aborted("maximum filter capacity reached"); + } + if (!s.ok()) return s; + + CuckooSubFilter new_filter(storage_, ctx, ns_key, storage_->IsSlotIdEncoded(), metadata->version, + metadata->bucket_size, metadata->page_size, new_filter_idx, new_num_buckets); + bool new_filter_inserted = false; + s = new_filter.TryInsert(hash, fingerprint, &new_filter_inserted); + if (!s.ok()) return s; + if (!new_filter_inserted) return rocksdb::Status::Corruption("failed to insert into new cuckoo filter"); + + metadata->n_filters++; + s = commitSubFilterAndMetadata(ctx, user_key, ns_key, metadata, &new_filter); + if (!s.ok()) return s; + + *inserted = true; + return rocksdb::Status::OK(); +} + +rocksdb::Status CuckooChain::commitSubFilterAndMetadata(engine::Context &ctx, const Slice &user_key, + const std::string &ns_key, CuckooChainMetadata *metadata, + CuckooSubFilter *sub_filter) { + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisCuckooFilter, std::vector{"add", user_key.ToString()}); + auto s = batch->PutLogData(log_data.Encode()); + if (!s.ok()) return s; + + s = sub_filter->WriteToBatch(batch.Get()); + if (!s.ok()) return s; + + metadata->size++; + std::string metadata_bytes; + metadata->Encode(&metadata_bytes); + s = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); + if (!s.ok()) return s; + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +} // namespace redis diff --git a/src/types/redis_cuckoo_chain.h b/src/types/redis_cuckoo_chain.h new file mode 100644 index 00000000000..d66f4549a3b --- /dev/null +++ b/src/types/redis_cuckoo_chain.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include "cuckoo_filter.h" +#include "storage/redis_db.h" +#include "storage/redis_metadata.h" + +namespace redis { + +class CuckooSubFilter; + +// Default values for a newly created cuckoo filter. +const uint32_t kCFDefaultCapacity = 1024; +const uint8_t kCFDefaultBucketSize = 4; // 4 fingerprints per bucket +const uint16_t kCFDefaultMaxIterations = 500; +const uint16_t kCFDefaultExpansion = 2; +const uint16_t kCFMaxExpansion = 32768; + +class CuckooChain : public Database { + public: + CuckooChain(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {} + + // Creates a new cuckoo filter with the specified parameters. + rocksdb::Status Reserve(engine::Context &ctx, const Slice &user_key, uint64_t capacity, uint8_t bucket_size, + uint16_t max_iterations, uint16_t expansion, uint32_t page_size); + + // Adds one item to the cuckoo filter. + // Duplicate items are allowed, so added is true whenever insertion succeeds. + rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, const Slice &item, bool *added); + + private: + // Loads metadata for a cuckoo filter key. + rocksdb::Status getCuckooChainMetadata(engine::Context &ctx, const Slice &ns_key, CuckooChainMetadata *metadata); + + static rocksdb::Status validateMetadata(const CuckooChainMetadata &metadata); + + rocksdb::Status tryCuckooInsert(engine::Context &ctx, const Slice &user_key, const std::string &ns_key, + CuckooChainMetadata *metadata, uint64_t hash, uint8_t fingerprint, bool *inserted); + rocksdb::Status tryCuckooKickOut(engine::Context &ctx, const Slice &user_key, const std::string &ns_key, + CuckooChainMetadata *metadata, uint64_t hash, uint8_t fingerprint, bool *inserted); + rocksdb::Status expandAndInsertCuckooChain(engine::Context &ctx, const Slice &user_key, const std::string &ns_key, + CuckooChainMetadata *metadata, uint64_t hash, uint8_t fingerprint, + bool *inserted); + rocksdb::Status commitSubFilterAndMetadata(engine::Context &ctx, const Slice &user_key, const std::string &ns_key, + CuckooChainMetadata *metadata, CuckooSubFilter *sub_filter); +}; + +} // namespace redis diff --git a/tests/cppunit/types/cuckoo_filter_page_test.cc b/tests/cppunit/types/cuckoo_filter_page_test.cc new file mode 100644 index 00000000000..8be87aefada --- /dev/null +++ b/tests/cppunit/types/cuckoo_filter_page_test.cc @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types/cuckoo_filter_page.h" + +#include + +#include +#include + +#include "common/encoding.h" +#include "storage/redis_metadata.h" +#include "test_base.h" +#include "types/redis_cuckoo_chain.h" + +class RedisCuckooPageCacheTest : public TestBase { + public: + RedisCuckooPageCacheTest(const RedisCuckooPageCacheTest &) = delete; + RedisCuckooPageCacheTest &operator=(const RedisCuckooPageCacheTest &) = delete; + RedisCuckooPageCacheTest(RedisCuckooPageCacheTest &&) = delete; + RedisCuckooPageCacheTest &operator=(RedisCuckooPageCacheTest &&) = delete; + + protected: + explicit RedisCuckooPageCacheTest() + : TestBase(), db_(std::make_unique(storage_.get(), "cuckoo_ns")) {} + + ~RedisCuckooPageCacheTest() override { db_.reset(); } + + void SetUp() override { + const ::testing::TestInfo *const test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + ns_key_ = db_->AppendNamespacePrefix(std::string("cf_page_test_") + test_info->name()); + } + + static CuckooChainMetadata makeMetadata(uint8_t bucket_size, uint64_t version = 1, + uint32_t page_size = kCuckooFilterDefaultPageSize) { + CuckooChainMetadata metadata(false); + metadata.version = version; + metadata.size = 0; + metadata.base_capacity = 2; + metadata.bucket_size = bucket_size; + metadata.max_iterations = 500; + metadata.expansion = 0; + metadata.n_filters = 1; + metadata.num_deleted_items = 0; + metadata.page_size = page_size; + return metadata; + } + + std::string makePageKey(const CuckooChainMetadata &metadata, uint16_t filter_index, uint32_t page_index) const { + std::string sub_key; + PutFixed16(&sub_key, filter_index); + PutFixed32(&sub_key, page_index); + return InternalKey(ns_key_, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + } + + rocksdb::Status readPage(const std::string &page_key, std::string *value) { + return storage_->Get(*ctx_, ctx_->GetReadOptions(), storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), page_key, + value); + } + + void writePage(const std::string &page_key, const std::string &value) { + auto batch = storage_->GetWriteBatchBase(); + auto s = batch->Put(page_key, value); + ASSERT_TRUE(s.ok()) << s.ToString(); + s = storage_->Write(*ctx_, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); + ASSERT_TRUE(s.ok()) << s.ToString(); + } + + void commitBatch(rocksdb::WriteBatchBase *batch) { + auto s = storage_->Write(*ctx_, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); + ASSERT_TRUE(s.ok()) << s.ToString(); + } + + std::unique_ptr db_; + std::string ns_key_; +}; + +TEST_F(RedisCuckooPageCacheTest, TryInsertWritesSmallPage) { + auto metadata = makeMetadata(4); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + bool inserted = false; + auto s = pages.TryInsertInBucket(0, 2, 1, 11, &inserted); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_TRUE(inserted); + + auto batch = storage_->GetWriteBatchBase(); + s = pages.WriteBackDirtyPages(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + commitBatch(batch.Get()); + + std::string page; + s = readPage(makePageKey(metadata, 0, 0), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(page.size(), 8); + EXPECT_EQ(page.substr(0, 4), std::string(4, 0)); + EXPECT_EQ(static_cast(page[4]), 11); + EXPECT_EQ(page.substr(5, 3), std::string(3, 0)); +} + +TEST_F(RedisCuckooPageCacheTest, TryInsertWritesLastPartialPage) { + auto metadata = makeMetadata(4); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + bool inserted = false; + auto s = pages.TryInsertInBucket(0, 513, 512, 33, &inserted); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_TRUE(inserted); + + auto batch = storage_->GetWriteBatchBase(); + s = pages.WriteBackDirtyPages(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + commitBatch(batch.Get()); + + std::string page; + s = readPage(makePageKey(metadata, 0, 1), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(page.size(), 4); + EXPECT_EQ(static_cast(page[0]), 33); + EXPECT_EQ(page.substr(1, 3), std::string(3, 0)); + + s = readPage(makePageKey(metadata, 0, 0), &page); + EXPECT_TRUE(s.IsNotFound()) << s.ToString(); +} + +TEST_F(RedisCuckooPageCacheTest, PrefetchBucketsOnSamePageLoadsCachedPage) { + auto metadata = makeMetadata(2); + auto page_key = makePageKey(metadata, 0, 0); + writePage(page_key, std::string{static_cast(11), static_cast(12), static_cast(21), + static_cast(22), 0, 0, 0, 0}); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + auto s = pages.PrefetchBuckets(0, 4, 0, 1); + ASSERT_TRUE(s.ok()) << s.ToString(); + writePage(page_key, std::string(8, static_cast(99))); + + uint8_t fingerprint = 0; + s = pages.GetBucketSlot(0, 4, 0, 0, &fingerprint); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(fingerprint, 11); + s = pages.GetBucketSlot(0, 4, 1, 1, &fingerprint); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(fingerprint, 22); +} + +TEST_F(RedisCuckooPageCacheTest, PrefetchBucketsOnDifferentPagesLoadsBothPages) { + auto metadata = makeMetadata(4); + auto page0_key = makePageKey(metadata, 0, 0); + auto page1_key = makePageKey(metadata, 0, 1); + std::string page0(metadata.page_size, 0); + std::string page1(metadata.page_size, 0); + page0[0] = static_cast(44); + page1[0] = static_cast(55); + writePage(page0_key, page0); + writePage(page1_key, page1); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + auto s = pages.PrefetchBuckets(0, 1025, 0, 512); + ASSERT_TRUE(s.ok()) << s.ToString(); + writePage(page0_key, std::string(metadata.page_size, static_cast(99))); + writePage(page1_key, std::string(metadata.page_size, static_cast(88))); + + uint8_t fingerprint = 0; + s = pages.GetBucketSlot(0, 1025, 0, 0, &fingerprint); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(fingerprint, 44); + s = pages.GetBucketSlot(0, 1025, 512, 0, &fingerprint); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(fingerprint, 55); +} + +TEST_F(RedisCuckooPageCacheTest, NonDefaultPageSizeControlsBucketMapping) { + auto metadata = makeMetadata(4, 1, 8); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + bool inserted = false; + auto s = pages.TryInsertInBucket(0, 3, 2, 66, &inserted); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_TRUE(inserted); + + auto batch = storage_->GetWriteBatchBase(); + s = pages.WriteBackDirtyPages(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + commitBatch(batch.Get()); + + std::string page; + s = readPage(makePageKey(metadata, 0, 0), &page); + EXPECT_TRUE(s.IsNotFound()) << s.ToString(); + + s = readPage(makePageKey(metadata, 0, 1), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(page.size(), 4); + EXPECT_EQ(static_cast(page[0]), 66); + EXPECT_EQ(page.substr(1, 3), std::string(3, 0)); +} + +TEST_F(RedisCuckooPageCacheTest, SetBucketSlotWritesOnlyTargetSlot) { + auto metadata = makeMetadata(4); + std::string expected(16, static_cast(7)); + writePage(makePageKey(metadata, 0, 0), expected); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + auto s = pages.SetBucketSlot(0, 4, 2, 1, 88); + ASSERT_TRUE(s.ok()) << s.ToString(); + + auto batch = storage_->GetWriteBatchBase(); + s = pages.WriteBackDirtyPages(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + commitBatch(batch.Get()); + + expected[9] = static_cast(88); + std::string page; + s = readPage(makePageKey(metadata, 0, 0), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(page, expected); +} + +TEST_F(RedisCuckooPageCacheTest, InvalidBucketAndSlotArguments) { + auto metadata = makeMetadata(4); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + uint8_t fingerprint = 0; + auto s = pages.GetBucketSlot(0, 2, 0, 4, &fingerprint); + EXPECT_TRUE(s.IsInvalidArgument()) << s.ToString(); + + s = pages.SetBucketSlot(0, 2, 0, 4, 11); + EXPECT_TRUE(s.IsInvalidArgument()) << s.ToString(); + + bool inserted = true; + s = pages.TryInsertInBucket(0, 2, 2, 11, &inserted); + EXPECT_TRUE(s.IsCorruption()) << s.ToString(); + + s = pages.TryInsertInBucket(0, 0, 0, 11, &inserted); + EXPECT_TRUE(s.IsCorruption()) << s.ToString(); + + auto batch = storage_->GetWriteBatchBase(); + s = pages.WriteBackDirtyPages(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + commitBatch(batch.Get()); + + std::string page; + s = readPage(makePageKey(metadata, 0, 0), &page); + EXPECT_TRUE(s.IsNotFound()) << s.ToString(); +} + +TEST_F(RedisCuckooPageCacheTest, OversizedPageReturnsCorruption) { + auto metadata = makeMetadata(4); + writePage(makePageKey(metadata, 0, 0), std::string(9, static_cast(1))); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + uint8_t fingerprint = 0; + auto s = pages.GetBucketSlot(0, 2, 0, 0, &fingerprint); + EXPECT_TRUE(s.IsCorruption()) << s.ToString(); + + std::string page; + s = readPage(makePageKey(metadata, 0, 0), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(page, std::string(9, static_cast(1))); +} + +TEST_F(RedisCuckooPageCacheTest, UndersizedPageReturnsCorruption) { + auto metadata = makeMetadata(4); + auto page_key = makePageKey(metadata, 0, 0); + writePage(page_key, std::string{static_cast(1), static_cast(2), static_cast(3)}); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + uint8_t fingerprint = 0; + auto s = pages.GetBucketSlot(0, 2, 0, 0, &fingerprint); + EXPECT_TRUE(s.IsCorruption()) << s.ToString(); + + auto batch = storage_->GetWriteBatchBase(); + s = pages.WriteBackDirtyPages(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + commitBatch(batch.Get()); + + std::string page; + s = readPage(page_key, &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(page, std::string({static_cast(1), static_cast(2), static_cast(3)})); +} + +TEST_F(RedisCuckooPageCacheTest, DirtyPagesAreDiscardedWithoutWriteBack) { + auto metadata = makeMetadata(4); + { + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + bool inserted = false; + auto s = pages.TryInsertInBucket(0, 2, 0, 11, &inserted); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_TRUE(inserted); + } + + std::string page; + auto s = readPage(makePageKey(metadata, 0, 0), &page); + EXPECT_TRUE(s.IsNotFound()) << s.ToString(); +} + +TEST_F(RedisCuckooPageCacheTest, DiscardCachedPagesDropsDirtyPages) { + auto metadata = makeMetadata(4); + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), metadata.version, + metadata.bucket_size, metadata.page_size); + + bool inserted = false; + auto s = pages.TryInsertInBucket(0, 2, 0, 11, &inserted); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_TRUE(inserted); + + pages.DiscardCachedPages(); + + auto batch = storage_->GetWriteBatchBase(); + s = pages.WriteBackDirtyPages(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + commitBatch(batch.Get()); + + std::string page; + s = readPage(makePageKey(metadata, 0, 0), &page); + EXPECT_TRUE(s.IsNotFound()) << s.ToString(); +} + +TEST_F(RedisCuckooPageCacheTest, PageKeyUsesMetadataVersion) { + auto old_metadata = makeMetadata(4, 100); + auto new_metadata = makeMetadata(4, 101); + { + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), old_metadata.version, + old_metadata.bucket_size, old_metadata.page_size); + bool inserted = false; + auto s = pages.TryInsertInBucket(0, 2, 0, 11, &inserted); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_TRUE(inserted); + + auto batch = storage_->GetWriteBatchBase(); + s = pages.WriteBackDirtyPages(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + commitBatch(batch.Get()); + } + + redis::CuckooPageCache pages(storage_.get(), *ctx_, ns_key_, storage_->IsSlotIdEncoded(), new_metadata.version, + new_metadata.bucket_size, new_metadata.page_size); + uint8_t fingerprint = 0; + auto s = pages.GetBucketSlot(0, 2, 0, 0, &fingerprint); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(fingerprint, 0); + s = pages.SetBucketSlot(0, 2, 0, 0, 22); + ASSERT_TRUE(s.ok()) << s.ToString(); + + auto batch = storage_->GetWriteBatchBase(); + s = pages.WriteBackDirtyPages(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + commitBatch(batch.Get()); + + std::string page; + s = readPage(makePageKey(old_metadata, 0, 0), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(page.size(), 8); + EXPECT_EQ(static_cast(page[0]), 11); + + s = readPage(makePageKey(new_metadata, 0, 0), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(page.size(), 8); + EXPECT_EQ(static_cast(page[0]), 22); +} diff --git a/tests/cppunit/types/cuckoo_filter_test.cc b/tests/cppunit/types/cuckoo_filter_test.cc new file mode 100644 index 00000000000..c665038b189 --- /dev/null +++ b/tests/cppunit/types/cuckoo_filter_test.cc @@ -0,0 +1,678 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +#include +#include +#include +#include + +#include "common/encoding.h" +#include "storage/redis_db.h" +#include "storage/redis_metadata.h" +#include "test_base.h" +#include "types/cuckoo_filter_page.h" +#include "types/cuckoo_filter_sub_filter.h" +#include "types/redis_cuckoo_chain.h" + +class RedisCuckooFilterTest : public TestBase { + public: + RedisCuckooFilterTest(const RedisCuckooFilterTest &) = delete; + RedisCuckooFilterTest &operator=(const RedisCuckooFilterTest &) = delete; + RedisCuckooFilterTest(RedisCuckooFilterTest &&) = delete; + RedisCuckooFilterTest &operator=(RedisCuckooFilterTest &&) = delete; + + protected: + explicit RedisCuckooFilterTest() : TestBase() { + cuckoo_ = std::make_unique(storage_.get(), "cuckoo_ns"); + db_ = std::make_unique(storage_.get(), "cuckoo_ns"); + } + ~RedisCuckooFilterTest() override { + // Ensure cuckoo_ is destroyed before storage_ + cuckoo_.reset(); + db_.reset(); + } + + void SetUp() override { + const ::testing::TestInfo *const test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + key_ = std::string("cf_test_") + test_info->name(); + } + + void verifyMetadata(const std::string &key, uint64_t capacity, uint8_t bucket_size, uint16_t max_iterations, + uint16_t expansion, uint64_t size, uint16_t n_filters, uint64_t num_deleted_items = 0, + uint32_t page_size = kCuckooFilterDefaultPageSize) { + std::string ns_key = db_->AppendNamespacePrefix(key); + CuckooChainMetadata metadata(false); + auto s = db_->GetMetadata(*ctx_, {kRedisCuckooFilter}, ns_key, &metadata); + ASSERT_TRUE(s.ok()) << key << ": metadata not found"; + EXPECT_EQ(metadata.Type(), kRedisCuckooFilter) << key; + EXPECT_EQ(metadata.base_capacity, capacity) << key; + EXPECT_EQ(metadata.bucket_size, bucket_size) << key; + EXPECT_EQ(metadata.max_iterations, max_iterations) << key; + EXPECT_EQ(metadata.expansion, expansion) << key; + EXPECT_EQ(metadata.size, size) << key; + EXPECT_EQ(metadata.n_filters, n_filters) << key; + EXPECT_EQ(metadata.num_deleted_items, num_deleted_items) << key; + EXPECT_EQ(metadata.page_size, page_size) << key; + } + + void reserveAndVerify(const std::string &key, uint64_t capacity, uint8_t bucket_size, uint16_t max_iterations, + uint16_t expansion, uint32_t page_size = kCuckooFilterDefaultPageSize) { + auto s = cuckoo_->Reserve(*ctx_, key, capacity, bucket_size, max_iterations, expansion, page_size); + ASSERT_TRUE(s.ok()) << key << ": " << s.ToString(); + verifyMetadata(key, capacity, bucket_size, max_iterations, expansion, 0, 1, 0, page_size); + } + + void addAndVerify(const std::string &key, const std::string &item, uint64_t capacity, uint8_t bucket_size, + uint16_t max_iterations, uint16_t expansion, uint64_t expected_size, uint16_t n_filters = 1) { + bool added = false; + auto s = cuckoo_->Add(*ctx_, key, item, &added); + ASSERT_TRUE(s.ok()) << key << ": add '" << item << "' failed: " << s.ToString(); + ASSERT_TRUE(added) << key << ": item '" << item << "' should have been added"; + verifyMetadata(key, capacity, bucket_size, max_iterations, expansion, expected_size, n_filters, 0); + } + + CuckooChainMetadata getMetadata(const std::string &key) { + std::string ns_key = db_->AppendNamespacePrefix(key); + CuckooChainMetadata metadata(false); + auto s = db_->GetMetadata(*ctx_, {kRedisCuckooFilter}, ns_key, &metadata); + EXPECT_TRUE(s.ok()) << key << ": metadata not found"; + return metadata; + } + + std::string makePageKey(const std::string &key, const CuckooChainMetadata &metadata, uint16_t filter_index, + uint32_t page_index) { + std::string sub_key; + PutFixed16(&sub_key, filter_index); + PutFixed32(&sub_key, page_index); + return InternalKey(db_->AppendNamespacePrefix(key), sub_key, metadata.version, storage_->IsSlotIdEncoded()) + .Encode(); + } + + rocksdb::Status readPage(const std::string &page_key, std::string *value) { + return storage_->Get(*ctx_, ctx_->GetReadOptions(), storage_->GetCFHandle(ColumnFamilyID::PrimarySubkey), page_key, + value); + } + + void writePage(const std::string &page_key, const std::string &value) { + auto batch = storage_->GetWriteBatchBase(); + auto s = batch->Put(page_key, value); + ASSERT_TRUE(s.ok()) << s.ToString(); + s = storage_->Write(*ctx_, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); + ASSERT_TRUE(s.ok()) << s.ToString(); + } + + void writeMetadata(const std::string &key, const CuckooChainMetadata &metadata) { + std::string metadata_bytes; + metadata.Encode(&metadata_bytes); + auto batch = storage_->GetWriteBatchBase(); + auto s = + batch->Put(storage_->GetCFHandle(ColumnFamilyID::Metadata), db_->AppendNamespacePrefix(key), metadata_bytes); + ASSERT_TRUE(s.ok()) << s.ToString(); + s = storage_->Write(*ctx_, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); + ASSERT_TRUE(s.ok()) << s.ToString(); + } + + std::unique_ptr cuckoo_; + std::unique_ptr db_; + std::string key_; +}; + +TEST_F(RedisCuckooFilterTest, ReserveInvalidParams) { + struct InvalidTestCase { + std::string key; + uint64_t capacity; + uint8_t bucket_size; + uint16_t max_iterations; + uint16_t expansion; + std::string err; + }; + + std::vector invalid_test_cases = { + {"zero_capacity", 0, 4, 500, 2, "capacity must be larger than 0"}, + {"capacity_too_small", 1, 4, 500, 2, "capacity must be at least 2"}, + {"zero_bucket_size", 1000, 0, 500, 2, "bucket_size must be between 1 and 255"}, + {"zero_max_iterations", 1000, 4, 0, 2, "max_iterations must be larger than 0"}, + {"capacity_too_large", std::numeric_limits::max(), 4, 500, 2, "capacity is too large"}, + {"expansion_too_large", 1000, 4, 500, redis::kCFMaxExpansion + 1, "expansion must be between 0 and 32768"}, + {"zero_page_size", 1000, 4, 500, 2, "page_size must be larger than 0"}, + {"page_size_smaller_than_bucket", 1000, 4, 500, 2, "page_size must be at least bucket_size"}, + }; + + for (const auto &test_case : invalid_test_cases) { + uint32_t page_size = kCuckooFilterDefaultPageSize; + if (test_case.key == "zero_page_size") { + page_size = 0; + } else if (test_case.key == "page_size_smaller_than_bucket") { + page_size = test_case.bucket_size - 1; + } + auto s = cuckoo_->Reserve(*ctx_, test_case.key, test_case.capacity, test_case.bucket_size, test_case.max_iterations, + test_case.expansion, page_size); + ASSERT_FALSE(s.ok()) << test_case.key; + ASSERT_TRUE(s.IsInvalidArgument()) << test_case.key << ": " << s.ToString(); + ASSERT_NE(s.ToString().find(test_case.err), std::string::npos) << test_case.key << ": " << s.ToString(); + } +} + +TEST_F(RedisCuckooFilterTest, ReserveValidParams) { + struct TestCase { + std::string key; + uint64_t capacity; + uint8_t bucket_size; + uint16_t max_iterations; + uint16_t expansion; + }; + + std::vector test_cases = { + {"min_capacity", 2, 4, 500, 2}, + {"min_bucket_size", 1000, 1, 500, 2}, + {"max_bucket_size", 1000, 255, 500, 2}, + {"min_max_iterations", 1000, 4, 1, 2}, + {"max_max_iterations", 1000, 4, 65535, 2}, + // RedisBloom allows expansion=0; it disables creating additional sub-filters. + {"no_auto_expansion", 2, 1, 1, 0}, + {"capacity_3", 3, 4, 500, 2}, + {"capacity_10", 10, 4, 500, 2}, + {"capacity_100", 100, 4, 500, 2}, + {"capacity_10000", 10000, 4, 500, 2}, + {"capacity_100000", 100000, 4, 500, 2}, + {"bucket_size_2", 1000, 2, 500, 2}, + {"bucket_size_8", 1000, 8, 500, 2}, + {"bucket_size_16", 1000, 16, 500, 2}, + {"bucket_size_128", 1000, 128, 500, 2}, + {"no_auto_expansion_regular", 1000, 4, 500, 0}, + // expansion=1 is valid and creates additional sub-filters with the same capacity. + {"expansion_1", 1000, 4, 500, 1}, + {"expansion_8", 1000, 4, 500, 8}, + {"expansion_256", 1000, 4, 500, 256}, + {"max_expansion", 1000, 4, 500, redis::kCFMaxExpansion}, + {"capacity_10M", 10000000, 4, 500, 2}, + {"capacity_100M", 100000000, 4, 500, 2}, + {"max_params", 100000, 255, 65535, redis::kCFMaxExpansion}, + {"mixed_params", 50000, 8, 1000, 4}, + }; + + for (const auto &test_case : test_cases) { + reserveAndVerify(test_case.key, test_case.capacity, test_case.bucket_size, test_case.max_iterations, + test_case.expansion); + } +} + +TEST_F(RedisCuckooFilterTest, ReserveDuplicate) { + reserveAndVerify(key_, 1000, 4, 500, 2); + + // Second reserve with same key should fail + auto s = cuckoo_->Reserve(*ctx_, key_, 2000, 4, 500, 2, kCuckooFilterDefaultPageSize); + ASSERT_FALSE(s.ok()); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_NE(s.ToString().find("already exists"), std::string::npos); +} + +TEST_F(RedisCuckooFilterTest, CalculateRequiredBucketsCalculation) { + struct TestCase { + uint64_t capacity; + uint8_t bucket_size; + uint32_t expected_num_buckets; + }; + + std::vector test_cases = { + {0, 4, 1}, {1, 4, 1}, {2, 1, 4}, {4, 4, 2}, {488, 4, 128}, {489, 4, 256}, + {977, 4, 256}, {978, 4, 512}, {1000, 4, 512}, {1024, 4, 512}, {1000, 1, 2048}, {1000, 16, 128}, + }; + + for (const auto &test_case : test_cases) { + uint32_t num_buckets = 0; + auto s = + redis::CuckooFilterHelper::CalculateRequiredBuckets(test_case.capacity, test_case.bucket_size, &num_buckets); + ASSERT_TRUE(s.ok()) << "capacity=" << test_case.capacity + << ", bucket_size=" << static_cast(test_case.bucket_size) << ": " << s.ToString(); + ASSERT_EQ(num_buckets, test_case.expected_num_buckets) + << "capacity=" << test_case.capacity << ", bucket_size=" << static_cast(test_case.bucket_size); + ASSERT_EQ(num_buckets & (num_buckets - 1), 0) << "Number of buckets should be power of 2"; + + auto expected_min = + static_cast(static_cast(test_case.capacity) / test_case.bucket_size / 0.955L); + ASSERT_GE(num_buckets, expected_min) << "Number of buckets too small for capacity"; + } +} + +TEST_F(RedisCuckooFilterTest, FingerprintGeneration) { + // Test fingerprint generation ensures non-zero values in range [1, 255] + // Following RedisBloom: fp = hash % 255 + 1 + for (uint64_t hash = 0; hash < 1000; ++hash) { + uint8_t fp = redis::CuckooFilterHelper::GenerateFingerprint(hash); + ASSERT_GE(fp, 1) << "Fingerprint should be at least 1"; + ASSERT_LE(fp, 255) << "Fingerprint should be at most 255"; + } + + // Verify the formula: fp = hash % 255 + 1 + ASSERT_EQ(redis::CuckooFilterHelper::GenerateFingerprint(0), 1); + ASSERT_EQ(redis::CuckooFilterHelper::GenerateFingerprint(254), 255); + ASSERT_EQ(redis::CuckooFilterHelper::GenerateFingerprint(255), 1); + ASSERT_EQ(redis::CuckooFilterHelper::GenerateFingerprint(256), 2); +} + +TEST_F(RedisCuckooFilterTest, AlternateBucketCalculation) { + std::vector num_buckets_cases = {1, 2, 128, 256, 512, 1024, 2048}; + + // Test GetAltHash symmetry at hash level (following RedisBloom design) + // h2 = GetAltHash(fp, h1) + // h1 = GetAltHash(fp, h2) <- this is the symmetry property + for (auto num_buckets : num_buckets_cases) { + for (uint64_t hash = 0; hash < 100; ++hash) { + for (uint16_t fp = 1; fp <= 255; ++fp) { + auto fingerprint = static_cast(fp); + uint64_t alt_hash = redis::CuckooFilterHelper::GetAltHash(fingerprint, hash); + + // Applying GetAltHash twice should return original hash + uint64_t double_alt_hash = redis::CuckooFilterHelper::GetAltHash(fingerprint, alt_hash); + ASSERT_EQ(double_alt_hash, hash) << "Double alternate hash should give original hash"; + + // Both hashes should map to valid bucket indices + uint32_t bucket1 = hash % num_buckets; + uint32_t bucket2 = alt_hash % num_buckets; + ASSERT_LT(bucket1, num_buckets) << "Bucket 1 out of range"; + ASSERT_LT(bucket2, num_buckets) << "Bucket 2 out of range"; + } + } + } +} + +TEST_F(RedisCuckooFilterTest, HashFunction) { + // Test that Hash function produces consistent 64-bit values + std::string test_item = "hello"; + uint64_t hash1 = redis::CuckooFilterHelper::Hash(test_item); + uint64_t hash2 = redis::CuckooFilterHelper::Hash(test_item.data(), test_item.size()); + + // Both methods should produce the same result + ASSERT_EQ(hash1, hash2) << "Hash methods should be consistent"; + + // Hash should be deterministic + uint64_t hash3 = redis::CuckooFilterHelper::Hash(test_item); + ASSERT_EQ(hash1, hash3) << "Hash should be deterministic"; + + // Different items should produce different hashes (with high probability) + uint64_t hash_world = redis::CuckooFilterHelper::Hash("world"); + ASSERT_NE(hash1, hash_world) << "Different items should have different hashes"; + + // Empty string produces hash value 0 (this is expected with MurmurHash) + uint64_t hash_empty = redis::CuckooFilterHelper::Hash(""); + ASSERT_EQ(hash_empty, 0) << "Empty string should produce hash value 0 with MurmurHash"; + + // Even with hash=0, fingerprint should be non-zero + uint8_t fp_empty = redis::CuckooFilterHelper::GenerateFingerprint(hash_empty); + ASSERT_EQ(fp_empty, 1) << "Fingerprint of hash=0 should be 1 (0 % 255 + 1)"; + + // Test that hash can be used with fingerprint generation + uint8_t fp = redis::CuckooFilterHelper::GenerateFingerprint(hash1); + ASSERT_GE(fp, 1) << "Fingerprint should be at least 1"; + ASSERT_LE(fp, 255) << "Fingerprint should be at most 255"; +} + +TEST_F(RedisCuckooFilterTest, ReserveVerifyMetadata) { + uint64_t capacity = 1000; + uint8_t bucket_size = 4; + uint16_t max_iterations = 500; + uint16_t expansion = 2; + + // Create the filter + reserveAndVerify(key_, capacity, bucket_size, max_iterations, expansion); + + // Verify metadata was stored by trying to reserve again with same key + // This should fail with "already exists" error + auto s = + cuckoo_->Reserve(*ctx_, key_, capacity * 2, bucket_size, max_iterations, expansion, kCuckooFilterDefaultPageSize); + ASSERT_FALSE(s.ok()) << "Second reserve with same key should fail"; + ASSERT_TRUE(s.IsInvalidArgument()) << "Should return InvalidArgument error"; + ASSERT_NE(s.ToString().find("already exists"), std::string::npos) << "Error message should mention 'already exists'"; + + // Verify we can still create filters with different keys + reserveAndVerify("different_key", capacity, bucket_size, max_iterations, expansion); + + // Verify the original key still exists (can't create it again) + s = cuckoo_->Reserve(*ctx_, key_, capacity, bucket_size, max_iterations, expansion, kCuckooFilterDefaultPageSize); + ASSERT_FALSE(s.ok()) << "Original key should still exist"; + ASSERT_NE(s.ToString().find("already exists"), std::string::npos); +} + +TEST_F(RedisCuckooFilterTest, ReservePersistsPageSize) { + constexpr uint32_t page_size = 4096; + reserveAndVerify(key_, 1000, 4, 500, 2, page_size); +} + +TEST_F(RedisCuckooFilterTest, AddBasic) { + reserveAndVerify(key_, 1000, 4, 500, 2); + addAndVerify(key_, "item1", 1000, 4, 500, 2, 1); +} + +TEST_F(RedisCuckooFilterTest, ReserveDoesNotPreallocatePages) { + reserveAndVerify(key_, 1000, 4, 500, 2); + + auto metadata = getMetadata(key_); + std::string page; + auto s = readPage(makePageKey(key_, metadata, 0, 0), &page); + EXPECT_TRUE(s.IsNotFound()) << s.ToString(); +} + +TEST_F(RedisCuckooFilterTest, AddWritesPagedLayout) { + constexpr uint64_t capacity = 1000; + constexpr uint8_t bucket_size = 4; + reserveAndVerify(key_, capacity, bucket_size, 500, 2); + + const std::string item = "item1"; + addAndVerify(key_, item, capacity, bucket_size, 500, 2, 1); + + auto metadata = getMetadata(key_); + uint32_t num_buckets = 0; + auto s = redis::CuckooFilterHelper::CalculateRequiredBuckets(capacity, bucket_size, &num_buckets); + ASSERT_TRUE(s.ok()) << s.ToString(); + auto hash = redis::CuckooFilterHelper::Hash(item); + auto fingerprint = redis::CuckooFilterHelper::GenerateFingerprint(hash); + auto bucket1_idx = static_cast(hash % num_buckets); + auto bucket2_idx = static_cast(redis::CuckooFilterHelper::GetAltHash(fingerprint, hash) % num_buckets); + auto buckets_per_page = metadata.page_size / bucket_size; + auto page_index = bucket1_idx / buckets_per_page; + auto page_size = std::min(buckets_per_page, num_buckets - page_index * buckets_per_page) * bucket_size; + + std::string page; + s = readPage(makePageKey(key_, metadata, 0, page_index), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(page.size(), page_size); + + auto bucket1_offset = (bucket1_idx % buckets_per_page) * bucket_size; + auto bucket2_offset = (bucket2_idx % buckets_per_page) * bucket_size; + bool found = false; + for (uint32_t i = 0; i < bucket_size; ++i) { + found = found || static_cast(page[bucket1_offset + i]) == fingerprint; + if (bucket2_idx / buckets_per_page == page_index) { + found = found || static_cast(page[bucket2_offset + i]) == fingerprint; + } + } + EXPECT_TRUE(found); +} + +TEST_F(RedisCuckooFilterTest, AddToNonExistentFilter) { + std::string key = "nonexistent_key"; + addAndVerify(key, "item1", redis::kCFDefaultCapacity, redis::kCFDefaultBucketSize, redis::kCFDefaultMaxIterations, + redis::kCFDefaultExpansion, 1); + + auto s = cuckoo_->Reserve(*ctx_, key, 1000, 4, 500, 2, kCuckooFilterDefaultPageSize); + ASSERT_FALSE(s.ok()); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_NE(s.ToString().find("already exists"), std::string::npos); +} + +TEST_F(RedisCuckooFilterTest, AddWithDifferentBucketSizes) { + std::vector bucket_sizes = {1, 2, 4, 8, 16}; + + for (auto bs : bucket_sizes) { + std::string test_key = key_ + "_bucket_" + std::to_string(bs); + reserveAndVerify(test_key, 100, bs, 500, 2); + for (int j = 0; j < 10; ++j) { + addAndVerify(test_key, "item_" + std::to_string(j), 100, bs, 500, 2, j + 1); + } + } +} + +TEST_F(RedisCuckooFilterTest, AddDuplicateItems) { + reserveAndVerify(key_, 1000, 4, 500, 2); + for (int i = 0; i < 5; ++i) { + addAndVerify(key_, "duplicate_item", 1000, 4, 500, 2, i + 1); + } +} + +TEST_F(RedisCuckooFilterTest, AddPrioritizesNewestFilter) { + CuckooChainMetadata metadata; + metadata.size = 0; + metadata.base_capacity = 1000; + metadata.bucket_size = 4; + metadata.max_iterations = 500; + metadata.expansion = 2; + metadata.n_filters = 2; + metadata.num_deleted_items = 0; + metadata.page_size = kCuckooFilterDefaultPageSize; + writeMetadata(key_, metadata); + + const std::string item = "item"; + addAndVerify(key_, item, metadata.base_capacity, metadata.bucket_size, metadata.max_iterations, metadata.expansion, 1, + metadata.n_filters); + + auto stored_metadata = getMetadata(key_); + ASSERT_EQ(stored_metadata.n_filters, 2); + ASSERT_EQ(stored_metadata.size, 1); + + auto hash = redis::CuckooFilterHelper::Hash(item); + uint32_t old_num_buckets = 0; + auto s = redis::CuckooFilterHelper::CalculateRequiredBuckets(metadata.base_capacity, metadata.bucket_size, + &old_num_buckets); + ASSERT_TRUE(s.ok()) << s.ToString(); + auto buckets_per_page = stored_metadata.page_size / stored_metadata.bucket_size; + auto old_page_idx = static_cast(hash % old_num_buckets) / buckets_per_page; + + std::string page; + s = readPage(makePageKey(key_, stored_metadata, 0, old_page_idx), &page); + EXPECT_TRUE(s.IsNotFound()) << s.ToString(); + + uint32_t num_buckets = 0; + s = redis::CuckooFilterHelper::CalculateRequiredBuckets(metadata.base_capacity * metadata.expansion, + metadata.bucket_size, &num_buckets); + ASSERT_TRUE(s.ok()) << s.ToString(); + auto bucket1_idx = static_cast(hash % num_buckets); + auto expected_page_idx = bucket1_idx / buckets_per_page; + + s = readPage(makePageKey(key_, stored_metadata, 1, expected_page_idx), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); +} + +TEST_F(RedisCuckooFilterTest, AddManyItems) { + reserveAndVerify(key_, 1000, 4, 500, 2); + for (int i = 0; i < 100; ++i) { + addAndVerify(key_, "item_" + std::to_string(i), 1000, 4, 500, 2, i + 1); + } +} + +TEST_F(RedisCuckooFilterTest, AddSmallFilterCapacity) { + uint64_t small_capacity = 10; + reserveAndVerify(key_, small_capacity, 2, 500, 0); + + uint64_t added_count = 0; + bool full = false; + for (int i = 0; i < 100; ++i) { + std::string item = "item_" + std::to_string(i); + bool added = false; + auto s = cuckoo_->Add(*ctx_, key_, item, &added); + + if (!s.ok()) { + ASSERT_TRUE(s.IsAborted()) << "Should be Aborted status when full"; + full = true; + break; + } + + ASSERT_TRUE(added) << "Item should have been added before the filter is full"; + ++added_count; + } + + ASSERT_TRUE(full) << "Small filter should eventually become full"; + verifyMetadata(key_, small_capacity, 2, 500, 0, added_count, 1, 0); +} + +TEST_F(RedisCuckooFilterTest, AddEdgeCaseItems) { + reserveAndVerify(key_, 1000, 4, 500, 2); + std::vector items = { + "", + std::string(10000, 'x'), + std::string("\x00\x01\x02\xFF\xFE", 5), + }; + for (size_t i = 0; i < items.size(); ++i) { + addAndVerify(key_, items[i], 1000, 4, 500, 2, i + 1); + } +} + +TEST_F(RedisCuckooFilterTest, KickOutSuccessWritesDirtyPages) { + constexpr uint64_t capacity = 2; + constexpr uint8_t bucket_size = 1; + constexpr uint32_t num_buckets = 4; + reserveAndVerify(key_, capacity, bucket_size, 1, 0); + + struct Candidate { + std::string item; + uint8_t fingerprint = 0; + uint32_t bucket1 = 0; + uint32_t bucket2 = 0; + }; + + std::vector candidates; + for (int i = 0; i < 10000; ++i) { + std::string item = "kick_item_" + std::to_string(i); + auto hash = redis::CuckooFilterHelper::Hash(item); + auto fingerprint = redis::CuckooFilterHelper::GenerateFingerprint(hash); + candidates.push_back( + {item, fingerprint, static_cast(hash % num_buckets), + static_cast(redis::CuckooFilterHelper::GetAltHash(fingerprint, hash) % num_buckets)}); + } + + Candidate first; + Candidate second; + Candidate kicked; + uint32_t evicted_bucket = 0; + bool found = false; + for (const auto &candidate : candidates) { + if (candidate.bucket1 == candidate.bucket2) continue; + for (const auto &first_candidate : candidates) { + if (first_candidate.item == candidate.item || first_candidate.bucket1 != candidate.bucket1) continue; + auto alt_for_victim = + redis::CuckooFilterHelper::GetAltBucketIndex(candidate.bucket1, first_candidate.fingerprint, num_buckets); + if (alt_for_victim == candidate.bucket1 || alt_for_victim == candidate.bucket2) continue; + + for (const auto &second_candidate : candidates) { + if (second_candidate.item == candidate.item || second_candidate.item == first_candidate.item || + second_candidate.bucket1 != candidate.bucket2) { + continue; + } + first = first_candidate; + second = second_candidate; + kicked = candidate; + evicted_bucket = alt_for_victim; + found = true; + break; + } + if (found) break; + } + if (found) break; + } + ASSERT_TRUE(found); + + addAndVerify(key_, first.item, capacity, bucket_size, 1, 0, 1); + addAndVerify(key_, second.item, capacity, bucket_size, 1, 0, 2); + addAndVerify(key_, kicked.item, capacity, bucket_size, 1, 0, 3); + + auto metadata = getMetadata(key_); + ASSERT_EQ(metadata.n_filters, 1); + ASSERT_EQ(metadata.size, 3); + + std::string page; + auto s = readPage(makePageKey(key_, metadata, 0, 0), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(page.size(), num_buckets * bucket_size); + EXPECT_EQ(static_cast(page[kicked.bucket1]), kicked.fingerprint); + EXPECT_EQ(static_cast(page[kicked.bucket2]), second.fingerprint); + EXPECT_EQ(static_cast(page[evicted_bucket]), first.fingerprint); +} + +TEST_F(RedisCuckooFilterTest, KickOutErrorDiscardsDirtyPages) { + constexpr uint8_t bucket_size = 1; + constexpr uint32_t page_size = 1; + constexpr uint32_t num_buckets = 2; + CuckooChainMetadata metadata(false); + metadata.version = 1; + metadata.size = 0; + metadata.base_capacity = 1; + metadata.bucket_size = bucket_size; + metadata.max_iterations = 2; + metadata.expansion = 0; + metadata.n_filters = 1; + metadata.num_deleted_items = 0; + metadata.page_size = page_size; + writeMetadata(key_, metadata); + + constexpr uint64_t hash = 0; + constexpr uint8_t fingerprint = 2; + constexpr uint8_t existing_fingerprint = 1; + ASSERT_EQ(redis::CuckooFilterHelper::GetAltBucketIndex(0, existing_fingerprint, num_buckets), 1); + std::string original_page{static_cast(existing_fingerprint)}; + writePage(makePageKey(key_, metadata, 0, 0), original_page); + writePage(makePageKey(key_, metadata, 0, 1), std::string(2, static_cast(9))); + + redis::CuckooSubFilter sub_filter(storage_.get(), *ctx_, db_->AppendNamespacePrefix(key_), + storage_->IsSlotIdEncoded(), metadata.version, metadata.bucket_size, + metadata.page_size, 0, num_buckets); + bool inserted = true; + auto s = sub_filter.TryKickOutInsert(hash, fingerprint, metadata.max_iterations, &inserted); + ASSERT_TRUE(s.IsCorruption()) << s.ToString(); + ASSERT_FALSE(inserted); + + auto batch = storage_->GetWriteBatchBase(); + s = sub_filter.WriteToBatch(batch.Get()); + ASSERT_TRUE(s.ok()) << s.ToString(); + s = storage_->Write(*ctx_, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); + ASSERT_TRUE(s.ok()) << s.ToString(); + + std::string page; + s = readPage(makePageKey(key_, metadata, 0, 0), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(page, original_page); +} + +TEST_F(RedisCuckooFilterTest, ExpansionWritesNewFilterIndexPage) { + constexpr uint64_t capacity = 2; + constexpr uint8_t bucket_size = 1; + reserveAndVerify(key_, capacity, bucket_size, 1, 2); + + CuckooChainMetadata metadata(false); + uint64_t added_count = 0; + for (int i = 0; i < 100; ++i) { + bool added = false; + auto s = cuckoo_->Add(*ctx_, key_, "item_" + std::to_string(i), &added); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_TRUE(added); + ++added_count; + + metadata = getMetadata(key_); + if (metadata.n_filters > 1) break; + } + + ASSERT_GT(metadata.n_filters, 1); + EXPECT_EQ(metadata.size, added_count); + + uint32_t num_buckets = 0; + uint64_t new_filter_capacity = capacity; + for (uint16_t i = 0; i < metadata.n_filters - 1; ++i) { + new_filter_capacity *= metadata.expansion; + } + auto s = redis::CuckooFilterHelper::CalculateRequiredBuckets(new_filter_capacity, bucket_size, &num_buckets); + ASSERT_TRUE(s.ok()) << s.ToString(); + auto expected_page_size = std::min(metadata.page_size / bucket_size, num_buckets) * bucket_size; + + std::string page; + s = readPage(makePageKey(key_, metadata, metadata.n_filters - 1, 0), &page); + ASSERT_TRUE(s.ok()) << s.ToString(); + EXPECT_EQ(page.size(), expected_page_size); +} diff --git a/tests/gocase/integration/replication/replication_test.go b/tests/gocase/integration/replication/replication_test.go index 23ccabd1017..ab75a80e489 100644 --- a/tests/gocase/integration/replication/replication_test.go +++ b/tests/gocase/integration/replication/replication_test.go @@ -344,10 +344,10 @@ func TestReplicationWithLimitSpeed(t *testing.T) { util.Populate(t, slaveClient, "", 1026, 1) t.Run("resume broken transfer based files", func(t *testing.T) { - // Try to transfer some files, because max-replication-mb 1, - // so maybe more than 5 files are transferred for sleep 5s. util.SlaveOf(t, slaveClient, master) - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return slave.LogFileMatches(t, `.*\[fetch\] Fetched .*`) + }, 50*time.Second, time.Second) // Restart master server, let the slave try to full sync with master again, // because slave already received some SST files, so we will skip them. @@ -359,7 +359,9 @@ func TestReplicationWithLimitSpeed(t *testing.T) { require.Eventually(t, func() bool { return slave.LogFileMatches(t, ".*skip count: 1.*") }, 50*time.Second, 1000*time.Millisecond) - util.WaitForSync(t, slaveClient) + require.Eventually(t, func() bool { + return util.FindInfoEntry(slaveClient, "master_link_status") == "up" + }, 50*time.Second, 100*time.Millisecond) require.Equal(t, "b", slaveClient.Get(ctx, "a").Val()) }) } diff --git a/tests/gocase/unit/scan/scan_test.go b/tests/gocase/unit/scan/scan_test.go index e0eb8e4d751..c20267f33fa 100644 --- a/tests/gocase/unit/scan/scan_test.go +++ b/tests/gocase/unit/scan/scan_test.go @@ -465,6 +465,12 @@ func ScanTest(t *testing.T, rdb *redis.Client, ctx context.Context) { require.NoError(t, rdb.Do(ctx, "bf.reserve", "MBbloomtype3", "0.02", "1000").Err()) require.Equal(t, []string{"MBbloomtype1", "MBbloomtype2", "MBbloomtype3"}, scanAll(t, rdb, "match", "MBbloomtype*", "type", "MBbloom--")) require.Equal(t, []string{"MBbloomtype1", "MBbloomtype2", "MBbloomtype3"}, scanAll(t, rdb, "match", "MBbloomtype*", "count", "3", "type", "MBbloom--")) + //MBbloomCF type + require.NoError(t, rdb.Do(ctx, "cf.reserve", "MBbloomCFtype1", "1000").Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", "MBbloomCFtype2", "1000").Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", "MBbloomCFtype3", "1000").Err()) + require.Equal(t, []string{"MBbloomCFtype1", "MBbloomCFtype2", "MBbloomCFtype3"}, scanAll(t, rdb, "match", "MBbloomCFtype*", "type", "MBbloomCF")) + require.Equal(t, []string{"MBbloomCFtype1", "MBbloomCFtype2", "MBbloomCFtype3"}, scanAll(t, rdb, "match", "MBbloomCFtype*", "count", "3", "type", "MBbloomCF")) //ReJSON-RL type require.NoError(t, rdb.Do(ctx, "JSON.SET", "ReJSONtype1", "$", ` {"x":1, "y":2} `).Err()) require.NoError(t, rdb.Do(ctx, "JSON.SET", "ReJSONtype2", "$", ` {"x":1, "y":2} `).Err()) diff --git a/tests/gocase/unit/type/bloom/cuckoo_filter_test.go b/tests/gocase/unit/type/bloom/cuckoo_filter_test.go new file mode 100644 index 00000000000..852dde75ed0 --- /dev/null +++ b/tests/gocase/unit/type/bloom/cuckoo_filter_test.go @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package bloom + +import ( + "context" + "fmt" + "testing" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/stretchr/testify/require" +) + +func TestCuckooFilter(t *testing.T) { + srv := util.StartServer(t, map[string]string{}) + defer srv.Close() + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + t.Run("Add creates filter", func(t *testing.T) { + key := "test_cuckoo_filter_add_create" + require.NoError(t, rdb.Del(ctx, key).Err()) + require.Equal(t, int64(1), rdb.Do(ctx, "cf.add", key, "item").Val()) + require.Equal(t, "MBbloomCF", rdb.Type(ctx, key).Val()) + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", key, "1000").Err(), "already exists") + }) + + t.Run("Wrong type", func(t *testing.T) { + key := "test_cuckoo_filter_wrong_type" + require.NoError(t, rdb.Set(ctx, key, "value", 0).Err()) + require.ErrorContains(t, rdb.Do(ctx, "cf.add", key, "item").Err(), "WRONGTYPE") + }) + + t.Run("Reserve expansion", func(t *testing.T) { + require.NoError(t, rdb.Do(ctx, "cf.reserve", "test_cuckoo_filter_expansion_256", "1000", "EXPANSION", "256").Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", "test_cuckoo_filter_expansion_max", "1000", "EXPANSION", "32768").Err()) + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "test_cuckoo_filter_expansion_too_large", "1000", "EXPANSION", "32769").Err(), "expansion must be between 0 and 32768") + }) + + t.Run("Reserve creates cuckoo filter type", func(t *testing.T) { + key := "test_cuckoo_filter_reserve_type" + require.NoError(t, rdb.Del(ctx, key).Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", key, "1000").Err()) + require.Equal(t, "MBbloomCF", rdb.Type(ctx, key).Val()) + }) + + t.Run("Reserve invalid capacity", func(t *testing.T) { + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_cap_str", "abc").Err(), "invalid capacity") + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_cap_neg", "-1").Err(), "invalid capacity") + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_cap_zero", "0").Err(), "capacity must be larger than 0") + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_cap_one", "1").Err(), "capacity must be at least 2") + }) + + t.Run("Reserve invalid bucket size", func(t *testing.T) { + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_bs_str", "1000", "BUCKETSIZE", "abc").Err(), "invalid bucket size") + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_bs_zero", "1000", "BUCKETSIZE", "0").Err(), "bucket size must be between 1 and 255") + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_bs_neg", "1000", "BUCKETSIZE", "-1").Err(), "invalid bucket size") + }) + + t.Run("Reserve invalid max iterations", func(t *testing.T) { + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_mi_str", "1000", "MAXITERATIONS", "abc").Err(), "invalid max iterations") + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_mi_zero", "1000", "MAXITERATIONS", "0").Err(), "max iterations must be larger than 0") + }) + + t.Run("Reserve invalid expansion", func(t *testing.T) { + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_exp_str", "1000", "EXPANSION", "abc").Err(), "invalid expansion factor") + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_exp_big", "1000", "EXPANSION", "32769").Err(), "expansion must be between 0 and 32768") + }) + + t.Run("Reserve invalid syntax", func(t *testing.T) { + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", "cf_bad_syntax", "1000", "UNKNOWNOPT").Err(), "syntax error") + }) + + t.Run("Reserve duplicate key", func(t *testing.T) { + key := "test_cuckoo_filter_reserve_dup" + require.NoError(t, rdb.Del(ctx, key).Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", key, "1000").Err()) + require.ErrorContains(t, rdb.Do(ctx, "cf.reserve", key, "2000").Err(), "already exists") + }) + + t.Run("Reserve wrong number of arguments", func(t *testing.T) { + require.Error(t, rdb.Do(ctx, "cf.reserve").Err()) + require.Error(t, rdb.Do(ctx, "cf.reserve", "key_only").Err()) + }) + + t.Run("Add wrong number of arguments", func(t *testing.T) { + require.Error(t, rdb.Do(ctx, "cf.add").Err()) + require.Error(t, rdb.Do(ctx, "cf.add", "key_only").Err()) + require.Error(t, rdb.Do(ctx, "cf.add", "key", "item1", "item2").Err()) + }) + + t.Run("Add many items", func(t *testing.T) { + key := "test_cuckoo_filter_add_many" + require.NoError(t, rdb.Del(ctx, key).Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", key, "1000").Err()) + for i := 0; i < 500; i++ { + result := rdb.Do(ctx, "cf.add", key, fmt.Sprintf("item_%d", i)) + require.NoError(t, result.Err()) + require.Equal(t, int64(1), result.Val()) + } + }) + + t.Run("Add duplicate items allowed", func(t *testing.T) { + key := "test_cuckoo_filter_add_dup" + require.NoError(t, rdb.Del(ctx, key).Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", key, "1000").Err()) + for i := 0; i < 10; i++ { + result := rdb.Do(ctx, "cf.add", key, "same_item") + require.NoError(t, result.Err()) + require.Equal(t, int64(1), result.Val()) + } + }) + + t.Run("Add triggers expansion", func(t *testing.T) { + key := "test_cuckoo_filter_add_expansion" + require.NoError(t, rdb.Del(ctx, key).Err()) + // Small capacity with expansion enabled to trigger expansion quickly + require.NoError(t, rdb.Do(ctx, "cf.reserve", key, "4", "BUCKETSIZE", "1", "MAXITERATIONS", "1", "EXPANSION", "2").Err()) + // Add enough items to trigger expansion + for i := 0; i < 20; i++ { + result := rdb.Do(ctx, "cf.add", key, fmt.Sprintf("expand_item_%d", i)) + require.NoError(t, result.Err()) + require.Equal(t, int64(1), result.Val()) + } + }) + + t.Run("Add to full non-scaling filter returns error", func(t *testing.T) { + key := "test_cuckoo_filter_full_nonscaling" + require.NoError(t, rdb.Del(ctx, key).Err()) + // expansion=0 disables scaling; small capacity fills quickly + require.NoError(t, rdb.Do(ctx, "cf.reserve", key, "4", "BUCKETSIZE", "1", "MAXITERATIONS", "1", "EXPANSION", "0").Err()) + full := false + for i := 0; i < 100; i++ { + result := rdb.Do(ctx, "cf.add", key, fmt.Sprintf("full_item_%d", i)) + if result.Err() != nil { + require.ErrorContains(t, result.Err(), "filter is full") + full = true + break + } + } + require.True(t, full, "Non-scaling filter should eventually become full") + }) + + t.Run("Reserve with all optional params", func(t *testing.T) { + key := "test_cuckoo_filter_all_opts" + require.NoError(t, rdb.Del(ctx, key).Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", key, "5000", "BUCKETSIZE", "8", "MAXITERATIONS", "100", "EXPANSION", "4").Err()) + require.Equal(t, "MBbloomCF", rdb.Type(ctx, key).Val()) + }) + + t.Run("Add empty string item", func(t *testing.T) { + key := "test_cuckoo_filter_empty_item" + require.NoError(t, rdb.Del(ctx, key).Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", key, "100").Err()) + result := rdb.Do(ctx, "cf.add", key, "") + require.NoError(t, result.Err()) + require.Equal(t, int64(1), result.Val()) + }) + + t.Run("Add large item", func(t *testing.T) { + key := "test_cuckoo_filter_large_item" + require.NoError(t, rdb.Del(ctx, key).Err()) + require.NoError(t, rdb.Do(ctx, "cf.reserve", key, "100").Err()) + largeItem := make([]byte, 10000) + for i := range largeItem { + largeItem[i] = byte('a' + i%26) + } + result := rdb.Do(ctx, "cf.add", key, string(largeItem)) + require.NoError(t, result.Err()) + require.Equal(t, int64(1), result.Val()) + }) +}