-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRedisUtil.cpp
More file actions
158 lines (146 loc) · 5.93 KB
/
RedisUtil.cpp
File metadata and controls
158 lines (146 loc) · 5.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#include "stdafx.h"
#include "RedisUtil.h"
std::vector<std::unique_ptr<cpp_redis::client>> RedisUtil::availableClients;
std::mutex RedisUtil::clientsMutex;
std::mutex RedisUtil::initedMutex;
bool RedisUtil::initialized = false;
void RedisUtil::initClients() {
std::lock_guard<std::mutex> lock(initedMutex);
if (!initialized) {
for (int i = 0; i < AppConfig::REDIS_POOL_SIZE; ++i) {
auto client = std::make_unique<cpp_redis::client>();
//lambda表达式闭包:
client->connect(AppConfig::REDIS_HOST, AppConfig::REDIS_PORT, [](const std::string& host, std::size_t port, cpp_redis::client::connect_state status) {
if (status == cpp_redis::client::connect_state::ok) {
spdlog::info("连接到 Redis 服务器 {}:{} 成功", host, port);
}
else {
spdlog::error("连接到 Redis 服务器 {}:{} 失败", host, port);
}
});
client->auth("", [](const cpp_redis::reply& reply) {
if (reply.is_error()) {
spdlog::error("Redis 认证失败: {}", reply.as_string());
}
else {
spdlog::debug("Redis 认证成功");
}
});
client->select(AppConfig::REDIS_DB, [](const cpp_redis::reply& reply) {
if (reply.is_error()) {
spdlog::error("选择 Redis 数据库失败: {}", reply.as_string());
}
else {
spdlog::debug("选择 Redis 数据库{}成功", AppConfig::REDIS_DB);
}
});
availableClients.push_back(std::move(client));
}
initialized = true;
//spdlog::debug("Redis连接池初始化成功,共{}个连接", AppConfig::REDIS_POOL_SIZE);
}
}
std::optional<json> RedisUtil::pullMsg(const std::string& queue) {
std::unique_ptr<cpp_redis::client> clientPtr = getClient();
std::string msg;
clientPtr->rpop(queue, [&msg](const cpp_redis::reply& reply) {
if (reply.is_error()) {
spdlog::error("获取消息错误: {}", reply.as_string());
msg = "";
}
else {
msg = reply.as_array()[0].as_string();
}
});
clientPtr->sync_commit();
return msg.empty() ? std::nullopt : std::make_optional(json::parse(msg));
RedisUtil::returnClient(std::move(clientPtr));
}
void RedisUtil::pushMsg(const std::string& queue, const std::string& jsonStrItem) {
std::unique_ptr<cpp_redis::client> clientPtr = getClient();
clientPtr->lpushx(queue, jsonStrItem, [](const cpp_redis::reply& reply) {
if (reply.is_error()) {
spdlog::error("推送消息错误: {}", reply.as_string());
}
});
clientPtr->sync_commit();
RedisUtil::returnClient(std::move(clientPtr));
}
std::vector<std::string> RedisUtil::listMsg(const std::string& queue) {
std::unique_ptr<cpp_redis::client> clientPtr = getClient();
std::vector<std::string> items;
std::promise<void> promise;
std::future<void> future = promise.get_future();
clientPtr->lrange(queue, 0, -1, [&items, &promise](const cpp_redis::reply& reply) {
if (reply.is_error()) {
spdlog::error("获取列表错误: {}", reply.as_string());
}
else {
// 假设返回的是字符串列表
const std::vector<cpp_redis::reply>& replies = reply.as_array();
for (const auto& item : replies) {
items.push_back(item.as_string());
}
}
promise.set_value();
});
clientPtr->sync_commit();
future.wait(); // 等待异步回调完成
returnClient(std::move(clientPtr));
return items;
}
void RedisUtil::clearMsg(const std::string& queue) {
std::unique_ptr<cpp_redis::client> clientPtr = getClient();
clientPtr->del({ queue }, [](const cpp_redis::reply& reply) {
if (reply.is_error()) {
spdlog::error("清空列表错误: {}", reply.as_string());
}
});
clientPtr->sync_commit();
returnClient(std::move(clientPtr));
}
std::optional<std::string> RedisUtil::getStr(const std::string& key) {
std::unique_ptr<cpp_redis::client> clientPtr = getClient();
std::string value;
clientPtr->get(key, [&value](const cpp_redis::reply& reply) {
if (reply.is_error()) {
spdlog::error("获取字符串错误: {}", reply.as_string());
value = "";
}
else {
value = reply.as_string();
}
});
clientPtr->sync_commit();
RedisUtil::returnClient(std::move(clientPtr));
return value.empty() ? std::nullopt : std::make_optional(value);
}
void RedisUtil::setStr(const std::string& key, const std::string& value) {
std::unique_ptr<cpp_redis::client> clientPtr = getClient();
clientPtr->set(key, value, [](const cpp_redis::reply& reply) {
if (reply.is_error()) {
spdlog::error("设置字符串错误: {}", reply.as_string());
}
});
clientPtr->sync_commit();
RedisUtil::returnClient(std::move(clientPtr));
}
// 从连接池中获取一个 client 对象
std::unique_ptr<cpp_redis::client> RedisUtil::getClient() {
std::lock_guard<std::mutex> lock(clientsMutex);
if (!availableClients.empty()) {
std::unique_ptr<cpp_redis::client> client = std::move(availableClients.back());
availableClients.pop_back();
//spdlog::debug("借走了,还有{}个可用", RedisUtil::availableClients.size());
return client;
}
throw std::runtime_error("无连接可用.");
}
// 归还 client 对象到连接池
void RedisUtil::returnClient(std::unique_ptr<cpp_redis::client> client) {
std::lock_guard<std::mutex> lock(clientsMutex);
if (client) {
availableClients.push_back(std::move(client));
}
//spdlog::debug("增加了,还有{}个可用", RedisUtil::availableClients.size());
}