Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,142 @@ class CommandXDel : public Commander {
std::vector<redis::StreamEntryID> ids_;
};

class CommandXAckDel : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
stream_name_ = GET_OR_RET(parser.TakeStr());
group_name_ = GET_OR_RET(parser.TakeStr());

option_ = redis::StreamDeleteOption::KeepRef;

while (parser.Good() && !util::EqualICase(parser.RawPeek(), "IDS")) {
if (parser.EatEqICase("KEEPREF")) {
option_ = redis::StreamDeleteOption::KeepRef;
} else if (parser.EatEqICase("DELREF")) {
option_ = redis::StreamDeleteOption::DelRef;
} else if (parser.EatEqICase("ACKED")) {
option_ = redis::StreamDeleteOption::Acked;
} else {
return parser.InvalidSyntax();
}
}

if (!parser.EatEqICase("IDS")) {
return {Status::RedisParseErr, "syntax error, expected IDS keyword"};
}

auto numids_result = parser.TakeInt<int64_t>();
if (!numids_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
int64_t numids = numids_result.GetValue();
if (numids <= 0) {
return {Status::RedisParseErr, "numids must be positive"};
}

for (int64_t i = 0; i < numids; i++) {
auto id_str = GET_OR_RET(parser.TakeStr());
redis::StreamEntryID id;
auto s = ParseStreamEntryID(id_str, &id);
if (!s.IsOK()) return s;
entry_ids_.emplace_back(id);
}

return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
std::vector<int> results;

auto s = stream_db.DeleteEntriesAndAck(ctx, stream_name_, group_name_, entry_ids_, option_, &results);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

output->append(redis::MultiLen(results.size()));
for (int r : results) {
output->append(redis::Integer(r));
}

return Status::OK();
}

private:
std::string stream_name_;
std::string group_name_;
redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef;
std::vector<redis::StreamEntryID> entry_ids_;
};

class CommandXDelEx : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
stream_name_ = GET_OR_RET(parser.TakeStr());

option_ = redis::StreamDeleteOption::KeepRef;

while (parser.Good() && !util::EqualICase(parser.RawPeek(), "IDS")) {
if (parser.EatEqICase("KEEPREF")) {
option_ = redis::StreamDeleteOption::KeepRef;
} else if (parser.EatEqICase("DELREF")) {
option_ = redis::StreamDeleteOption::DelRef;
} else if (parser.EatEqICase("ACKED")) {
option_ = redis::StreamDeleteOption::Acked;
} else {
return parser.InvalidSyntax();
}
}

if (!parser.EatEqICase("IDS")) {
return {Status::RedisParseErr, "syntax error, expected IDS keyword"};
}

auto numids_result = parser.TakeInt<int64_t>();
if (!numids_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
int64_t numids = numids_result.GetValue();
if (numids <= 0) {
return {Status::RedisParseErr, "numids must be positive"};
}

for (int64_t i = 0; i < numids; i++) {
auto id_str = GET_OR_RET(parser.TakeStr());
redis::StreamEntryID id;
auto s = ParseStreamEntryID(id_str, &id);
if (!s.IsOK()) return s;
entry_ids_.emplace_back(id);
}

return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
std::vector<int> results;

auto s = stream_db.DeleteEntriesWithOption(ctx, stream_name_, entry_ids_, option_, &results);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

output->append(redis::MultiLen(results.size()));
for (int r : results) {
output->append(redis::Integer(r));
}

return Status::OK();
}

private:
std::string stream_name_;
redis::StreamDeleteOption option_ = redis::StreamDeleteOption::KeepRef;
std::vector<redis::StreamEntryID> entry_ids_;
};

class CommandXClaim : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -1907,6 +2043,8 @@ class CommandXSetId : public Commander {
REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXAckDel>("xackdel", -6, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXDelEx>("xdelex", -5, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
Expand Down
21 changes: 20 additions & 1 deletion src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,27 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
Slice encoded_id = ikey.GetSubKey();
redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);

if (entry_id.ms == UINT64_MAX) {
return rocksdb::Status::OK();
}

GetFixed64(&encoded_id, &entry_id.seq);
command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()};
std::string entry_id_str = entry_id.ToString();
std::string user_key = ikey.GetKey().ToString();

auto args = log_data_.GetArguments();
if (!args->empty()) {
if ((*args)[0] == "XACKDEL" && args->size() >= 3) {
command_args = {(*args)[0], user_key, (*args)[1], (*args)[2], "IDS", "1", entry_id_str};
} else if ((*args)[0] == "XDELEX" && args->size() >= 2) {
command_args = {(*args)[0], user_key, (*args)[1], "IDS", "1", entry_id_str};
} else {
command_args = {"XDEL", user_key, entry_id_str};
}
} else {
command_args = {"XDEL", user_key, entry_id_str};
}
}

if (!command_args.empty()) {
Expand Down
Loading
Loading