From 8b52cd95a027eff45cf2c4dc8db6c478463f5993 Mon Sep 17 00:00:00 2001 From: kirito632 Date: Fri, 22 May 2026 15:52:22 +0800 Subject: [PATCH 1/2] fix(stream): filter consumers by group name in GetConsumerInfo to prevent cross-group leakage --- src/types/redis_stream.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 580629c9fb1..0c6aa8b08c1 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -1389,6 +1389,11 @@ rocksdb::Status Stream::GetConsumerInfo( continue; } + // Iterate bounds may include consumers from other groups; verify group name explicitly. + if (groupNameFromInternalKey(iter->key()) != group_name) { + continue; + } + std::string consumer_name = consumerNameFromInternalKey(iter->key()); StreamConsumerMetadata c_metadata = decodeStreamConsumerMetadataValue(iter->value().ToString()); std::pair tmp_item(consumer_name, c_metadata); From 19e00c2fd5959ee37db31f56ffdf2ee210e813fe Mon Sep 17 00:00:00 2001 From: kirito632 Date: Fri, 22 May 2026 15:58:07 +0800 Subject: [PATCH 2/2] feat(stream): support XACKDEL and XDELEX commands --- src/commands/cmd_stream.cc | 138 +++++ src/storage/batch_extractor.cc | 21 +- src/types/redis_stream.cc | 568 +++++++++++++++++++ src/types/redis_stream.h | 31 + src/types/redis_stream_base.h | 12 + tests/gocase/unit/type/stream/stream_test.go | 354 ++++++++++++ 6 files changed, 1123 insertions(+), 1 deletion(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 73a5279c8eb..a0ecb862994 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -268,6 +268,142 @@ class CommandXDel : public Commander { std::vector ids_; }; +class CommandXAckDel : public Commander { + public: + Status Parse(const std::vector &args) override { + CommandParser parser(args, 1); + stream_name_ = GET_OR_RET(parser.TakeStr()); + group_name_ = GET_OR_RET(parser.TakeStr()); + + option_ = redis::StreamDeleteOption::KeepRef; + + while (parser.Good() && !util::EqualICase(parser.RawPeek(), "IDS")) { + if (parser.EatEqICase("KEEPREF")) { + option_ = redis::StreamDeleteOption::KeepRef; + } else if (parser.EatEqICase("DELREF")) { + option_ = redis::StreamDeleteOption::DelRef; + } else if (parser.EatEqICase("ACKED")) { + option_ = redis::StreamDeleteOption::Acked; + } else { + return parser.InvalidSyntax(); + } + } + + if (!parser.EatEqICase("IDS")) { + return {Status::RedisParseErr, "syntax error, expected IDS keyword"}; + } + + auto numids_result = parser.TakeInt(); + if (!numids_result.IsOK()) { + return {Status::RedisParseErr, errValueNotInteger}; + } + int64_t numids = numids_result.GetValue(); + if (numids <= 0) { + return {Status::RedisParseErr, "numids must be positive"}; + } + + for (int64_t i = 0; i < numids; i++) { + auto id_str = GET_OR_RET(parser.TakeStr()); + redis::StreamEntryID id; + auto s = ParseStreamEntryID(id_str, &id); + if (!s.IsOK()) return s; + entry_ids_.emplace_back(id); + } + + return Status::OK(); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::Stream stream_db(srv->storage, conn->GetNamespace()); + std::vector results; + + auto s = stream_db.DeleteEntriesAndAck(ctx, stream_name_, group_name_, entry_ids_, option_, &results); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + output->append(redis::MultiLen(results.size())); + for (int r : results) { + output->append(redis::Integer(r)); + } + + return Status::OK(); + } + + private: + std::string stream_name_; + std::string group_name_; + redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef; + std::vector entry_ids_; +}; + +class CommandXDelEx : public Commander { + public: + Status Parse(const std::vector &args) override { + CommandParser parser(args, 1); + stream_name_ = GET_OR_RET(parser.TakeStr()); + + option_ = redis::StreamDeleteOption::KeepRef; + + while (parser.Good() && !util::EqualICase(parser.RawPeek(), "IDS")) { + if (parser.EatEqICase("KEEPREF")) { + option_ = redis::StreamDeleteOption::KeepRef; + } else if (parser.EatEqICase("DELREF")) { + option_ = redis::StreamDeleteOption::DelRef; + } else if (parser.EatEqICase("ACKED")) { + option_ = redis::StreamDeleteOption::Acked; + } else { + return parser.InvalidSyntax(); + } + } + + if (!parser.EatEqICase("IDS")) { + return {Status::RedisParseErr, "syntax error, expected IDS keyword"}; + } + + auto numids_result = parser.TakeInt(); + if (!numids_result.IsOK()) { + return {Status::RedisParseErr, errValueNotInteger}; + } + int64_t numids = numids_result.GetValue(); + if (numids <= 0) { + return {Status::RedisParseErr, "numids must be positive"}; + } + + for (int64_t i = 0; i < numids; i++) { + auto id_str = GET_OR_RET(parser.TakeStr()); + redis::StreamEntryID id; + auto s = ParseStreamEntryID(id_str, &id); + if (!s.IsOK()) return s; + entry_ids_.emplace_back(id); + } + + return Status::OK(); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::Stream stream_db(srv->storage, conn->GetNamespace()); + std::vector results; + + auto s = stream_db.DeleteEntriesWithOption(ctx, stream_name_, entry_ids_, option_, &results); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + output->append(redis::MultiLen(results.size())); + for (int r : results) { + output->append(redis::Integer(r)); + } + + return Status::OK(); + } + + private: + std::string stream_name_; + redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef; + std::vector entry_ids_; +}; + class CommandXClaim : public Commander { public: Status Parse(const std::vector &args) override { @@ -1907,6 +2043,8 @@ class CommandXSetId : public Commander { REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr("xack", -4, "write no-dbsize-check", 1, 1, 1), MakeCmdAttr("xadd", -5, "write", 1, 1, 1), MakeCmdAttr("xdel", -3, "write no-dbsize-check", 1, 1, 1), + MakeCmdAttr("xackdel", -6, "write no-dbsize-check", 1, 1, 1), + MakeCmdAttr("xdelex", -5, "write no-dbsize-check", 1, 1, 1), MakeCmdAttr("xclaim", -6, "write", 1, 1, 1), MakeCmdAttr("xautoclaim", -6, "write", 1, 1, 1), MakeCmdAttr("xgroup", -4, "write", 2, 2, 1), diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index 8dfd4258e7e..dc2c17a70c1 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -397,8 +397,27 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S Slice encoded_id = ikey.GetSubKey(); redis::StreamEntryID entry_id; GetFixed64(&encoded_id, &entry_id.ms); + + if (entry_id.ms == UINT64_MAX) { + return rocksdb::Status::OK(); + } + GetFixed64(&encoded_id, &entry_id.seq); - command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()}; + std::string entry_id_str = entry_id.ToString(); + std::string user_key = ikey.GetKey().ToString(); + + auto args = log_data_.GetArguments(); + if (!args->empty()) { + if ((*args)[0] == "XACKDEL" && args->size() >= 3) { + command_args = {(*args)[0], user_key, (*args)[1], (*args)[2], "IDS", "1", entry_id_str}; + } else if ((*args)[0] == "XDELEX" && args->size() >= 2) { + command_args = {(*args)[0], user_key, (*args)[1], "IDS", "1", entry_id_str}; + } else { + command_args = {"XDEL", user_key, entry_id_str}; + } + } else { + command_args = {"XDEL", user_key, entry_id_str}; + } } if (!command_args.empty()) { diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 0c6aa8b08c1..252db22c130 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -394,6 +395,573 @@ rocksdb::Status Stream::DeletePelEntries(engine::Context &ctx, const Slice &stre return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } +rocksdb::Status Stream::getGroupNames(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, + std::vector *group_names) { + group_names->clear(); + + std::string subkey_type_delimiter; + PutFixed64(&subkey_type_delimiter, UINT64_MAX); + PutFixed8(&subkey_type_delimiter, static_cast(StreamSubkeyType::StreamConsumerGroupMetadata)); + + std::string next_version_prefix_key = + InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + std::string prefix_key = + InternalKey(ns_key, subkey_type_delimiter, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + rocksdb::Slice lower_bound(prefix_key); + read_options.iterate_lower_bound = &lower_bound; + + auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamConsumerGroupMetadata) { + continue; + } + group_names->push_back(groupNameFromInternalKey(iter->key())); + } + return iter->status(); +} + +rocksdb::Status Stream::deleteEntryAndUpdateMeta(rocksdb::WriteBatchBase *batch, const std::string &entry_key, + const StreamEntryID &id, StreamMetadata *metadata, + uint64_t *deleted_cnt) { + rocksdb::Status s = batch->Delete(stream_cf_handle_, entry_key); + if (!s.ok()) return s; + (*deleted_cnt)++; + + if (metadata->max_deleted_entry_id < id) { + metadata->max_deleted_entry_id = id; + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::processOneEntryDeletion( + engine::Context &ctx, const rocksdb::ReadOptions &read_options, const std::string &ns_key, StreamMetadata *metadata, + const StreamEntryID &id, const std::vector &all_groups, StreamDeleteOption option, + rocksdb::WriteBatchBase *batch, bool *batch_modified, uint64_t *deleted_cnt, StreamEntryDeleteResult *result, + std::map *group_pending_decrements, + std::map> *consumer_pending_decrements) { + std::string entry_key = internalKeyFromEntryID(ns_key, *metadata, id); + std::string value; + auto s = storage_->Get(ctx, read_options, stream_cf_handle_, entry_key, &value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + + if (s.IsNotFound()) { + if (option == StreamDeleteOption::DelRef) { + s = cleanPelFromAllGroups(ctx, ns_key, *metadata, id, batch, batch_modified, all_groups, group_pending_decrements, + consumer_pending_decrements); + if (!s.ok()) return s; + } + *result = StreamEntryDeleteResult::kEntryNotFound; + return rocksdb::Status::OK(); + } + + switch (option) { + case StreamDeleteOption::KeepRef: + s = deleteEntryAndUpdateMeta(batch, entry_key, id, metadata, deleted_cnt); + if (!s.ok()) return s; + *batch_modified = true; + *result = StreamEntryDeleteResult::kEntryDeleted; + break; + + case StreamDeleteOption::DelRef: + s = deleteEntryAndUpdateMeta(batch, entry_key, id, metadata, deleted_cnt); + if (!s.ok()) return s; + s = cleanPelFromAllGroups(ctx, ns_key, *metadata, id, batch, batch_modified, all_groups, group_pending_decrements, + consumer_pending_decrements); + if (!s.ok()) return s; + *batch_modified = true; + *result = StreamEntryDeleteResult::kEntryDeleted; + break; + + case StreamDeleteOption::Acked: { + if (all_groups.empty()) { + *result = StreamEntryDeleteResult::kEntrySkipped; + return rocksdb::Status::OK(); + } + bool all_acked = false; + s = isAckedByAllGroups(ctx, ns_key, *metadata, id, all_groups, &all_acked); + if (!s.ok()) return s; + if (all_acked) { + s = deleteEntryAndUpdateMeta(batch, entry_key, id, metadata, deleted_cnt); + if (!s.ok()) return s; + *batch_modified = true; + *result = StreamEntryDeleteResult::kEntryDeleted; + } else { + *result = StreamEntryDeleteResult::kEntrySkipped; + } + break; + } + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::cleanPelFromAllGroups( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, const StreamEntryID &id, + rocksdb::WriteBatchBase *batch, bool *batch_modified, const std::vector &group_names, + std::map *group_pending_decrements, + std::map> *consumer_pending_decrements) { + for (const auto &group_name : group_names) { + std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); + std::string pel_value; + auto pel_s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, pel_key, &pel_value); + if (pel_s.ok()) { + rocksdb::Status s = batch->Delete(stream_cf_handle_, pel_key); + if (!s.ok()) return s; + *batch_modified = true; + + auto pel_entry = decodeStreamPelEntryValue(pel_value); + (*group_pending_decrements)[group_name]++; + (*consumer_pending_decrements)[group_name][pel_entry.consumer_name]++; + } else if (!pel_s.IsNotFound()) { + return pel_s; + } + } + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::flushPendingNumberUpdates( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, rocksdb::WriteBatchBase *batch, + const std::map &group_pending_decrements, + const std::map> &consumer_pending_decrements) { + for (const auto &[group_name, decrement] : group_pending_decrements) { + auto group_key = internalKeyFromGroupName(ns_key, metadata, group_name); + std::string group_value; + auto s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &group_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.ok()) { + auto group_meta = decodeStreamConsumerGroupMetadataValue(group_value); + group_meta.pending_number -= decrement; + s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(group_meta)); + if (!s.ok()) return s; + } + } + + for (const auto &[group_name, consumers] : consumer_pending_decrements) { + for (const auto &[consumer_name, decrement] : consumers) { + auto consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); + std::string consumer_value; + auto s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &consumer_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.ok()) { + auto consumer_meta = decodeStreamConsumerMetadataValue(consumer_value); + consumer_meta.pending_number -= decrement; + s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_meta)); + if (!s.ok()) return s; + } + } + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::isAckedByAllGroups(engine::Context &ctx, const std::string &ns_key, + const StreamMetadata &metadata, const StreamEntryID &id, + const std::vector &all_groups, bool *all_acked) { + *all_acked = true; + for (const auto &group_name : all_groups) { + std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); + std::string pel_value; + auto pel_s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, pel_key, &pel_value); + if (pel_s.ok()) { + *all_acked = false; + return rocksdb::Status::OK(); + } else if (!pel_s.IsNotFound()) { + return pel_s; + } + } + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::DeleteEntriesWithOption(engine::Context &ctx, const Slice &stream_name, + const std::vector &ids, StreamDeleteOption option, + std::vector *results) { + results->assign(ids.size(), static_cast(StreamEntryDeleteResult::kEntryNotFound)); + + std::string ns_key = AppendNamespacePrefix(stream_name); + + StreamMetadata metadata(false); + rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); + if (!s.ok()) { + return s.IsNotFound() ? rocksdb::Status::OK() : s; + } + + if (ids.empty()) { + return rocksdb::Status::OK(); + } + + auto batch = storage_->GetWriteBatchBase(); + std::string option_str; + switch (option) { + case StreamDeleteOption::DelRef: + option_str = "DELREF"; + break; + case StreamDeleteOption::Acked: + option_str = "ACKED"; + break; + default: + option_str = "KEEPREF"; + break; + } + WriteBatchLogData log_data(kRedisStream, {"XDELEX", option_str}); + s = batch->PutLogData(log_data.Encode()); + if (!s.ok()) return s; + + std::string next_version_prefix_key = + InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + rocksdb::Slice lower_bound(prefix_key); + read_options.iterate_lower_bound = &lower_bound; + + auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_); + + std::vector all_groups; + bool need_groups = (option == StreamDeleteOption::DelRef || option == StreamDeleteOption::Acked); + if (need_groups) { + s = getGroupNames(ctx, ns_key, metadata, &all_groups); + if (!s.ok()) return s; + } + + uint64_t deleted_cnt = 0; + bool batch_modified = false; + std::map group_pending_decrements; + std::map> consumer_pending_decrements; + + std::unordered_set seen_keys; + seen_keys.reserve(ids.size()); + std::unordered_set deleted_entry_keys; + deleted_entry_keys.reserve(ids.size()); + StreamEntryID orig_first_entry_id = metadata.first_entry_id; + StreamEntryID orig_last_entry_id = metadata.last_entry_id; + + for (size_t i = 0; i < ids.size(); i++) { + const auto &id = ids[i]; + + std::string entry_key = internalKeyFromEntryID(ns_key, metadata, id); + + if (!seen_keys.insert(entry_key).second) { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryNotFound); + continue; + } + + StreamEntryDeleteResult result = StreamEntryDeleteResult::kEntryNotFound; + s = processOneEntryDeletion(ctx, read_options, ns_key, &metadata, id, all_groups, option, batch.Get(), + &batch_modified, &deleted_cnt, &result, &group_pending_decrements, + &consumer_pending_decrements); + if (!s.ok()) return s; + if (result == StreamEntryDeleteResult::kEntryDeleted) { + deleted_entry_keys.insert(entry_key); + } + (*results)[i] = static_cast(result); + } + + if (deleted_cnt > 0) { + metadata.size -= deleted_cnt; + + if (metadata.size == 0) { + metadata.first_entry_id.Clear(); + metadata.last_entry_id.Clear(); + metadata.recorded_first_entry_id.Clear(); + } else { + bool first_deleted = deleted_entry_keys.count(internalKeyFromEntryID(ns_key, metadata, orig_first_entry_id)) > 0; + bool last_deleted = deleted_entry_keys.count(internalKeyFromEntryID(ns_key, metadata, orig_last_entry_id)) > 0; + + if (first_deleted) { + iter->SeekToFirst(); + while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) { + iter->Next(); + } + if (iter->Valid()) { + metadata.first_entry_id = entryIDFromInternalKey(iter->key()); + metadata.recorded_first_entry_id = metadata.first_entry_id; + } else { + metadata.first_entry_id.Clear(); + metadata.recorded_first_entry_id.Clear(); + } + } + if (last_deleted) { + iter->SeekToLast(); + while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) { + iter->Prev(); + } + if (iter->Valid()) { + metadata.last_entry_id = entryIDFromInternalKey(iter->key()); + } else { + metadata.last_entry_id.Clear(); + } + } + } + + std::string bytes; + metadata.Encode(&bytes); + s = batch->Put(metadata_cf_handle_, ns_key, bytes); + if (!s.ok()) return s; + } + + if (!group_pending_decrements.empty()) { + s = flushPendingNumberUpdates(ctx, ns_key, metadata, batch.Get(), group_pending_decrements, + consumer_pending_decrements); + if (!s.ok()) return s; + } + + if (!batch_modified) { + return rocksdb::Status::OK(); + } + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status Stream::DeleteEntriesAndAck(engine::Context &ctx, const Slice &stream_name, + const std::string &group_name, const std::vector &ids, + StreamDeleteOption option, std::vector *results) { + results->assign(ids.size(), static_cast(StreamEntryDeleteResult::kEntryNotFound)); + + std::string ns_key = AppendNamespacePrefix(stream_name); + + StreamMetadata metadata(false); + rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); + if (!s.ok()) { + return s.IsNotFound() ? rocksdb::Status::NotFound("NOGROUP No such consumer group '" + group_name + "'") : s; + } + + if (ids.empty()) { + return rocksdb::Status::OK(); + } + + std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name); + std::string get_group_value; + s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value); + if (!s.ok()) { + if (s.IsNotFound()) { + return rocksdb::Status::NotFound("NOGROUP No such consumer group '" + group_name + "'"); + } + return s; + } + + auto batch = storage_->GetWriteBatchBase(); + std::string option_str; + switch (option) { + case StreamDeleteOption::DelRef: + option_str = "DELREF"; + break; + case StreamDeleteOption::Acked: + option_str = "ACKED"; + break; + default: + option_str = "KEEPREF"; + break; + } + WriteBatchLogData log_data(kRedisStream, {"XACKDEL", group_name, option_str}); + s = batch->PutLogData(log_data.Encode()); + if (!s.ok()) return s; + + std::string next_version_prefix_key = + InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + rocksdb::Slice lower_bound(prefix_key); + read_options.iterate_lower_bound = &lower_bound; + + auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_); + + std::vector all_groups; + bool need_groups = (option == StreamDeleteOption::DelRef || option == StreamDeleteOption::Acked); + if (need_groups) { + s = getGroupNames(ctx, ns_key, metadata, &all_groups); + if (!s.ok()) return s; + } + + std::map consumer_acknowledges; + std::map other_group_pending_decrements; + std::map> other_consumer_pending_decrements; + uint64_t deleted_cnt = 0; + uint64_t acknowledged_cnt = 0; + bool batch_modified = false; + + std::unordered_set seen_keys; + seen_keys.reserve(ids.size()); + std::unordered_set deleted_entry_keys; + deleted_entry_keys.reserve(ids.size()); + StreamEntryID orig_first_entry_id = metadata.first_entry_id; + StreamEntryID orig_last_entry_id = metadata.last_entry_id; + + for (size_t i = 0; i < ids.size(); i++) { + const auto &id = ids[i]; + + std::string entry_key = internalKeyFromEntryID(ns_key, metadata, id); + + if (!seen_keys.insert(entry_key).second) { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryNotFound); + continue; + } + + std::string value; + s = storage_->Get(ctx, read_options, stream_cf_handle_, entry_key, &value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + bool stream_entry_exists = s.ok(); + + std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); + std::string pel_value; + s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, pel_key, &pel_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + bool pel_entry_exists = s.ok(); + + if (!stream_entry_exists && !pel_entry_exists) { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryNotFound); + continue; + } + + bool pel_cleaned = false; + if (pel_entry_exists) { + s = batch->Delete(stream_cf_handle_, pel_key); + if (!s.ok()) return s; + pel_cleaned = true; + acknowledged_cnt++; + batch_modified = true; + + auto pel_entry = decodeStreamPelEntryValue(pel_value); + consumer_acknowledges[pel_entry.consumer_name]++; + } + + const std::vector *groups_for_cleanup = &all_groups; + std::vector filtered_groups; + if ((option == StreamDeleteOption::DelRef || option == StreamDeleteOption::Acked) && pel_entry_exists) { + for (const auto &g : all_groups) { + if (g != group_name) { + filtered_groups.push_back(g); + } + } + groups_for_cleanup = &filtered_groups; + } + + StreamEntryDeleteResult del_result = StreamEntryDeleteResult::kEntryNotFound; + s = processOneEntryDeletion(ctx, read_options, ns_key, &metadata, id, *groups_for_cleanup, option, batch.Get(), + &batch_modified, &deleted_cnt, &del_result, &other_group_pending_decrements, + &other_consumer_pending_decrements); + if (!s.ok()) return s; + + if (del_result == StreamEntryDeleteResult::kEntrySkipped && option == StreamDeleteOption::Acked && + pel_entry_exists && filtered_groups.empty() && !all_groups.empty()) { + s = deleteEntryAndUpdateMeta(batch.Get(), entry_key, id, &metadata, &deleted_cnt); + if (!s.ok()) return s; + batch_modified = true; + del_result = StreamEntryDeleteResult::kEntryDeleted; + } + + if (del_result == StreamEntryDeleteResult::kEntryDeleted) { + deleted_entry_keys.insert(entry_key); + } + + if (del_result != StreamEntryDeleteResult::kEntryNotFound) { + (*results)[i] = static_cast(del_result); + } else if (pel_cleaned) { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryDeleted); + } + } + + if (deleted_cnt > 0 || acknowledged_cnt > 0) { + if (deleted_cnt > 0) { + metadata.size -= deleted_cnt; + + if (metadata.size == 0) { + metadata.first_entry_id.Clear(); + metadata.last_entry_id.Clear(); + metadata.recorded_first_entry_id.Clear(); + } else { + bool first_deleted = + deleted_entry_keys.count(internalKeyFromEntryID(ns_key, metadata, orig_first_entry_id)) > 0; + bool last_deleted = deleted_entry_keys.count(internalKeyFromEntryID(ns_key, metadata, orig_last_entry_id)) > 0; + + if (first_deleted) { + iter->SeekToFirst(); + while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) { + iter->Next(); + } + if (iter->Valid()) { + metadata.first_entry_id = entryIDFromInternalKey(iter->key()); + metadata.recorded_first_entry_id = metadata.first_entry_id; + } else { + metadata.first_entry_id.Clear(); + metadata.recorded_first_entry_id.Clear(); + } + } + if (last_deleted) { + iter->SeekToLast(); + while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) { + iter->Prev(); + } + if (iter->Valid()) { + metadata.last_entry_id = entryIDFromInternalKey(iter->key()); + } else { + metadata.last_entry_id.Clear(); + } + } + } + + std::string bytes; + metadata.Encode(&bytes); + s = batch->Put(metadata_cf_handle_, ns_key, bytes); + if (!s.ok()) return s; + } + + if (acknowledged_cnt > 0) { + StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value); + group_metadata.pending_number -= acknowledged_cnt; + std::string group_value = encodeStreamConsumerGroupMetadataValue(group_metadata); + s = batch->Put(stream_cf_handle_, group_key, group_value); + if (!s.ok()) return s; + + for (const auto &[consumer_name, ack_count] : consumer_acknowledges) { + auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); + std::string consumer_meta_original; + s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_meta_key, &consumer_meta_original); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.ok()) { + auto consumer_metadata = decodeStreamConsumerMetadataValue(consumer_meta_original); + consumer_metadata.pending_number -= ack_count; + s = batch->Put(stream_cf_handle_, consumer_meta_key, encodeStreamConsumerMetadataValue(consumer_metadata)); + if (!s.ok()) return s; + } + } + } + + if (!other_group_pending_decrements.empty()) { + s = flushPendingNumberUpdates(ctx, ns_key, metadata, batch.Get(), other_group_pending_decrements, + other_consumer_pending_decrements); + if (!s.ok()) return s; + } + } + + if (!batch_modified) { + return rocksdb::Status::OK(); + } + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + rocksdb::Status Stream::ClaimPelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::string &consumer_name, const uint64_t min_idle_time_ms, const std::vector &entry_ids, const StreamClaimOptions &options, diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h index 08cae0b1d48..94db98a8397 100644 --- a/src/types/redis_stream.h +++ b/src/types/redis_stream.h @@ -22,10 +22,12 @@ #include +#include #include #include #include +#include "common/db_util.h" #include "storage/redis_db.h" #include "storage/redis_metadata.h" @@ -53,6 +55,12 @@ class Stream : public SubKeyScanner { uint64_t *deleted_cnt); rocksdb::Status DeletePelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::vector &entry_ids, uint64_t *acknowledged); + rocksdb::Status DeleteEntriesWithOption(engine::Context &ctx, const Slice &stream_name, + const std::vector &ids, StreamDeleteOption option, + std::vector *results); + rocksdb::Status DeleteEntriesAndAck(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, + const std::vector &ids, StreamDeleteOption option, + std::vector *results); rocksdb::Status ClaimPelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::string &consumer_name, uint64_t min_idle_time_ms, const std::vector &entry_ids, const StreamClaimOptions &options, @@ -100,6 +108,8 @@ class Stream : public SubKeyScanner { static StreamConsumerGroupMetadata decodeStreamConsumerGroupMetadataValue(const std::string &value); std::string internalKeyFromConsumerName(const std::string &ns_key, const StreamMetadata &metadata, const std::string &group_name, const std::string &consumer_name) const; + rocksdb::Status getGroupNames(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, + std::vector *group_names); std::string consumerNameFromInternalKey(rocksdb::Slice key) const; static std::string encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata); static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const std::string &value); @@ -109,6 +119,27 @@ class Stream : public SubKeyScanner { std::string internalPelKeyFromGroupAndEntryId(const std::string &ns_key, const StreamMetadata &metadata, const std::string &group_name, const StreamEntryID &id); StreamEntryID groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, std::string &group_name); + + rocksdb::Status deleteEntryAndUpdateMeta(rocksdb::WriteBatchBase *batch, const std::string &entry_key, + const StreamEntryID &id, StreamMetadata *metadata, uint64_t *deleted_cnt); + rocksdb::Status processOneEntryDeletion( + engine::Context &ctx, const rocksdb::ReadOptions &read_options, const std::string &ns_key, + StreamMetadata *metadata, const StreamEntryID &id, const std::vector &all_groups, + StreamDeleteOption option, rocksdb::WriteBatchBase *batch, bool *batch_modified, uint64_t *deleted_cnt, + StreamEntryDeleteResult *result, std::map *group_pending_decrements, + std::map> *consumer_pending_decrements); + rocksdb::Status cleanPelFromAllGroups( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, const StreamEntryID &id, + rocksdb::WriteBatchBase *batch, bool *batch_modified, const std::vector &group_names, + std::map *group_pending_decrements, + std::map> *consumer_pending_decrements); + rocksdb::Status isAckedByAllGroups(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, + const StreamEntryID &id, const std::vector &all_groups, + bool *all_acked); + rocksdb::Status flushPendingNumberUpdates( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, rocksdb::WriteBatchBase *batch, + const std::map &group_pending_decrements, + const std::map> &consumer_pending_decrements); static std::string encodeStreamPelEntryValue(const StreamPelEntry &pel_entry); static StreamPelEntry decodeStreamPelEntryValue(const std::string &value); StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const; diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h index f7991f573e3..1e9830249b1 100644 --- a/src/types/redis_stream_base.h +++ b/src/types/redis_stream_base.h @@ -203,6 +203,18 @@ enum class StreamSubkeyType { StreamPelEntry = 3, }; +enum class StreamDeleteOption { + KeepRef = 0, + DelRef = 1, + Acked = 2, +}; + +enum class StreamEntryDeleteResult : int { + kEntryNotFound = -1, + kEntryDeleted = 1, + kEntrySkipped = 2, +}; + struct StreamPelEntry { uint64_t last_delivery_time_ms; uint64_t last_delivery_count; diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 00354848ca9..a6eacb68549 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2643,6 +2643,360 @@ func TestStreamOffset(t *testing.T) { require.NoError(t, rdb.Del(ctx, streamKey).Err()) }) + + t.Run("XACKDEL DELREF cross-group PEL cleanup", func(t *testing.T) { + streamName := "xackdel_delref_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "DELREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + + otherPending, err := rdb.XPending(ctx, streamName, otherGroup).Result() + require.NoError(t, err) + require.Equal(t, int64(0), otherPending.Count) + + infoGroups := rdb.XInfoGroups(ctx, streamName).Val() + require.Len(t, infoGroups, 2) + require.Equal(t, int64(0), infoGroups[0].Pending) + require.Equal(t, int64(0), infoGroups[1].Pending) + + consumers1 := rdb.XInfoConsumers(ctx, streamName, groupName).Val() + require.Len(t, consumers1, 1) + require.Equal(t, "c1", consumers1[0].Name) + require.Equal(t, int64(0), consumers1[0].Pending) + + consumers2 := rdb.XInfoConsumers(ctx, streamName, otherGroup).Val() + require.Len(t, consumers2, 1) + require.Equal(t, "c2", consumers2[0].Name) + require.Equal(t, int64(0), consumers2[0].Pending) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL ACKED all groups have acked deletes entry", func(t *testing.T) { + streamName := "xackdel_acked_all_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + require.NoError(t, rdb.XAck(ctx, streamName, otherGroup, "1-0").Err()) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL ACKED with other group not acked returns 2", func(t *testing.T) { + streamName := "xackdel_acked_notall_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(2)}, r) + + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL NOGROUP error on non-existent stream", func(t *testing.T) { + streamName := "xackdel_nogroup_" + strconv.Itoa(rand.Int()) + + _, err := rdb.Do(ctx, "XACKDEL", streamName, "nonexistent", "KEEPREF", "IDS", "1", "1-0").Result() + require.Error(t, err) + require.Contains(t, err.Error(), "NOGROUP") + }) + + t.Run("XDELEX KEEPREF deletes entry without PEL cleanup", func(t *testing.T) { + streamName := "xdelex_keepref_" + strconv.Itoa(rand.Int()) + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX DELREF deletes entry with PEL cleanup", func(t *testing.T) { + streamName := "xdelex_delref_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "DELREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with no groups returns 2", func(t *testing.T) { + streamName := "xdelex_acked_nogroup_" + strconv.Itoa(rand.Int()) + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(2)}, r) + + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with group acked deletes entry", func(t *testing.T) { + streamName := "xdelex_acked_group_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with multi-group some acked returns 2", func(t *testing.T) { + streamName := "xdelex_acked_multigroup_notall_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(2)}, r) + + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with multi-group all acked deletes entry", func(t *testing.T) { + streamName := "xdelex_acked_multigroup_all_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + require.NoError(t, rdb.XAck(ctx, streamName, otherGroup, "1-0").Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XACKDEL DELREF duplicate IDs are idempotent", func(t *testing.T) { + streamName := "xackdel_dedup_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XACKDEL", streamName, groupName, "DELREF", "IDS", "2", "1-0", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1), int64(-1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {