Skip to content

Commit 5e2007f

Browse files
committed
Merge branch 'main' into v1.0.0
2 parents 65496ef + 7c7575d commit 5e2007f

File tree

10 files changed

+149
-181
lines changed

10 files changed

+149
-181
lines changed

CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,17 @@ if (BUILD_EXE)
151151
add_executable(to_binary_format
152152
tools/to_binary_format.cpp)
153153
target_link_libraries(to_binary_format PRIVATE GraphZeppelinCommon)
154+
155+
# executable for processing a binary graph stream
156+
add_executable(process_stream
157+
tools/process_stream.cpp)
158+
target_link_libraries(process_stream PRIVATE GraphZeppelin)
159+
160+
# tool for validating that a binary stream appears correct
161+
add_executable(validate_binary_stream
162+
tools/validate_binary_stream.cpp
163+
)
164+
target_link_libraries(validate_binary_stream PRIVATE GraphZeppelin)
154165
endif()
155166

156167
if (BUILD_BENCH)

include/binary_graph_stream.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ class BadStreamException : public std::exception {
1111
}
1212
};
1313

14+
class StreamFailedException : public std::exception {
15+
virtual const char* what() const throw() {
16+
return "ERROR: read_data() encountered failed stream. Is stream file corrupted?";
17+
}
18+
};
19+
1420
// A class for reading from a binary graph stream
1521
class BinaryGraphStream {
1622
public:
@@ -56,6 +62,10 @@ class BinaryGraphStream {
5662
// set buf back to the beginning of the buffer read in data
5763
buf = start_buf;
5864
bin_file.read(buf, buf_size);
65+
66+
if (bin_file.fail() && !bin_file.eof()) {
67+
throw StreamFailedException();
68+
}
5969
}
6070
const uint32_t edge_size = sizeof(uint8_t) + 2 * sizeof(uint32_t); // size of binary encoded edge
6171
std::ifstream bin_file; // file to read from
@@ -72,7 +82,7 @@ class BinaryGraphStream_MT {
7282
public:
7383
BinaryGraphStream_MT(std::string file_name, uint32_t _b) {
7484
stream_fd = open(file_name.c_str(), O_RDONLY, S_IRUSR);
75-
if (stream_fd == -1) {
85+
if (!stream_fd) {
7686
throw BadStreamException();
7787
}
7888

@@ -167,7 +177,9 @@ class BinaryGraphStream_MT {
167177
data_to_read = end_of_file - read_off; // EOF truncates the read
168178

169179
while (data_read < data_to_read) {
170-
data_read += pread(stream_fd, buf, data_to_read, read_off + data_read); // perform the read
180+
int ret = pread(stream_fd, buf, data_to_read, read_off + data_read); // perform the read
181+
if (ret == -1) throw StreamFailedException();
182+
data_read += ret;
171183
}
172184
return data_read;
173185
}

src/graph_configuration.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@ std::ostream& operator<< (std::ostream &out, const GraphConfiguration &conf) {
5353
out << " Size of groups = " << conf._group_size << std::endl;
5454
out << " On disk data location = " << conf._disk_dir << std::endl;
5555
out << " Backup sketch to RAM = " << (conf._backup_in_mem? "ON" : "OFF") << std::endl;
56-
out << conf._gutter_conf << std::endl;
56+
out << conf._gutter_conf;
5757
return out;
5858
}

tools/experiment/BCHMK_Graph.cpp

Lines changed: 0 additions & 36 deletions
This file was deleted.

tools/experiment/BCHMK_post_proc.cpp

Lines changed: 0 additions & 68 deletions
This file was deleted.

tools/experiment/parallelism.cpp

Lines changed: 0 additions & 40 deletions
This file was deleted.

tools/experiment/run_experiments.sh

Lines changed: 0 additions & 23 deletions
This file was deleted.

tools/process_stream.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#include <graph.h>
2+
#include <binary_graph_stream.h>
3+
4+
int main(int argc, char **argv) {
5+
if (argc != 3) {
6+
std::cout << "ERROR: Incorrect number of arguments!" << std::endl;
7+
std::cout << "Arguments: stream_file, graph_workers" << std::endl;
8+
}
9+
10+
std::string stream_file = argv[1];
11+
int num_threads = std::atoi(argv[2]);
12+
if (num_threads < 1) {
13+
std::cout << "ERROR: Invalid number of graph workers! Must be > 0." << std::endl;
14+
}
15+
16+
BinaryGraphStream stream(stream_file, 1024*32);
17+
node_id_t num_nodes = stream.nodes();
18+
size_t num_updates = stream.edges();
19+
std::cout << "Processing stream: " << stream_file << std::endl;
20+
std::cout << "nodes = " << num_nodes << std::endl;
21+
std::cout << "num_updates = " << num_updates << std::endl;
22+
std::cout << std::endl;
23+
24+
auto config = GraphConfiguration().gutter_sys(STANDALONE).num_groups(num_threads);
25+
config.gutter_conf().gutter_factor(-4);
26+
Graph g{num_nodes, config};
27+
28+
auto ins_start = std::chrono::steady_clock::now();
29+
for (size_t e = 0; e < num_updates; e++)
30+
g.update(stream.get_edge());
31+
32+
auto cc_start = std::chrono::steady_clock::now();
33+
auto CC_num = g.connected_components().size();
34+
std::chrono::duration<double> insert_time = g.flush_end - ins_start;
35+
std::chrono::duration<double> cc_time = std::chrono::steady_clock::now() - cc_start;
36+
double num_seconds = insert_time.count();
37+
std::cout << "Total insertion time was: " << num_seconds << std::endl;
38+
std::cout << "Insertion rate was: " << stream.edges() / num_seconds << std::endl;
39+
std::cout << "CC query latency: " << cc_time.count() << std::endl;
40+
std::cout << "Connected Components: " << CC_num << std::endl;
41+
}

tools/to_binary_format.cpp

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,43 @@
11
#include <iostream>
22
#include <fstream>
33
#include <vector>
4+
#include <errno.h>
5+
#include <string.h>
46
#include <graph_zeppelin_common.h>
57

68
int main(int argc, char **argv) {
7-
if (argc != 2 && argc != 3) {
9+
if (argc < 3 || argc > 5) {
810
std::cout << "Incorrect number of arguments. "
9-
"Expected at either one or two but got " << argc-1 << std::endl;
10-
std::cout << "Arguments are: text_stream [update_type]" << std::endl;
11-
std::cout << "text_stream is the file to parse into binary format" << std::endl;
12-
std::cout << "update_type is a flag. If present then stream indicates insertions vs deletions" << std::endl;
11+
"Expected [2-4] but got " << argc-1 << std::endl;
12+
std::cout << "Arguments are: ascii_stream out_file_name [--update_type] [--verbose]" << std::endl;
13+
std::cout << "ascii_stream: The file to parse into binary format" << std::endl;
14+
std::cout << "out_file_name: Where the binary stream will be written" << std::endl;
15+
std::cout << "--update_type: If present then ascii stream indicates insertions vs deletions" << std::endl;
16+
std::cout << "--silent: If present then no warnings are printed when stream corrections are made" << std::endl;
17+
exit(EXIT_FAILURE);
1318
}
1419

1520
std::ifstream txt_file(argv[1]);
16-
std::ofstream out_file("binary_stream.data", std::ios_base::binary);
21+
if (!txt_file) {
22+
std::cerr << "ERROR: could not open input file!" << std::endl;
23+
exit(EXIT_FAILURE);
24+
}
25+
std::ofstream out_file(argv[2], std::ios_base::binary | std::ios_base::out);
26+
if (!out_file) {
27+
std::cerr << "ERROR: could not open output file! " << argv[2] << ": " << strerror(errno) << std::endl;
28+
exit(EXIT_FAILURE);
29+
}
1730

1831
bool update_type = false;
19-
if (argc == 3) {
20-
if (std::string(argv[2]) == "update_type")
32+
bool silent = false;
33+
for (int i = 3; i < argc; i++) {
34+
if (std::string(argv[i]) == "--update_type")
2135
update_type = true;
36+
else if (std::string(argv[i]) == "--silent") {
37+
silent = true;
38+
}
2239
else {
23-
std::cerr << "Did not recognize second argument! Expected 'update_type'";
40+
std::cerr << "Did not recognize argument: " << argv[i] << " Expected '--update_type' or '--silent'";
2441
return EXIT_FAILURE;
2542
}
2643
}
@@ -29,6 +46,16 @@ int main(int argc, char **argv) {
2946
edge_id_t num_edges;
3047

3148
txt_file >> num_nodes >> num_edges;
49+
50+
std::cout << "Parsed ascii stream header. . ." << std::endl;
51+
std::cout << "Number of nodes: " << num_nodes << std::endl;
52+
std::cout << "Number of updates: " << num_edges << std::endl;
53+
if (update_type)
54+
std::cout << "Assuming that update format is: upd_type src dst" << std::endl;
55+
else
56+
std::cout << "Assuming that update format is: src dst" << std::endl;
57+
58+
3259
out_file.write((char *) &num_nodes, sizeof(num_nodes));
3360
out_file.write((char *) &num_edges, sizeof(num_edges));
3461

@@ -48,14 +75,14 @@ int main(int argc, char **argv) {
4875
txt_file >> src >> dst;
4976

5077
if (src > dst) {
51-
if (u != adj_mat[dst][src - dst]) {
78+
if (!silent && u != adj_mat[dst][src - dst]) {
5279
std::cout << "WARNING: update " << u << " " << src << " " << dst;
5380
std::cout << " is double insert or delete before insert. Correcting." << std::endl;
5481
}
5582
u = adj_mat[dst][src - dst];
5683
adj_mat[dst][src - dst] = !adj_mat[dst][src - dst];
5784
} else {
58-
if (u != adj_mat[src][dst - src]) {
85+
if (!silent && u != adj_mat[src][dst - src]) {
5986
std::cout << "WARNING: update " << u << " " << src << " " << dst;
6087
std::cout << " is double insert or delete before insert. Correcting." << std::endl;
6188
}
@@ -68,3 +95,4 @@ int main(int argc, char **argv) {
6895
out_file.write((char *) &dst, sizeof(dst));
6996
}
7097
}
98+

0 commit comments

Comments
 (0)