From 333a9b1168e478bbb824f06c9f2ddbad25481292 Mon Sep 17 00:00:00 2001 From: kirito632 Date: Fri, 22 May 2026 15:52:22 +0800 Subject: [PATCH 1/2] fix(stream): filter consumers by group name in GetConsumerInfo to prevent cross-group leakage --- src/types/redis_stream.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 580629c9fb1..0c6aa8b08c1 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -1389,6 +1389,11 @@ rocksdb::Status Stream::GetConsumerInfo( continue; } + // Iterate bounds may include consumers from other groups; verify group name explicitly. + if (groupNameFromInternalKey(iter->key()) != group_name) { + continue; + } + std::string consumer_name = consumerNameFromInternalKey(iter->key()); StreamConsumerMetadata c_metadata = decodeStreamConsumerMetadataValue(iter->value().ToString()); std::pair tmp_item(consumer_name, c_metadata); From 18e8c869df4bc14a1a3a22b5586c0bbd6f3aa57d Mon Sep 17 00:00:00 2001 From: kirito632 Date: Tue, 26 May 2026 11:21:04 +0800 Subject: [PATCH 2/2] feat(stream): support XACKDEL command --- src/commands/cmd_stream.cc | 74 +++ src/storage/batch_extractor.cc | 82 ++- src/storage/batch_extractor.h | 2 + src/types/redis_stream.cc | 420 +++++++++++++ src/types/redis_stream.h | 23 + src/types/redis_stream_base.h | 12 + tests/gocase/unit/server/poll_updates_test.go | 1 + tests/gocase/unit/type/stream/stream_test.go | 577 ++++++++++++++++++ 8 files changed, 1190 insertions(+), 1 deletion(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 73a5279c8eb..9b480b469a6 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -268,6 +268,79 @@ class CommandXDel : public Commander { std::vector ids_; }; +class CommandXAckDel : public Commander { + public: + Status Parse(const std::vector &args) override { + CommandParser parser(args, 1); + stream_name_ = GET_OR_RET(parser.TakeStr()); + group_name_ = GET_OR_RET(parser.TakeStr()); + + option_ = redis::StreamDeleteOption::KeepRef; + + while (parser.Good() && !util::EqualICase(parser.RawPeek(), "IDS")) { + if (parser.EatEqICase("KEEPREF")) { + option_ = redis::StreamDeleteOption::KeepRef; + } else if (parser.EatEqICase("DELREF")) { + option_ = redis::StreamDeleteOption::DelRef; + } else if (parser.EatEqICase("ACKED")) { + option_ = redis::StreamDeleteOption::Acked; + } else { + return parser.InvalidSyntax(); + } + } + + if (!parser.EatEqICase("IDS")) { + return {Status::RedisParseErr, "syntax error, expected IDS keyword"}; + } + + auto numids_result = parser.TakeInt(); + if (!numids_result.IsOK()) { + return {Status::RedisParseErr, errValueNotInteger}; + } + int64_t numids = numids_result.GetValue(); + if (numids <= 0) { + return {Status::RedisParseErr, "numids must be positive"}; + } + + for (int64_t i = 0; i < numids; i++) { + auto id_str = GET_OR_RET(parser.TakeStr()); + redis::StreamEntryID id; + auto s = ParseStreamEntryID(id_str, &id); + if (!s.IsOK()) return s; + entry_ids_.emplace_back(id); + } + + if (parser.Good()) { + return {Status::RedisParseErr, "syntax error, unexpected trailing arguments after IDs"}; + } + + return Status::OK(); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::Stream stream_db(srv->storage, conn->GetNamespace()); + std::vector results; + + auto s = stream_db.DeleteEntriesAndAck(ctx, stream_name_, group_name_, entry_ids_, option_, &results); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + output->append(redis::MultiLen(results.size())); + for (int r : results) { + output->append(redis::Integer(r)); + } + + return Status::OK(); + } + + private: + std::string stream_name_; + std::string group_name_; + redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef; + std::vector entry_ids_; +}; + class CommandXClaim : public Commander { public: Status Parse(const std::vector &args) override { @@ -1907,6 +1980,7 @@ class CommandXSetId : public Commander { REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr("xack", -4, "write no-dbsize-check", 1, 1, 1), MakeCmdAttr("xadd", -5, "write", 1, 1, 1), MakeCmdAttr("xdel", -3, "write no-dbsize-check", 1, 1, 1), + MakeCmdAttr("xackdel", -6, "write no-dbsize-check", 1, 1, 1), MakeCmdAttr("xclaim", -6, "write", 1, 1, 1), MakeCmdAttr("xautoclaim", -6, "write", 1, 1, 1), MakeCmdAttr("xgroup", -4, "write", 2, 2, 1), diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index 8dfd4258e7e..670fb2bb7bc 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -26,6 +26,7 @@ #include "server/redis_reply.h" #include "server/server.h" #include "types/redis_bitmap.h" +#include "types/redis_stream_base.h" void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) { // Currently, we only have two kinds of log data @@ -38,6 +39,8 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) { // Redis type log data if (auto s = log_data_.Decode(blob); !s.IsOK()) { WARN("Failed to decode Redis type log: {}", s.Msg()); + } else { + seen_xackdel_entry_keys_.clear(); } } } @@ -266,6 +269,21 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic break; } } else if (column_family_id == static_cast(ColumnFamilyID::Stream)) { + InternalKey ikey(key, is_slot_id_encoded_); + Slice entry_id_check = ikey.GetSubKey(); + uint64_t delimiter = 0; + GetFixed64(&entry_id_check, &delimiter); + if (delimiter == UINT64_MAX) { + return rocksdb::Status::OK(); + } + + user_key = ikey.GetKey().ToString(); + auto key_slot_id = GetSlotIdFromKey(user_key); + if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) { + return rocksdb::Status::OK(); + } + ns = ikey.GetNamespace().ToString(); + auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, &command_args); if (!s.IsOK()) { ERROR("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg()); @@ -397,8 +415,70 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S Slice encoded_id = ikey.GetSubKey(); redis::StreamEntryID entry_id; GetFixed64(&encoded_id, &entry_id.ms); + + if (entry_id.ms == UINT64_MAX) { + // PEL / group / consumer metadata sub-key. Only PEL deletions + // produce XACKDEL commands for replication. + auto args = log_data_.GetArguments(); + if (!args->empty() && (*args)[0] == "XACKDEL" && args->size() >= 3) { + uint8_t type_delimiter = 0; + if (!GetFixed8(&encoded_id, &type_delimiter)) { + return rocksdb::Status::OK(); + } + if (type_delimiter == static_cast(redis::StreamSubkeyType::StreamPelEntry)) { + uint64_t group_name_len = 0; + if (!GetFixed64(&encoded_id, &group_name_len)) { + return rocksdb::Status::OK(); + } + if (group_name_len > encoded_id.size() || encoded_id.size() - group_name_len < 16) { + return rocksdb::Status::OK(); + } + encoded_id.remove_prefix(group_name_len); + + if (!GetFixed64(&encoded_id, &entry_id.ms) || !GetFixed64(&encoded_id, &entry_id.seq)) { + return rocksdb::Status::OK(); + } + std::string entry_id_str = entry_id.ToString(); + + std::string user_key = ikey.GetKey().ToString(); + auto key_slot_id = GetSlotIdFromKey(user_key); + if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) { + return rocksdb::Status::OK(); + } + ns = ikey.GetNamespace().ToString(); + std::string dedup_key = ns + '\0' + user_key + '\0' + (*args)[1] + '\0' + entry_id_str; + if (seen_xackdel_entry_keys_.insert(std::move(dedup_key)).second) { + command_args = {(*args)[0], user_key, (*args)[1], (*args)[2], "IDS", "1", entry_id_str}; + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); + } + } + } + return rocksdb::Status::OK(); + } + GetFixed64(&encoded_id, &entry_id.seq); - command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()}; + std::string entry_id_str = entry_id.ToString(); + std::string user_key = ikey.GetKey().ToString(); + + auto key_slot_id = GetSlotIdFromKey(user_key); + if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) { + return rocksdb::Status::OK(); + } + ns = ikey.GetNamespace().ToString(); + + auto args = log_data_.GetArguments(); + if (!args->empty()) { + if ((*args)[0] == "XACKDEL" && args->size() >= 3) { + std::string dedup_key = ns + '\0' + user_key + '\0' + (*args)[1] + '\0' + entry_id_str; + if (seen_xackdel_entry_keys_.insert(std::move(dedup_key)).second) { + command_args = {(*args)[0], user_key, (*args)[1], (*args)[2], "IDS", "1", entry_id_str}; + } + } else { + command_args = {"XDEL", user_key, entry_id_str}; + } + } else { + command_args = {"XDEL", user_key, entry_id_str}; + } } if (!command_args.empty()) { diff --git a/src/storage/batch_extractor.h b/src/storage/batch_extractor.h index 4f5775046cf..fef55a1c08a 100644 --- a/src/storage/batch_extractor.h +++ b/src/storage/batch_extractor.h @@ -22,6 +22,7 @@ #include #include +#include #include #include "cluster/cluster_defs.h" @@ -54,4 +55,5 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler { bool is_slot_id_encoded_ = false; SlotRange slot_range_; bool to_redis_; + std::unordered_set seen_xackdel_entry_keys_; }; diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 0c6aa8b08c1..d76a15beb14 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -394,6 +395,425 @@ rocksdb::Status Stream::DeletePelEntries(engine::Context &ctx, const Slice &stre return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } +rocksdb::Status Stream::getGroupNames(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, + std::vector *group_names) { + group_names->clear(); + + std::string subkey_type_delimiter; + PutFixed64(&subkey_type_delimiter, UINT64_MAX); + PutFixed8(&subkey_type_delimiter, static_cast(StreamSubkeyType::StreamConsumerGroupMetadata)); + + std::string next_version_prefix_key = + InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + std::string prefix_key = + InternalKey(ns_key, subkey_type_delimiter, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + rocksdb::Slice lower_bound(prefix_key); + read_options.iterate_lower_bound = &lower_bound; + + auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamConsumerGroupMetadata) { + continue; + } + group_names->push_back(groupNameFromInternalKey(iter->key())); + } + return iter->status(); +} + +rocksdb::Status Stream::deleteEntryAndUpdateMeta(rocksdb::WriteBatchBase *batch, const std::string &entry_key, + const StreamEntryID &id, StreamMetadata *metadata, + uint64_t *deleted_cnt) { + rocksdb::Status s = batch->Delete(stream_cf_handle_, entry_key); + if (!s.ok()) return s; + (*deleted_cnt)++; + + if (metadata->max_deleted_entry_id < id) { + metadata->max_deleted_entry_id = id; + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::cleanPelFromAllGroups( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, const StreamEntryID &id, + rocksdb::WriteBatchBase *batch, bool *batch_modified, const std::vector &group_names, + std::map *group_pending_decrements, + std::map> *consumer_pending_decrements) { + for (const auto &group_name : group_names) { + std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); + std::string pel_value; + auto pel_s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, pel_key, &pel_value); + if (pel_s.ok()) { + rocksdb::Status s = batch->Delete(stream_cf_handle_, pel_key); + if (!s.ok()) return s; + *batch_modified = true; + + auto pel_entry = decodeStreamPelEntryValue(pel_value); + (*group_pending_decrements)[group_name]++; + (*consumer_pending_decrements)[group_name][pel_entry.consumer_name]++; + } else if (!pel_s.IsNotFound()) { + return pel_s; + } + } + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::flushPendingNumberUpdates( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, rocksdb::WriteBatchBase *batch, + const std::map &group_pending_decrements, + const std::map> &consumer_pending_decrements) { + for (const auto &[group_name, decrement] : group_pending_decrements) { + auto group_key = internalKeyFromGroupName(ns_key, metadata, group_name); + std::string group_value; + auto s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &group_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.ok()) { + auto group_meta = decodeStreamConsumerGroupMetadataValue(group_value); + group_meta.pending_number -= decrement; + s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(group_meta)); + if (!s.ok()) return s; + } + } + + for (const auto &[group_name, consumers] : consumer_pending_decrements) { + for (const auto &[consumer_name, decrement] : consumers) { + auto consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); + std::string consumer_value; + auto s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &consumer_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.ok()) { + auto consumer_meta = decodeStreamConsumerMetadataValue(consumer_value); + consumer_meta.pending_number -= decrement; + s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_meta)); + if (!s.ok()) return s; + } + } + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::isAckedByAllGroups( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, const StreamEntryID &id, + const std::vector &group_names, + const std::unordered_map &last_delivered_ids_by_group, bool *all_acked) { + *all_acked = true; + for (const auto &group_name : group_names) { + // A group with last_delivered_id < id has never delivered this entry, + // so it cannot be considered acknowledged. + auto it = last_delivered_ids_by_group.find(group_name); + if (it != last_delivered_ids_by_group.end() && id > it->second) { + *all_acked = false; + return rocksdb::Status::OK(); + } + + std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); + std::string pel_value; + auto pel_s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, pel_key, &pel_value); + if (pel_s.ok()) { + *all_acked = false; + return rocksdb::Status::OK(); + } else if (!pel_s.IsNotFound()) { + return pel_s; + } + } + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::DeleteEntriesAndAck(engine::Context &ctx, const Slice &stream_name, + const std::string &group_name, const std::vector &ids, + StreamDeleteOption option, std::vector *results) { + results->assign(ids.size(), static_cast(StreamEntryDeleteResult::kEntryNotFound)); + + std::string ns_key = AppendNamespacePrefix(stream_name); + + StreamMetadata metadata(false); + rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); + if (!s.ok()) { + if (s.IsNotFound()) { + // Missing keys return per-ID not-found results, not a command error. + return rocksdb::Status::OK(); + } + return s; + } + + if (ids.empty()) { + return rocksdb::Status::OK(); + } + + std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name); + std::string get_group_value; + s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value); + if (!s.ok()) { + if (s.IsNotFound()) { + return rocksdb::Status::OK(); + } + return s; + } + + auto batch = storage_->GetWriteBatchBase(); + std::string option_str; + switch (option) { + case StreamDeleteOption::DelRef: + option_str = "DELREF"; + break; + case StreamDeleteOption::Acked: + option_str = "ACKED"; + break; + default: + option_str = "KEEPREF"; + break; + } + WriteBatchLogData log_data(kRedisStream, {"XACKDEL", group_name, option_str}); + s = batch->PutLogData(log_data.Encode()); + if (!s.ok()) return s; + + std::string next_version_prefix_key = + InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + rocksdb::Slice lower_bound(prefix_key); + read_options.iterate_lower_bound = &lower_bound; + + auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_); + + std::vector all_groups; + bool need_groups = (option == StreamDeleteOption::DelRef || option == StreamDeleteOption::Acked); + if (need_groups) { + s = getGroupNames(ctx, ns_key, metadata, &all_groups); + if (!s.ok()) return s; + } + + // Prefetch last-delivered IDs so ACKED can distinguish acked entries + // from entries that were never delivered. + std::unordered_map last_delivered_ids_by_group; + if (option == StreamDeleteOption::Acked) { + for (const auto &candidate_group_name : all_groups) { + std::string group_metadata_key = internalKeyFromGroupName(ns_key, metadata, candidate_group_name); + std::string group_metadata_value; + s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_metadata_key, &group_metadata_value); + if (s.ok()) { + auto group_metadata = decodeStreamConsumerGroupMetadataValue(group_metadata_value); + last_delivered_ids_by_group[candidate_group_name] = group_metadata.last_delivered_id; + } else if (!s.IsNotFound()) { + return s; + } + } + } + + std::map consumer_acknowledges; + std::map other_group_pending_decrements; + std::map> other_consumer_pending_decrements; + uint64_t deleted_cnt = 0; + uint64_t acknowledged_cnt = 0; + bool batch_modified = false; + + std::unordered_set seen_entry_keys; + seen_entry_keys.reserve(ids.size()); + std::unordered_set deleted_entry_keys; + deleted_entry_keys.reserve(ids.size()); + StreamEntryID original_first_entry_id = metadata.first_entry_id; + StreamEntryID original_last_entry_id = metadata.last_entry_id; + + for (size_t i = 0; i < ids.size(); i++) { + const auto &id = ids[i]; + + std::string entry_key = internalKeyFromEntryID(ns_key, metadata, id); + + if (!seen_entry_keys.insert(entry_key).second) { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryNotFound); + continue; + } + + // Look up the current group's PEL entry first. + std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); + std::string pel_value; + s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, pel_key, &pel_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.IsNotFound()) { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryNotFound); + continue; + } + + s = batch->Delete(stream_cf_handle_, pel_key); + if (!s.ok()) return s; + acknowledged_cnt++; + batch_modified = true; + + auto pel_entry = decodeStreamPelEntryValue(pel_value); + consumer_acknowledges[pel_entry.consumer_name]++; + + std::string value; + s = storage_->Get(ctx, read_options, stream_cf_handle_, entry_key, &value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + bool stream_entry_exists = s.ok(); + + std::vector other_groups; + if (need_groups) { + for (const auto &candidate_group_name : all_groups) { + if (candidate_group_name != group_name) other_groups.push_back(candidate_group_name); + } + } + + if (!stream_entry_exists) { + if (option == StreamDeleteOption::DelRef && !other_groups.empty()) { + s = cleanPelFromAllGroups(ctx, ns_key, metadata, id, batch.Get(), &batch_modified, other_groups, + &other_group_pending_decrements, &other_consumer_pending_decrements); + if (!s.ok()) return s; + } + + if (option == StreamDeleteOption::Acked) { + bool all_other_acked = true; + if (!other_groups.empty()) { + s = isAckedByAllGroups(ctx, ns_key, metadata, id, other_groups, last_delivered_ids_by_group, + &all_other_acked); + if (!s.ok()) return s; + } + (*results)[i] = static_cast(all_other_acked ? StreamEntryDeleteResult::kEntryDeleted + : StreamEntryDeleteResult::kEntrySkipped); + } else { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryDeleted); + } + continue; + } + + if (option == StreamDeleteOption::KeepRef) { + s = deleteEntryAndUpdateMeta(batch.Get(), entry_key, id, &metadata, &deleted_cnt); + if (!s.ok()) return s; + batch_modified = true; + deleted_entry_keys.insert(entry_key); + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryDeleted); + } else if (option == StreamDeleteOption::DelRef) { + s = deleteEntryAndUpdateMeta(batch.Get(), entry_key, id, &metadata, &deleted_cnt); + if (!s.ok()) return s; + batch_modified = true; + deleted_entry_keys.insert(entry_key); + + if (!other_groups.empty()) { + s = cleanPelFromAllGroups(ctx, ns_key, metadata, id, batch.Get(), &batch_modified, other_groups, + &other_group_pending_decrements, &other_consumer_pending_decrements); + if (!s.ok()) return s; + } + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryDeleted); + } else { // StreamDeleteOption::Acked + bool all_other_acked = true; + if (!other_groups.empty()) { + s = isAckedByAllGroups(ctx, ns_key, metadata, id, other_groups, last_delivered_ids_by_group, &all_other_acked); + if (!s.ok()) return s; + } + + if (all_other_acked) { + s = deleteEntryAndUpdateMeta(batch.Get(), entry_key, id, &metadata, &deleted_cnt); + if (!s.ok()) return s; + batch_modified = true; + deleted_entry_keys.insert(entry_key); + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryDeleted); + } else { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntrySkipped); + } + } + } + + if (deleted_cnt > 0 || acknowledged_cnt > 0 || !other_group_pending_decrements.empty()) { + if (deleted_cnt > 0) { + metadata.size -= deleted_cnt; + + if (metadata.size == 0) { + metadata.first_entry_id.Clear(); + metadata.last_entry_id.Clear(); + metadata.recorded_first_entry_id.Clear(); + } else { + bool first_deleted = + deleted_entry_keys.count(internalKeyFromEntryID(ns_key, metadata, original_first_entry_id)) > 0; + bool last_deleted = + deleted_entry_keys.count(internalKeyFromEntryID(ns_key, metadata, original_last_entry_id)) > 0; + + if (first_deleted) { + iter->SeekToFirst(); + while (iter->Valid() && (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry || + deleted_entry_keys.count(iter->key().ToString()) > 0)) { + iter->Next(); + } + if (iter->Valid()) { + metadata.first_entry_id = entryIDFromInternalKey(iter->key()); + metadata.recorded_first_entry_id = metadata.first_entry_id; + } else { + metadata.first_entry_id.Clear(); + metadata.recorded_first_entry_id.Clear(); + } + } + if (last_deleted) { + iter->SeekToLast(); + while (iter->Valid() && (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry || + deleted_entry_keys.count(iter->key().ToString()) > 0)) { + iter->Prev(); + } + if (iter->Valid()) { + metadata.last_entry_id = entryIDFromInternalKey(iter->key()); + } else { + metadata.last_entry_id.Clear(); + } + } + } + + std::string bytes; + metadata.Encode(&bytes); + s = batch->Put(metadata_cf_handle_, ns_key, bytes); + if (!s.ok()) return s; + } + + if (acknowledged_cnt > 0) { + StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value); + group_metadata.pending_number -= acknowledged_cnt; + std::string group_value = encodeStreamConsumerGroupMetadataValue(group_metadata); + s = batch->Put(stream_cf_handle_, group_key, group_value); + if (!s.ok()) return s; + + for (const auto &[consumer_name, ack_count] : consumer_acknowledges) { + auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); + std::string consumer_meta_original; + s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_meta_key, &consumer_meta_original); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.ok()) { + auto consumer_metadata = decodeStreamConsumerMetadataValue(consumer_meta_original); + consumer_metadata.pending_number -= ack_count; + s = batch->Put(stream_cf_handle_, consumer_meta_key, encodeStreamConsumerMetadataValue(consumer_metadata)); + if (!s.ok()) return s; + } + } + } + + if (!other_group_pending_decrements.empty()) { + s = flushPendingNumberUpdates(ctx, ns_key, metadata, batch.Get(), other_group_pending_decrements, + other_consumer_pending_decrements); + if (!s.ok()) return s; + } + } + + if (!batch_modified) { + return rocksdb::Status::OK(); + } + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + rocksdb::Status Stream::ClaimPelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::string &consumer_name, const uint64_t min_idle_time_ms, const std::vector &entry_ids, const StreamClaimOptions &options, diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h index 08cae0b1d48..6a1442c7030 100644 --- a/src/types/redis_stream.h +++ b/src/types/redis_stream.h @@ -22,8 +22,10 @@ #include +#include #include #include +#include #include #include "storage/redis_db.h" @@ -53,6 +55,9 @@ class Stream : public SubKeyScanner { uint64_t *deleted_cnt); rocksdb::Status DeletePelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::vector &entry_ids, uint64_t *acknowledged); + rocksdb::Status DeleteEntriesAndAck(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, + const std::vector &ids, StreamDeleteOption option, + std::vector *results); rocksdb::Status ClaimPelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::string &consumer_name, uint64_t min_idle_time_ms, const std::vector &entry_ids, const StreamClaimOptions &options, @@ -100,6 +105,8 @@ class Stream : public SubKeyScanner { static StreamConsumerGroupMetadata decodeStreamConsumerGroupMetadataValue(const std::string &value); std::string internalKeyFromConsumerName(const std::string &ns_key, const StreamMetadata &metadata, const std::string &group_name, const std::string &consumer_name) const; + rocksdb::Status getGroupNames(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, + std::vector *group_names); std::string consumerNameFromInternalKey(rocksdb::Slice key) const; static std::string encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata); static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const std::string &value); @@ -109,6 +116,22 @@ class Stream : public SubKeyScanner { std::string internalPelKeyFromGroupAndEntryId(const std::string &ns_key, const StreamMetadata &metadata, const std::string &group_name, const StreamEntryID &id); StreamEntryID groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, std::string &group_name); + + rocksdb::Status deleteEntryAndUpdateMeta(rocksdb::WriteBatchBase *batch, const std::string &entry_key, + const StreamEntryID &id, StreamMetadata *metadata, uint64_t *deleted_cnt); + rocksdb::Status cleanPelFromAllGroups( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, const StreamEntryID &id, + rocksdb::WriteBatchBase *batch, bool *batch_modified, const std::vector &group_names, + std::map *group_pending_decrements, + std::map> *consumer_pending_decrements); + rocksdb::Status isAckedByAllGroups(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, + const StreamEntryID &id, const std::vector &group_names, + const std::unordered_map &last_delivered_ids_by_group, + bool *all_acked); + rocksdb::Status flushPendingNumberUpdates( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, rocksdb::WriteBatchBase *batch, + const std::map &group_pending_decrements, + const std::map> &consumer_pending_decrements); static std::string encodeStreamPelEntryValue(const StreamPelEntry &pel_entry); static StreamPelEntry decodeStreamPelEntryValue(const std::string &value); StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const; diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h index f7991f573e3..1e9830249b1 100644 --- a/src/types/redis_stream_base.h +++ b/src/types/redis_stream_base.h @@ -203,6 +203,18 @@ enum class StreamSubkeyType { StreamPelEntry = 3, }; +enum class StreamDeleteOption { + KeepRef = 0, + DelRef = 1, + Acked = 2, +}; + +enum class StreamEntryDeleteResult : int { + kEntryNotFound = -1, + kEntryDeleted = 1, + kEntrySkipped = 2, +}; + struct StreamPelEntry { uint64_t last_delivery_time_ms; uint64_t last_delivery_count; diff --git a/tests/gocase/unit/server/poll_updates_test.go b/tests/gocase/unit/server/poll_updates_test.go index 4d76499f9cc..d000cd8868a 100644 --- a/tests/gocase/unit/server/poll_updates_test.go +++ b/tests/gocase/unit/server/poll_updates_test.go @@ -283,6 +283,7 @@ func TestPollUpdates_WithRESPFormat(t *testing.T) { pollUpdates = parsePollUpdatesResult(t, result.(map[any]any), true) require.Len(t, pollUpdates.Updates, 1) require.EqualValues(t, []any{RESPFormat{ + Namespace: "default", Commands: [][]string{ {"XADD", "stream", id, "field", "value"}, {"XDEL", "stream", id}, diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 00354848ca9..d3070fb6ddc 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2643,6 +2643,583 @@ func TestStreamOffset(t *testing.T) { require.NoError(t, rdb.Del(ctx, streamKey).Err()) }) + + t.Run("XACKDEL DELREF cross-group PEL cleanup", func(t *testing.T) { + streamName := "xackdel_delref_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "DELREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + + otherPending, err := rdb.XPending(ctx, streamName, otherGroup).Result() + require.NoError(t, err) + require.Equal(t, int64(0), otherPending.Count) + + infoGroups := rdb.XInfoGroups(ctx, streamName).Val() + require.Len(t, infoGroups, 2) + require.Equal(t, int64(0), infoGroups[0].Pending) + require.Equal(t, int64(0), infoGroups[1].Pending) + + consumers1 := rdb.XInfoConsumers(ctx, streamName, groupName).Val() + require.Len(t, consumers1, 1) + require.Equal(t, "c1", consumers1[0].Name) + require.Equal(t, int64(0), consumers1[0].Pending) + + consumers2 := rdb.XInfoConsumers(ctx, streamName, otherGroup).Val() + require.Len(t, consumers2, 1) + require.Equal(t, "c2", consumers2[0].Name) + require.Equal(t, int64(0), consumers2[0].Pending) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL ACKED all groups have acked deletes entry", func(t *testing.T) { + streamName := "xackdel_acked_all_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XAck(ctx, streamName, otherGroup, "1-0").Err()) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL ACKED with other group not acked returns 2", func(t *testing.T) { + streamName := "xackdel_acked_notall_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + // Both groups still have pending entries; XACKDEL acks current group but + // other group still references the entry, so it cannot be deleted. + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(2)}, r) + + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL non-existent key returns array of -1", func(t *testing.T) { + streamName := "xackdel_nokey_" + strconv.Itoa(rand.Int()) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, "nonexistent", "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(-1)}, r) + }) + + t.Run("XACKDEL non-existent group on existing stream returns array of -1", func(t *testing.T) { + streamName := "xackdel_nogroup_" + strconv.Itoa(rand.Int()) + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, "nonexistent", "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(-1)}, r) + + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL DELREF duplicate IDs are idempotent", func(t *testing.T) { + streamName := "xackdel_dedup_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "DELREF", "IDS", "2", "1-0", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1), int64(-1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL KEEPREF deletes entry", func(t *testing.T) { + streamName := "xackdel_keepref_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + // Create a pending entry so the ID is in the group's PEL. + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL KEEPREF ID not pending in group does not delete entry", func(t *testing.T) { + streamName := "xackdel_keepref_notpending_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(-1)}, r) + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL DELREF dangling PEL after stream entry deleted by XDEL", func(t *testing.T) { + streamName := "xackdel_delref_dangling_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + // Both groups read the entry so both have PEL entries. + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + // Delete the stream entry directly, leaving dangling PEL entries. + require.NoError(t, rdb.XDel(ctx, streamName, "1-0").Err()) + + // XACKDEL DELREF from the first group should clean up all dangling PEL entries. + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "DELREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + + otherPending, err := rdb.XPending(ctx, streamName, otherGroup).Result() + require.NoError(t, err) + require.Equal(t, int64(0), otherPending.Count) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL KEEPREF acks current group PEL", func(t *testing.T) { + streamName := "xackdel_keepref_ack_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + // Create a pending entry by reading. + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + + // KEEPREF must still ACK the current group's pending entry. + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL KEEPREF non-existent entry returns -1", func(t *testing.T) { + streamName := "xackdel_keepref_notfound_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF", "IDS", "1", "999-999").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(-1)}, r) + + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL KEEPREF delete first entry recalculates boundary", func(t *testing.T) { + streamName := "xackdel_boundary_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "2-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "3-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + // Create a pending entry so the ID is in the group's PEL. + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 3, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(2), rdb.XLen(ctx, streamName).Val()) + + msgs, err := rdb.XRange(ctx, streamName, "-", "+").Result() + require.NoError(t, err) + require.Equal(t, "2-0", msgs[0].ID) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL DELREF single group PEL cleanup", func(t *testing.T) { + streamName := "xackdel_delref_single_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "DELREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + + consumers := rdb.XInfoConsumers(ctx, streamName, groupName).Val() + require.Len(t, consumers, 1) + require.Equal(t, "c1", consumers[0].Name) + require.Equal(t, int64(0), consumers[0].Pending) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL ACKED already acked in current group returns -1", func(t *testing.T) { + streamName := "xackdel_acked_single_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + // Ack the entry first; the PEL entry for the current group is now gone. + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + + // XACKDEL must find the ID in the current group's PEL; since it is already + // acked, it returns -1 and does not delete the stream entry. + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(-1)}, r) + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL ACKED single group pending acknowledges and deletes entry", func(t *testing.T) { + streamName := "xackdel_acked_pending_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + pendingBefore, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(1), pendingBefore.Count) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + + pendingAfter, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pendingAfter.Count) + + consumers := rdb.XInfoConsumers(ctx, streamName, groupName).Val() + require.Len(t, consumers, 1) + require.Equal(t, int64(0), consumers[0].Pending) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL with invalid option returns error", func(t *testing.T) { + streamName := "xackdel_badopt_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "INVALID", "IDS", "1", "1-0").Result() + require.Error(t, err) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL missing IDS returns error", func(t *testing.T) { + streamName := "xackdel_noids_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF").Result() + require.Error(t, err) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL with non-integer numids returns error", func(t *testing.T) { + streamName := "xackdel_badnumids_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF", "IDS", "abc").Result() + require.Error(t, err) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL with zero numids returns error", func(t *testing.T) { + streamName := "xackdel_zeronumids_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF", "IDS", "0").Result() + require.Error(t, err) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL with negative numids returns error", func(t *testing.T) { + streamName := "xackdel_negnumids_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF", "IDS", "-1").Result() + require.Error(t, err) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL with invalid entry ID returns error", func(t *testing.T) { + streamName := "xackdel_badid_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "KEEPREF", "IDS", "1", "bad-id").Result() + require.Error(t, err) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {