diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 73a5279c8eb..9016f679764 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -268,6 +268,79 @@ class CommandXDel : public Commander { std::vector ids_; }; +class CommandXDelEx : public Commander { + public: + Status Parse(const std::vector &args) override { + CommandParser parser(args, 1); + stream_name_ = GET_OR_RET(parser.TakeStr()); + + option_ = redis::StreamDeleteOption::KeepRef; + bool has_ids = false; + + while (parser.Good()) { + if (parser.EatEqICase("KEEPREF")) { + option_ = redis::StreamDeleteOption::KeepRef; + } else if (parser.EatEqICase("DELREF")) { + option_ = redis::StreamDeleteOption::DelRef; + } else if (parser.EatEqICase("ACKED")) { + option_ = redis::StreamDeleteOption::Acked; + } else if (parser.EatEqICase("IDS")) { + if (has_ids) { + return parser.InvalidSyntax(); + } + has_ids = true; + + auto numids_result = parser.TakeInt(); + if (!numids_result.IsOK()) { + return {Status::RedisParseErr, errValueNotInteger}; + } + int64_t numids = numids_result.GetValue(); + if (numids <= 0) { + return {Status::RedisParseErr, "numids must be positive"}; + } + + for (int64_t i = 0; i < numids; i++) { + auto id_str = GET_OR_RET(parser.TakeStr()); + redis::StreamEntryID id; + auto s = ParseStreamEntryID(id_str, &id); + if (!s.IsOK()) return s; + entry_ids_.emplace_back(id); + } + } else { + return parser.InvalidSyntax(); + } + } + + if (!has_ids) { + return {Status::RedisParseErr, "syntax error, expected IDS keyword"}; + } + + return Status::OK(); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::Stream stream_db(srv->storage, conn->GetNamespace()); + std::vector results; + + auto s = stream_db.DeleteEntriesWithOption(ctx, stream_name_, entry_ids_, option_, &results); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + output->append(redis::MultiLen(results.size())); + for (int r : results) { + output->append(redis::Integer(r)); + } + + return Status::OK(); + } + + private: + std::string stream_name_; + redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef; + std::vector entry_ids_; +}; + class CommandXClaim : public Commander { public: Status Parse(const std::vector &args) override { @@ -1907,6 +1980,7 @@ class CommandXSetId : public Commander { REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr("xack", -4, "write no-dbsize-check", 1, 1, 1), MakeCmdAttr("xadd", -5, "write", 1, 1, 1), MakeCmdAttr("xdel", -3, "write no-dbsize-check", 1, 1, 1), + MakeCmdAttr("xdelex", -5, "write no-dbsize-check", 1, 1, 1), MakeCmdAttr("xclaim", -6, "write", 1, 1, 1), MakeCmdAttr("xautoclaim", -6, "write", 1, 1, 1), MakeCmdAttr("xgroup", -4, "write", 2, 2, 1), diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index 8dfd4258e7e..ccfa5be6e38 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -20,12 +20,15 @@ #include "batch_extractor.h" +#include + #include "cluster/redis_slot.h" #include "logging.h" #include "parse_util.h" #include "server/redis_reply.h" #include "server/server.h" #include "types/redis_bitmap.h" +#include "types/redis_stream_base.h" void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) { // Currently, we only have two kinds of log data @@ -38,6 +41,8 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) { // Redis type log data if (auto s = log_data_.Decode(blob); !s.IsOK()) { WARN("Failed to decode Redis type log: {}", s.Msg()); + } else { + seen_xdelex_entry_keys_.clear(); } } } @@ -266,6 +271,21 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic break; } } else if (column_family_id == static_cast(ColumnFamilyID::Stream)) { + InternalKey ikey(key, is_slot_id_encoded_); + Slice entry_id_check = ikey.GetSubKey(); + uint64_t delimiter = 0; + GetFixed64(&entry_id_check, &delimiter); + if (delimiter == UINT64_MAX) { + return rocksdb::Status::OK(); + } + + user_key = ikey.GetKey().ToString(); + auto key_slot_id = GetSlotIdFromKey(user_key); + if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) { + return rocksdb::Status::OK(); + } + ns = ikey.GetNamespace().ToString(); + auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, &command_args); if (!s.IsOK()) { ERROR("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg()); @@ -397,8 +417,70 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S Slice encoded_id = ikey.GetSubKey(); redis::StreamEntryID entry_id; GetFixed64(&encoded_id, &entry_id.ms); + + if (entry_id.ms == UINT64_MAX) { + // DELREF may remove dangling PELs without a stream entry deletion. + auto args = log_data_.GetArguments(); + if (!args->empty() && (*args)[0] == "XDELEX" && args->size() >= 2 && (*args)[1] == "DELREF") { + uint8_t type_delimiter = 0; + if (!GetFixed8(&encoded_id, &type_delimiter)) { + return rocksdb::Status::OK(); + } + if (type_delimiter == static_cast(redis::StreamSubkeyType::StreamPelEntry)) { + uint64_t group_name_len = 0; + if (!GetFixed64(&encoded_id, &group_name_len)) { + return rocksdb::Status::OK(); + } + if (group_name_len > encoded_id.size() || encoded_id.size() - group_name_len < 16) { + return rocksdb::Status::OK(); + } + encoded_id.remove_prefix(group_name_len); + + if (!GetFixed64(&encoded_id, &entry_id.ms) || !GetFixed64(&encoded_id, &entry_id.seq)) { + return rocksdb::Status::OK(); + } + std::string entry_id_str = entry_id.ToString(); + + std::string user_key = ikey.GetKey().ToString(); + auto key_slot_id = GetSlotIdFromKey(user_key); + if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) { + return rocksdb::Status::OK(); + } + ns = ikey.GetNamespace().ToString(); + + std::string dedup_key = ns + '\0' + user_key + '\0' + entry_id_str; + if (seen_xdelex_entry_keys_.insert(std::move(dedup_key)).second) { + command_args = {(*args)[0], user_key, (*args)[1], "IDS", "1", entry_id_str}; + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); + } + } + } + return rocksdb::Status::OK(); + } + GetFixed64(&encoded_id, &entry_id.seq); - command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()}; + std::string entry_id_str = entry_id.ToString(); + std::string user_key = ikey.GetKey().ToString(); + + auto key_slot_id = GetSlotIdFromKey(user_key); + if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) { + return rocksdb::Status::OK(); + } + ns = ikey.GetNamespace().ToString(); + + auto args = log_data_.GetArguments(); + if (!args->empty()) { + if ((*args)[0] == "XDELEX" && args->size() >= 2) { + std::string dedup_key = ns + '\0' + user_key + '\0' + entry_id_str; + if (seen_xdelex_entry_keys_.insert(std::move(dedup_key)).second) { + command_args = {(*args)[0], user_key, (*args)[1], "IDS", "1", entry_id_str}; + } + } else { + command_args = {"XDEL", user_key, entry_id_str}; + } + } else { + command_args = {"XDEL", user_key, entry_id_str}; + } } if (!command_args.empty()) { diff --git a/src/storage/batch_extractor.h b/src/storage/batch_extractor.h index 4f5775046cf..28d54d4f3f2 100644 --- a/src/storage/batch_extractor.h +++ b/src/storage/batch_extractor.h @@ -22,6 +22,7 @@ #include #include +#include #include #include "cluster/cluster_defs.h" @@ -54,4 +55,5 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler { bool is_slot_id_encoded_ = false; SlotRange slot_range_; bool to_redis_; + std::unordered_set seen_xdelex_entry_keys_; }; diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 580629c9fb1..5adefc2a232 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -394,6 +395,349 @@ rocksdb::Status Stream::DeletePelEntries(engine::Context &ctx, const Slice &stre return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } +rocksdb::Status Stream::getGroupNames(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, + std::vector *group_names) { + group_names->clear(); + + std::string subkey_type_delimiter; + PutFixed64(&subkey_type_delimiter, UINT64_MAX); + PutFixed8(&subkey_type_delimiter, static_cast(StreamSubkeyType::StreamConsumerGroupMetadata)); + + std::string next_version_prefix_key = + InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + std::string prefix_key = + InternalKey(ns_key, subkey_type_delimiter, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + rocksdb::Slice lower_bound(prefix_key); + read_options.iterate_lower_bound = &lower_bound; + + auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + // Group metadata keys sort before consumer/PEL keys, so no more groups once we see a different type. + if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamConsumerGroupMetadata) { + break; + } + group_names->push_back(groupNameFromInternalKey(iter->key())); + } + return iter->status(); +} + +rocksdb::Status Stream::deleteEntryAndUpdateMeta(rocksdb::WriteBatchBase *batch, const std::string &entry_key, + const StreamEntryID &id, StreamMetadata *metadata, + uint64_t *deleted_cnt) { + rocksdb::Status s = batch->Delete(stream_cf_handle_, entry_key); + if (!s.ok()) return s; + (*deleted_cnt)++; + + if (metadata->max_deleted_entry_id < id) { + metadata->max_deleted_entry_id = id; + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::cleanPelFromAllGroups( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, const StreamEntryID &id, + rocksdb::WriteBatchBase *batch, bool *batch_modified, const std::vector &group_names, + std::map *group_pending_decrements, + std::map> *consumer_pending_decrements) { + for (const auto &group_name : group_names) { + std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); + std::string pel_value; + auto pel_s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, pel_key, &pel_value); + if (pel_s.ok()) { + rocksdb::Status s = batch->Delete(stream_cf_handle_, pel_key); + if (!s.ok()) return s; + *batch_modified = true; + + auto pel_entry = decodeStreamPelEntryValue(pel_value); + (*group_pending_decrements)[group_name]++; + (*consumer_pending_decrements)[group_name][pel_entry.consumer_name]++; + } else if (!pel_s.IsNotFound()) { + return pel_s; + } + } + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::flushPendingNumberUpdates( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, rocksdb::WriteBatchBase *batch, + const std::map &group_pending_decrements, + const std::map> &consumer_pending_decrements) { + for (const auto &[group_name, decrement] : group_pending_decrements) { + auto group_key = internalKeyFromGroupName(ns_key, metadata, group_name); + std::string group_value; + auto s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &group_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.ok()) { + auto group_meta = decodeStreamConsumerGroupMetadataValue(group_value); + group_meta.pending_number -= decrement; + s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(group_meta)); + if (!s.ok()) return s; + } + } + + for (const auto &[group_name, consumers] : consumer_pending_decrements) { + for (const auto &[consumer_name, decrement] : consumers) { + auto consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); + std::string consumer_value; + auto s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, consumer_key, &consumer_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.ok()) { + auto consumer_meta = decodeStreamConsumerMetadataValue(consumer_value); + consumer_meta.pending_number -= decrement; + s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_meta)); + if (!s.ok()) return s; + } + } + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::isAckedByAllGroups( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, const StreamEntryID &id, + const std::vector &group_names, + const std::unordered_map &last_delivered_ids_by_group, bool *all_acked) { + *all_acked = true; + for (const auto &group_name : group_names) { + auto it = last_delivered_ids_by_group.find(group_name); + if (it != last_delivered_ids_by_group.end() && id > it->second) { + *all_acked = false; + return rocksdb::Status::OK(); + } + + std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); + std::string pel_value; + auto pel_s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, pel_key, &pel_value); + if (pel_s.ok()) { + *all_acked = false; + return rocksdb::Status::OK(); + } + if (!pel_s.IsNotFound()) { + return pel_s; + } + } + return rocksdb::Status::OK(); +} + +rocksdb::Status Stream::DeleteEntriesWithOption(engine::Context &ctx, const Slice &stream_name, + const std::vector &ids, StreamDeleteOption option, + std::vector *results) { + results->assign(ids.size(), static_cast(StreamEntryDeleteResult::kEntryNotFound)); + + std::string ns_key = AppendNamespacePrefix(stream_name); + + StreamMetadata metadata(false); + rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); + if (!s.ok()) { + return s.IsNotFound() ? rocksdb::Status::OK() : s; + } + + if (ids.empty()) { + return rocksdb::Status::OK(); + } + + auto batch = storage_->GetWriteBatchBase(); + std::string option_str; + switch (option) { + case StreamDeleteOption::DelRef: + option_str = "DELREF"; + break; + case StreamDeleteOption::Acked: + option_str = "ACKED"; + break; + default: + option_str = "KEEPREF"; + break; + } + WriteBatchLogData log_data(kRedisStream, {"XDELEX", option_str}); + s = batch->PutLogData(log_data.Encode()); + if (!s.ok()) return s; + + std::string next_version_prefix_key = + InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + rocksdb::Slice lower_bound(prefix_key); + read_options.iterate_lower_bound = &lower_bound; + + auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_); + + std::vector group_names; + bool need_groups = (option == StreamDeleteOption::DelRef || option == StreamDeleteOption::Acked); + if (need_groups) { + s = getGroupNames(ctx, ns_key, metadata, &group_names); + if (!s.ok()) return s; + } + + std::unordered_map last_delivered_ids_by_group; + if (option == StreamDeleteOption::Acked) { + for (const auto &group_name : group_names) { + std::string group_metadata_key = internalKeyFromGroupName(ns_key, metadata, group_name); + std::string group_metadata_value; + s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_metadata_key, &group_metadata_value); + if (s.ok()) { + auto group_metadata = decodeStreamConsumerGroupMetadataValue(group_metadata_value); + last_delivered_ids_by_group[group_name] = group_metadata.last_delivered_id; + } else if (!s.IsNotFound()) { + return s; + } + } + } + + uint64_t deleted_cnt = 0; + bool batch_modified = false; + std::map group_pending_decrements; + std::map> consumer_pending_decrements; + + std::unordered_set seen_entry_keys; + seen_entry_keys.reserve(ids.size()); + std::unordered_set deleted_entry_keys; + deleted_entry_keys.reserve(ids.size()); + StreamEntryID original_first_entry_id = metadata.first_entry_id; + StreamEntryID original_last_entry_id = metadata.last_entry_id; + + for (size_t i = 0; i < ids.size(); i++) { + const auto &id = ids[i]; + + std::string entry_key = internalKeyFromEntryID(ns_key, metadata, id); + + if (!seen_entry_keys.insert(entry_key).second) { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryNotFound); + continue; + } + + std::string value; + s = storage_->Get(ctx, read_options, stream_cf_handle_, entry_key, &value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + + if (s.IsNotFound()) { + // DELREF cleans dangling PELs; ACKED preserves them without the entry. + if (option == StreamDeleteOption::DelRef) { + s = cleanPelFromAllGroups(ctx, ns_key, metadata, id, batch.Get(), &batch_modified, group_names, + &group_pending_decrements, &consumer_pending_decrements); + if (!s.ok()) return s; + } + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryNotFound); + continue; + } + + switch (option) { + case StreamDeleteOption::KeepRef: + s = deleteEntryAndUpdateMeta(batch.Get(), entry_key, id, &metadata, &deleted_cnt); + if (!s.ok()) return s; + batch_modified = true; + deleted_entry_keys.insert(entry_key); + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryDeleted); + break; + + case StreamDeleteOption::DelRef: + s = deleteEntryAndUpdateMeta(batch.Get(), entry_key, id, &metadata, &deleted_cnt); + if (!s.ok()) return s; + s = cleanPelFromAllGroups(ctx, ns_key, metadata, id, batch.Get(), &batch_modified, group_names, + &group_pending_decrements, &consumer_pending_decrements); + if (!s.ok()) return s; + batch_modified = true; + deleted_entry_keys.insert(entry_key); + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryDeleted); + break; + + case StreamDeleteOption::Acked: { + if (group_names.empty()) { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntrySkipped); + break; + } + + bool all_acked = false; + s = isAckedByAllGroups(ctx, ns_key, metadata, id, group_names, last_delivered_ids_by_group, &all_acked); + if (!s.ok()) return s; + if (all_acked) { + s = deleteEntryAndUpdateMeta(batch.Get(), entry_key, id, &metadata, &deleted_cnt); + if (!s.ok()) return s; + batch_modified = true; + deleted_entry_keys.insert(entry_key); + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntryDeleted); + } else { + (*results)[i] = static_cast(StreamEntryDeleteResult::kEntrySkipped); + } + break; + } + } + } + + if (deleted_cnt > 0) { + metadata.size -= deleted_cnt; + + if (metadata.size == 0) { + metadata.first_entry_id.Clear(); + metadata.last_entry_id.Clear(); + metadata.recorded_first_entry_id.Clear(); + } else { + bool first_deleted = + deleted_entry_keys.count(internalKeyFromEntryID(ns_key, metadata, original_first_entry_id)) > 0; + bool last_deleted = + deleted_entry_keys.count(internalKeyFromEntryID(ns_key, metadata, original_last_entry_id)) > 0; + + if (first_deleted) { + iter->SeekToFirst(); + while (iter->Valid() && (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry || + deleted_entry_keys.count(iter->key().ToString()) > 0)) { + iter->Next(); + } + if (iter->Valid()) { + metadata.first_entry_id = entryIDFromInternalKey(iter->key()); + metadata.recorded_first_entry_id = metadata.first_entry_id; + } else { + metadata.first_entry_id.Clear(); + metadata.recorded_first_entry_id.Clear(); + } + } + if (last_deleted) { + iter->SeekToLast(); + while (iter->Valid() && (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry || + deleted_entry_keys.count(iter->key().ToString()) > 0)) { + iter->Prev(); + } + if (iter->Valid()) { + metadata.last_entry_id = entryIDFromInternalKey(iter->key()); + } else { + metadata.last_entry_id.Clear(); + } + } + } + + std::string bytes; + metadata.Encode(&bytes); + s = batch->Put(metadata_cf_handle_, ns_key, bytes); + if (!s.ok()) return s; + } + + if (!group_pending_decrements.empty()) { + s = flushPendingNumberUpdates(ctx, ns_key, metadata, batch.Get(), group_pending_decrements, + consumer_pending_decrements); + if (!s.ok()) return s; + } + + if (!batch_modified) { + return rocksdb::Status::OK(); + } + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + rocksdb::Status Stream::ClaimPelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::string &consumer_name, const uint64_t min_idle_time_ms, const std::vector &entry_ids, const StreamClaimOptions &options, @@ -1389,6 +1733,11 @@ rocksdb::Status Stream::GetConsumerInfo( continue; } + // Iterate bounds may include consumers from other groups; verify group name explicitly. + if (groupNameFromInternalKey(iter->key()) != group_name) { + continue; + } + std::string consumer_name = consumerNameFromInternalKey(iter->key()); StreamConsumerMetadata c_metadata = decodeStreamConsumerMetadataValue(iter->value().ToString()); std::pair tmp_item(consumer_name, c_metadata); diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h index 08cae0b1d48..20177d535dd 100644 --- a/src/types/redis_stream.h +++ b/src/types/redis_stream.h @@ -22,8 +22,10 @@ #include +#include #include #include +#include #include #include "storage/redis_db.h" @@ -53,6 +55,9 @@ class Stream : public SubKeyScanner { uint64_t *deleted_cnt); rocksdb::Status DeletePelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::vector &entry_ids, uint64_t *acknowledged); + rocksdb::Status DeleteEntriesWithOption(engine::Context &ctx, const Slice &stream_name, + const std::vector &ids, StreamDeleteOption option, + std::vector *results); rocksdb::Status ClaimPelEntries(engine::Context &ctx, const Slice &stream_name, const std::string &group_name, const std::string &consumer_name, uint64_t min_idle_time_ms, const std::vector &entry_ids, const StreamClaimOptions &options, @@ -100,6 +105,8 @@ class Stream : public SubKeyScanner { static StreamConsumerGroupMetadata decodeStreamConsumerGroupMetadataValue(const std::string &value); std::string internalKeyFromConsumerName(const std::string &ns_key, const StreamMetadata &metadata, const std::string &group_name, const std::string &consumer_name) const; + rocksdb::Status getGroupNames(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, + std::vector *group_names); std::string consumerNameFromInternalKey(rocksdb::Slice key) const; static std::string encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata); static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const std::string &value); @@ -109,6 +116,22 @@ class Stream : public SubKeyScanner { std::string internalPelKeyFromGroupAndEntryId(const std::string &ns_key, const StreamMetadata &metadata, const std::string &group_name, const StreamEntryID &id); StreamEntryID groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, std::string &group_name); + + rocksdb::Status deleteEntryAndUpdateMeta(rocksdb::WriteBatchBase *batch, const std::string &entry_key, + const StreamEntryID &id, StreamMetadata *metadata, uint64_t *deleted_cnt); + rocksdb::Status cleanPelFromAllGroups( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, const StreamEntryID &id, + rocksdb::WriteBatchBase *batch, bool *batch_modified, const std::vector &group_names, + std::map *group_pending_decrements, + std::map> *consumer_pending_decrements); + rocksdb::Status isAckedByAllGroups(engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, + const StreamEntryID &id, const std::vector &group_names, + const std::unordered_map &last_delivered_ids_by_group, + bool *all_acked); + rocksdb::Status flushPendingNumberUpdates( + engine::Context &ctx, const std::string &ns_key, const StreamMetadata &metadata, rocksdb::WriteBatchBase *batch, + const std::map &group_pending_decrements, + const std::map> &consumer_pending_decrements); static std::string encodeStreamPelEntryValue(const StreamPelEntry &pel_entry); static StreamPelEntry decodeStreamPelEntryValue(const std::string &value); StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const; diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h index f7991f573e3..1e9830249b1 100644 --- a/src/types/redis_stream_base.h +++ b/src/types/redis_stream_base.h @@ -203,6 +203,18 @@ enum class StreamSubkeyType { StreamPelEntry = 3, }; +enum class StreamDeleteOption { + KeepRef = 0, + DelRef = 1, + Acked = 2, +}; + +enum class StreamEntryDeleteResult : int { + kEntryNotFound = -1, + kEntryDeleted = 1, + kEntrySkipped = 2, +}; + struct StreamPelEntry { uint64_t last_delivery_time_ms; uint64_t last_delivery_count; diff --git a/tests/cppunit/types/stream_test.cc b/tests/cppunit/types/stream_test.cc index 96c22c54392..e0f533c9000 100644 --- a/tests/cppunit/types/stream_test.cc +++ b/tests/cppunit/types/stream_test.cc @@ -2691,3 +2691,17 @@ TEST_F(RedisStreamTest, DestroyGroupDoesNotAffectOtherGroups) { s = stream_->Del(*ctx_, stream_name); EXPECT_TRUE(s.ok()); } + +TEST_F(RedisStreamTest, DeleteEntriesWithOptionEmptyIDs) { + redis::StreamAddOptions add_options; + add_options.next_id_strategy = *ParseNextStreamEntryIDStrategy("12345-6789"); + std::vector values = {"key1", "val1"}; + redis::StreamEntryID id; + auto s = stream_->Add(*ctx_, name_, add_options, values, &id); + EXPECT_TRUE(s.ok()); + + std::vector results; + s = stream_->DeleteEntriesWithOption(*ctx_, name_, {}, redis::StreamDeleteOption::KeepRef, &results); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(results.empty()); +} diff --git a/tests/gocase/unit/server/poll_updates_test.go b/tests/gocase/unit/server/poll_updates_test.go index 4d76499f9cc..9829ea5dc1b 100644 --- a/tests/gocase/unit/server/poll_updates_test.go +++ b/tests/gocase/unit/server/poll_updates_test.go @@ -283,6 +283,7 @@ func TestPollUpdates_WithRESPFormat(t *testing.T) { pollUpdates = parsePollUpdatesResult(t, result.(map[any]any), true) require.Len(t, pollUpdates.Updates, 1) require.EqualValues(t, []any{RESPFormat{ + Namespace: "default", Commands: [][]string{ {"XADD", "stream", id, "field", "value"}, {"XDEL", "stream", id}, @@ -290,6 +291,61 @@ func TestPollUpdates_WithRESPFormat(t *testing.T) { }, pollUpdates.Updates) }) + t.Run("Stream XDELEX KEEPREF entry deletion", func(t *testing.T) { + streamName := "stream_xdelex_keepref" + require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: map[string]interface{}{"field": "value"}, + }).Err()) + _, err := rdb0.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + + result, err := rdb0.Do(ctx, "POLLUPDATES", pollUpdates.NextSeq, "MAX", 10, "FORMAT", "RESP").Result() + require.NoError(t, err) + + pollUpdates = parsePollUpdatesResult(t, result.(map[any]any), true) + require.Len(t, pollUpdates.Updates, 1) + require.EqualValues(t, []any{RESPFormat{ + Namespace: "default", + Commands: [][]string{ + {"XADD", streamName, "1-0", "field", "value"}, + {"XDELEX", streamName, "KEEPREF", "IDS", "1", "1-0"}, + }, + }}, pollUpdates.Updates) + }) + + t.Run("Stream XDELEX DELREF dangling PEL", func(t *testing.T) { + streamName := "stream_xdelex" + require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: map[string]interface{}{"field": "value"}, + }).Err()) + require.NoError(t, rdb0.XGroupCreateMkStream(ctx, streamName, "group", "0").Err()) + _, err := rdb0.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", Consumer: "consumer", Streams: []string{streamName, ">"}, Count: 1, + }).Result() + require.NoError(t, err) + require.Equal(t, int64(1), rdb0.XDel(ctx, streamName, "1-0").Val()) + _, err = rdb0.Do(ctx, "XDELEX", streamName, "DELREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + + result, err := rdb0.Do(ctx, "POLLUPDATES", pollUpdates.NextSeq, "MAX", 10, "FORMAT", "RESP").Result() + require.NoError(t, err) + + pollUpdates = parsePollUpdatesResult(t, result.(map[any]any), true) + require.Len(t, pollUpdates.Updates, 1) + require.EqualValues(t, []any{RESPFormat{ + Namespace: "default", + Commands: [][]string{ + {"XADD", streamName, "1-0", "field", "value"}, + {"XDEL", streamName, "1-0"}, + {"XDELEX", streamName, "DELREF", "IDS", "1", "1-0"}, + }}, + }, pollUpdates.Updates) + }) + t.Run("JSON type", func(t *testing.T) { require.NoError(t, rdb0.JSONSet(ctx, "json", "$", `{"field": "value"}`).Err()) require.NoError(t, rdb0.JSONDel(ctx, "json", "$.field").Err()) diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 00354848ca9..8a0019cdb32 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -767,6 +767,509 @@ var streamTests = func(t *testing.T, configs util.KvrocksServerConfigs) { require.NoError(t, err) require.Equal(t, 3, val) }) + + t.Run("XDELEX KEEPREF deletes entry without PEL cleanup", func(t *testing.T) { + streamName := "xdelex_keepref_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX DELREF deletes entry with PEL cleanup", func(t *testing.T) { + streamName := "xdelex_delref_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, Consumer: "c1", Streams: []string{streamName, ">"}, Count: 1, + }).Result() + require.NoError(t, err) + r, err := rdb.Do(ctx, "XDELEX", streamName, "DELREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX duplicate IDs are idempotent", func(t *testing.T) { + streamName := "xdelex_dedup_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "2", "1-0", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1), int64(-1)}, r) + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with no groups returns 2", func(t *testing.T) { + streamName := "xdelex_acked_nogroup_" + strconv.Itoa(rand.Int()) + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(2)}, r) + + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with group acked deletes entry", func(t *testing.T) { + streamName := "xdelex_acked_group_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with multi-group some acked returns 2", func(t *testing.T) { + streamName := "xdelex_acked_multigroup_notall_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(2)}, r) + + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with undelivered group returns 2", func(t *testing.T) { + streamName := "xdelex_acked_undelivered_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(2)}, r) + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with multi-group all acked deletes entry", func(t *testing.T) { + streamName := "xdelex_acked_multigroup_all_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + otherGroup := "otherGroup" + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, otherGroup, "0").Err()) + + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "c1", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: otherGroup, + Consumer: "c2", + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XAck(ctx, streamName, groupName, "1-0").Err()) + require.NoError(t, rdb.XAck(ctx, streamName, otherGroup, "1-0").Err()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX DELREF dangling PEL after XDEL returns -1", func(t *testing.T) { + streamName := "xdelex_dangling_pel_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, Consumer: "c1", Streams: []string{streamName, ">"}, Count: 1, + }).Result() + require.NoError(t, err) + require.Equal(t, int64(1), rdb.XDel(ctx, streamName, "1-0").Val()) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "DELREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(-1)}, r) + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX option can follow IDS block", func(t *testing.T) { + streamName := "xdelex_option_after_ids_" + strconv.Itoa(rand.Int()) + groupName := "myGroup" + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, Consumer: "c1", Streams: []string{streamName, ">"}, Count: 1, + }).Result() + require.NoError(t, err) + + r, err := rdb.Do(ctx, "XDELEX", streamName, "IDS", "1", "1-0", "DELREF").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + pending, err := rdb.XPending(ctx, streamName, groupName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), pending.Count) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX with invalid option returns error", func(t *testing.T) { + streamName := "xdelex_badopt_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + _, err := rdb.Do(ctx, "XDELEX", streamName, "INVALID", "IDS", "1", "1-0").Result() + require.Error(t, err) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX missing IDS returns error", func(t *testing.T) { + streamName := "xdelex_noids_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + _, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF").Result() + require.Error(t, err) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX with non-integer numids returns error", func(t *testing.T) { + streamName := "xdelex_badnumids_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + _, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "abc").Result() + require.Error(t, err) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX with zero numids returns error", func(t *testing.T) { + streamName := "xdelex_zeronumids_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + _, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "0").Result() + require.Error(t, err) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX with negative numids returns error", func(t *testing.T) { + streamName := "xdelex_negnumids_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + _, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "-1").Result() + require.Error(t, err) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX with invalid entry ID returns error", func(t *testing.T) { + streamName := "xdelex_badid_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + _, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "1", "bad-id").Result() + require.Error(t, err) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX KEEPREF non-existent entry returns -1", func(t *testing.T) { + streamName := "xdelex_notfound_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"field", "value"}, + }).Err()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "1", "999-999").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(-1)}, r) + // Verify entry 1-0 still exists. + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX KEEPREF delete first entry recalculates boundary", func(t *testing.T) { + streamName := "xdelex_boundary_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "2-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "3-0", Values: []string{"f", "v"}, + }).Err()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(2), rdb.XLen(ctx, streamName).Val()) + // XRANGE should start from 2-0 now. + msgs, err := rdb.XRange(ctx, streamName, "-", "+").Result() + require.NoError(t, err) + require.Equal(t, "2-0", msgs[0].ID) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX KEEPREF delete last entry with consumer groups recalculates boundary", func(t *testing.T) { + streamName := "xdelex_boundary_last_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "2-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "3-0", Values: []string{"f", "v"}, + }).Err()) + // Group/PEL keys sort after entry keys; SeekToLast must filter them out. + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, "g1", "0").Err()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "1", "3-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(2), rdb.XLen(ctx, streamName).Val()) + // XRANGE should end at 2-0 now, not parse garbage from group keys. + msgs, err := rdb.XRange(ctx, streamName, "-", "+").Result() + require.NoError(t, err) + require.Equal(t, 2, len(msgs)) + require.Equal(t, "2-0", msgs[1].ID) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX with multiple entries in one call", func(t *testing.T) { + streamName := "xdelex_multi_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "2-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "3-0", Values: []string{"f", "v"}, + }).Err()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "3", "1-0", "999-999", "3-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1), int64(-1), int64(1)}, r) + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX delete all entries empties stream", func(t *testing.T) { + streamName := "xdelex_empty_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "2-0", Values: []string{"f", "v"}, + }).Err()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "2", "1-0", "2-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1), int64(1)}, r) + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + info, err := rdb.XInfoStream(ctx, streamName).Result() + require.NoError(t, err) + require.Equal(t, int64(0), info.Length) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX DELREF removes PEL from all groups", func(t *testing.T) { + streamName := "xdelex_delref_multigrp_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, "g1", "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, "g2", "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "g1", Consumer: "c1", Streams: []string{streamName, ">"}, Count: 1, + }).Result() + require.NoError(t, err) + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "g2", Consumer: "c2", Streams: []string{streamName, ">"}, Count: 1, + }).Result() + require.NoError(t, err) + r, err := rdb.Do(ctx, "XDELEX", streamName, "DELREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(0), rdb.XLen(ctx, streamName).Val()) + p1, err := rdb.XPending(ctx, streamName, "g1").Result() + require.NoError(t, err) + require.Equal(t, int64(0), p1.Count) + p2, err := rdb.XPending(ctx, streamName, "g2").Result() + require.NoError(t, err) + require.Equal(t, int64(0), p2.Count) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with group created at $ allows deletion", func(t *testing.T) { + streamName := "xdelex_acked_dollar_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "2-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, "g1", "0").Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, "g2", "$").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "g1", Consumer: "c1", Streams: []string{streamName, ">"}, Count: 1, + }).Result() + require.NoError(t, err) + require.NoError(t, rdb.XAck(ctx, streamName, "g1", "1-0").Err()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX ACKED with mixed results in same call", func(t *testing.T) { + streamName := "xdelex_acked_mixed_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "2-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, "g1", "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "g1", Consumer: "c1", Streams: []string{streamName, ">"}, Count: 2, + }).Result() + require.NoError(t, err) + require.NoError(t, rdb.XAck(ctx, streamName, "g1", "1-0").Err()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "ACKED", "IDS", "2", "1-0", "2-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1), int64(2)}, r) + require.Equal(t, int64(1), rdb.XLen(ctx, streamName).Val()) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) + + t.Run("XDELEX on non-existent stream returns -1", func(t *testing.T) { + streamName := "xdelex_nonexist_" + strconv.Itoa(rand.Int()) + r, err := rdb.Do(ctx, "XDELEX", streamName, "KEEPREF", "IDS", "1", "1-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(-1)}, r) + }) + + t.Run("XDELEX DELREF delete last entry with consumer groups recalculates boundary", func(t *testing.T) { + streamName := "xdelex_boundary_last_delref_" + strconv.Itoa(rand.Int()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "1-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "2-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, ID: "3-0", Values: []string{"f", "v"}, + }).Err()) + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, "g1", "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "g1", Consumer: "c1", Streams: []string{streamName, ">"}, Count: 1, + }).Result() + require.NoError(t, err) + r, err := rdb.Do(ctx, "XDELEX", streamName, "DELREF", "IDS", "1", "3-0").Result() + require.NoError(t, err) + require.Equal(t, []interface{}{int64(1)}, r) + require.Equal(t, int64(2), rdb.XLen(ctx, streamName).Val()) + msgs, err := rdb.XRange(ctx, streamName, "-", "+").Result() + require.NoError(t, err) + require.Equal(t, 2, len(msgs)) + require.Equal(t, "2-0", msgs[1].ID) + require.NoError(t, rdb.Del(ctx, streamName).Err()) + }) } // streamSimulateXRANGE simulates Redis XRANGE implementation in Golang.