diff --git a/CMakeLists.txt b/CMakeLists.txt index 35365b270b..be401354e3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -383,6 +383,7 @@ set(BUTIL_SOURCES ${PROJECT_SOURCE_DIR}/src/butil/time.cpp ${PROJECT_SOURCE_DIR}/src/butil/zero_copy_stream_as_streambuf.cpp ${PROJECT_SOURCE_DIR}/src/butil/crc32c.cc + ${PROJECT_SOURCE_DIR}/src/butil/redis_cluster_slot.cc ${PROJECT_SOURCE_DIR}/src/butil/containers/case_ignored_flat_map.cpp ${PROJECT_SOURCE_DIR}/src/butil/iobuf.cpp ${PROJECT_SOURCE_DIR}/src/butil/binary_printer.cpp diff --git a/example/http_c++/http_client.cpp b/example/http_c++/http_client.cpp index d9984c886f..8e39ecd239 100644 --- a/example/http_c++/http_client.cpp +++ b/example/http_c++/http_client.cpp @@ -56,7 +56,7 @@ int main(int argc, char* argv[]) { // Initialize the channel, NULL means using default options. // options, see `brpc/channel.h'. - if (channel.Init(url, FLAGS_load_balancer.c_str(), &options) != 0) { + if (channel.Init(url, "", &options) != 0) { LOG(ERROR) << "Fail to initialize channel"; return -1; } diff --git a/example/redis_c++/redis_cli.cpp b/example/redis_c++/redis_cli.cpp index d4cbf63a95..4ceab594e7 100644 --- a/example/redis_c++/redis_cli.cpp +++ b/example/redis_c++/redis_cli.cpp @@ -23,18 +23,38 @@ #include #include #include +#include #include #include +#include DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); DEFINE_string(server, "127.0.0.1:6379", "IP Address of server"); DEFINE_int32(timeout_ms, 1000, "RPC timeout in milliseconds"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_string(auth, "", "auth..."); +DEFINE_uint64(request_code, 0, "request code"); namespace brpc { const char* logo(); } +static std::string extractSecondWord(const std::string &str) { + std::istringstream iss(str); + std::vector words; + std::string word; + + while (iss >> word) { + words.push_back(word); + } + + if (words.size() >= 2) { + return words[1]; + } else { + return ""; + } +} + // Send `command' to redis-server via `channel' static bool access_redis(brpc::Channel& channel, const char* command) { brpc::RedisRequest request; @@ -44,6 +64,7 @@ static bool access_redis(brpc::Channel& channel, const char* command) { } brpc::RedisResponse response; brpc::Controller cntl; + cntl.set_request_code(butil::cal_slot_num(extractSecondWord(command))); channel.CallMethod(NULL, &cntl, &request, &response, NULL); if (cntl.Failed()) { LOG(ERROR) << "Fail to access redis, " << cntl.ErrorText(); @@ -89,7 +110,11 @@ int main(int argc, char* argv[]) { options.connection_type = FLAGS_connection_type; options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; options.max_retry = FLAGS_max_retry; - if (channel.Init(FLAGS_server.c_str(), &options) != 0) { + if (!FLAGS_auth.empty()) { + brpc::policy::RedisAuthenticator* auth = new brpc::policy::RedisAuthenticator(FLAGS_auth); + options.auth = auth; + } + if (channel.Init(FLAGS_server.c_str(), "c_redis_cluster", &options) != 0) { LOG(ERROR) << "Fail to initialize channel"; return -1; } diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index a94c09b5b8..df77ae8716 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -24,7 +24,6 @@ #include "butil/third_party/murmurhash3/murmurhash3.h" #include "butil/strings/string_util.h" #include "bthread/unstable.h" // bthread_timer_add -#include "brpc/socket_map.h" // SocketMapInsert #include "brpc/compress.h" #include "brpc/global.h" #include "brpc/span.h" diff --git a/src/brpc/channel.h b/src/brpc/channel.h index b7b2ae8f9e..ca8d0aba55 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -32,6 +32,7 @@ #include "brpc/adaptive_connection_type.h" // AdaptiveConnectionType #include "brpc/socket_id.h" // SocketId #include "brpc/controller.h" // brpc::Controller +#include "brpc/socket_map.h" // SocketMapInsert #include "brpc/details/profiler_linker.h" #include "brpc/retry_policy.h" #include "brpc/naming_service_filter.h" diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index b6c8e750fe..c213780907 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -29,7 +29,6 @@ #include "bthread/unstable.h" #include "bvar/bvar.h" #include "brpc/socket.h" -#include "brpc/socket_map.h" #include "brpc/channel.h" #include "brpc/load_balancer.h" #include "brpc/closure_guard.h" @@ -265,6 +264,7 @@ void Controller::ResetPods() { _sender = NULL; _request_code = 0; _single_server_id = INVALID_SOCKET_ID; + _tmp_single_server_id = INVALID_SOCKET_ID; _unfinished_call = NULL; _stream_creator = NULL; _accessed = NULL; @@ -587,6 +587,12 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, return; } + if (saved_error == EMOVED) { + //todo set real serverid.... + _current_call.OnComplete(this, _error_code, info.responded, false); + return IssueRPC(butil::gettimeofday_us()); + } + if ((!_error_code && _retry_policy == NULL) || _current_call.nretry >= _max_retry) { goto END_OF_RPC; @@ -1020,14 +1026,23 @@ void Controller::IssueRPC(int64_t start_realtime_us) { if (SingleServer()) { // Don't use _current_call.peer_id which is set to -1 after construction // of the backup call. - const int rc = Socket::Address(_single_server_id, &tmp_sock); + int rc; + SocketId peer_id; + if (_single_server_id != INVALID_SOCKET_ID) { + rc = Socket::Address(_single_server_id, &tmp_sock); + peer_id = _single_server_id; + } else { + rc = Socket::Address(_tmp_single_server_id, &tmp_sock); + peer_id = _tmp_single_server_id; + _tmp_single_server_id = INVALID_SOCKET_ID; + } if (rc != 0 || (!is_health_check_call() && !tmp_sock->IsAvailable())) { SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64, endpoint2str(_remote_side).c_str(), _single_server_id); tmp_sock.reset(); // Release ref ASAP return HandleSendFailed(); } - _current_call.peer_id = _single_server_id; + _current_call.peer_id = peer_id; } else { LoadBalancer::SelectIn sel_in = { start_realtime_us, true, diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 658cc6957c..44df0b43c8 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -43,6 +43,7 @@ #include "brpc/callback.h" #include "brpc/progressive_attachment.h" // ProgressiveAttachment #include "brpc/progressive_reader.h" // ProgressiveReader +#include "brpc/socket_map.h" #include "brpc/grpc.h" #include "brpc/kvmap.h" @@ -564,6 +565,10 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // -1 means no deadline. int64_t deadline_us() const { return _deadline_us; } + void set_tmp_single_socket_id (const SocketId& id) { + _tmp_single_server_id = id; + } + private: struct CompletionInfo { CallId id; // call_id of the corresponding request @@ -679,7 +684,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); void HandleStreamConnection(Socket *host_socket); - bool SingleServer() const { return _single_server_id != INVALID_SOCKET_ID; } + bool SingleServer() const { return _single_server_id != INVALID_SOCKET_ID || _tmp_single_server_id != INVALID_SOCKET_ID; } void SubmitSpan(); @@ -774,6 +779,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); RPCSender* _sender; uint64_t _request_code; SocketId _single_server_id; + SocketId _tmp_single_server_id; butil::intrusive_ptr _lb; // for passing parameters to created bthread, don't modify it otherwhere. diff --git a/src/brpc/errno.proto b/src/brpc/errno.proto index fccd8edb8d..d5d0468a4c 100644 --- a/src/brpc/errno.proto +++ b/src/brpc/errno.proto @@ -49,6 +49,7 @@ enum Errno { ELIMIT = 2004; // Reached server's limit on resources ECLOSE = 2005; // Close socket initiatively EITP = 2006; // Failed Itp response + EMOVED = 2007; // Failed Itp response // Errno related to RDMA (may happen at both sides) ERDMA = 3001; // RDMA verbs error diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 30c2f1a3b9..00c05c1977 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -39,6 +39,7 @@ #include "brpc/policy/consul_naming_service.h" #include "brpc/policy/discovery_naming_service.h" #include "brpc/policy/nacos_naming_service.h" +#include "brpc/policy/redis_cluster_naming_service.h" // Load Balancers #include "brpc/policy/round_robin_load_balancer.h" @@ -123,6 +124,7 @@ struct GlobalExtensions { , ch_mh_lb(CONS_HASH_LB_MURMUR3) , ch_md5_lb(CONS_HASH_LB_MD5) , ch_ketama_lb(CONS_HASH_LB_KETAMA) + , ch_redis_cluster_lb(CONS_HASH_LB_REDIS_CLUSTER) , constant_cl(0) { } @@ -138,6 +140,7 @@ struct GlobalExtensions { ConsulNamingService cns; DiscoveryNamingService dcns; NacosNamingService nns; + RedisClusterNamingService rcns; RoundRobinLoadBalancer rr_lb; WeightedRoundRobinLoadBalancer wrr_lb; @@ -147,6 +150,7 @@ struct GlobalExtensions { ConsistentHashingLoadBalancer ch_mh_lb; ConsistentHashingLoadBalancer ch_md5_lb; ConsistentHashingLoadBalancer ch_ketama_lb; + ConsistentHashingLoadBalancer ch_redis_cluster_lb; DynPartLoadBalancer dynpart_lb; AutoConcurrencyLimiter auto_cl; @@ -363,6 +367,7 @@ static void GlobalInitializeOrDieImpl() { NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns); NamingServiceExtension()->RegisterOrDie("discovery", &g_ext->dcns); NamingServiceExtension()->RegisterOrDie("nacos", &g_ext->nns); + NamingServiceExtension()->RegisterOrDie("redis_cluster", &g_ext->rcns); // Load Balancers LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb); @@ -373,6 +378,7 @@ static void GlobalInitializeOrDieImpl() { LoadBalancerExtension()->RegisterOrDie("c_murmurhash", &g_ext->ch_mh_lb); LoadBalancerExtension()->RegisterOrDie("c_md5", &g_ext->ch_md5_lb); LoadBalancerExtension()->RegisterOrDie("c_ketama", &g_ext->ch_ketama_lb); + LoadBalancerExtension()->RegisterOrDie("c_redis_cluster", &g_ext->ch_redis_cluster_lb); LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb); // Compress Handlers diff --git a/src/brpc/policy/consistent_hashing_load_balancer.cpp b/src/brpc/policy/consistent_hashing_load_balancer.cpp index 19c8849c7d..6fc0167917 100644 --- a/src/brpc/policy/consistent_hashing_load_balancer.cpp +++ b/src/brpc/policy/consistent_hashing_load_balancer.cpp @@ -121,6 +121,38 @@ bool KetamaReplicaPolicy::Build(ServerId server, return true; } +class RedisClusterPolicy : public ReplicaPolicy { +public: + + virtual bool Build(ServerId server, + size_t num_replicas, + std::vector* replicas) const; + + virtual const char* name() const { return "RedisCluster"; } +}; + +bool RedisClusterPolicy::Build(ServerId server, + size_t, + std::vector* replicas) const { + SocketUniquePtr ptr; + if (Socket::AddressFailedAsWell(server.id, &ptr) == -1) { + return false; + } + replicas->clear(); + std::stringstream ss(server.tag); + int slot_start, slot_end; + char delimiter; + ss >> slot_start >> delimiter >> slot_end; + for (size_t i = slot_start; i <= slot_end; ++i) { + ConsistentHashingLoadBalancer::Node node; + node.hash = i; + node.server_sock = server; + node.server_addr = ptr->remote_side(); + replicas->push_back(node); + } + return true; +} + namespace { pthread_once_t s_replica_policy_once = PTHREAD_ONCE_INIT; @@ -130,7 +162,8 @@ void InitReplicaPolicy() { g_replica_policy = new std::array({ new DefaultReplicaPolicy(MurmurHash32), new DefaultReplicaPolicy(MD5Hash32), - new KetamaReplicaPolicy + new KetamaReplicaPolicy, + new RedisClusterPolicy }); } @@ -245,12 +278,12 @@ size_t ConsistentHashingLoadBalancer::AddServersInBatch( std::sort(add_nodes.begin(), add_nodes.end()); bool executed = false; const size_t ret = _db_hash_ring.ModifyWithForeground(AddBatch, add_nodes, &executed); - CHECK(ret % _num_replicas == 0); - const size_t n = ret / _num_replicas; - LOG_IF(ERROR, n != servers.size()) - << "Fail to AddServersInBatch, expected " << servers.size() - << " actually " << n; - return n; + // CHECK(ret % _num_replicas == 0); + // const size_t n = ret / _num_replicas; + // LOG_IF(ERROR, n != servers.size()) + // << "Fail to AddServersInBatch, expected " << servers.size() + // << " actually " << n; + return add_nodes.size(); } bool ConsistentHashingLoadBalancer::RemoveServer(const ServerId& server) { diff --git a/src/brpc/policy/consistent_hashing_load_balancer.h b/src/brpc/policy/consistent_hashing_load_balancer.h index 5da7548a63..93db8c8dd6 100644 --- a/src/brpc/policy/consistent_hashing_load_balancer.h +++ b/src/brpc/policy/consistent_hashing_load_balancer.h @@ -36,9 +36,10 @@ enum ConsistentHashingLoadBalancerType { CONS_HASH_LB_MURMUR3 = 0, CONS_HASH_LB_MD5 = 1, CONS_HASH_LB_KETAMA = 2, + CONS_HASH_LB_REDIS_CLUSTER = 3, // Identify the last one. - CONS_HASH_LB_LAST = 3 + CONS_HASH_LB_LAST = 4 }; class ConsistentHashingLoadBalancer : public LoadBalancer { diff --git a/src/brpc/policy/redis_cluster_naming_service.cpp b/src/brpc/policy/redis_cluster_naming_service.cpp new file mode 100644 index 0000000000..0b3e0cb7f8 --- /dev/null +++ b/src/brpc/policy/redis_cluster_naming_service.cpp @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/policy/redis_cluster_naming_service.h" +#include +#include "brpc/log.h" +#include "bthread/bthread.h" +#include +#include +#include +#include // gethostbyname_r +#include // strtol +#include // std::string + +namespace brpc { +namespace policy { + +RedisClusterNamingService::RedisClusterNamingService() = default; + +std::pair GetServiceNameAndToken (const std::string& input) { + std::stringstream ss(input); + std::string service_name, token; + std::getline(ss, service_name, '|'); + std::getline(ss, token); + return {service_name, token}; +} + + +int RedisClusterNamingService::GetServers(const char *service_name_and_token, std::vector *servers) { + servers->clear(); + const auto service_name_token_pair = GetServiceNameAndToken (service_name_and_token); + const auto& service_name = service_name_token_pair.first; + const auto& token = service_name_token_pair.second; + + brpc::Channel channel; + + // Initialize the channel, NULL means using default options. + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_REDIS; + options.timeout_ms = 1000; + options.max_retry = 3; + if (!token.empty()) { + brpc::policy::RedisAuthenticator* auth = new brpc::policy::RedisAuthenticator(token); + options.auth = auth; + } + if (channel.Init(service_name.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + + brpc::RedisRequest request; + if (!request.AddCommand("CLUSTER SLOTS")) { + LOG(ERROR) << "Fail to add command"; + return -1; + } + + brpc::RedisResponse response; + brpc::Controller cntl; + channel.CallMethod(NULL, &cntl, &request, &response, NULL); + if (cntl.Failed()) { + LOG(ERROR) << "Fail to access redis, " << cntl.ErrorText(); + return -1; + } + const auto& reply = response.reply(0); + for (int i = 0; i < reply.size(); i++) { + const auto& slot_start = reply[i][0]; + const auto& slot_end = reply[i][1]; + const std::string tag = std::to_string(slot_start.integer()) + "-" + std::to_string(slot_end.integer()); + + const auto& ip = reply[i][2][0]; + const auto& port = reply[i][2][1]; + butil::EndPoint point; + if (butil::str2endpoint(ip.c_str(), port.integer(), &point) != 0 && + butil::hostname2endpoint(ip.c_str(), port.integer(), &point) != 0) { + LOG(ERROR) << "Invalid address=`" << ip.c_str() << ":" << port.integer() << '\''; + continue; + } + servers->emplace_back(point, tag); + } + return 0; +} + +void RedisClusterNamingService::Describe(std::ostream &os, const DescribeOptions &) const { + os << "redis_cluster"; + return; +} + +NamingService *RedisClusterNamingService::New() const { return new RedisClusterNamingService; } + +void RedisClusterNamingService::Destroy() { delete this; } + +} // namespace policy +} // namespace brpc diff --git a/src/brpc/policy/redis_cluster_naming_service.h b/src/brpc/policy/redis_cluster_naming_service.h new file mode 100644 index 0000000000..f8b63f0dc9 --- /dev/null +++ b/src/brpc/policy/redis_cluster_naming_service.h @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "brpc/periodic_naming_service.h" +#include "butil/unique_ptr.h" + +namespace brpc { +namespace policy { + +class RedisClusterNamingService : public PeriodicNamingService { + public: + RedisClusterNamingService(); + + private: + int GetServers(const char *service_name, std::vector *servers) override; + + void Describe(std::ostream &os, const DescribeOptions &) const override; + + NamingService *New() const override; + + void Destroy() override; +}; + +} // namespace policy +} // namespace brpc diff --git a/src/brpc/policy/redis_protocol.cpp b/src/brpc/policy/redis_protocol.cpp index 94524e8b75..300005f3d1 100644 --- a/src/brpc/policy/redis_protocol.cpp +++ b/src/brpc/policy/redis_protocol.cpp @@ -253,6 +253,35 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); } +static bool GetEndPoint(const char* moved_error, butil::EndPoint *out) { + int space_count = 0; + char ip[256]; + int port; + + for (int i = 0; moved_error[i] != '\0'; ++i) { + if (moved_error[i] == ' ') { + space_count++; + while (moved_error[i + 1] == ' ') { + i++; + } + } + + if (space_count == 2) { + int ip_end = 0; + int j = 0; + for (j = 0; moved_error[i + 1 + j] != ':'; ++j) { + ip[j] = moved_error[i + 1 + j]; + } + ip[j] = '\0'; + ip_end = i + 1 + j; + + sscanf(moved_error + ip_end + 1, "%d", &port); + break; + } + } + return butil::str2endpoint(ip, port, out) == 0 || butil::hostname2endpoint(ip, port, out); +} + void ProcessRedisResponse(InputMessageBase* msg_base) { const int64_t start_parse_us = butil::cpuwide_time_us(); DestroyingPtr msg(static_cast(msg_base)); @@ -274,7 +303,24 @@ void ProcessRedisResponse(InputMessageBase* msg_base) { span->set_response_size(msg->response.ByteSize()); span->set_start_parse_us(start_parse_us); } - const int saved_error = cntl->ErrorCode(); + int saved_error = cntl->ErrorCode(); + + while (msg->response.reply(0).is_error()) { + if (0 != std::strncmp(msg->response.reply(0).error_message(), "MOVED", 5)) { + break; + } + butil::EndPoint end_point; + if (!GetEndPoint(msg->response.reply(0).error_message(), &end_point)) { + break; + } + saved_error = EMOVED; + LOG(INFO) << "moved..: " << msg->response; + SocketId id; + SocketMapFind(end_point, &id); + cntl->set_tmp_single_socket_id(id); + break; + } + if (cntl->response() != NULL) { if (cntl->response()->GetDescriptor() != RedisResponse::descriptor()) { cntl->SetFailed(ERESPONSE, "Must be RedisResponse"); diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index d169afd8a9..05bea9fcb6 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -100,6 +100,14 @@ int SocketMapFind(const SocketMapKey& key, SocketId* id) { return -1; } +int SocketMapFind(const butil::EndPoint& key, SocketId* id) { + SocketMap* m = get_client_side_socket_map(); + if (m) { + return m->Find(key, id); + } + return -1; +} + void SocketMapRemove(const SocketMapKey& key) { SocketMap* m = get_client_side_socket_map(); if (m) { @@ -184,6 +192,10 @@ int SocketMap::Init(const SocketMapOptions& options) { LOG(ERROR) << "Fail to init _map"; return -1; } + if (_point_sock_map.init(_options.suggested_map_size, 70) != 0) { + LOG(ERROR) << "Fail to init _point_sock_map"; + return -1; + } if (_options.idle_timeout_second_dynamic != NULL || _options.idle_timeout_second > 0) { if (bthread_start_background(&_close_idle_thread, NULL, @@ -230,6 +242,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, // removing and inserting it again. But this would make error branches // below have to remove the entry before returning, which is // error-prone. We prefer code maintainability here. + _point_sock_map.erase(key.peer.addr); sc = NULL; } SocketId tmp_id; @@ -252,6 +265,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, ptr->SetHCRelatedRefHeld(); // set held status SingleConnection new_sc = { 1, ptr.release(), 0 }; _map[key] = new_sc; + _point_sock_map[key.peer.addr] = tmp_id; *id = tmp_id; bool need_to_create_bvar = false; if (FLAGS_show_socketmap_in_vars && !_exposed_in_bvar) { @@ -295,6 +309,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, } else { Socket* const s = sc->socket; _map.erase(key); + _point_sock_map.erase(key.peer.addr); bool need_to_create_bvar = false; if (FLAGS_show_socketmap_in_vars && !_exposed_in_bvar) { _exposed_in_bvar = true; @@ -324,6 +339,16 @@ int SocketMap::Find(const SocketMapKey& key, SocketId* id) { return -1; } +int SocketMap::Find(const butil::EndPoint& key, SocketId* id) { + BAIDU_SCOPED_LOCK(_mutex); + SocketId* sc = _point_sock_map.seek(key); + if (sc) { + *id = *sc; + return 0; + } + return -1; +} + void SocketMap::List(std::vector* ids) { ids->clear(); BAIDU_SCOPED_LOCK(_mutex); diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h index 893239461d..956023861d 100644 --- a/src/brpc/socket_map.h +++ b/src/brpc/socket_map.h @@ -96,6 +96,7 @@ inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) { // Find the SocketId associated with `key'. // Return 0 on found, -1 otherwise. int SocketMapFind(const SocketMapKey& key, SocketId* id); +int SocketMapFind(const butil::EndPoint& key, SocketId* id); // Called once when the Socket returned by SocketMapInsert() is not needed. void SocketMapRemove(const SocketMapKey& key); @@ -163,6 +164,7 @@ class SocketMap { void Remove(const SocketMapKey& key, SocketId expected_id); int Find(const SocketMapKey& key, SocketId* id); + int Find(const butil::EndPoint& key, SocketId* id); void List(std::vector* ids); void List(std::vector* pts); const SocketMapOptions& options() const { return _options; } @@ -190,6 +192,7 @@ class SocketMap { SocketMapOptions _options; butil::Mutex _mutex; Map _map; + butil::FlatMap _point_sock_map; bool _exposed_in_bvar; bvar::PassiveStatus* _this_map_bvar; bool _has_close_idle_thread; diff --git a/src/butil/redis_cluster_slot.cc b/src/butil/redis_cluster_slot.cc new file mode 100644 index 0000000000..49fc4fab24 --- /dev/null +++ b/src/butil/redis_cluster_slot.cc @@ -0,0 +1,88 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// A portable implementation of crc32c, optimized to handle +// four bytes at a time. + +#include "redis_cluster_slot.h" + +namespace butil { +static const uint16_t crc16tab[256]= { + 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, + 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, + 0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6, + 0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de, + 0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485, + 0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d, + 0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4, + 0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc, + 0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823, + 0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b, + 0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12, + 0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a, + 0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41, + 0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49, + 0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70, + 0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78, + 0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f, + 0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067, + 0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e, + 0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256, + 0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d, + 0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405, + 0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c, + 0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634, + 0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab, + 0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3, + 0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a, + 0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92, + 0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9, + 0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1, + 0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8, + 0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0 +}; + +static uint16_t crc16(const char *buf, int len) { + uint16_t crc = 0; + for (int counter = 0; counter < len; counter++) + crc = static_cast((crc<<8) ^ crc16tab[((crc>>8) ^ *buf++)&0x00FF]); + return crc; +} + +const static int SHARDS = 16383; + +uint64_t cal_slot_num(const std::string &key) { + + const auto *k = key.data(); + auto keylen = static_cast(key.size()); + + // start-end indexes of { and }. + int s = 0; + int e = 0; + + // Search the first occurrence of '{'. + for (s = 0; s < keylen; s++) + if (k[s] == '{') break; + + // No '{' ? Hash the whole key. This is the base case. + if (s == keylen) return crc16(k, keylen) & SHARDS; + + // '{' found? Check if we have the corresponding '}'. + for (e = s + 1; e < keylen; e++) + if (k[e] == '}') break; + + // No '}' or nothing between {} ? Hash the whole key. + if (e == keylen || e == s + 1) return crc16(k, keylen) & SHARDS; + + // If we are here there is both a { and a } on its right. Hash + // what is in the middle between { and }. + return crc16(k + s + 1, e - s - 1) & SHARDS; +} + +} // namespace butil diff --git a/src/butil/redis_cluster_slot.h b/src/butil/redis_cluster_slot.h new file mode 100644 index 0000000000..0b32331f27 --- /dev/null +++ b/src/butil/redis_cluster_slot.h @@ -0,0 +1,18 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + + +#pragma once + +#include +#include + +namespace butil { + uint64_t cal_slot_num(const std::string &key); +} // namespace butil \ No newline at end of file