Skip to content

Commit 42c0ece

Browse files
Added Chance to Finish for GrowLocal (#84)
* Chance to Finish GrowLocal SSP * moved sort to the end of the superstep as the first currently ready is already sorted for the first superstep
1 parent fe8a5bd commit 42c0ece

1 file changed

Lines changed: 174 additions & 68 deletions

File tree

include/osp/bsp/scheduler/GreedySchedulers/GrowLocalMaxBsp.hpp

Lines changed: 174 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,117 @@ class GrowLocalSSP : public MaxBspScheduler<GraphT> {
5050
static constexpr unsigned staleness{2U};
5151
GrowLocalSSPParams<VertexIdxT<GraphT>, VWorkwT<GraphT>> params_;
5252

53-
inline typename std::deque<VertexType>::difference_type maxAllReadyUsage(const std::deque<VertexType> &currentlyReady,
53+
/*! Vertices ready in current superstep */
54+
std::deque<VertexType> currentlyReady_;
55+
56+
/*! For i = 1,2,..,staleness, the vertices in futureReady_[(superstep + i) % staleness] becomes ready globally in superstep + i */
57+
std::array<std::deque<VertexType>, staleness> futureReady_;
58+
/*! Vertices to be added to futureReady_[superstep % staleness] which become ready globally in superstep + staleness */
59+
std::deque<VertexType> bestFutureReady_;
60+
61+
/*! Local to processor ready vertices in current superstep in a heap */
62+
std::vector<std::vector<std::pair<VertexType, unsigned>>> currentProcReadyHeaps_;
63+
/*! Leftover local to processor ready vertices in current superstep in a heap */
64+
std::vector<std::vector<std::pair<VertexType, unsigned>>> bestCurrentProcReadyHeaps_;
65+
66+
/*! For i = 0,1,2,..,staleness-1 and p processor, the vertices in procReady_[(superstep + i) % staleness][p] are ready locally
67+
* in superstep + i on processor p */
68+
std::array<std::vector<std::vector<std::pair<VertexType, unsigned>>>, staleness> procReady_;
69+
/*! Additions to procReady_ in current superstep attempt */
70+
std::array<std::vector<std::vector<std::pair<VertexType, unsigned>>>, staleness> procReadyAdditions_;
71+
/*! Additions to procReady_ from best superstep attempt */
72+
std::array<std::vector<std::vector<std::pair<VertexType, unsigned>>>, staleness> bestProcReadyAdditions_;
73+
74+
void Init(const unsigned numProcs);
75+
void ReleaseMemory();
76+
77+
inline typename std::deque<VertexType>::difference_type MaxAllReadyUsage(const std::deque<VertexType> &currentlyReady,
5478
const std::deque<VertexType> &nextSuperstepReady) const;
5579

80+
bool ChanceToFinish(const unsigned superStep) const;
81+
5682
public:
5783
ReturnStatus ComputeSchedule(BspSchedule<GraphT> &schedule) override;
5884
ReturnStatus ComputeSchedule(MaxBspSchedule<GraphT> &schedule) override;
5985

86+
inline GrowLocalSSPParams<VertexIdxT<GraphT>, VWorkwT<GraphT>> &GetParameters();
87+
inline const GrowLocalSSPParams<VertexIdxT<GraphT>, VWorkwT<GraphT>> &GetParameters() const;
88+
6089
std::string GetScheduleName() const override { return "GrowLocalSSP"; }
6190
};
6291

6392
template <typename GraphT>
64-
inline typename std::deque<VertexIdxT<GraphT>>::difference_type GrowLocalSSP<GraphT>::maxAllReadyUsage(
93+
inline GrowLocalSSPParams<VertexIdxT<GraphT>, VWorkwT<GraphT>> &GrowLocalSSP<GraphT>::GetParameters() {
94+
return params_;
95+
}
96+
97+
template <typename GraphT>
98+
inline const GrowLocalSSPParams<VertexIdxT<GraphT>, VWorkwT<GraphT>> &GrowLocalSSP<GraphT>::GetParameters() const {
99+
return params_;
100+
}
101+
102+
template <typename GraphT>
103+
void GrowLocalSSP<GraphT>::Init(const unsigned numProcs) {
104+
currentlyReady_.clear();
105+
106+
for (auto &stepFutureReady : futureReady_) {
107+
stepFutureReady.clear();
108+
}
109+
110+
bestFutureReady_.clear();
111+
112+
currentProcReadyHeaps_ = std::vector<std::vector<std::pair<VertexType, unsigned>>>(numProcs);
113+
bestCurrentProcReadyHeaps_ = std::vector<std::vector<std::pair<VertexType, unsigned>>>(numProcs);
114+
115+
for (auto &stepProcReady : procReady_) {
116+
stepProcReady = std::vector<std::vector<std::pair<VertexType, unsigned>>>(numProcs);
117+
}
118+
119+
for (auto &stepProcReadyAdditions : procReadyAdditions_) {
120+
stepProcReadyAdditions = std::vector<std::vector<std::pair<VertexType, unsigned>>>(numProcs);
121+
}
122+
123+
for (auto &stepBestProcReadyAdditions : bestProcReadyAdditions_) {
124+
stepBestProcReadyAdditions = std::vector<std::vector<std::pair<VertexType, unsigned>>>(numProcs);
125+
}
126+
}
127+
128+
template <typename GraphT>
129+
void GrowLocalSSP<GraphT>::ReleaseMemory() {
130+
currentlyReady_.clear();
131+
currentlyReady_.shrink_to_fit();
132+
133+
for (auto &stepFutureReady : futureReady_) {
134+
stepFutureReady.clear();
135+
stepFutureReady.shrink_to_fit();
136+
}
137+
138+
bestFutureReady_.clear();
139+
140+
currentProcReadyHeaps_.clear();
141+
currentProcReadyHeaps_.shrink_to_fit();
142+
143+
bestCurrentProcReadyHeaps_.clear();
144+
bestCurrentProcReadyHeaps_.shrink_to_fit();
145+
146+
for (auto &stepProcReady : procReady_) {
147+
stepProcReady.clear();
148+
stepProcReady.shrink_to_fit();
149+
}
150+
151+
for (auto &stepProcReadyAdditions : procReadyAdditions_) {
152+
stepProcReadyAdditions.clear();
153+
stepProcReadyAdditions.shrink_to_fit();
154+
}
155+
156+
for (auto &stepBestProcReadyAdditions : bestProcReadyAdditions_) {
157+
stepBestProcReadyAdditions.clear();
158+
stepBestProcReadyAdditions.shrink_to_fit();
159+
}
160+
}
161+
162+
template <typename GraphT>
163+
inline typename std::deque<VertexIdxT<GraphT>>::difference_type GrowLocalSSP<GraphT>::MaxAllReadyUsage(
65164
const std::deque<VertexIdxT<GraphT>> &currentlyReady, const std::deque<VertexIdxT<GraphT>> &nextSuperstepReady) const {
66165
if constexpr (staleness == 1U) {
67166
return std::distance(currentlyReady.cbegin(), currentlyReady.cend());
@@ -77,6 +176,34 @@ inline typename std::deque<VertexIdxT<GraphT>>::difference_type GrowLocalSSP<Gra
77176
}
78177
}
79178

179+
template <typename GraphT>
180+
bool GrowLocalSSP<GraphT>::ChanceToFinish(const unsigned superStep) const {
181+
bool ans = std::all_of(futureReady_.cbegin(), futureReady_.cend(), [](const auto &deq) { return deq.empty(); });
182+
183+
if (ans) {
184+
for (unsigned i = 1U; i < staleness; ++i) {
185+
const auto &stepProcReady = procReady_[(i + superStep) % staleness];
186+
ans = std::all_of(stepProcReady.cbegin(), stepProcReady.cend(), [](const auto &vec) { return vec.empty(); });
187+
if (not ans) {
188+
break;
189+
}
190+
}
191+
}
192+
193+
if (ans) {
194+
for (unsigned i = 1U; i < staleness; ++i) {
195+
const auto &stepProcReadyAdditions = procReadyAdditions_[(i + superStep) % staleness];
196+
ans = std::all_of(
197+
stepProcReadyAdditions.cbegin(), stepProcReadyAdditions.cend(), [](const auto &vec) { return vec.empty(); });
198+
if (not ans) {
199+
break;
200+
}
201+
}
202+
}
203+
204+
return ans;
205+
}
206+
80207
template <typename GraphT>
81208
ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(BspSchedule<GraphT> &schedule) {
82209
return MaxBspScheduler<GraphT>::ComputeSchedule(schedule);
@@ -89,42 +216,15 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
89216
const VertexType numVertices = graph.NumVertices();
90217
const unsigned numProcs = instance.NumberOfProcessors();
91218

92-
std::deque<VertexType> currentlyReady; // vertices ready in current superstep
93-
94-
std::array<std::deque<VertexType>, staleness> futureReady;
95-
// For i = 1,2,..,staleness, the vertices in futureReady[(superstep + i) % staleness] becomes ready globally in superstep + i
96-
std::deque<VertexType> bestFutureReady;
97-
// vertices to be added to futureReady[superstep % staleness] which become ready globally in superstep + staleness
98-
99-
std::vector<std::vector<std::pair<VertexType, unsigned>>> currentProcReadyHeaps(numProcs);
100-
std::vector<std::vector<std::pair<VertexType, unsigned>>> bestCurrentProcReadyHeaps(numProcs);
101-
102-
std::array<std::vector<std::vector<std::pair<VertexType, unsigned>>>, staleness> procReady;
103-
// For i = 0,1,2,..,staleness-1 and p processor, the vertices in procReady[(superstep + i) % staleness][p] are ready locally
104-
// in superstep + i on processor p
105-
std::array<std::vector<std::vector<std::pair<VertexType, unsigned>>>, staleness> procReadyAdditions;
106-
std::array<std::vector<std::vector<std::pair<VertexType, unsigned>>>, staleness> bestProcReadyAdditions;
107-
108-
for (auto &arrVal : procReady) {
109-
arrVal = std::vector<std::vector<std::pair<VertexType, unsigned>>>(numProcs);
110-
}
111-
for (auto &arrVal : procReadyAdditions) {
112-
arrVal = std::vector<std::vector<std::pair<VertexType, unsigned>>>(numProcs);
113-
}
114-
for (auto &arrVal : bestProcReadyAdditions) {
115-
arrVal = std::vector<std::vector<std::pair<VertexType, unsigned>>>(numProcs);
116-
}
219+
Init(numProcs);
117220

118221
std::vector<VertexType> predec(numVertices);
119222
for (const auto vert : graph.Vertices()) {
120223
predec[vert] = graph.InDegree(vert);
121224
if (predec[vert] == 0U) {
122-
currentlyReady.emplace_back(vert);
225+
currentlyReady_.emplace_back(vert);
123226
}
124227
}
125-
if constexpr (not hasVerticesInTopOrderV<GraphT>) {
126-
std::sort(currentlyReady.begin(), currentlyReady.end(), std::less<>{});
127-
}
128228

129229
std::vector<std::vector<VertexType>> newAssignments(numProcs);
130230
std::vector<std::vector<VertexType>> bestNewAssignments(numProcs);
@@ -140,20 +240,14 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
140240
while (totalAssigned < numVertices) {
141241
const unsigned reducedSuperStep = superStep % staleness;
142242

143-
std::deque<VertexType> &stepFutureReady = futureReady[reducedSuperStep];
144-
std::sort(stepFutureReady.begin(), stepFutureReady.end(), std::less<>{});
145-
const typename std::deque<VertexType>::difference_type lengthCurrentlyReady
146-
= std::distance(currentlyReady.begin(), currentlyReady.end());
147-
currentlyReady.insert(currentlyReady.end(), stepFutureReady.begin(), stepFutureReady.end());
148-
std::inplace_merge(
149-
currentlyReady.begin(), std::next(currentlyReady.begin(), lengthCurrentlyReady), currentlyReady.end(), std::less<>{});
243+
std::deque<VertexType> &stepFutureReady = futureReady_[reducedSuperStep];
150244

151245
const typename std::deque<VertexType>::difference_type maxCurrentlyReadyUsage
152246
= std::max(static_cast<typename std::deque<VertexType>::difference_type>(
153247
static_cast<double>(params_.minSuperstepSize_) * desiredParallelism),
154-
maxAllReadyUsage(currentlyReady, futureReady[(superStep + 1U) % staleness]));
248+
MaxAllReadyUsage(currentlyReady_, futureReady_[(superStep + 1U) % staleness]));
155249

156-
std::vector<std::vector<std::pair<VertexType, unsigned>>> &stepProcReady = procReady[reducedSuperStep];
250+
std::vector<std::vector<std::pair<VertexType, unsigned>>> &stepProcReady = procReady_[reducedSuperStep];
157251
for (auto &procHeap : stepProcReady) {
158252
std::make_heap(procHeap.begin(), procHeap.end(), std::greater<>{}); // min heap
159253
}
@@ -172,11 +266,11 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
172266
procAssignments.clear();
173267
}
174268
stepFutureReady.clear();
175-
currentProcReadyHeaps = stepProcReady;
269+
currentProcReadyHeaps_ = stepProcReady;
176270

177-
currentlyReadyIter = currentlyReady.cbegin();
271+
currentlyReadyIter = currentlyReady_.cbegin();
178272

179-
for (auto &stepProcReadyAdditions : procReadyAdditions) {
273+
for (auto &stepProcReadyAdditions : procReadyAdditions_) {
180274
for (auto &localStepProcReadyAdditions : stepProcReadyAdditions) {
181275
localStepProcReadyAdditions.clear();
182276
}
@@ -189,14 +283,14 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
189283
// Processor 0
190284
constexpr unsigned proc0{0U};
191285
while (newAssignments[proc0].size() < limit) {
192-
std::vector<std::pair<VertexType, unsigned>> &proc0Heap = currentProcReadyHeaps[proc0];
286+
std::vector<std::pair<VertexType, unsigned>> &proc0Heap = currentProcReadyHeaps_[proc0];
193287
VertexType chosenNode = std::numeric_limits<VertexType>::max();
194288
{
195289
if (proc0Heap.size() != 0U) {
196290
std::pop_heap(proc0Heap.begin(), proc0Heap.end(), std::greater<>{});
197291
chosenNode = proc0Heap.back().first;
198292
proc0Heap.pop_back();
199-
} else if (currentlyReadyIter != currentlyReady.cend()) {
293+
} else if (currentlyReadyIter != currentlyReady_.cend()) {
200294
chosenNode = *currentlyReadyIter;
201295
++currentlyReadyIter;
202296
} else {
@@ -223,7 +317,7 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
223317
proc0Heap.emplace_back(succ, superStep + staleness);
224318
std::push_heap(proc0Heap.begin(), proc0Heap.end(), std::greater<>{});
225319
} else if (earliest < superStep + staleness) {
226-
procReadyAdditions[earliest % staleness][proc0].emplace_back(succ, superStep + staleness);
320+
procReadyAdditions_[earliest % staleness][proc0].emplace_back(succ, superStep + staleness);
227321
} else {
228322
stepFutureReady.emplace_back(succ);
229323
}
@@ -237,14 +331,14 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
237331
for (unsigned proc = 1U; proc < numProcs; ++proc) {
238332
VWorkwT<GraphT> currentWeightAssigned = 0;
239333
while (currentWeightAssigned < weightLimit) {
240-
std::vector<std::pair<VertexType, unsigned>> &procHeap = currentProcReadyHeaps[proc];
334+
std::vector<std::pair<VertexType, unsigned>> &procHeap = currentProcReadyHeaps_[proc];
241335
VertexType chosenNode = std::numeric_limits<VertexType>::max();
242336
{
243337
if (procHeap.size() != 0U) {
244338
std::pop_heap(procHeap.begin(), procHeap.end(), std::greater<>{});
245339
chosenNode = procHeap.back().first;
246340
procHeap.pop_back();
247-
} else if (currentlyReadyIter != currentlyReady.cend()) {
341+
} else if (currentlyReadyIter != currentlyReady_.cend()) {
248342
chosenNode = *currentlyReadyIter;
249343
++currentlyReadyIter;
250344
} else {
@@ -271,7 +365,7 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
271365
procHeap.emplace_back(succ, superStep + staleness);
272366
std::push_heap(procHeap.begin(), procHeap.end(), std::greater<>{});
273367
} else if (earliest < superStep + staleness) {
274-
procReadyAdditions[earliest % staleness][proc].emplace_back(succ, superStep + staleness);
368+
procReadyAdditions_[earliest % staleness][proc].emplace_back(succ, superStep + staleness);
275369
} else {
276370
stepFutureReady.emplace_back(succ);
277371
}
@@ -314,16 +408,16 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
314408
}
315409
}
316410

317-
if (currentlyReadyIter == currentlyReady.cend()) {
411+
if (currentlyReadyIter == currentlyReady_.cend()) {
318412
continueSuperstepAttemps = false;
319413
}
320414

321-
if (std::distance(currentlyReady.cbegin(), currentlyReadyIter) > maxCurrentlyReadyUsage) {
322-
continueSuperstepAttemps = false;
323-
}
324-
325-
if (totalAssigned + newTotalAssigned == numVertices) {
326-
continueSuperstepAttemps = false;
415+
if (continueSuperstepAttemps) {
416+
if (std::distance(currentlyReady_.cbegin(), currentlyReadyIter) > maxCurrentlyReadyUsage) {
417+
if (not((totalAssigned + newTotalAssigned >= (numVertices / 4) * 3) && ChanceToFinish(superStep))) {
418+
continueSuperstepAttemps = false;
419+
}
420+
}
327421
}
328422

329423
// Undo predec decreases
@@ -336,41 +430,41 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
336430
}
337431

338432
if (acceptStep) {
339-
std::swap(bestFutureReady, stepFutureReady);
340-
std::swap(bestProcReadyAdditions, procReadyAdditions);
433+
std::swap(bestFutureReady_, stepFutureReady);
434+
std::swap(bestProcReadyAdditions_, procReadyAdditions_);
341435
std::swap(bestcurrentlyReadyIter, currentlyReadyIter);
342436
std::swap(bestNewAssignments, newAssignments);
343-
std::swap(bestCurrentProcReadyHeaps, currentProcReadyHeaps);
437+
std::swap(bestCurrentProcReadyHeaps_, currentProcReadyHeaps_);
344438
}
345439

346440
limit++;
347441
limit += (limit / 2);
348442
}
349443

350444
// apply best iteration
351-
currentlyReady.erase(currentlyReady.begin(), bestcurrentlyReadyIter);
352-
std::swap(futureReady[reducedSuperStep], bestFutureReady);
445+
currentlyReady_.erase(currentlyReady_.begin(), bestcurrentlyReadyIter);
446+
std::swap(futureReady_[reducedSuperStep], bestFutureReady_);
353447

354-
for (auto &localProcReady : procReady[reducedSuperStep]) {
448+
for (auto &localProcReady : procReady_[reducedSuperStep]) {
355449
localProcReady.clear();
356450
}
357451

358452
const unsigned nextSuperStep = superStep + 1U;
359453
for (unsigned proc = 0U; proc < numProcs; ++proc) {
360-
for (const auto &vertStepPair : bestCurrentProcReadyHeaps[proc]) {
454+
for (const auto &vertStepPair : bestCurrentProcReadyHeaps_[proc]) {
361455
if (vertStepPair.second <= nextSuperStep) {
362-
futureReady[nextSuperStep % staleness].emplace_back(vertStepPair.first);
456+
futureReady_[nextSuperStep % staleness].emplace_back(vertStepPair.first);
363457
} else {
364-
procReady[nextSuperStep % staleness][proc].emplace_back(vertStepPair);
458+
procReady_[nextSuperStep % staleness][proc].emplace_back(vertStepPair);
365459
}
366460
}
367461
}
368462

369463
for (std::size_t stepInd = 0U; stepInd < staleness; ++stepInd) {
370464
for (unsigned proc = 0U; proc < numProcs; ++proc) {
371-
procReady[stepInd][proc].insert(procReady[stepInd][proc].end(),
372-
bestProcReadyAdditions[stepInd][proc].begin(),
373-
bestProcReadyAdditions[stepInd][proc].end());
465+
procReady_[stepInd][proc].insert(procReady_[stepInd][proc].end(),
466+
bestProcReadyAdditions_[stepInd][proc].begin(),
467+
bestProcReadyAdditions_[stepInd][proc].end());
374468
}
375469
}
376470

@@ -385,12 +479,24 @@ ReturnStatus GrowLocalSSP<GraphT>::ComputeSchedule(MaxBspSchedule<GraphT> &sched
385479
}
386480
}
387481

482+
std::deque<VertexType> &nextStepFutureReady = futureReady_[nextSuperStep % staleness];
483+
std::sort(nextStepFutureReady.begin(), nextStepFutureReady.end(), std::less<>{});
484+
const typename std::deque<VertexType>::difference_type lengthCurrentlyReady
485+
= std::distance(currentlyReady_.begin(), currentlyReady_.end());
486+
currentlyReady_.insert(currentlyReady_.end(), nextStepFutureReady.begin(), nextStepFutureReady.end());
487+
std::inplace_merge(currentlyReady_.begin(),
488+
std::next(currentlyReady_.begin(), lengthCurrentlyReady),
489+
currentlyReady_.end(),
490+
std::less<>{});
491+
nextStepFutureReady.clear();
492+
388493
++superStep;
389494
desiredParallelism = (0.3 * desiredParallelism) + (0.6 * bestParallelism)
390495
+ (0.1 * static_cast<double>(numProcs)); // weights should sum up to one
391496
}
392497

393498
schedule.SetNumberOfSupersteps(superStep);
499+
ReleaseMemory();
394500

395501
return ReturnStatus::OSP_SUCCESS;
396502
}

0 commit comments

Comments
 (0)