Skip to content

Commit fd3c971

Browse files
committed
allow multiple queries in process stream tool
1 parent f68d883 commit fd3c971

File tree

1 file changed

+34
-25
lines changed

1 file changed

+34
-25
lines changed

tools/process_stream.cpp

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,28 @@ void track_insertions(uint64_t total, GraphSketchDriver<CCSketchAlg> *driver,
6161
}
6262

6363
int main(int argc, char **argv) {
64-
if (argc != 4) {
65-
std::cout << "ERROR: Incorrect number of arguments!" << std::endl;
66-
std::cout << "Arguments: stream_file, graph_workers, reader_threads" << std::endl;
64+
if (argc != 5) {
65+
std::cerr << "ERROR: Incorrect number of arguments!" << std::endl;
66+
std::cerr << "Arguments: stream_file, num_queries, graph_workers, reader_threads" << std::endl;
6767
exit(EXIT_FAILURE);
6868
}
6969

7070
shutdown = false;
7171
std::string stream_file = argv[1];
72-
int num_threads = std::atoi(argv[2]);
72+
int num_queries = std::atoi(argv[2]);
73+
if (num_queries < 1 || num_queries > 1000) {
74+
std::cerr << "ERROR: Invalid number of queries! Must be > 0 and <= 1000" << std::endl;
75+
exit(EXIT_FAILURE);
76+
}
77+
int num_threads = std::atoi(argv[3]);
7378
if (num_threads < 1) {
74-
std::cout << "ERROR: Invalid number of graph workers! Must be > 0." << std::endl;
79+
std::cerr << "ERROR: Invalid number of graph workers! Must be > 0." << std::endl;
7580
exit(EXIT_FAILURE);
7681
}
77-
size_t reader_threads = std::atol(argv[3]);
82+
size_t reader_threads = std::atol(argv[4]);
83+
84+
double query_percent = 1.0 / num_queries;
85+
size_t queries_in_stream = num_queries - 1;
7886

7987
BinaryFileStream stream(stream_file);
8088
node_id_t num_nodes = stream.vertices();
@@ -85,23 +93,41 @@ int main(int argc, char **argv) {
8593
std::cout << std::endl;
8694

8795
auto driver_config = DriverConfiguration().gutter_sys(CACHETREE).worker_threads(num_threads);
88-
auto cc_config = CCAlgConfiguration().batch_factor(1);
96+
auto cc_config = CCAlgConfiguration().batch_factor(1.0/5);
8997
CCSketchAlg cc_alg{num_nodes, get_seed(), cc_config};
9098
GraphSketchDriver<CCSketchAlg> driver{&cc_alg, &stream, driver_config, reader_threads};
9199

92100
auto ins_start = std::chrono::steady_clock::now();
93101
std::thread querier(track_insertions, num_updates, &driver, ins_start);
94102

103+
for (size_t q = 0; q < queries_in_stream; q++) {
104+
driver.process_stream_until((q+1) * query_percent * num_updates);
105+
auto cc_start = std::chrono::steady_clock::now();
106+
driver.prep_query(CONNECTIVITY);
107+
auto CC_num = cc_alg.connected_components().size();
108+
std::chrono::duration<double> cc_time = std::chrono::steady_clock::now() - cc_start;
109+
std::chrono::duration<double> flush_time = driver.flush_end - driver.flush_start;
110+
std::chrono::duration<double> cc_alg_time = cc_alg.cc_alg_end - cc_alg.cc_alg_start;
111+
112+
std::cout << "Query " << q + 1 << std::endl;
113+
std::cout << "Total CC query latency: " << cc_time.count() << std::endl;
114+
std::cout << " Flush Gutters(sec): " << flush_time.count() << std::endl;
115+
std::cout << " Boruvka's Algorithm(sec): " << cc_alg_time.count() << std::endl;
116+
std::cout << "Connected Components: " << CC_num << std::endl;
117+
}
118+
119+
// finish the stream
95120
driver.process_stream_until(END_OF_STREAM);
96121

97122
auto cc_start = std::chrono::steady_clock::now();
98123
driver.prep_query(CONNECTIVITY);
99124
auto CC_num = cc_alg.connected_components().size();
100125
std::chrono::duration<double> cc_time = std::chrono::steady_clock::now() - cc_start;
101-
std::chrono::duration<double> insert_time = driver.flush_end - ins_start;
102126
std::chrono::duration<double> flush_time = driver.flush_end - driver.flush_start;
103127
std::chrono::duration<double> cc_alg_time = cc_alg.cc_alg_end - cc_alg.cc_alg_start;
104128

129+
130+
std::chrono::duration<double> insert_time = driver.flush_end - ins_start;
105131
shutdown = true;
106132
querier.join();
107133

@@ -113,21 +139,4 @@ int main(int argc, char **argv) {
113139
std::cout << " Boruvka's Algorithm(sec): " << cc_alg_time.count() << std::endl;
114140
std::cout << "Connected Components: " << CC_num << std::endl;
115141
std::cout << "Maximum Memory Usage(MiB): " << get_max_mem_used() << std::endl;
116-
117-
118-
cc_start = std::chrono::steady_clock::now();
119-
driver.prep_query(CONNECTIVITY);
120-
CC_num = cc_alg.connected_components().size();
121-
cc_time = std::chrono::steady_clock::now() - cc_start;
122-
insert_time = driver.flush_end - ins_start;
123-
flush_time = driver.flush_end - driver.flush_start;
124-
cc_alg_time = cc_alg.cc_alg_end - cc_alg.cc_alg_start;
125-
126-
std::cout << "SECOND QUERY" << std::endl;
127-
std::cout << "Total CC query latency: " << cc_time.count() << std::endl;
128-
std::cout << " Flush Gutters(sec): " << flush_time.count() << std::endl;
129-
std::cout << " Boruvka's Algorithm(sec): " << cc_alg_time.count() << std::endl;
130-
std::cout << "Connected Components: " << CC_num << std::endl;
131-
std::cout << "Maximum Memory Usage(MiB): " << get_max_mem_used() << std::endl;
132-
133142
}

0 commit comments

Comments
 (0)