diff --git a/include/khop.h b/include/khop.h new file mode 100644 index 0000000..ae24b7d --- /dev/null +++ b/include/khop.h @@ -0,0 +1,84 @@ +// Copyright 2024 MIT +// Authors: Luc Gaitskell +#pragma once +#include "utils.h" +#include "graph.h" +#include +std::mt19937 gen_global(time(nullptr)); +// std::default_random_engine generator; +// std::uniform_real_distribution distribution(0.0,1.0); + +/** + * Content below from Miranda Cai + */ + +inline int steps() +{ + return 3; +} + +/** + * Creates the initial seeds. + * + * @param seeds_size size of initial sample + * @param graph_size size of original graph (has to be at least seeds_size) + * @return vector containing node ids + */ +inline vector get_initial_transits(vidType seeds_size, vidType graph_size) +{ + // set node_ids; + // vector n_ids; + // while (node_ids.size() < seeds_size) { + // auto sample_id = gen() % graph_size; + // node_ids.insert(sample_id); + // } + // n_ids.insert(n_ids.end(), node_ids.begin(), node_ids.end()); + vector n_ids; + for (int i = 0; i < seeds_size; i++) + { + n_ids.push_back(gen_global() % graph_size); + } + return n_ids; +} + +/** + * For given step, return number of samples to take. Step of -1 for original sample transits + */ +inline int sample_size(int step) +{ + if (step == -1) + return 1; + if (step == 0) + return 25; + return 10; + // if (step == -1) return 2; + // return 2; +} + +inline vidType sample_next_vbyte(Graph &g, vidType transit, std::mt19937 &gen) +{ + auto adj_transit = g.N_vbyte(transit, "streamvbyte"); + vidType src_degree = adj_transit.size(); + if (src_degree == 0) + { + return (numeric_limits::max)(); + } + int idx = gen() % src_degree; + return adj_transit.data()[idx]; +} + +inline vidType sample_next(Graph &g, vidType transit, std::mt19937 &gen) +{ + if (transit == (numeric_limits::max)()) + { + return (numeric_limits::max)(); + } + vidType src_degree = g.N(transit).size(); + if (src_degree == 0) + { + return (numeric_limits::max)(); + } + int idx = gen() % src_degree; + // int idx = 1; + return g.N(transit, idx); +} diff --git a/src/sampling_random_walk/Makefile b/src/sampling_random_walk/Makefile new file mode 100644 index 0000000..524a9c9 --- /dev/null +++ b/src/sampling_random_walk/Makefile @@ -0,0 +1,33 @@ +include ../common.mk +OBJS = graph.o VertexSet.o +CGOBJS = graph_compressed.o cgr_decoder.o vbyte_decoder.o +CGCUOBJS = graph_gpu_compressed.o cgr_decoder_gpu.o +COBJS = cgr_encoder.o unary_encoder.o vbyte_encoder.o +NVFLAGS += -dc +CXXFLAGS += -Wno-narrowing +#LIBS += $(SIMDCAI_LIB) +NVINCLUDES += -I./gpu_kernels +INCLUDES +=-I$(CILK_CLANG)/include/cilk +vpath %.cc ../common +vpath %.cc ../structure +vpath %.cu ../structure +# VPATH+=../common +BIN = ../../bin + +all: $(OBJS) khop_demo + +khop_demo: $(OBJS) $(CGOBJS) $(COBJS) compressor.o khop_demo.o + $(CXX) $(CXXFLAGS) $(INCLUDES) compressor.o khop_demo.o $(OBJS) $(CGOBJS) $(COBJS) -o $@ $(LIBS) + mv $@ $(BIN) + +rwalk_cpu: $(OBJS) $(CGOBJS) $(COBJS) compressor.o random_walk_cpu.o + $(CXX) $(CXXFLAGS) $(INCLUDES) compressor.o random_walk_cpu.o $(OBJS) $(CGOBJS) $(COBJS) -o $@ $(LIBS) + mv $@ $(BIN) + +rwalk_omp: $(OBJS) $(CGOBJS) $(COBJS) compressor.o random_walk_omp.o + $(CXX) $(CXXFLAGS) $(INCLUDES) compressor.o random_walk_omp.o $(OBJS) $(CGOBJS) $(COBJS) -o $@ $(LIBS) + mv $@ $(BIN) + + +clean: + rm *.o diff --git a/src/sampling_random_walk/README.md b/src/sampling_random_walk/README.md new file mode 100644 index 0000000..3128e2b --- /dev/null +++ b/src/sampling_random_walk/README.md @@ -0,0 +1,38 @@ +### Compressed: + +``` +make rwalk_cpu && ../../bin/rwalk_cpu /home/lgaitskell/data-xhchen/tester/vbyte streamvbyte 30 4000000 + +make rwalk_cpu && ../../bin/rwalk_cpu /home/lgaitskell/data-xhchen/livej/dag-streamvbyte streamvbyte 30 4000000 + +make rwalk_omp && ../../bin/rwalk_omp /home/lgaitskell/data-xhchen/livej/dag-streamvbyte streamvbyte 30 4000000 16 +``` + +### Uncompressed: + +``` +make rwalk_cpu && ../../bin/rwalk_cpu /home/lgaitskell/data-xhchen/livej/graph uncompressed 30 4000000 +## takes 6.86327 sec +make rwalk_omp && ../../bin/rwalk_omp /home/lgaitskell/data-xhchen/livej/graph uncompressed 30 4000000 16 +## takes 8.76526 sec +``` + +``` +interact -n 16 -p parallel -t 2:00:00 +cd scratch4/GraphAIBench/src/sampling_random_walk/ + +make rwalk_omp && /home/lgaitskell/intel/oneapi/vtune/2024.1/bin64/vtune -collect hotspots ../../bin/rwalk_omp /home/lgaitskell/data-xhchen/livej/graph 30 4000000 16 +``` + +Datasets: + +``` +~/data-xhchen/livej/graph +~/data-xhchen/orkut/graph +~/data-xhchen/twitter40/graph +~/data-xhchen/friendster/graph +~/data-xhchen/uk2007/graph +~/data-xhchen/gsh-2015/graph +~/data-xhchen/clueweb12/graph +~/data-xhchen/uk-2014-csgr/graph +``` diff --git a/src/sampling_random_walk/khop_demo.cc b/src/sampling_random_walk/khop_demo.cc new file mode 100644 index 0000000..760e674 --- /dev/null +++ b/src/sampling_random_walk/khop_demo.cc @@ -0,0 +1,74 @@ +#include "graph.h" +#include "compressor.hh" +#include "khop.h" + +void kHopSolver(Graph &g, int n_samples, int n_threads) +{ + vector inits = get_initial_transits(sample_size(-1) * n_samples, g.V()); + int step_count = sample_size(-1) * n_samples; + int total_count = step_count; + for (int step = 0; step < steps(); step++) + { + step_count *= sample_size(step); + total_count += step_count; + } + std::vector transits(total_count, 0); + for (int i = 0; i < inits.size(); i++) + { + transits[i] = inits[i]; + } + std::cout << "...initialized starting transits..." << std::endl; + + Timer t; + t.Start(); + // sample for defined number of steps + step_count = sample_size(-1) * n_samples; + int prev_step_count = n_samples; + int t_begin = 0; + int old_t_begin = 0; + for (int step = 0; step < steps(); step++) + { + std::cout << "STEP " << step << std::endl; + t_begin += step_count; + step_count *= sample_size(step); + prev_step_count *= sample_size(step - 1); + // sample every new transit in the step for every sample group + for (int idx = 0; idx < step_count; idx++) + { + int t_idx = t_begin + idx; + int old_t_idx = old_t_begin + idx / sample_size(step); + vidType old_t = transits[old_t_idx]; + if (old_t == (numeric_limits::max)()) + { + transits[t_idx] = (numeric_limits::max)(); + continue; + } + vidType new_t = sample_next_vbyte(g, old_t); + transits[t_idx] = new_t; + } + old_t_begin += prev_step_count; + } + t.Stop(); + std::cout << "result size: " << step_count + t_begin << std::endl; + std::cout << "Finished sampling in " << t.Seconds() << " sec" << std::endl; +} + +int main(int argc, char *argv[]) +{ + Graph g; + std::string in_prefix = argv[1]; + std::string out_prefix = argv[2]; + std::string scheme = "streamvbyte"; + bool permutated = false; + // save_compressed_graph(in_prefix, out_prefix); + g.load_compressed_graph(out_prefix, scheme, permutated); + g.print_meta_data(); + std::cout << "LOADED COMPRESSED GRAPH\n" + << std::endl; + + std::cout << "Begin sampling compressed graph..." << std::endl; + int n_samples = argc >= 4 ? atoi(argv[3]) : 40000; + int n_threads = argc >= 5 ? atoi(argv[4]) : 1; + kHopSolver(g, n_samples, n_threads); + return 0; +} diff --git a/src/sampling_random_walk/random_walk_cpu.cc b/src/sampling_random_walk/random_walk_cpu.cc new file mode 100644 index 0000000..594ef47 --- /dev/null +++ b/src/sampling_random_walk/random_walk_cpu.cc @@ -0,0 +1,90 @@ +#include "graph.h" +#include "compressor.hh" +#include "khop.h" + +void rWalkSolver(Graph &g, bool vbyte, int sample_steps, int n_samples) +{ + vector inits = get_initial_transits(sample_size(-1) * n_samples, g.V()); + int total_count = (sample_steps + 1) * n_samples; + + std::vector transits(total_count, 0); + for (int i = 0; i < inits.size(); i++) + { + transits[i] = inits[i]; + } + std::cout << "...initialized starting transits..." << std::endl; + + Timer t; + t.Start(); + // sample for defined number of steps + + // sampling length is set to `sample_steps` for all samples + for (int step = 0; step < sample_steps; step++) + { + // std::cout << "STEP " << step << std::endl; + + // sample every new transit in the step for every sample group + for (int sample_i = 0; sample_i < n_samples; sample_i++) + { + + vidType sample_transit = transits[step * n_samples + sample_i]; + // std::cout << "sample_transit: at " << step << " " << sample_i << " " << sample_transit << std::endl; + vidType new_t; + if (sample_transit == (numeric_limits::max)()) + { + new_t = sample_transit; + } + else + { + if (vbyte) + { + new_t = sample_next_vbyte(g, sample_transit, gen_global); + } + else + { + new_t = sample_next(g, sample_transit, gen_global); + } + } + + transits[(step + 1) * n_samples + sample_i] = new_t; + } + } + t.Stop(); + std::cout << "result size: " << total_count << std::endl; + std::cout << "Finished sampling in " << t.Seconds() << " sec" << std::endl; +} + +int main(int argc, char *argv[]) +{ + Graph g; + std::string in_prefix = argv[1]; + + std::string scheme = argv[2]; + bool vbyte = (scheme == "streamvbyte"); + if (scheme == "streamvbyte") + { + std::string scheme = "streamvbyte"; + bool permutated = false; + g.load_compressed_graph(in_prefix, scheme, permutated); + std::cout << "Loaded COMPRESSED Graph\n" + << std::endl; + } + else if (scheme == "uncompressed") + { + g.load_graph(in_prefix); + std::cout << "Loaded UNcompressed Graph\n" + << std::endl; + } + else + { + std::cout << "Incorrect or no scheme specified\n" + << std::endl; + exit(1); + } + g.print_meta_data(); + + int sample_steps = atoi(argv[3]); + int n_samples = argc >= 4 ? atoi(argv[4]) : 40000; + rWalkSolver(g, vbyte, sample_steps, n_samples); + return 0; +} diff --git a/src/sampling_random_walk/random_walk_omp.cc b/src/sampling_random_walk/random_walk_omp.cc new file mode 100644 index 0000000..ad5490d --- /dev/null +++ b/src/sampling_random_walk/random_walk_omp.cc @@ -0,0 +1,109 @@ +#include +#include "graph.h" +#include "compressor.hh" +#include "khop.h" + +void rWalkOMPSolver(Graph &g, bool vbyte, int sample_steps, int n_samples, int n_threads) +{ + int num_threads = 1; + omp_set_num_threads(n_threads); +#pragma omp parallel + { + num_threads = omp_get_num_threads(); + } + vector inits = get_initial_transits(sample_size(-1) * n_samples, g.V()); + int total_count = (sample_steps + 1) * n_samples; + + std::vector transits(total_count, 0); +#pragma omp parallel for + for (int i = 0; i < inits.size(); i++) + { + transits[i] = inits[i]; + } + std::cout << "...initialized starting transits..." << std::endl; + + std::cout << "Begin OpenMP sampling (" << num_threads << " threads)..." << std::endl; + + Timer t; + t.Start(); + // sample for defined number of steps + + // sampling length is set to `sample_steps` for all samples + +#pragma omp parallel + { + int t_idx = omp_get_thread_num(); + std::mt19937 gen(t_idx); + + // sample every new transit in the step for every sample group +#pragma omp for // schedule(dynamic) // schedule(static) num_threads(8) + for (int sample_i = 0; sample_i < n_samples; sample_i++) + { + for (int step = 0; step < sample_steps; step++) + { + // std::cout << "STEP " << step << std::endl; + + vidType sample_transit = transits[step * n_samples + sample_i]; + // std::cout << "sample_transit: at " << step << " " << sample_i << " " << sample_transit << std::endl; + + vidType new_t; + if (sample_transit == (numeric_limits::max)()) + { + new_t = sample_transit; + } + else + { + if (vbyte) + { + new_t = sample_next_vbyte(g, sample_transit, gen); + } + else + { + new_t = sample_next(g, sample_transit, gen); + } + } + + transits[(step + 1) * n_samples + sample_i] = new_t; + } + } + } + t.Stop(); + std::cout << "result size: " << total_count << std::endl; + std::cout << "Finished sampling in " << t.Seconds() << " sec" << std::endl; +} + +int main(int argc, char *argv[]) +{ + Graph g; + std::string in_prefix = argv[1]; + + std::string scheme = argv[2]; + bool vbyte = (scheme == "streamvbyte"); + if (scheme == "streamvbyte") + { + std::string scheme = "streamvbyte"; + bool permutated = false; + g.load_compressed_graph(in_prefix, scheme, permutated); + std::cout << "Loaded COMPRESSED Graph\n" + << std::endl; + } + else if (scheme == "uncompressed") + { + g.load_graph(in_prefix); + std::cout << "Loaded UNcompressed Graph\n" + << std::endl; + } + else + { + std::cout << "Incorrect or no scheme specified\n" + << std::endl; + exit(1); + } + g.print_meta_data(); + + int sample_steps = atoi(argv[3]); + int n_samples = argc >= 4 ? atoi(argv[4]) : 40000; + int n_threads = argc >= 5 ? atoi(argv[5]) : 1; + rWalkOMPSolver(g, vbyte, sample_steps, n_samples, n_threads); + return 0; +} diff --git a/src/structure/compressor.cc b/src/structure/compressor.cc index 2700ce4..343a9a3 100644 --- a/src/structure/compressor.cc +++ b/src/structure/compressor.cc @@ -255,6 +255,7 @@ void printusage() { << " [-a alignment(0)]\n"; } +# if 0 int main(int argc,char *argv[]) { int zeta_k = 2, permutate = 0, degree_threshold = 32; int alignment = 0; // 0: not aligned; 1: byte aligned; 2: word aligned @@ -348,3 +349,4 @@ int main(int argc,char *argv[]) { std::cout << "compression completed!\n"; return 0; } +#endif \ No newline at end of file