Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,41 @@ CVoid GDynamicEngine::analysisDagType(const GSortedGElementPtrSet& elements) {
dag_type_ = internal::GEngineDagType::ALL_SERIAL;
} else if (total_element_arr_.size() == total_end_size_ && front_element_arr_.size() == total_end_size_) {
dag_type_ = internal::GEngineDagType::ALL_PARALLEL;
analysisParallelMatrix();
} else {
dag_type_ = internal::GEngineDagType::COMMON;
}
}


CVoid GDynamicEngine::analysisParallelMatrix() {
parallel_element_matrix_.clear();
const auto& config = thread_pool_->getConfig();
CSize thdSize = config.default_thread_size_ + config.secondary_thread_size_;
CGRAPH_THROW_EXCEPTION_BY_CONDITION(thdSize <= 0,
"default thread size cannot smaller than 1");

CSize taskNumPerThd = total_end_size_ / thdSize + (CSize)(0 != total_end_size_ % thdSize);
CGRAPH_THROW_EXCEPTION_BY_CONDITION(taskNumPerThd == 0,
"task number per thread is 0");
CGRAPH_THROW_EXCEPTION_BY_CONDITION(total_end_size_ <= 1,
"total end size <= 1, should not enter all parallel path");
if (1 == taskNumPerThd) {
// 如果线程数比 task数量都多,则直接放到一个 arr里就好了
parallel_element_matrix_.push_back(total_element_arr_);
return;
}

CSize curIndex = 0;
while (curIndex < total_end_size_) {
CSize curEnd = curIndex + taskNumPerThd < total_end_size_ ? curIndex + taskNumPerThd : total_end_size_ ;
GElementPtrArr curArr(total_element_arr_.data() + curIndex, total_element_arr_.data() + curEnd);
parallel_element_matrix_.push_back(curArr);
curIndex += taskNumPerThd;
}
}


CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) {
if (unlikely(cur_status_.isErr())) {
return;
Expand Down Expand Up @@ -207,11 +236,17 @@ CVoid GDynamicEngine::parallelRunAll() {
#else
CVoid GDynamicEngine::parallelRunAll() {
parallel_run_num_ = 0;
for (GElementPtr element : total_element_arr_) {
thread_pool_->execute([this, element] {
parallelRunOne(element);
}, element->binding_index_);
for (int i = 0; i < parallel_element_matrix_.size(); i++) {
const auto& curArr = parallel_element_matrix_[i];
for (auto element : curArr) {
thread_pool_->executeWithTid([this, element] {
parallelRunOne(element); },
1 == parallel_element_matrix_.size() ? CGRAPH_SECONDARY_THREAD_COMMON_ID : i,
element == curArr.front() || element == curArr.back(),
element == curArr.front());
}
}
thread_pool_->wakeupAllThread();

{
CGRAPH_UNIQUE_LOCK lock(locker_.mtx_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class GDynamicEngine : public GEngine {
*/
CVoid analysisDagType(const GSortedGElementPtrSet& elements);

/**
* 解析纯并行的情况下,元素矩阵
* @return
*/
CVoid analysisParallelMatrix();

/**
* 动态图运行
* @param
Expand Down Expand Up @@ -91,6 +97,8 @@ class GDynamicEngine : public GEngine {
CSize finished_end_size_ = 0; // 执行结束节点数量
CStatus cur_status_; // 当前全局的状态信息
std::atomic<CSize> parallel_run_num_ {0}; // 纯并行时,执行的个数信息
GElementPtrMat2D parallel_element_matrix_ {}; // 纯并行时,记录

internal::GEngineDagType dag_type_ = { internal::GEngineDagType::COMMON }; // 当前元素的排布形式

UCvMutex locker_;
Expand Down
15 changes: 15 additions & 0 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ class UThreadBase : public UThreadObject {
total_task_num_ = 0;
}


/**
* 唤醒当前线程
* @return
*/
CBool wakeup() {
CBool result = false;
if (!is_running_) {
cv_.notify_one();
result = true;
}
return result;
}


/**
* 执行单个消息
* @return
Expand Down
11 changes: 11 additions & 0 deletions src/UtilsCtrl/ThreadPool/UThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,15 @@ CVoid UThreadPool::monitor() {
}
}


CVoid UThreadPool::wakeupAllThread() {
for (auto& pt : primary_threads_) {
pt->wakeup();
}

for (auto& st : secondary_threads_) {
st->wakeup();
}
}

CGRAPH_NAMESPACE_END
18 changes: 18 additions & 0 deletions src/UtilsCtrl/ThreadPool/UThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ class UThreadPool : public UThreadObject {
CVoid execute(FunctionType&& task,
CIndex index = CGRAPH_DEFAULT_TASK_STRATEGY);

/**
* 异步写入特定thread id,执行信息
* @tparam FunctionType
* @param task
* @param tid
* @param enable
* @param lockable
* @return
*/
template<typename FunctionType>
CVoid executeWithTid(FunctionType&& task, CIndex tid, CBool enable, CBool lockable);

/**
* 执行任务组信息
* 取taskGroup内部ttl和入参ttl的最小值,为计算ttl标准
Expand Down Expand Up @@ -164,6 +176,12 @@ class UThreadPool : public UThreadObject {
*/
CStatus releaseSecondaryThread(CInt size);

/**
* 通知所有thread 开启
* @return
*/
CVoid wakeupAllThread();

protected:
/**
* 根据传入的策略信息,确定最终执行方式
Expand Down
18 changes: 12 additions & 6 deletions src/UtilsCtrl/ThreadPool/UThreadPool.inl
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@ auto UThreadPool::commitWithTid(const FunctionType& func, CIndex tid, CBool enab
std::packaged_task<ResultType()> task(std::move(func));
std::future<ResultType> result(task.get_future());

if (tid >= 0 && tid < config_.default_thread_size_) {
primary_threads_[tid]->pushTask(std::move(task), enable, lockable);
} else {
// 如果超出主线程的范围,则默认写入 pool 通用的任务队列中
task_queue_.push(func);
}
execute(std::move(task), tid, enable, lockable);
return result;
}

Expand Down Expand Up @@ -72,6 +67,17 @@ CVoid UThreadPool::execute(FunctionType&& task, CIndex index) {
}
}


template<typename FunctionType>
CVoid UThreadPool::executeWithTid(FunctionType&& task, CIndex tid, CBool enable, CBool lockable) {
if (likely(tid >= 0 && tid < config_.default_thread_size_)) {
primary_threads_[tid]->pushTask(std::forward<FunctionType>(task), enable, lockable);
} else {
// 如果超出主线程的范围,则默认写入 pool 通用的任务队列中
task_queue_.push(std::forward<FunctionType>(task));
}
}

CGRAPH_NAMESPACE_END

#endif // CGRAPH_UTHREADPOOL_INL
Loading