中文 | English Readme | deepwiki
CGraph, short for Color Graph, is a cross-platform DAG computing framework. It is written by C++11 without any third-party dependencies, Python APIs are also supported.
With the scheduling via GPipeline, the purpose of sequential and concurrent executing elements is realized. You only need to inherit GNode class, implement the run() method in the subclass, and set the dependencies as needed to achieve the graphical execution of tasks.
At the same time, you can also control the graph conditional judgment, loop or concurrent execution logic by setting various GGroups, which containing multi-node information by themselves.
You can transfer your params in many scenes. It is also possible to extend the functions of the above elements horizontally by adding GAspect, to enhance the functions of individual nodes by introducing various GAdapter, or to enrich pipeline schedule by GEvent.
C++ version
#include "CGraph.h"
using namespace CGraph;
class MyNode1 : public GNode {
public:
CStatus run() override {
printf("[%s], sleep for 1 second ...\n", this->getName().c_str());
CGRAPH_SLEEP_SECOND(1)
return CStatus();
}
};
class MyNode2 : public GNode {
public:
CStatus run() override {
printf("[%s], sleep for 2 second ...\n", this->getName().c_str());
CGRAPH_SLEEP_SECOND(2)
return CStatus();
}
};
int main() {
GPipelinePtr pipeline = GPipelineFactory::create();
GElementPtr a, b, c, d = nullptr;
pipeline->registerGElement<MyNode1>(&a, {}, "nodeA");
pipeline->registerGElement<MyNode2>(&b, {a}, "nodeB");
pipeline->registerGElement<MyNode1>(&c, {a}, "nodeC");
pipeline->registerGElement<MyNode2>(&d, {b, c}, "nodeD");
pipeline->process();
GPipelineFactory::remove(pipeline);
return 0;
}
As is shown on the picture, run a firstly. Then, run b and c parallelized. Run d at last after b and c finished.
Python version
import time
from datetime import datetime
from PyCGraph import GNode, GPipeline, CStatus
class MyNode1(GNode):
def run(self):
print("[{0}] {1}, enter MyNode1 run function. Sleep for 1 second ... ".format(datetime.now(), self.getName()))
time.sleep(1)
return CStatus()
class MyNode2(GNode):
def run(self):
print("[{0}] {1}, enter MyNode2 run function. Sleep for 2 second ... ".format(datetime.now(), self.getName()))
time.sleep(2)
return CStatus()
if __name__ == '__main__':
pipeline = GPipeline()
a, b, c, d = MyNode1(), MyNode2(), MyNode1(), MyNode2()
pipeline.registerGElement(a, set(), "nodeA")
pipeline.registerGElement(b, {a}, "nodeB")
pipeline.registerGElement(c, {a}, "nodeC")
pipeline.registerGElement(d, {b, c}, "nodeD")
pipeline.process()