From ee184eaae92aecbb6baa3d41628244ecdb76a003 Mon Sep 17 00:00:00 2001 From: yeshenyong <52573793+yeshenyong@users.noreply.github.com> Date: Sun, 1 Mar 2026 17:47:14 +0800 Subject: [PATCH 1/5] [bugfix] fix GStage data racing (#583) --- src/GraphCtrl/GraphStage/GStage.cpp | 19 ++++++++----------- src/GraphCtrl/GraphStage/GStage.h | 1 - 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/GraphCtrl/GraphStage/GStage.cpp b/src/GraphCtrl/GraphStage/GStage.cpp index 6c78a760..d0d21979 100644 --- a/src/GraphCtrl/GraphStage/GStage.cpp +++ b/src/GraphCtrl/GraphStage/GStage.cpp @@ -31,19 +31,16 @@ GStagePtr GStage::setThreshold(CInt threshold) { CVoid GStage::waiting() { - { - CGRAPH_LOCK_GUARD wm(waiting_mutex_); - cur_value_++; - if (cur_value_ >= threshold_) { - // 如果超过了 threshold,则打开全部 - launch(param_); - cur_value_ = 0; - locker_.cv_.notify_all(); - return; - } + CGRAPH_UNIQUE_LOCK lk(locker_.mtx_); + cur_value_++; + if (cur_value_ >= threshold_) { + // 如果超过了 threshold,则打开全部 + launch(param_); + cur_value_ = 0; + locker_.cv_.notify_all(); + return; } - CGRAPH_UNIQUE_LOCK lk(locker_.mtx_); locker_.cv_.wait(lk, [this] { return 0 == cur_value_ || cur_value_ >= threshold_; }); diff --git a/src/GraphCtrl/GraphStage/GStage.h b/src/GraphCtrl/GraphStage/GStage.h index 18e53594..fdc26dbb 100644 --- a/src/GraphCtrl/GraphStage/GStage.h +++ b/src/GraphCtrl/GraphStage/GStage.h @@ -63,7 +63,6 @@ class GStage : public GStageObject { CInt cur_value_ { 0 }; // 当前值 GStageParamPtr param_ { nullptr }; // 参数信息 UCvMutex locker_; - std::mutex waiting_mutex_; friend class GStageManager; friend class CAllocator; From 6cad07e03b9f68b053783fc10ad4040f1918a67a Mon Sep 17 00:00:00 2001 From: Chunel Date: Sun, 1 Mar 2026 18:16:54 +0800 Subject: [PATCH 2/5] [bugfix] fix destory wait too long bug --- LICENSE | 2 +- src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/LICENSE b/LICENSE index e2b24181..2deadfc2 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2025 Chunel Feng +Copyright (c) 2026 Chunel Feng Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h index c8a56c0f..4b2e7b6e 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h @@ -119,7 +119,7 @@ class UThreadPrimary : public UThreadBase { if (cur_empty_epoch_ >= config_->primary_thread_busy_epoch_) { CGRAPH_UNIQUE_LOCK lk(mutex_); cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_), - [this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty(); }); + [this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !done_; }); cur_empty_epoch_ = 0; } } From bf7cb4b8724a41c67703373f2b537db05c7174eb Mon Sep 17 00:00:00 2001 From: yeshenyong <52573793+yeshenyong@users.noreply.github.com> Date: Sun, 1 Mar 2026 20:20:28 +0800 Subject: [PATCH 3/5] [bugfix] fix GSome read node done data racing (#584) --- src/GraphCtrl/GraphElement/GElement.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/GraphCtrl/GraphElement/GElement.h b/src/GraphCtrl/GraphElement/GElement.h index 7c4527d0..c68f8a22 100644 --- a/src/GraphCtrl/GraphElement/GElement.h +++ b/src/GraphCtrl/GraphElement/GElement.h @@ -445,7 +445,7 @@ class GElement : public GElementObject, private: /** 状态相关信息 */ - CBool done_ { false }; // 判定被执行结束 + std::atomic done_ { false }; // 判定被执行结束 CBool visible_ { true }; // 判定可见的,如果被删除的话,则认为是不可见的 CBool is_init_ { false }; // 判断是否init GElementType element_type_ { GElementType::ELEMENT }; // 用于区分element 内部类型 From 404ef378e42b12895fb75d3e2f119a603c02cab6 Mon Sep 17 00:00:00 2001 From: Chunel Date: Sun, 1 Mar 2026 20:26:58 +0800 Subject: [PATCH 4/5] [chore] optimize code style --- src/GraphCtrl/GraphElement/GElement.h | 6 +++--- src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl | 6 +++--- src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/GraphCtrl/GraphElement/GElement.h b/src/GraphCtrl/GraphElement/GElement.h index c68f8a22..7d4247b2 100644 --- a/src/GraphCtrl/GraphElement/GElement.h +++ b/src/GraphCtrl/GraphElement/GElement.h @@ -445,11 +445,11 @@ class GElement : public GElementObject, private: /** 状态相关信息 */ - std::atomic done_ { false }; // 判定被执行结束 + std::atomic done_ { false }; // 判定被执行结束 CBool visible_ { true }; // 判定可见的,如果被删除的话,则认为是不可见的 CBool is_init_ { false }; // 判断是否init GElementType element_type_ { GElementType::ELEMENT }; // 用于区分element 内部类型 - std::atomic cur_state_ { GElementState::NORMAL }; // 当前执行状态 + std::atomic cur_state_ { GElementState::NORMAL }; // 当前执行状态 internal::GElementShape shape_ { internal::GElementShape::NORMAL }; // 元素位置类型 /** 配置相关信息 */ @@ -468,7 +468,7 @@ class GElement : public GElementObject, CBool is_prepared_ { false }; // 判断是否已经执行过 prepareRun() 方法 /** 图相关信息 */ - std::atomic left_depend_ { 0 }; // 当 left_depend_ 值为0的时候,即可以执行该element信息 + std::atomic left_depend_ { 0 }; // 当 left_depend_ 值为0的时候,即可以执行该element信息 USmallVector run_before_ {}; // 被依赖的节点(后继) USmallVector dependence_ {}; // 依赖的节点信息(前驱) GElement* belong_ { nullptr }; // 从属的element 信息,如为nullptr,则表示从属于 pipeline diff --git a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl index 4bb653ec..94847139 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl +++ b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl @@ -43,7 +43,7 @@ CStatus GSome::run() { CGRAPH_FUNCTION_BEGIN left_num_ = TriggerNum; // 还剩n个,就完成当前GSome的执行逻辑 - cur_status_ = CStatus(); + cur_status_.reset(); /** * 1. 并发的执行,其中的所有逻辑信息 @@ -53,8 +53,8 @@ CStatus GSome::run() { */ for (auto* element : group_elements_arr_) { thread_pool_->commit([this, element] { - auto curStatus = element->fatProcessor(CFunctionType::RUN); { + const auto& curStatus = element->fatProcessor(CFunctionType::RUN); CGRAPH_UNIQUE_LOCK lock(lock_); cur_status_ += curStatus; left_num_--; @@ -68,7 +68,7 @@ CStatus GSome::run() { return left_num_ <= 0 || cur_status_.isErr(); }); - for (GElementPtr element : group_elements_arr_) { + for (auto* element : group_elements_arr_) { if (!element->done_) { element->cur_state_.store(GElementState::TIMEOUT, std::memory_order_release); } diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h index 4b2e7b6e..abe5d995 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h @@ -254,8 +254,8 @@ class UThreadPrimary : public UThreadBase { } private: - CInt index_ {0}; // 线程index - std::atomic cur_empty_epoch_ {0 }; // 当前空转的轮数信息 + CInt index_ { 0 }; // 线程index + std::atomic cur_empty_epoch_ { 0 }; // 当前空转的轮数信息 UWorkStealingQueue wsq_ {}; // 内部队列信息 std::vector* pool_threads_ {}; // 用于存放线程池中的线程信息 std::vector steal_targets_ {}; // 被偷的目标信息 From 5be39a8c4a180182ae950554fa9a02b02e8d82c9 Mon Sep 17 00:00:00 2001 From: Chunel Date: Sun, 1 Mar 2026 22:41:03 +0800 Subject: [PATCH 5/5] [test] add timer for test --- src/UtilsCtrl/Timer/UTimeCounter.h | 39 ++++++++++++++++++++++---- test/Functional/test-functional-01.cpp | 19 ++++++++++++- test/Functional/test-functional-02.cpp | 2 +- 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/src/UtilsCtrl/Timer/UTimeCounter.h b/src/UtilsCtrl/Timer/UTimeCounter.h index 67a4be8f..5a8032fc 100644 --- a/src/UtilsCtrl/Timer/UTimeCounter.h +++ b/src/UtilsCtrl/Timer/UTimeCounter.h @@ -23,19 +23,46 @@ class UTimeCounter : public UtilsObject { start_ts_ = std::chrono::steady_clock::now(); } - explicit UTimeCounter(const std::string& key) { - start_ts_ = std::chrono::steady_clock::now(); + + explicit UTimeCounter(const std::string& key, const CMSec minShowSpan = 0) { key_ = key; + start_ts_ = std::chrono::steady_clock::now(); + min_show_span_ = minShowSpan; + } + + + /** + * 获取间隔 + * @return + */ + CMSec getSpan() const { + const std::chrono::duration& span = std::chrono::steady_clock::now() - start_ts_; + return static_cast(span.count()); } + + /** + * 重置信息 + * @return + */ + CMSec reset() { + const std::chrono::duration& span = std::chrono::steady_clock::now() - start_ts_; + start_ts_ = std::chrono::steady_clock::now(); + return static_cast(span.count()); + } + + ~UTimeCounter() override { - std::chrono::duration span = std::chrono::steady_clock::now() - start_ts_; - CGraph::CGRAPH_ECHO("[%s]: time counter is : [%0.2lf] ms", key_.c_str(), span.count()); + const std::chrono::duration& span = std::chrono::steady_clock::now() - start_ts_; + if (static_cast(span.count()) >= min_show_span_) { + CGRAPH_ECHO("[%s]: time counter is : [%0.2lf] ms", key_.c_str(), span.count()); + } } private: - std::chrono::steady_clock::time_point start_ts_; - std::string key_; + std::chrono::steady_clock::time_point start_ts_ {}; + std::string key_ {}; + CMSec min_show_span_ {0}; }; CGRAPH_NAMESPACE_END diff --git a/test/Functional/test-functional-01.cpp b/test/Functional/test-functional-01.cpp index 5b3745ae..b3ea7d30 100644 --- a/test/Functional/test-functional-01.cpp +++ b/test/Functional/test-functional-01.cpp @@ -29,7 +29,24 @@ void test_functional_01() { { UTimeCounter counter("test_functional_01"); - status = pipeline->process(runTimes); + { + UTimeCounter ic("test_functional_init_01", CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL); + status += pipeline->init(); + } + + for (auto x = 0; x < runTimes; x++) { + UTimeCounter rc("test_functional_run_01", CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL); + status += pipeline->run(); + if (rc.getSpan() >= CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL) { + std::cout << " [timeout] test_functional_01 times = " << x << " span = " << rc.getSpan() << std::endl; + break; + } + } + + { + UTimeCounter dc("test_functional_destroy_01", CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL); + status += pipeline->destroy(); + } } if (status.isErr()) { diff --git a/test/Functional/test-functional-02.cpp b/test/Functional/test-functional-02.cpp index 577f9325..7f90f9a2 100644 --- a/test/Functional/test-functional-02.cpp +++ b/test/Functional/test-functional-02.cpp @@ -13,7 +13,7 @@ using namespace CGraph; void test_functional_02() { GPipelinePtr pipeline = GPipelineFactory::create(); CStatus status; - const int runTimes = 5000; + const int runTimes = 50000; GElementPtr a,b,c,d,e,f,g,h,i,j,k,l,m,n = nullptr; GElementPtr region1, region2, cluster1, cluster2 = nullptr;