-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathManager.h
More file actions
189 lines (148 loc) · 5.77 KB
/
Manager.h
File metadata and controls
189 lines (148 loc) · 5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#ifndef MANAGER_H
#define MANAGER_H
#include <string>
#include <map>
#include <vector>
#include <thread>
#include <mutex>
#include <functional>
#include <memory>
#include <queue>
#include <chrono>
#include <any>
#include "Channel.h"
#include "ThreadPool.h"
using namespace std::chrono;
class Process;
class Event;
using EventHandler = std::function<void(std::shared_ptr<Event>)>;
template<typename T>
using ChannelHandler = std::function<void(T)>;
class Manager {
private:
Manager() : threadPool(std::make_unique<ThreadPool>(4)) {} // 假设线程池大小为4
std::map<std::string, std::shared_ptr<void>> channels;
std::map<std::string, std::shared_ptr<Process>> processes;
std::map<std::string, std::vector<EventHandler>> eventListeners;
std::map<std::string, std::vector<std::function<void(std::any)>>> channelListeners;
// event任务队列 eventTaskQueue
std::queue<std::pair<std::shared_ptr<Event>, EventHandler>> eventTaskQueue;
// channel处理线程池
std::unique_ptr<ThreadPool> threadPool;
// event互斥锁
std::mutex eventMutex;
// channel互斥锁
std::mutex channelMutex;
high_resolution_clock::time_point _start_time;
high_resolution_clock::duration _elapsed;
public:
Manager(const Manager &) = delete;
Manager &operator=(const Manager &) = delete;
static Manager &getInstance();
// 提供线程池访问接口
ThreadPool &getThreadPool() {
return *threadPool;
}
void subscribeEvent(const std::string &eventType, const EventHandler &handler);
void publishEvent(const std::string &eventType, std::shared_ptr<Event> event);
template<typename T>
void subscribeChannel(const std::string &channelName, std::function<void(T)> listener);
template<typename T>
void publishToChannel(const std::string &channelName, T data);
template<typename T>
Channel<T> &getOrCreateChannel(const std::string &channelName);
template<typename T>
void createChannel(const std::string &channelName);
void run(high_resolution_clock::duration runtime);
};
Manager &Manager::getInstance() {
static Manager instance;
return instance;
}
void Manager::subscribeEvent(const std::string &eventType, const EventHandler &handler) {
std::lock_guard<std::mutex> lock(eventMutex);
eventListeners[eventType].push_back(handler);
}
void Manager::publishEvent(const std::string &eventType, std::shared_ptr<Event> event) {
std::lock_guard<std::mutex> lock(eventMutex);
if (eventListeners.find(eventType) != eventListeners.end()) {
for (auto &handler: eventListeners[eventType]) {
// handler(event);
// 改为事件循环
eventTaskQueue.emplace(event, handler);
std::cout << "Task added to queue" << std::endl;
}
}
}
template<typename T>
void Manager::subscribeChannel(const std::string &channelName, std::function<void(T)> listener) {
// 将listener封装为接受std::any的函数,然后存储
std::function<void(std::any)> anyListener = [listener](std::any data) {
listener(std::any_cast<T>(data));
};
channelListeners[channelName].push_back(anyListener);
}
template<typename T>
void Manager::publishToChannel(const std::string &channelName, T data) {
if (channelListeners.find(channelName) != channelListeners.end()) {
for (auto &listener: channelListeners[channelName]) {
// 将数据封装为std::any类型
std::any anyData = data;
// 使用线程池异步执行监听器
std::cout << "Enqueueing channel listener" << std::endl;
getThreadPool().enqueue([listener, anyData]() {
// 在这个lambda表达式中调用监听器
listener(anyData);
});
// 同步调用监听器
// std::cout << "Calling channel listener" << std::endl;
// listener(anyData);
}
}
}
template<typename T>
Channel<T> &Manager::Manager::getOrCreateChannel(const std::string &channelName) {
std::lock_guard<std::mutex> lock(channelMutex);
auto it = channels.find(channelName);
if (it != channels.end()) {
return *std::static_pointer_cast<Channel<T>>(it->second);
} else {
auto channel = std::make_shared<Channel<T>>(channelName);
channels[channelName] = channel;
return *channel;
}
}
template<typename T>
void Manager::createChannel(const std::string &channelName) {
std::lock_guard<std::mutex> lock(channelMutex);
auto channel = std::make_shared<Channel<T>>(channelName);
channels[channelName] = channel;
}
// 事件循环
void Manager::run(high_resolution_clock::duration runtime) {
_start_time = high_resolution_clock::now();
// _elapsed 在循环内部更新
while (true) { // 修改循环条件
std::cout << "Event loop running" << std::endl;
std::cout << "Event queue size: " << eventTaskQueue.size() << std::endl;
// 在循环内部更新_elapsed,以确保能够获取最新的经过时间
_elapsed = high_resolution_clock::now() - _start_time;
std::cout << "Elapsed time: " << duration_cast<seconds>(_elapsed).count() << "s" << std::endl;
// 检查是否超过了指定的运行时间
if (_elapsed >= runtime) {
break; // 终止循环
}
if (eventTaskQueue.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
continue;
}
auto [event, handler] = eventTaskQueue.front();
std::cout << "Processing event task" << std::endl;
std::cout << "StatusChangeEvent " << std::endl;
handler(event);
std::cout << "event Task processed" << std::endl;
eventTaskQueue.pop();
}
std::cout << "Event loop stopped" << std::endl;
}
#endif // MANAGER_H