Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ limitations under the License.

#include <omp.h>

#include <algorithm>
#include <climits>
#include <deque>
#include <list>
#include <map>
#include <set>
Expand Down Expand Up @@ -98,13 +100,13 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
const VertexType n = endNode - startNode;
const unsigned p = instance.NumberOfProcessors();

std::set<VertexType> ready;
std::deque<VertexType> ready;

std::vector<VertexType> futureReady;
std::vector<VertexType> bestFutureReady;

std::vector<std::set<VertexType>> procReady(p);
std::vector<std::set<VertexType>> bestProcReady(p);
std::vector<std::vector<VertexType>> procReady(p);
std::vector<std::vector<VertexType>> bestProcReady(p);

std::vector<VertexType> predec(n, 0);

Expand Down Expand Up @@ -143,12 +145,15 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
VertexType index = nodePos - startNode;
if (predec[index] == 0) {
if constexpr (hasVerticesInTopOrderV<GraphT>) {
ready.insert(nodePos);
ready.emplace_back(nodePos);
} else {
ready.insert(topOrder[nodePos]);
ready.emplace_back(topOrder[nodePos]);
}
}
}
if constexpr (not hasVerticesInTopOrderV<GraphT>) {
std::sort(ready.begin(), ready.end(), std::less<>{});
}

std::vector<std::vector<VertexType>> newAssignments(p);
std::vector<std::vector<VertexType>> bestNewAssignments(p);
Expand All @@ -159,19 +164,21 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
double desiredParallelism = static_cast<double>(p);

VertexType totalAssigned = 0;
unsigned totalAttempts = 1U;
supstep = 0;

