Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,79 @@ class CommandXDel : public Commander {
std::vector<redis::StreamEntryID> ids_;
};

class CommandXDelEx : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
stream_name_ = GET_OR_RET(parser.TakeStr());

option_ = redis::StreamDeleteOption::KeepRef;
bool has_ids = false;

while (parser.Good()) {
if (parser.EatEqICase("KEEPREF")) {
option_ = redis::StreamDeleteOption::KeepRef;
} else if (parser.EatEqICase("DELREF")) {
option_ = redis::StreamDeleteOption::DelRef;
} else if (parser.EatEqICase("ACKED")) {
option_ = redis::StreamDeleteOption::Acked;
} else if (parser.EatEqICase("IDS")) {
if (has_ids) {
return parser.InvalidSyntax();
}
has_ids = true;

auto numids_result = parser.TakeInt<int64_t>();
if (!numids_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
int64_t numids = numids_result.GetValue();
if (numids <= 0) {
return {Status::RedisParseErr, "numids must be positive"};
}

for (int64_t i = 0; i < numids; i++) {
auto id_str = GET_OR_RET(parser.TakeStr());
redis::StreamEntryID id;
auto s = ParseStreamEntryID(id_str, &id);
if (!s.IsOK()) return s;
entry_ids_.emplace_back(id);
}
} else {
return parser.InvalidSyntax();
}
}

if (!has_ids) {
return {Status::RedisParseErr, "syntax error, expected IDS keyword"};
}

return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
std::vector<int> results;

auto s = stream_db.DeleteEntriesWithOption(ctx, stream_name_, entry_ids_, option_, &results);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

output->append(redis::MultiLen(results.size()));
for (int r : results) {
output->append(redis::Integer(r));
}

return Status::OK();
}

private:
std::string stream_name_;
redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef;
std::vector<redis::StreamEntryID> entry_ids_;
};

class CommandXClaim : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -1907,6 +1980,7 @@ class CommandXSetId : public Commander {
REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXDelEx>("xdelex", -5, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
Expand Down
84 changes: 83 additions & 1 deletion src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

#include "batch_extractor.h"

#include <utility>

#include "cluster/redis_slot.h"
#include "logging.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "types/redis_bitmap.h"
#include "types/redis_stream_base.h"

void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
// Currently, we only have two kinds of log data
Expand All @@ -38,6 +41,8 @@ void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
// Redis type log data
if (auto s = log_data_.Decode(blob); !s.IsOK()) {
WARN("Failed to decode Redis type log: {}", s.Msg());
} else {
seen_xdelex_entry_keys_.clear();
}
}
}
Expand Down Expand Up @@ -266,6 +271,21 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
break;
}
} else if (column_family_id == static_cast<uint32_t>(ColumnFamilyID::Stream)) {
InternalKey ikey(key, is_slot_id_encoded_);
Slice entry_id_check = ikey.GetSubKey();
uint64_t delimiter = 0;
GetFixed64(&entry_id_check, &delimiter);
if (delimiter == UINT64_MAX) {
return rocksdb::Status::OK();
}

user_key = ikey.GetKey().ToString();
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
ns = ikey.GetNamespace().ToString();

auto s = ExtractStreamAddCommand(is_slot_id_encoded_, key, value, &command_args);
if (!s.IsOK()) {
ERROR("Failed to parse write_batch in PutCF. Type=Stream: {}", s.Msg());
Expand Down Expand Up @@ -397,8 +417,70 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
Slice encoded_id = ikey.GetSubKey();
redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);

if (entry_id.ms == UINT64_MAX) {
// DELREF may remove dangling PELs without a stream entry deletion.
auto args = log_data_.GetArguments();
if (!args->empty() && (*args)[0] == "XDELEX" && args->size() >= 2 && (*args)[1] == "DELREF") {
uint8_t type_delimiter = 0;
if (!GetFixed8(&encoded_id, &type_delimiter)) {
return rocksdb::Status::OK();
}
if (type_delimiter == static_cast<uint8_t>(redis::StreamSubkeyType::StreamPelEntry)) {
uint64_t group_name_len = 0;
if (!GetFixed64(&encoded_id, &group_name_len)) {
return rocksdb::Status::OK();
}
if (group_name_len > encoded_id.size() || encoded_id.size() - group_name_len < 16) {
return rocksdb::Status::OK();
}
encoded_id.remove_prefix(group_name_len);

if (!GetFixed64(&encoded_id, &entry_id.ms) || !GetFixed64(&encoded_id, &entry_id.seq)) {
return rocksdb::Status::OK();
}
std::string entry_id_str = entry_id.ToString();

std::string user_key = ikey.GetKey().ToString();
auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
ns = ikey.GetNamespace().ToString();

std::string dedup_key = ns + '\0' + user_key + '\0' + entry_id_str;
if (seen_xdelex_entry_keys_.insert(std::move(dedup_key)).second) {
command_args = {(*args)[0], user_key, (*args)[1], "IDS", "1", entry_id_str};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
}
}
return rocksdb::Status::OK();
}

GetFixed64(&encoded_id, &entry_id.seq);
command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()};
std::string entry_id_str = entry_id.ToString();
std::string user_key = ikey.GetKey().ToString();

auto key_slot_id = GetSlotIdFromKey(user_key);
if (slot_range_.IsValid() && !slot_range_.Contains(key_slot_id)) {
return rocksdb::Status::OK();
}
ns = ikey.GetNamespace().ToString();

auto args = log_data_.GetArguments();
if (!args->empty()) {
if ((*args)[0] == "XDELEX" && args->size() >= 2) {
std::string dedup_key = ns + '\0' + user_key + '\0' + entry_id_str;
if (seen_xdelex_entry_keys_.insert(std::move(dedup_key)).second) {
command_args = {(*args)[0], user_key, (*args)[1], "IDS", "1", entry_id_str};
}
} else {
command_args = {"XDEL", user_key, entry_id_str};
}
} else {
command_args = {"XDEL", user_key, entry_id_str};
}
}

if (!command_args.empty()) {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/batch_extractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <map>
#include <string>
#include <unordered_set>
#include <vector>

#include "cluster/cluster_defs.h"
Expand Down Expand Up @@ -54,4 +55,5 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
bool is_slot_id_encoded_ = false;
SlotRange slot_range_;
bool to_redis_;
std::unordered_set<std::string> seen_xdelex_entry_keys_;
};
Loading
Loading