diff --git a/src/cmd_list.cc b/src/cmd_list.cc index a3d3403..de7e935 100644 --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -177,9 +177,48 @@ void RPopCmd::DoCmd(PClient* client) { BLPopCmd::BLPopCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {} -bool BLPopCmd::DoInitial(PClient* client) { return true; } +bool BLPopCmd::DoInitial(PClient* client) { + std::vector keys(client->argv_.begin() + 1, client->argv_.end() - 1); + client->SetKey(keys); + + int64_t timeout = 0; + if (!kstd::String2int(client->argv_.back(), &timeout)) { + client->SetRes(CmdRes::kInvalidInt); + return false; + } + if (timeout < 0) { + client->SetRes(CmdRes::kErrOther, "timeout can't be a negative value"); + return false; + } + if (timeout > 0) { + auto now = std::chrono::system_clock::now(); + expire_time_ = + std::chrono::time_point_cast(now).time_since_epoch().count() + timeout * 1000; + } + return true; +} -void BLPopCmd::DoCmd(PClient* client) {} +void BLPopCmd::DoCmd(PClient* client) { + std::vector elements; + std::vector list_keys(client->Keys().begin(), client->Keys().end()); + storage::MultiScopeRecordLock(STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->GetLockMgr(), list_keys); + for (auto& list_key : list_keys) { + storage::Status s = + STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->LPopWithoutLock(list_key, 1, &elements); + if (s.ok()) { + client->AppendArrayLen(2); + client->AppendString(list_key); + client->AppendString(elements[0]); + return; + } else if (s.IsNotFound()) { + continue; + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + return; + } + } + BlockThisClientToWaitLRPush(list_keys, expire_time_, client->shared_from_this(), BlockedConnNode::Type::BLPop); +} BRPopCmd::BRPopCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {} diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index e3c70a1..83a0ff7 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -573,6 +573,8 @@ class Storage { // Removes and returns the first elements of the list stored at key. Status LPop(const Slice& key, int64_t count, std::vector* elements); + Status LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements); + // Removes and returns the last elements of the list stored at key. Status RPop(const Slice& key, int64_t count, std::vector* elements); diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index abc275f..94a225d 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -243,6 +243,7 @@ class Redis { const std::string& value, int64_t* ret); Status LLen(const Slice& key, uint64_t* len); Status LPop(const Slice& key, int64_t count, std::vector* elements); + Status LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements); Status LPush(const Slice& key, const std::vector& values, uint64_t* ret); Status LPushx(const Slice& key, const std::vector& values, uint64_t* len); Status LRange(const Slice& key, int64_t start, int64_t stop, std::vector* ret); diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index cff6792..ebb7333 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -268,6 +268,52 @@ Status Redis::LPop(const Slice& key, int64_t count, std::vector* el return s; } +Status Redis::LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements) { + uint32_t statistic = 0; + elements->clear(); + + auto batch = Batch::CreateBatch(this); + + std::string meta_value; + + BaseMetaKey base_meta_key(key); + Status s = db_->Get(default_read_options_, handles_[kMetaCF], base_meta_key.Encode(), &meta_value); + if (s.ok()) { + if (IsStale(meta_value)) { + return Status::NotFound(); + } else if (!ExpectedMetaValue(DataType::kLists, meta_value)) { + return Status::InvalidArgument(fmt::format("WRONGTYPE, key: {}, expect type: {}, get type: {}", key.ToString(), + DataTypeStrings[static_cast(DataType::kLists)], + DataTypeStrings[static_cast(GetMetaValueType(meta_value))])); + } else { + ParsedListsMetaValue parsed_lists_meta_value(&meta_value); + auto size = static_cast(parsed_lists_meta_value.Count()); + uint64_t version = parsed_lists_meta_value.Version(); + int32_t start_index = 0; + auto stop_index = static_cast(count <= size ? count - 1 : size - 1); + int32_t cur_index = 0; + ListsDataKey lists_data_key(key, version, parsed_lists_meta_value.LeftIndex() + 1); + rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[kListsDataCF]); + for (iter->Seek(lists_data_key.Encode()); iter->Valid() && cur_index <= stop_index; iter->Next(), ++cur_index) { + statistic++; + ParsedBaseDataValue parsed_base_data_value(iter->value()); + elements->push_back(parsed_base_data_value.UserValue().ToString()); + batch->Delete(kListsDataCF, iter->key()); + + parsed_lists_meta_value.ModifyCount(-1); + parsed_lists_meta_value.ModifyLeftIndex(-1); + } + batch->Put(kMetaCF, base_meta_key.Encode(), meta_value); + delete iter; + } + } + if (batch->Count() != 0U) { + s = batch->Commit(); + UpdateSpecificKeyStatistics(DataType::kLists, key.ToString(), statistic); + } + return s; +} + Status Redis::LPush(const Slice& key, const std::vector& values, uint64_t* ret) { *ret = 0; auto batch = Batch::CreateBatch(this); diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index e389516..364da9e 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -899,6 +899,12 @@ Status Storage::LPop(const Slice& key, int64_t count, std::vector* return inst->LPop(key, count, elements); } +Status Storage::LPopWithoutLock(const Slice& key, int64_t count, std::vector* elements) { + elements->clear(); + auto& inst = GetDBInstance(key); + return inst->LPopWithoutLock(key, count, elements); +} + Status Storage::RPop(const Slice& key, int64_t count, std::vector* elements) { elements->clear(); auto& inst = GetDBInstance(key);