-
Notifications
You must be signed in to change notification settings - Fork 385
ipc::sync::semaphore
mutouyun edited this page Dec 9, 2025
·
1 revision
进程间信号量,用于控制对共享资源的访问数量。支持命名和匿名两种模式。
namespace ipc {
namespace sync {
class semaphore {
public:
semaphore();
explicit semaphore(char const *name, std::uint32_t count = 0);
~semaphore();
void const *native() const noexcept;
void *native() noexcept;
bool valid() const noexcept;
bool open(char const *name, std::uint32_t count = 0) noexcept;
void close() noexcept;
void clear() noexcept;
static void clear_storage(char const * name) noexcept;
bool wait(std::uint64_t tm = ipc::invalid_value) noexcept;
bool post(std::uint32_t count = 1) noexcept;
};
} // namespace sync
} // namespace ipc| 成员 | 说明 |
|---|---|
semaphore |
构造函数 |
~semaphore |
析构函数 |
open |
打开一个命名的semaphore |
close |
关闭semaphore |
clear |
清理semaphore资源 |
clear_storage |
清理指定名称的semaphore存储 |
wait |
等待信号量(P操作,减1) |
post |
释放信号量(V操作,加N) |
valid |
检查semaphore是否有效 |
native |
获取原生句柄 |
/*A*/ semaphore();
/*B*/ explicit semaphore(char const *name, std::uint32_t count = 0);- A. 默认构造函数,创建一个匿名semaphore(仅用于进程内同步)
- B. 创建或打开一个命名的semaphore(用于进程间同步)
| 参数 | 说明 |
|---|---|
name |
char const *,semaphore的名称。使用相同名称可以在不同进程间共享semaphore |
count |
std::uint32_t,初始计数值。默认为0 |
~semaphore();析构函数。自动释放semaphore资源。
bool open(char const *name, std::uint32_t count = 0) noexcept;打开一个命名的semaphore。如果semaphore不存在,会自动创建并设置初始计数值。
| 参数 | 说明 |
|---|---|
name |
char const *,semaphore的名称 |
count |
std::uint32_t,初始计数值(仅在创建时有效) |
| 返回值 | 说明 |
|---|---|
true |
打开成功 |
false |
打开失败 |
void close() noexcept;关闭semaphore。释放相关资源。
void clear() noexcept;清理已打开semaphore的所有资源,包括共享内存。
static void clear_storage(char const * name) noexcept;静态方法。清理指定名称的semaphore存储资源。
| 参数 | 说明 |
|---|---|
name |
char const *,要清理的semaphore名称 |
bool wait(std::uint64_t tm = ipc::invalid_value) noexcept;等待信号量(P操作,减1)。如果当前计数为0,则阻塞等待直到计数大于0或超时。
| 参数 | 说明 |
|---|---|
tm |
std::uint64_t,超时时间(毫秒)。默认为ipc::invalid_value(无限等待) |
| 返回值 | 说明 |
|---|---|
true |
成功获得信号量(计数减1) |
false |
超时未获得信号量 |
bool post(std::uint32_t count = 1) noexcept;释放信号量(V操作,加N)。增加信号量的计数,唤醒等待的线程/进程。
| 参数 | 说明 |
|---|---|
count |
std::uint32_t,要增加的计数值。默认为1 |
| 返回值 | 说明 |
|---|---|
true |
操作成功 |
false |
操作失败 |
bool valid() const noexcept;检查semaphore是否有效(已打开)。
| 返回值 | 说明 |
|---|---|
true |
semaphore有效 |
false |
semaphore无效或未打开 |
/*A*/ void const *native() const noexcept;
/*B*/ void *native() noexcept;获取平台相关的原生semaphore句柄。
#include "libipc/semaphore.h"
#include "libipc/shm.h"
#include <iostream>
#include <thread>
constexpr int BUFFER_SIZE = 10;
struct shared_buffer {
int data[BUFFER_SIZE];
int in; // 生产者写入位置
int out; // 消费者读取位置
};
// 生产者
void producer(int id) {
ipc::sync::semaphore empty("empty_slots", BUFFER_SIZE); // 空槽位数
ipc::sync::semaphore full("full_slots", 0); // 满槽位数
ipc::sync::mutex mtx("buffer_mutex");
ipc::shm::handle shm("shared_buffer", sizeof(shared_buffer));
auto* buffer = static_cast<shared_buffer*>(shm.get());
for (int i = 0; i < 20; ++i) {
int item = id * 1000 + i;
// 等待空槽位
if (!empty.wait(5000)) {
std::cout << "Producer " << id << ": timeout waiting for empty slot" << std::endl;
continue;
}
// 加锁访问共享缓冲区
mtx.lock();
buffer->data[buffer->in] = item;
buffer->in = (buffer->in + 1) % BUFFER_SIZE;
std::cout << "Producer " << id << " produced: " << item << std::endl;
mtx.unlock();
// 增加满槽位计数
full.post();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// 消费者
void consumer(int id) {
ipc::sync::semaphore empty("empty_slots", BUFFER_SIZE);
ipc::sync::semaphore full("full_slots", 0);
ipc::sync::mutex mtx("buffer_mutex");
ipc::shm::handle shm("shared_buffer", sizeof(shared_buffer));
auto* buffer = static_cast<shared_buffer*>(shm.get());
for (int i = 0; i < 20; ++i) {
// 等待满槽位
if (!full.wait(5000)) {
std::cout << "Consumer " << id << ": timeout waiting for full slot" << std::endl;
continue;
}
// 加锁访问共享缓冲区
mtx.lock();
int item = buffer->data[buffer->out];
buffer->out = (buffer->out + 1) % BUFFER_SIZE;
std::cout << "Consumer " << id << " consumed: " << item << std::endl;
mtx.unlock();
// 增加空槽位计数
empty.post();
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
}
int main() {
// 初始化共享缓冲区
{
ipc::shm::handle shm("shared_buffer", sizeof(shared_buffer));
auto* buffer = static_cast<shared_buffer*>(shm.get());
buffer->in = 0;
buffer->out = 0;
}
std::thread p1(producer, 1);
std::thread p2(producer, 2);
std::thread c1(consumer, 1);
std::thread c2(consumer, 2);
p1.join();
p2.join();
c1.join();
c2.join();
// 清理
ipc::sync::semaphore::clear_storage("empty_slots");
ipc::sync::semaphore::clear_storage("full_slots");
ipc::sync::mutex::clear_storage("buffer_mutex");
return 0;
}// 限制同时访问资源的进程/线程数量
class resource_pool {
ipc::sync::semaphore sem_;
public:
explicit resource_pool(const char* name, int max_users)
: sem_(name, max_users) {}
bool acquire(std::uint64_t timeout_ms = ipc::invalid_value) {
return sem_.wait(timeout_ms);
}
void release() {
sem_.post();
}
};
// 使用示例
void worker(int id, resource_pool& pool) {
if (pool.acquire(2000)) { // 最多等待2秒
std::cout << "Worker " << id << " acquired resource" << std::endl;
// 使用资源
std::this_thread::sleep_for(std::chrono::milliseconds(500));
pool.release();
std::cout << "Worker " << id << " released resource" << std::endl;
} else {
std::cout << "Worker " << id << " timeout" << std::endl;
}
}
int main() {
resource_pool pool("db_connections", 3); // 最多3个并发连接
std::vector<std::thread> workers;
for (int i = 0; i < 10; ++i) {
workers.emplace_back(worker, i, std::ref(pool));
}
for (auto& t : workers) {
t.join();
}
return 0;
}// 使用信号量作为事件通知机制
void event_waiter() {
ipc::sync::semaphore event("event_signal", 0);
std::cout << "Waiting for event..." << std::endl;
if (event.wait(5000)) {
std::cout << "Event received!" << std::endl;
} else {
std::cout << "Event timeout!" << std::endl;
}
}
void event_signaler() {
std::this_thread::sleep_for(std::chrono::seconds(2));
ipc::sync::semaphore event("event_signal", 0);
std::cout << "Signaling event..." << std::endl;
event.post(); // 触发事件
}
int main() {
std::thread waiter(event_waiter);
std::thread signaler(event_signaler);
waiter.join();
signaler.join();
ipc::sync::semaphore::clear_storage("event_signal");
return 0;
}// 一次性唤醒多个等待者
void batch_wake_up() {
ipc::sync::semaphore sem("batch_sem", 0);
// 启动5个等待者
std::vector<std::thread> waiters;
for (int i = 0; i < 5; ++i) {
waiters.emplace_back([i, &sem]() {
std::cout << "Waiter " << i << " waiting..." << std::endl;
sem.wait();
std::cout << "Waiter " << i << " woke up!" << std::endl;
});
}
std::this_thread::sleep_for(std::chrono::seconds(1));
// 一次性唤醒所有等待者
std::cout << "Waking up all waiters..." << std::endl;
sem.post(5); // 增加5个计数
for (auto& t : waiters) {
t.join();
}
ipc::sync::semaphore::clear_storage("batch_sem");
}// 使用信号量模拟互斥锁(初始值为1)
ipc::sync::semaphore mutex_sem("mutex_sim", 1);
// 加锁
mutex_sem.wait();
// 临界区
// ...
// 解锁
mutex_sem.post();// 控制读者数量
ipc::sync::semaphore read_sem("readers", 5); // 最多5个并发读者
// 读者
read_sem.wait();
// 读取数据
// ...
read_sem.post();// 等待所有线程/进程到达屏障点
void barrier_sync(int thread_id, int total_threads) {
static ipc::sync::semaphore barrier("barrier_sem", 0);
static ipc::sync::mutex counter_mtx("counter_mtx");
static ipc::shm::handle shm("barrier_counter", sizeof(int));
auto* count = static_cast<int*>(shm.get());
// 增加到达计数
counter_mtx.lock();
(*count)++;
bool is_last = (*count == total_threads);
counter_mtx.unlock();
if (is_last) {
// 最后一个到达的线程唤醒所有等待者
std::cout << "Thread " << thread_id << " is last, releasing all" << std::endl;
barrier.post(total_threads - 1);
} else {
// 其他线程等待
std::cout << "Thread " << thread_id << " waiting at barrier" << std::endl;
barrier.wait();
}
std::cout << "Thread " << thread_id << " passed barrier" << std::endl;
}- 基于POSIX命名信号量
- 信号量名称需要以'/'开头
- 使用
sem_open(),sem_wait(),sem_post()等API
- 基于Windows Semaphore对象
- 通过命名Semaphore实现进程间同步
- 最大计数值受系统限制
-
初始计数值:
- 用作互斥锁时,设置为1
- 用作资源池时,设置为资源数量
- 用作事件通知时,设置为0
-
死锁避免:确保每个
wait()都有对应的post() - 计数溢出:注意不要让计数值超过平台限制
-
清理资源:程序退出前调用
clear()或clear_storage() - 超时使用:生产环境建议使用明确的超时值而非无限等待
- 命名规范:不同的semaphore使用不同的名称以避免冲突
-
原子性:
wait()和post()操作都是原子的
-
ipc::sync::mutex- 进程间互斥锁 -
ipc::sync::condition- 进程间条件变量