-
Notifications
You must be signed in to change notification settings - Fork 385
ipc::channel
mutouyun edited this page Dec 9, 2025
·
1 revision
多写多读(N-to-N)IPC通讯通道。多个生产者可以向多个消费者广播消息。
using channel = chan<relat::multi, relat::multi, trans::broadcast>;ipc::channel是ipc::chan的特化类型,配置为:
- 多个写者(multi writer):允许多个发送端
- 多个读者(multi reader):支持多个接收端
- 广播模式(broadcast):所有接收端都会收到相同的消息
| 特性 | ipc::route | ipc::channel |
|---|---|---|
| 写者数量 | 单个 | 多个 |
| 读者数量 | 多个 | 多个 |
| 使用场景 | 一对多广播 | 多对多通讯 |
| 线程安全 | 单写者,无需锁 | 多写者,内部使用锁 |
ipc::channel的接口与ipc::route完全相同。所有方法的使用方式和参数都一致。
class channel {
public:
channel() noexcept = default;
explicit channel(char const * name, unsigned mode = ipc::sender);
channel(prefix pref, char const * name, unsigned mode = ipc::sender);
channel(channel&& rhs) noexcept;
~channel();
// ... 所有方法与 ipc::route 相同 ...
// 详见:ipc::route 文档
};#include "libipc/ipc.h"
#include <thread>
#include <iostream>
#include <chrono>
// 生产者线程
void producer(int id) {
// 注意:channel作为sender时,通常也需要receiver模式来接收确认消息
ipc::channel ch { "my-channel", ipc::sender | ipc::receiver };
for (int i = 0; i < 5; ++i) {
std::string msg = "Producer " + std::to_string(id) +
": Message " + std::to_string(i);
// 尝试发送消息
while (!ch.send(msg)) {
// 等待接收者连接
// 注意:如果本身也是receiver,需要等待至少2个连接(包括自己)
ch.wait_for_recv(2);
}
std::cout << "Sent: " << msg << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
// 消费者线程
void consumer(int id) {
// channel作为receiver时,也可以发送消息(如确认消息)
ipc::channel ch { "my-channel", ipc::sender | ipc::receiver };
for (int i = 0; i < 10; ++i) { // 接收更多消息(2个生产者 × 5条)
auto buf = ch.recv(5000); // 5秒超时
if (buf.empty()) {
std::cout << "Consumer " << id << ": timeout" << std::endl;
continue;
}
char* str = buf.get<char*>();
std::cout << "Consumer " << id << " received: " << str << std::endl;
// 发送确认消息
std::string ack = "ACK from consumer " + std::to_string(id);
ch.try_send(ack);
}
}
int main() {
// 启动2个生产者和2个消费者
std::thread p1(producer, 1);
std::thread p2(producer, 2);
std::thread c1(consumer, 1);
std::thread c2(consumer, 2);
p1.join();
p2.join();
c1.join();
c2.join();
return 0;
}// 客户端:发送请求并等待响应
void client(int id) {
ipc::channel ch { "rpc-channel", ipc::sender | ipc::receiver };
ch.wait_for_recv(2); // 等待服务端就绪
// 发送请求
std::string request = "Request from client " + std::to_string(id);
ch.send(request);
std::cout << "Sent request: " << request << std::endl;
// 等待响应
auto buf = ch.recv(3000); // 3秒超时
if (!buf.empty()) {
std::cout << "Received response: " << buf.get<char*>() << std::endl;
}
}
// 服务端:接收请求并发送响应
void server() {
ipc::channel ch { "rpc-channel", ipc::sender | ipc::receiver };
while (true) {
// 接收请求
auto buf = ch.recv(10000); // 10秒超时
if (buf.empty()) {
std::cout << "Server timeout" << std::endl;
break;
}
char* request = buf.get<char*>();
std::cout << "Server received: " << request << std::endl;
// 处理请求并发送响应
std::string response = "Response to: " + std::string(request);
while (!ch.send(response)) {
ch.wait_for_recv(2);
}
}
}
int main() {
std::thread s(server);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::thread c1(client, 1);
std::thread c2(client, 2);
c1.join();
c2.join();
s.join();
return 0;
}channel支持自定义配置,可以实现单播模式的工作队列:
// 定义单播模式的channel(消息只被一个接收者消费)
using work_queue = ipc::chan<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast>;
// 生产者:添加任务到队列
void task_producer(int id) {
work_queue wq { "work-queue" };
for (int i = 0; i < 10; ++i) {
std::string task = "Task-" + std::to_string(id) + "-" + std::to_string(i);
while (!wq.send(task)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::cout << "Producer " << id << " added: " << task << std::endl;
}
}
// 工作者:从队列获取并处理任务
void worker(int id) {
work_queue wq { "work-queue", ipc::receiver };
while (true) {
auto buf = wq.recv(5000); // 5秒超时
if (buf.empty()) {
std::cout << "Worker " << id << ": no more tasks" << std::endl;
break;
}
char* task = buf.get<char*>();
std::cout << "Worker " << id << " processing: " << task << std::endl;
// 模拟任务处理
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
int main() {
// 启动3个工作者
std::thread w1(worker, 1);
std::thread w2(worker, 2);
std::thread w3(worker, 3);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 启动2个生产者
std::thread p1(task_producer, 1);
std::thread p2(task_producer, 2);
p1.join();
p2.join();
w1.join();
w2.join();
w3.join();
return 0;
}ipc::channel支持通过模板参数自定义行为:
// 定义不同类型的通道
template <ipc::relat Rp, ipc::relat Rc, ipc::trans Ts>
using chan = ipc::chan<Rp, Rc, Ts>;
// 1. 单播模式:消息只被一个接收者消费(工作队列)
using msg_queue = chan<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast>;
// 2. 点对点通道:单发送者,单接收者,单播
using msg_pipe = chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
// 3. 标准广播channel(默认)
using broadcast_channel = chan<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast>;根据源码,以下组合是被支持的:
// 单写单读 + 单播
chan<relat::single, relat::single, trans::unicast>
// 单写多读 + 单播
chan<relat::single, relat::multi, trans::unicast>
// 多写多读 + 单播(工作队列)
chan<relat::multi, relat::multi, trans::unicast>
// 单写多读 + 广播(即 ipc::route)
chan<relat::single, relat::multi, trans::broadcast>
// 多写多读 + 广播(即 ipc::channel)
chan<relat::multi, relat::multi, trans::broadcast>在使用ipc::channel时,如果节点同时作为sender和receiver,需要注意wait_for_recv()的参数:
ipc::channel ch { "my-channel", ipc::sender | ipc::receiver };
// 错误:等待1个连接会立即返回(因为自己就是一个receiver)
ch.wait_for_recv(1); // 立即返回
// 正确:需要等待至少2个连接(包括自己)
ch.wait_for_recv(2); // 等待另一个receiver连接原因:当channel以ipc::receiver模式创建时,它自身就算作一个接收者连接。
-
多写者开销:
ipc::channel支持多写者,内部使用互斥锁保护写操作,因此比ipc::route有额外开销 - 广播开销:广播模式下,每条消息会被所有接收者接收,内存开销与接收者数量成正比
- 单播优势:单播模式下,消息只被一个接收者消费,适合工作队列场景
-
ipc::channel的所有方法都是线程安全的(多写者支持) - 广播模式下,所有接收者都会收到相同的消息
- 单播模式下,每条消息只被一个接收者消费(先到先得)
- 如果队列满且发送超时,
send()会覆盖最旧的消息;try_send()会直接返回失败 - channel对象是移动语义,不支持拷贝
- 使用完毕后,channel会自动断开连接并清理资源
- 目前最多支持32个接收者
- 当节点同时作为sender和receiver时,注意
wait_for_recv()的参数计数
-
ipc::route- 单写多读路由 -
ipc::buffer- 数据缓冲区 - 全局常量定义 - 超时和队列大小配置