-
Notifications
You must be signed in to change notification settings - Fork 385
ipc::route
mutouyun edited this page Dec 9, 2025
·
1 revision
单写多读(1-to-N)IPC通讯路由。一个生产者可以向多个消费者广播消息。
using route = chan<relat::single, relat::multi, trans::broadcast>;ipc::route是ipc::chan的特化类型,配置为:
- 单个写者(single writer):只允许一个发送端
- 多个读者(multi reader):支持多个接收端
- 广播模式(broadcast):所有接收端都会收到相同的消息
class route {
public:
route() noexcept = default;
explicit route(char const * name, unsigned mode = ipc::sender);
route(prefix pref, char const * name, unsigned mode = ipc::sender);
route(route&& rhs) noexcept;
~route();
void swap(route& rhs) noexcept;
route& operator=(route rhs) noexcept;
char const * name() const noexcept;
void release() noexcept;
void clear() noexcept;
static void clear_storage(char const * name) noexcept;
static void clear_storage(prefix pref, char const * name) noexcept;
ipc::handle_t handle() const noexcept;
bool valid() const noexcept;
unsigned mode() const noexcept;
route clone() const;
bool connect(char const * name, unsigned mode = ipc::sender | ipc::receiver);
bool connect(prefix pref, char const * name, unsigned mode = ipc::sender | ipc::receiver);
bool reconnect(unsigned mode);
void disconnect();
std::size_t recv_count() const;
bool wait_for_recv(std::size_t r_count, std::uint64_t tm = invalid_value) const;
static bool wait_for_recv(char const * name, std::size_t r_count, std::uint64_t tm = invalid_value);
bool send(void const * data, std::size_t size, std::uint64_t tm = default_timeout);
bool send(buff_t const & buff, std::uint64_t tm = default_timeout);
bool send(std::string const & str, std::uint64_t tm = default_timeout);
bool try_send(void const * data, std::size_t size, std::uint64_t tm = default_timeout);
bool try_send(buff_t const & buff, std::uint64_t tm = default_timeout);
bool try_send(std::string const & str, std::uint64_t tm = default_timeout);
buff_t recv(std::uint64_t tm = invalid_value);
buff_t try_recv();
};| 成员 | 说明 |
|---|---|
route |
构造函数 |
~route |
析构函数 |
connect |
连接到指定名称的路由 |
reconnect |
使用新模式重新连接 |
disconnect |
断开连接 |
send |
发送数据(超时后强制发送) |
try_send |
尝试发送数据(超时返回失败) |
recv |
接收数据 |
try_recv |
尝试接收数据(非阻塞) |
recv_count |
获取当前接收者数量 |
wait_for_recv |
等待指定数量的接收者连接 |
valid |
检查route是否有效 |
name |
获取route名称 |
mode |
获取连接模式 |
clone |
克隆route对象 |
release |
释放内存(不等待断开连接) |
clear |
清理共享内存 |
clear_storage |
清理指定名称的共享内存 |
/*A*/ route() noexcept = default;
/*B*/ explicit route(char const * name, unsigned mode = ipc::sender);
/*C*/ route(prefix pref, char const * name, unsigned mode = ipc::sender);
/*D*/ route(route&& rhs) noexcept;- A. 默认构造函数,创建一个未连接的route
- B. 使用指定名称和模式构造route
- C. 使用前缀和名称构造route
- D. 移动构造函数
| 参数 | 说明 |
|---|---|
name |
char const *,route的名称 |
mode |
unsigned,连接模式:ipc::sender(发送者)或 ipc::receiver(接收者) |
pref |
prefix,名称前缀 |
rhs |
route&&,另一个route对象的右值引用 |
~route();析构函数。自动断开连接并清理资源。
/*A*/ bool connect(char const * name, unsigned mode = ipc::sender | ipc::receiver);
/*B*/ bool connect(prefix pref, char const * name, unsigned mode = ipc::sender | ipc::receiver);连接到指定名称的route。
| 参数 | 说明 |
|---|---|
name |
char const *,route的名称 |
pref |
prefix,名称前缀 |
mode |
unsigned,连接模式 |
| 返回值 | 说明 |
|---|---|
true |
连接成功 |
false |
连接失败 |
bool reconnect(unsigned mode);使用新模式重新连接。
| 参数 | 说明 |
|---|---|
mode |
unsigned,新的连接模式 |
| 返回值 | 说明 |
|---|---|
true |
重连成功 |
false |
重连失败 |
void disconnect();断开当前连接。
/*A*/ bool send(void const * data, std::size_t size, std::uint64_t tm = default_timeout);
/*B*/ bool send(buff_t const & buff, std::uint64_t tm = default_timeout);
/*C*/ bool send(std::string const & str, std::uint64_t tm = default_timeout);发送数据。如果超时,此函数会强制发送数据(可能覆盖旧数据)。
| 参数 | 说明 |
|---|---|
data |
void const *,要发送的数据指针 |
size |
std::size_t,数据大小(字节) |
buff |
buff_t const &,buffer对象 |
str |
std::string const &,字符串对象 |
tm |
std::uint64_t,超时时间(毫秒),默认为default_timeout (100ms) |
| 返回值 | 说明 |
|---|---|
true |
发送成功 |
false |
发送失败 |
/*A*/ bool try_send(void const * data, std::size_t size, std::uint64_t tm = default_timeout);
/*B*/ bool try_send(buff_t const & buff, std::uint64_t tm = default_timeout);
/*C*/ bool try_send(std::string const & str, std::uint64_t tm = default_timeout);尝试发送数据。如果超时,直接返回失败,不会强制发送。
| 参数 | 说明 |
|---|---|
同send
|
参数含义与send相同 |
| 返回值 | 说明 |
|---|---|
true |
发送成功 |
false |
发送失败或超时 |
buff_t recv(std::uint64_t tm = invalid_value);接收数据。阻塞等待直到有数据到达或超时。
| 参数 | 说明 |
|---|---|
tm |
std::uint64_t,超时时间(毫秒),默认为invalid_value(无限等待) |
| 返回值 | 说明 |
|---|---|
buff_t |
接收到的数据buffer,如果失败则为空buffer |
buff_t try_recv();尝试接收数据(非阻塞)。立即返回,不等待。
| 返回值 | 说明 |
|---|---|
buff_t |
接收到的数据buffer,如果没有数据则为空buffer |
std::size_t recv_count() const;获取当前连接的接收者数量。
| 返回值 | 说明 |
|---|---|
std::size_t |
当前接收者数量 |
/*A*/ bool wait_for_recv(std::size_t r_count, std::uint64_t tm = invalid_value) const;
/*B*/ static bool wait_for_recv(char const * name, std::size_t r_count, std::uint64_t tm = invalid_value);等待指定数量的接收者连接。
| 参数 | 说明 |
|---|---|
r_count |
std::size_t,期望的接收者数量 |
tm |
std::uint64_t,超时时间(毫秒),默认为invalid_value(无限等待) |
name |
char const *,route名称(静态方法) |
| 返回值 | 说明 |
|---|---|
true |
达到指定数量的接收者 |
false |
超时未达到 |
bool valid() const noexcept;检查route是否有效(已连接)。
char const * name() const noexcept;获取route的名称。
unsigned mode() const noexcept;获取当前连接模式。
route clone() const;克隆route对象,创建一个具有相同名称和模式的新连接。
void release() noexcept;释放内存,不等待连接断开。
void clear() noexcept;清理已打开handle下的共享内存文件。
/*A*/ static void clear_storage(char const * name) noexcept;
/*B*/ static void clear_storage(prefix pref, char const * name) noexcept;清理指定名称的共享内存文件。
#include "libipc/ipc.h"
#include <thread>
#include <iostream>
// 生产者线程
void producer() {
ipc::route rt { "my-route" }; // 默认为sender模式
// 等待至少2个接收者连接
if (!rt.wait_for_recv(2, 5000)) { // 等待5秒
std::cout << "Timeout: not enough receivers" << std::endl;
return;
}
// 发送消息
for (int i = 0; i < 10; ++i) {
std::string msg = "Message " + std::to_string(i);
if (rt.send(msg)) {
std::cout << "Sent: " << msg << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 发送结束信号
rt.send(ipc::buff_t('\0'));
}
// 消费者线程
void consumer(int id) {
ipc::route rt { "my-route", ipc::receiver };
while (true) {
auto buf = rt.recv();
if (buf.empty()) {
std::cout << "Consumer " << id << ": connection closed" << std::endl;
break;
}
char* str = buf.get<char*>();
if (str[0] == '\0') {
std::cout << "Consumer " << id << ": received exit signal" << std::endl;
break;
}
std::cout << "Consumer " << id << " received: " << str << std::endl;
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer, 1);
std::thread t3(consumer, 2);
std::thread t4(consumer, 3);
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}// 非阻塞发送
ipc::route sender_rt { "my-route" };
if (sender_rt.try_send("Quick message")) {
std::cout << "Sent immediately" << std::endl;
} else {
std::cout << "Queue full, message not sent" << std::endl;
}
// 非阻塞接收
ipc::route receiver_rt { "my-route", ipc::receiver };
auto buf = receiver_rt.try_recv();
if (!buf.empty()) {
std::cout << "Received: " << buf.get<char*>() << std::endl;
} else {
std::cout << "No message available" << std::endl;
}ipc::route rt { "my-route" };
// 发送时设置超时为500ms
if (!rt.send("Time-sensitive data", 500)) {
std::cout << "Send timeout" << std::endl;
}
// 接收时设置超时为1000ms
auto buf = rt.recv(1000);
if (buf.empty()) {
std::cout << "Receive timeout" << std::endl;
}-
ipc::route只支持一个发送者,多个发送者会导致未定义行为 - 所有接收者都会收到相同的消息(广播模式)
- 如果队列满且发送超时,
send()会覆盖最旧的消息;try_send()会直接返回失败 - 建议在发送前使用
wait_for_recv()确保有接收者在监听 - route对象是移动语义,不支持拷贝
- 使用完毕后,route会自动断开连接并清理资源
- 目前最多支持32个接收者