while (totalAssigned < n) {
VertexType limit = params_.minSuperstepSize_;
double bestScore = 0;
double bestParallelism = 0;

typename std::set<VertexType>::iterator readyIter;
typename std::set<VertexType>::iterator bestReadyIter;
typename std::deque<VertexType>::const_iterator readyIter;
typename std::deque<VertexType>::const_iterator bestReadyIter;

bool continueSuperstepAttempts = true;

while (continueSuperstepAttempts) {
assert(totalAttempts < (UINT_MAX / (p + 1U)));
for (unsigned proc = 0; proc < p; proc++) {
newAssignments[proc].clear();
}
Expand All @@ -181,7 +188,7 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
procReady[proc].clear();
}

readyIter = ready.begin();
readyIter = ready.cbegin();

VertexType newTotalAssigned = 0;
VWorkwT<GraphT> weightLimit = 0;
Expand All @@ -191,9 +198,10 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
while (newAssignments[0].size() < limit) {
VertexType chosenNode = std::numeric_limits<VertexType>::max();
if (!procReady[0].empty()) {
chosenNode = *procReady[0].begin();
procReady[0].erase(procReady[0].begin());
} else if (readyIter != ready.end()) {
std::pop_heap(procReady[0].begin(), procReady[0].end(), std::greater<>{});
chosenNode = procReady[0].back();
procReady[0].pop_back();
} else if (readyIter != ready.cend()) {
chosenNode = *readyIter;
readyIter++;
} else {
Expand Down Expand Up @@ -222,10 +230,22 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
}
}

if (schedule.AssignedProcessor(succ) == UINT_MAX) {
schedule.SetAssignedProcessor(succ, 0);
} else if (schedule.AssignedProcessor(succ) != 0) {
schedule.SetAssignedProcessor(succ, p);
bool canScheduleSameProc = false;
const unsigned pp1 = p + 1U;
const unsigned base = pp1 * totalAttempts;
const unsigned remainder = schedule.AssignedProcessor(succ) - base;
// Encoding into processor of children where they can be sheduled locally through
// iteration * (p + 1) + proc
// where proc is either the processor where is can be enqueued in the processor local queue
// or equal to p when it can't be enqueued in any local queue.
// The extra encoding of iteration ensures that previous superstep attempts do not affect the current superstep
if ((remainder < pp1) & (remainder != 0U)) { // The first condition implies that the iteration is
// the same as the current and the second checks if
// it has already been to a different processor
schedule.SetAssignedProcessor(succ, base + p);
} else {
schedule.SetAssignedProcessor(succ, base + 0U);
canScheduleSameProc = true;
}

VertexType succIndex;
Expand All @@ -237,8 +257,9 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {

--predec[succIndex];
if (predec[succIndex] == 0) {
if (schedule.AssignedProcessor(succ) == 0) {
procReady[0].insert(succ);
if (canScheduleSameProc) {
procReady[0].emplace_back(succ);
std::push_heap(procReady[0].begin(), procReady[0].end(), std::greater<>{});
} else {
futureReady.push_back(succ);
}
Expand All @@ -254,9 +275,10 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
while (currentWeightAssigned < weightLimit) {
VertexType chosenNode = std::numeric_limits<VertexType>::max();
if (!procReady[proc].empty()) {
chosenNode = *procReady[proc].begin();
procReady[proc].erase(procReady[proc].begin());
} else if (readyIter != ready.end()) {
std::pop_heap(procReady[proc].begin(), procReady[proc].end(), std::greater<>{});
chosenNode = procReady[proc].back();
procReady[proc].pop_back();
} else if (readyIter != ready.cend()) {
chosenNode = *readyIter;
readyIter++;
} else {
Expand Down Expand Up @@ -285,10 +307,22 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
}
}

if (schedule.AssignedProcessor(succ) == UINT_MAX) {
schedule.SetAssignedProcessor(succ, proc);
} else if (schedule.AssignedProcessor(succ) != proc) {
schedule.SetAssignedProcessor(succ, p);
bool canScheduleSameProc = false;
const unsigned pp1 = p + 1U;
const unsigned base = pp1 * totalAttempts;
const unsigned remainder = schedule.AssignedProcessor(succ) - base;
// Encoding into processor of children where they can be sheduled locally through
// iteration * (p + 1) + proc
// where proc is either the processor where is can be enqueued in the processor local queue
// or equal to p when it can't be enqueued in any local queue.
// The extra encoding of iteration ensures that previous superstep attempts do not affect the current superstep
if ((remainder < pp1) & (remainder != proc)) { // The first condition implies that the iteration is
// the same as the current and the second checks if
// it has already been to a different processor
schedule.SetAssignedProcessor(succ, base + p);
} else {
schedule.SetAssignedProcessor(succ, base + proc);
canScheduleSameProc = true;
}

VertexType succIndex;
Expand All @@ -300,8 +334,9 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {

--predec[succIndex];
if (predec[succIndex] == 0) {
if (schedule.AssignedProcessor(succ) == proc) {
procReady[proc].insert(succ);
if (canScheduleSameProc) {
procReady[proc].emplace_back(succ);
std::push_heap(procReady[proc].begin(), procReady[proc].end(), std::greater<>{});
} else {
futureReady.push_back(succ);
}
Expand Down Expand Up @@ -345,17 +380,15 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
}
}

if (totalAssigned + newTotalAssigned == n) {
if (readyIter == ready.cend()) {
continueSuperstepAttempts = false;
}

// undo proc assingments and predec increases in any case
for (unsigned proc = 0; proc < p; ++proc) {
for (const VertexType &node : newAssignments[proc]) {
schedule.SetAssignedProcessor(node, UINT_MAX);
}
if (totalAssigned + newTotalAssigned == n) {
continueSuperstepAttempts = false;
}

// undo predec increases in any case
for (unsigned proc = 0; proc < p; ++proc) {
for (const VertexType &node : newAssignments[proc]) {
for (const VertexType &succ : graph.Children(node)) {
Expand Down Expand Up @@ -387,30 +420,6 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {
}
}

for (unsigned proc = 0; proc < p; ++proc) {
for (const VertexType &node : newAssignments[proc]) {
for (const VertexType &succ : graph.Children(node)) {
if constexpr (hasVerticesInTopOrderV<GraphT>) {
if constexpr (hasChildrenInVertexOrderV<GraphT>) {
if (succ >= endNode) {
break;
}
} else {
if (succ >= endNode) {
continue;
}
}
} else {
if (posInTopOrder[succ] >= endNode) {
continue;
}
}

schedule.SetAssignedProcessor(succ, UINT_MAX);
}
}
}

if (acceptStep) {
bestNewAssignments.swap(newAssignments);
bestFutureReady.swap(futureReady);
Expand All @@ -420,14 +429,19 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {

limit++;
limit += (limit / 2);
++totalAttempts;
}

// apply best iteration
ready.erase(ready.begin(), bestReadyIter);
ready.insert(bestFutureReady.begin(), bestFutureReady.end());
const auto lengthLeftoverReady = std::distance(ready.begin(), ready.end());
ready.insert(ready.end(), bestFutureReady.begin(), bestFutureReady.end());
for (unsigned proc = 0; proc < p; proc++) {
ready.merge(bestProcReady[proc]);
ready.insert(ready.end(), bestProcReady[proc].begin(), bestProcReady[proc].end());
}
const auto middleIt = std::next(ready.begin(), lengthLeftoverReady);
std::sort(middleIt, ready.end(), std::less<>{});
std::inplace_merge(ready.begin(), middleIt, ready.end(), std::less<>{});

for (unsigned proc = 0; proc < p; ++proc) {
for (const VertexType &node : bestNewAssignments[proc]) {
Expand Down Expand Up @@ -508,9 +522,7 @@ class GrowLocalAutoCoresParallel : public Scheduler<GraphT> {

const VertexType n = instance.NumberOfVertices();

for (VertexType vert = 0; vert < n; ++vert) {
schedule.SetAssignedProcessor(vert, UINT_MAX);
}
schedule.SetAssignedProcessors(std::vector<unsigned>(n, 0U));

VertexType numNodesPerThread = n / numThreads;
std::vector<VertexType> startNodes;
Expand Down