-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpartitioner.cpp
More file actions
executable file
·176 lines (128 loc) · 4.33 KB
/
partitioner.cpp
File metadata and controls
executable file
·176 lines (128 loc) · 4.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#include "partitioner.h"
#include "job_scheduler.h"
#include <cstdio>
#include <cstring>
extern JobScheduler js;
/*
* Hash Function for partitioning
* Get the n Least Significant Bits (LSB)
*/
int64_t Partitioner::hash1(uint64_t key, uint64_t n) {
uint64_t num = 1;
num <<= n;
// e.g. n = 3
// 1000 - 1 = 111
// val = key & (2^n - 1); // bitwise AND
return key & (num - 1);
}
relation Partitioner::partition1(relation& r, int bits_pass1) {
int64_t r_entries = r.getAmount();
// if partitioning is needed at least once
// std::printf("\nOne pass needed\n");
partitioningLevel = 1;
int partitions = 1 << bits_pass1;
// main hist
hist = new Histogram{partitions};
Histogram* hvector[THREAD_COUNT];
int64_t per_thread = r_entries / THREAD_COUNT;
for (int i = 0; i < THREAD_COUNT; i++) {
int64_t start = i * per_thread;
int64_t end = (i + 1) * per_thread;
if ((i == THREAD_COUNT - 1) && (end < r_entries)) end = r_entries;
js.add_job(new HistogramJob(r, start, end, hvector[i], bits_pass1));
}
js.wait_all();
for (int i = 0; i < THREAD_COUNT; i++) {
Histogram& histogram_i = *hvector[i];
for (int64_t j = 0; j < partitions; j++) {
(*hist)[j] += histogram_i[j];
}
delete hvector[i];
}
const int64_t* psum = hist->generatePsum();
// create copy so that we know where the next item needs to go
// psum_copy is being mutated
int64_t* psum_copy = new int64_t[hist->getSize()];
std::memmove(psum_copy, psum, hist->getSize() * sizeof(int64_t));
// sorted r'
relation r2(new tuple[r_entries], r_entries);
for (int64_t t = 0; t < r_entries; t++) {
int64_t index = hash1(r[t].getKey(), bits_pass1);
int64_t insertTo = psum_copy[index];
r2[insertTo] = r[t];
// indicate that there's one less tuple to copy
// move index forward
psum_copy[index]++;
}
delete[] psum_copy;
return r2;
}
relation Partitioner::partition2(relation& r2, int bits_pass2) {
// std::printf("Second pass needed\n");
partitioningLevel = 2;
int64_t r2_entries = r2.getAmount();
// discard old histogram, create a new using n2
Histogram* oldHist = hist;
int partitions2 = 1 << bits_pass2;
hist = new Histogram(partitions2);
Histogram* hvector[THREAD_COUNT];
// do the same thing, using r2
int64_t per_thread = r2_entries / THREAD_COUNT;
for (int i = 0; i < THREAD_COUNT; i++) {
int64_t start = i * per_thread;
int64_t end = (i + 1) * per_thread;
if ((i == THREAD_COUNT - 1) && (end < r2_entries)) end = r2_entries;
js.add_job(new HistogramJob(r2, start, end, hvector[i], bits_pass2));
}
js.wait_all();
for (int i = 0; i < THREAD_COUNT; i++) {
Histogram& histogram_i = *hvector[i];
for (int64_t j = 0; j < partitions2; j++) {
(*hist)[j] += histogram_i[j];
}
delete hvector[i];
}
const int64_t* psum = hist->generatePsum();
int64_t* psum_copy = new int64_t[hist->getSize()];
std::memmove(psum_copy, psum, hist->getSize() * sizeof(int64_t));
relation r3(new tuple[r2_entries], r2_entries);
for (int64_t t = 0; t < r2_entries; t++) {
int64_t index = hash1(r2[t].getKey(), bits_pass2);
int64_t insertTo = psum_copy[index];
r3[insertTo] = r2[t];
psum_copy[index]++;
}
// delete histogram from pass 1
delete oldHist;
delete[] psum_copy;
// if reordered once more, return final
return r3;
}
relation Partitioner::partition(relation& r, int force_partition_depth,
int bits_pass1, int bits_pass2) {
if (force_partition_depth == 0) {
return r;
} else if (force_partition_depth == 1) {
return partition1(r, bits_pass1);
} else if (force_partition_depth == 2) {
return partition2(r, bits_pass2);
}
if (((r.getAmount() * sizeof(tuple)) < L2_SIZE)) {
// std::printf("Doesn't need partitioning\n");
return r;
}
relation r2 = partition1(r, bits_pass1);
int partitions = hist->getSize();
for (int i = 0; i < partitions; i++) {
if (((*hist)[i] * sizeof(tuple)) > L2_SIZE) {
return partition2(r2, bits_pass2);
}
}
// if partitions fit
return r2;
}
// 2^n sized histogram
Partitioner::Partitioner() : hist{nullptr}, partitioningLevel{} {}
Partitioner::~Partitioner() { delete hist; }
int64_t Partitioner::getPartitioningLevel() const { return partitioningLevel; }
Histogram* Partitioner::getHistogram() const { return hist; }