Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2025 Chunel Feng
Copyright (c) 2026 Chunel Feng

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
6 changes: 3 additions & 3 deletions src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,11 @@ class GElement : public GElementObject,

private:
/** 状态相关信息 */
CBool done_ { false }; // 判定被执行结束
std::atomic<CBool> done_ { false }; // 判定被执行结束
CBool visible_ { true }; // 判定可见的,如果被删除的话,则认为是不可见的
CBool is_init_ { false }; // 判断是否init
GElementType element_type_ { GElementType::ELEMENT }; // 用于区分element 内部类型
std::atomic<GElementState> cur_state_ { GElementState::NORMAL }; // 当前执行状态
std::atomic<GElementState> cur_state_ { GElementState::NORMAL }; // 当前执行状态
internal::GElementShape shape_ { internal::GElementShape::NORMAL }; // 元素位置类型

/** 配置相关信息 */
Expand All @@ -468,7 +468,7 @@ class GElement : public GElementObject,
CBool is_prepared_ { false }; // 判断是否已经执行过 prepareRun() 方法

/** 图相关信息 */
std::atomic<CSize> left_depend_ { 0 }; // 当 left_depend_ 值为0的时候,即可以执行该element信息
std::atomic<CSize> left_depend_ { 0 }; // 当 left_depend_ 值为0的时候,即可以执行该element信息
USmallVector<GElement *> run_before_ {}; // 被依赖的节点(后继)
USmallVector<GElement *> dependence_ {}; // 依赖的节点信息(前驱)
GElement* belong_ { nullptr }; // 从属的element 信息,如为nullptr,则表示从属于 pipeline
Expand Down
6 changes: 3 additions & 3 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ CStatus GSome<TriggerNum>::run() {
CGRAPH_FUNCTION_BEGIN

left_num_ = TriggerNum; // 还剩n个,就完成当前GSome的执行逻辑
cur_status_ = CStatus();
cur_status_.reset();

/**
* 1. 并发的执行,其中的所有逻辑信息
Expand All @@ -53,8 +53,8 @@ CStatus GSome<TriggerNum>::run() {
*/
for (auto* element : group_elements_arr_) {
thread_pool_->commit([this, element] {
auto curStatus = element->fatProcessor(CFunctionType::RUN);
{
const auto& curStatus = element->fatProcessor(CFunctionType::RUN);
CGRAPH_UNIQUE_LOCK lock(lock_);
cur_status_ += curStatus;
left_num_--;
Expand All @@ -68,7 +68,7 @@ CStatus GSome<TriggerNum>::run() {
return left_num_ <= 0 || cur_status_.isErr();
});

for (GElementPtr element : group_elements_arr_) {
for (auto* element : group_elements_arr_) {
if (!element->done_) {
element->cur_state_.store(GElementState::TIMEOUT, std::memory_order_release);
}
Expand Down
19 changes: 8 additions & 11 deletions src/GraphCtrl/GraphStage/GStage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,16 @@ GStagePtr GStage::setThreshold(CInt threshold) {


CVoid GStage::waiting() {
{
CGRAPH_LOCK_GUARD wm(waiting_mutex_);
cur_value_++;
if (cur_value_ >= threshold_) {
// 如果超过了 threshold,则打开全部
launch(param_);
cur_value_ = 0;
locker_.cv_.notify_all();
return;
}
CGRAPH_UNIQUE_LOCK lk(locker_.mtx_);
cur_value_++;
if (cur_value_ >= threshold_) {
// 如果超过了 threshold,则打开全部
launch(param_);
cur_value_ = 0;
locker_.cv_.notify_all();
return;
}

CGRAPH_UNIQUE_LOCK lk(locker_.mtx_);
locker_.cv_.wait(lk, [this] {
return 0 == cur_value_ || cur_value_ >= threshold_;
});
Expand Down
1 change: 0 additions & 1 deletion src/GraphCtrl/GraphStage/GStage.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class GStage : public GStageObject {
CInt cur_value_ { 0 }; // 当前值
GStageParamPtr param_ { nullptr }; // 参数信息
UCvMutex locker_;
std::mutex waiting_mutex_;

friend class GStageManager;
friend class CAllocator;
Expand Down
6 changes: 3 additions & 3 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class UThreadPrimary : public UThreadBase {
if (cur_empty_epoch_ >= config_->primary_thread_busy_epoch_) {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_),
[this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty(); });
[this] { return 0 == cur_empty_epoch_ || !wsq_.isEmpty() || !done_; });
cur_empty_epoch_ = 0;
}
}
Expand Down Expand Up @@ -254,8 +254,8 @@ class UThreadPrimary : public UThreadBase {
}

private:
CInt index_ {0}; // 线程index
std::atomic<CInt> cur_empty_epoch_ {0 }; // 当前空转的轮数信息
CInt index_ { 0 }; // 线程index
std::atomic<CInt> cur_empty_epoch_ { 0 }; // 当前空转的轮数信息
UWorkStealingQueue<UTask> wsq_ {}; // 内部队列信息
std::vector<UThreadPrimary *>* pool_threads_ {}; // 用于存放线程池中的线程信息
std::vector<CInt> steal_targets_ {}; // 被偷的目标信息
Expand Down
39 changes: 33 additions & 6 deletions src/UtilsCtrl/Timer/UTimeCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,46 @@ class UTimeCounter : public UtilsObject {
start_ts_ = std::chrono::steady_clock::now();
}

explicit UTimeCounter(const std::string& key) {
start_ts_ = std::chrono::steady_clock::now();

explicit UTimeCounter(const std::string& key, const CMSec minShowSpan = 0) {
key_ = key;
start_ts_ = std::chrono::steady_clock::now();
min_show_span_ = minShowSpan;
}


/**
* 获取间隔
* @return
*/
CMSec getSpan() const {
const std::chrono::duration<double, std::milli>& span = std::chrono::steady_clock::now() - start_ts_;
return static_cast<CMSec>(span.count());
}


/**
* 重置信息
* @return
*/
CMSec reset() {
const std::chrono::duration<double, std::milli>& span = std::chrono::steady_clock::now() - start_ts_;
start_ts_ = std::chrono::steady_clock::now();
return static_cast<CMSec>(span.count());
}


~UTimeCounter() override {
std::chrono::duration<double, std::milli> span = std::chrono::steady_clock::now() - start_ts_;
CGraph::CGRAPH_ECHO("[%s]: time counter is : [%0.2lf] ms", key_.c_str(), span.count());
const std::chrono::duration<double, std::milli>& span = std::chrono::steady_clock::now() - start_ts_;
if (static_cast<CMSec>(span.count()) >= min_show_span_) {
CGRAPH_ECHO("[%s]: time counter is : [%0.2lf] ms", key_.c_str(), span.count());
}
}

private:
std::chrono::steady_clock::time_point start_ts_;
std::string key_;
std::chrono::steady_clock::time_point start_ts_ {};
std::string key_ {};
CMSec min_show_span_ {0};
};

CGRAPH_NAMESPACE_END
Expand Down
19 changes: 18 additions & 1 deletion test/Functional/test-functional-01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,24 @@ void test_functional_01() {

{
UTimeCounter counter("test_functional_01");
status = pipeline->process(runTimes);
{
UTimeCounter ic("test_functional_init_01", CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL);
status += pipeline->init();
}

for (auto x = 0; x < runTimes; x++) {
UTimeCounter rc("test_functional_run_01", CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL);
status += pipeline->run();
if (rc.getSpan() >= CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL) {
std::cout << " [timeout] test_functional_01 times = " << x << " span = " << rc.getSpan() << std::endl;
break;
}
}

{
UTimeCounter dc("test_functional_destroy_01", CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL);
status += pipeline->destroy();
}
}

if (status.isErr()) {
Expand Down
2 changes: 1 addition & 1 deletion test/Functional/test-functional-02.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using namespace CGraph;
void test_functional_02() {
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
const int runTimes = 5000;
const int runTimes = 50000;

GElementPtr a,b,c,d,e,f,g,h,i,j,k,l,m,n = nullptr;
GElementPtr region1, region2, cluster1, cluster2 = nullptr;
Expand Down
Loading