From e52f8de1576f9fe9f69ebc8abc9e98603bab1319 Mon Sep 17 00:00:00 2001 From: guozhihao Date: Tue, 11 Mar 2025 00:25:51 +0800 Subject: [PATCH 1/3] feat: add blpop cmd --- src/cmd_list.cc | 43 +++++++++++++++++++++++-- src/storage/include/storage/storage.h | 3 ++ src/storage/src/redis.h | 1 + src/storage/src/redis_lists.cc | 46 +++++++++++++++++++++++++++ src/storage/src/storage.cc | 7 ++++ 5 files changed, 98 insertions(+), 2 deletions(-) diff --git a/src/cmd_list.cc b/src/cmd_list.cc index cd2980e..dd8fce6 100644 --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -176,9 +176,48 @@ void RPopCmd::DoCmd(PClient* client) { BLPopCmd::BLPopCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {} -bool BLPopCmd::DoInitial(PClient* client) { return true; } +bool BLPopCmd::DoInitial(PClient* client) { + std::vector keys(client->argv_.begin() + 1, client->argv_.end() - 1); + client->SetKey(keys); + + int64_t timeout = 0; + if (!kstd::String2int(client->argv_.back(), &timeout)) { + client->SetRes(CmdRes::kInvalidInt); + return false; + } + if (timeout < 0) { + client->SetRes(CmdRes::kErrOther, "timeout can't be a negative value"); + return false; + } + if (timeout > 0) { + auto now = std::chrono::system_clock::now(); + expire_time_ = + std::chrono::time_point_cast(now).time_since_epoch().count() + timeout * 1000; + } + return true; +} -void BLPopCmd::DoCmd(PClient* client) {} +void BLPopCmd::DoCmd(PClient* client) { + std::vector elements; + std::vector list_keys(client->Keys().begin(), client->Keys().end()); + storage::MultiScopeRecordLock(STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->GetLockMgr(), list_keys); + for (auto& list_key : list_keys) { + storage::Status s = + STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->LPopWithoutLock(list_key, 1, &elements); + if (s.ok()) { + client->AppendArrayLen(2); + client->AppendString(list_key); + client->AppendString(elements[0]); + return; + } else if (s.IsNotFound()) { + continue; + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + return; + } + } + BlockThisClientToWaitLRPush(list_keys, expire_time_, client->shared_from_this(), BlockedConnNode::Type::BLPop); +} BRPopCmd::BRPopCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {} diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index e3c70a1..710bfa0 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -573,6 +573,9 @@ class Storage { // Removes and returns the first elements of the list stored at key. Status LPop(const Slice& key, int64_t count, std::vector* elements); + Status LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements); + + // Removes and returns the last elements of the list stored at key. Status RPop(const Slice& key, int64_t count, std::vector* elements); diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index abc275f..94a225d 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -243,6 +243,7 @@ class Redis { const std::string& value, int64_t* ret); Status LLen(const Slice& key, uint64_t* len); Status LPop(const Slice& key, int64_t count, std::vector* elements); + Status LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements); Status LPush(const Slice& key, const std::vector& values, uint64_t* ret); Status LPushx(const Slice& key, const std::vector& values, uint64_t* len); Status LRange(const Slice& key, int64_t start, int64_t stop, std::vector* ret); diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index cff6792..ebb7333 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -268,6 +268,52 @@ Status Redis::LPop(const Slice& key, int64_t count, std::vector* el return s; } +Status Redis::LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements) { + uint32_t statistic = 0; + elements->clear(); + + auto batch = Batch::CreateBatch(this); + + std::string meta_value; + + BaseMetaKey base_meta_key(key); + Status s = db_->Get(default_read_options_, handles_[kMetaCF], base_meta_key.Encode(), &meta_value); + if (s.ok()) { + if (IsStale(meta_value)) { + return Status::NotFound(); + } else if (!ExpectedMetaValue(DataType::kLists, meta_value)) { + return Status::InvalidArgument(fmt::format("WRONGTYPE, key: {}, expect type: {}, get type: {}", key.ToString(), + DataTypeStrings[static_cast(DataType::kLists)], + DataTypeStrings[static_cast(GetMetaValueType(meta_value))])); + } else { + ParsedListsMetaValue parsed_lists_meta_value(&meta_value); + auto size = static_cast(parsed_lists_meta_value.Count()); + uint64_t version = parsed_lists_meta_value.Version(); + int32_t start_index = 0; + auto stop_index = static_cast(count <= size ? count - 1 : size - 1); + int32_t cur_index = 0; + ListsDataKey lists_data_key(key, version, parsed_lists_meta_value.LeftIndex() + 1); + rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[kListsDataCF]); + for (iter->Seek(lists_data_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { + statistic++; + ParsedBaseDataValue parsed_base_data_value(iter->value()); + elements->push_back(parsed_base_data_value.UserValue().ToString()); + batch->Delete(kListsDataCF, iter->key()); + + parsed_lists_meta_value.ModifyCount(-1); + parsed_lists_meta_value.ModifyLeftIndex(-1); + } + batch->Put(kMetaCF, base_meta_key.Encode(), meta_value); + delete iter; + } + } + if (batch->Count() != 0U) { + s = batch->Commit(); + UpdateSpecificKeyStatistics(DataType::kLists, key.ToString(), statistic); + } + return s; +} + Status Redis::LPush(const Slice& key, const std::vector& values, uint64_t* ret) { *ret = 0; auto batch = Batch::CreateBatch(this); diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 62c458a..10c6cb2 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -900,6 +900,13 @@ Status Storage::LPop(const Slice& key, int64_t count, std::vector* return inst->LPop(key, count, elements); } +Status Storage::LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements) { + elements->clear(); + auto& inst = GetDBInstance(key); + return inst->RPopWithoutLock(key, count, elements); +} + + Status Storage::RPop(const Slice& key, int64_t count, std::vector* elements) { elements->clear(); auto& inst = GetDBInstance(key); From 60a9d0cffdb59ff2d992da0aec8900a6cce33849 Mon Sep 17 00:00:00 2001 From: guozhihao Date: Tue, 11 Mar 2025 00:38:26 +0800 Subject: [PATCH 2/3] feat: add blpop cmd --- src/storage/src/storage.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 10c6cb2..ea8d820 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -903,7 +903,7 @@ Status Storage::LPop(const Slice& key, int64_t count, std::vector* Status Storage::LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements) { elements->clear(); auto& inst = GetDBInstance(key); - return inst->RPopWithoutLock(key, count, elements); + return inst->LPopWithoutLock(key, count, elements); } From 922e2668c0dbc3e47ea7e9dff6fa83607234a16d Mon Sep 17 00:00:00 2001 From: guozhihao Date: Tue, 11 Mar 2025 22:38:10 +0800 Subject: [PATCH 3/3] fix: format --- src/storage/include/storage/storage.h | 1 - src/storage/src/storage.cc | 1 - 2 files changed, 2 deletions(-) diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 710bfa0..83a0ff7 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -575,7 +575,6 @@ class Storage { Status LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements); - // Removes and returns the last elements of the list stored at key. Status RPop(const Slice& key, int64_t count, std::vector* elements); diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index ea8d820..23ab82f 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -906,7 +906,6 @@ Status Storage::LPopWithoutLock(const Slice& key, int64_t count, std::vectorLPopWithoutLock(key, count, elements); } - Status Storage::RPop(const Slice& key, int64_t count, std::vector* elements) { elements->clear(); auto& inst = GetDBInstance(key);