diff --git a/include/osp/bsp/model/BspInstance.hpp b/include/osp/bsp/model/BspInstance.hpp index 358947e9..0fcd8040 100644 --- a/include/osp/bsp/model/BspInstance.hpp +++ b/include/osp/bsp/model/BspInstance.hpp @@ -107,10 +107,20 @@ class BspInstance { * @param cdag The computational DAG for the instance. * @param architecture The BSP architecture for the instance. */ - BspInstance(const GraphT &cdag, + template + BspInstance(const OtherGraphT &cdag, const BspArchitecture &architecture, std::vector> nodeProcessorCompatibility = std::vector>({{true}})) - : cdag_(cdag), architecture_(architecture), nodeProcessorCompatibility_(nodeProcessorCompatibility) {} + : cdag_(cdag), architecture_(architecture), nodeProcessorCompatibility_(nodeProcessorCompatibility) { + static_assert(std::is_same_v, VMemwT>, + "BspArchitecture: GraphT and Graph_t_other have the same memory weight type."); + + static_assert(std::is_same_v, VCommwT>, + "BspArchitecture: GraphT and Graph_t_other have the same communication weight type."); + + static_assert(std::is_same_v, VTypeT>, + "BspArchitecture: GraphT and Graph_t_other have the same processor type."); + } /** * @brief Constructs a BspInstance object with the specified computational DAG and BSP architecture. @@ -319,7 +329,7 @@ class BspInstance { bool HasAnyTypeRestrictions() const { for (VertexTypeTOrDefault node_type = 0; node_type < nodeProcessorCompatibility_.size(); ++node_type) { for (VertexTypeTOrDefault proc_type = 0; proc_type < nodeProcessorCompatibility_[node_type].size(); ++proc_type) { - if(!nodeProcessorCompatibility_[node_type][proc_type]) { + if (!nodeProcessorCompatibility_[node_type][proc_type]) { return true; } } diff --git a/include/osp/bsp/scheduler/GreedySchedulers/GrowLocalMaxBsp.hpp b/include/osp/bsp/scheduler/GreedySchedulers/GrowLocalMaxBsp.hpp new file mode 100644 index 00000000..8ce849c6 --- /dev/null +++ b/include/osp/bsp/scheduler/GreedySchedulers/GrowLocalMaxBsp.hpp @@ -0,0 +1,397 @@ +/* +Copyright 2026 Huawei Technologies Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +@author Toni Boehnlein, Christos Matzoros, Benjamin Lozes, Pal Andras Papp, Raphael S. Steiner +*/ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "osp/bsp/scheduler/MaxBspScheduler.hpp" + +namespace osp { + +template +struct GrowLocalSSPParams { + VertT minSuperstepSize_ = 10; + WeightT syncCostMultiplierMinSuperstepWeight_ = 1; + WeightT syncCostMultiplierParallelCheck_ = 4; +}; + +template +class GrowLocalSSP : public MaxBspScheduler { + static_assert(isDirectedGraphV); + static_assert(hasVertexWeightsV); + + private: + using VertexType = VertexIdxT; + + static constexpr unsigned staleness{2U}; + GrowLocalSSPParams, VWorkwT> params_; + + typename std::deque::difference_type maxAllReadyUsage(const std::deque ¤tlyReady, + const std::deque &nextSuperstepReady) const; + + public: + ReturnStatus ComputeSchedule(BspSchedule &schedule) override; + ReturnStatus ComputeSchedule(MaxBspSchedule &schedule) override; + + std::string GetScheduleName() const override { return "GrowLocalSSP"; } +}; + +template +typename std::deque>::difference_type GrowLocalSSP::maxAllReadyUsage( + const std::deque> ¤tlyReady, const std::deque> &nextSuperstepReady) const { + if constexpr (staleness == 1U) { + return std::distance(currentlyReady.cbegin(), currentlyReady.cend()); + } else { + typename std::deque::difference_type lengthCurrently + = std::distance(currentlyReady.cbegin(), currentlyReady.cend()); + typename std::deque::difference_type lengthNext + = std::distance(nextSuperstepReady.cbegin(), nextSuperstepReady.cend()); + + typename std::deque::difference_type ans = ((lengthCurrently + lengthNext + 2) / 3) * 2; + + return ans; + } +} + +template +ReturnStatus GrowLocalSSP::ComputeSchedule(BspSchedule &schedule) { + return MaxBspScheduler::ComputeSchedule(schedule); +} + +template +ReturnStatus GrowLocalSSP::ComputeSchedule(MaxBspSchedule &schedule) { + const BspInstance &instance = schedule.GetInstance(); + const GraphT &graph = instance.GetComputationalDag(); + const VertexType numVertices = graph.NumVertices(); + const unsigned numProcs = instance.NumberOfProcessors(); + + std::deque currentlyReady; // vertices ready in current superstep + + std::array, staleness> futureReady; + // For i = 1,2,..,staleness, the vertices in futureReady[(superstep + i) % staleness] becomes ready globally in superstep + i + std::deque bestFutureReady; + // vertices to be added to futureReady[superstep % staleness] which become ready globally in superstep + staleness + + std::vector>> currentProcReadyHeaps(numProcs); + std::vector>> bestCurrentProcReadyHeaps(numProcs); + + std::array>>, staleness> procReady; + // For i = 0,1,2,..,staleness-1 and p processor, the vertices in procReady[(superstep + i) % staleness][p] are ready locally + // in superstep + i on processor p + std::array>>, staleness> procReadyAdditions; + std::array>>, staleness> bestProcReadyAdditions; + + for (auto &arrVal : procReady) { + arrVal = std::vector>>(numProcs); + } + for (auto &arrVal : procReadyAdditions) { + arrVal = std::vector>>(numProcs); + } + for (auto &arrVal : bestProcReadyAdditions) { + arrVal = std::vector>>(numProcs); + } + + std::vector predec(numVertices); + for (const auto vert : graph.Vertices()) { + predec[vert] = graph.InDegree(vert); + if (predec[vert] == 0U) { + currentlyReady.emplace_back(vert); + } + } + if constexpr (not hasVerticesInTopOrderV) { + std::sort(currentlyReady.begin(), currentlyReady.end(), std::less<>{}); + } + + std::vector> newAssignments(numProcs); + std::vector> bestNewAssignments(numProcs); + + const VWorkwT minWeightParallelCheck = params_.syncCostMultiplierParallelCheck_ * instance.SynchronisationCosts(); + const VWorkwT minSuperstepWeight = params_.syncCostMultiplierMinSuperstepWeight_ * instance.SynchronisationCosts(); + + double desiredParallelism = static_cast(numProcs); + + VertexType totalAssigned = 0; + unsigned superStep = 0U; + + while (totalAssigned < numVertices) { + const unsigned reducedSuperStep = superStep % staleness; + + std::deque &stepFutureReady = futureReady[reducedSuperStep]; + std::sort(stepFutureReady.begin(), stepFutureReady.end(), std::less<>{}); + const typename std::deque::difference_type lengthCurrentlyReady + = std::distance(currentlyReady.begin(), currentlyReady.end()); + currentlyReady.insert(currentlyReady.end(), stepFutureReady.begin(), stepFutureReady.end()); + std::inplace_merge(currentlyReady.begin(), std::next(currentlyReady.begin(), lengthCurrentlyReady), currentlyReady.end(), std::less<>{}); + + const typename std::deque::difference_type maxCurrentlyReadyUsage + = std::max(static_cast::difference_type>( + static_cast(params_.minSuperstepSize_) * desiredParallelism), + maxAllReadyUsage(currentlyReady, futureReady[(superStep + 1U) % staleness])); + + std::vector>> &stepProcReady = procReady[reducedSuperStep]; + for (auto &procHeap : stepProcReady) { + std::make_heap(procHeap.begin(), procHeap.end(), std::greater<>{}); // min heap + } + + VertexType limit = params_.minSuperstepSize_; + double bestScore = std::numeric_limits::lowest(); + double bestParallelism = 0.0; + + typename std::deque::const_iterator currentlyReadyIter; + typename std::deque::const_iterator bestcurrentlyReadyIter; + + bool continueSuperstepAttemps = true; + + while (continueSuperstepAttemps) { + for (auto &procAssignments : newAssignments) { + procAssignments.clear(); + } + stepFutureReady.clear(); + currentProcReadyHeaps = stepProcReady; + + currentlyReadyIter = currentlyReady.cbegin(); + + for (auto &stepProcReadyAdditions : procReadyAdditions) { + for (auto &localStepProcReadyAdditions : stepProcReadyAdditions) { + localStepProcReadyAdditions.clear(); + } + } + + VertexType newTotalAssigned = 0; + VWorkwT weightLimit = 0; + VWorkwT totalWeightAssigned = 0; + + // Processor 0 + constexpr unsigned proc0{0U}; + while (newAssignments[proc0].size() < limit) { + std::vector> &proc0Heap = currentProcReadyHeaps[proc0]; + VertexType chosenNode = std::numeric_limits::max(); + { + if (proc0Heap.size() != 0U) { + std::pop_heap(proc0Heap.begin(), proc0Heap.end(), std::greater<>{}); + chosenNode = proc0Heap.back().first; + proc0Heap.pop_back(); + } else if (currentlyReadyIter != currentlyReady.cend()) { + chosenNode = *currentlyReadyIter; + ++currentlyReadyIter; + } else { + break; + } + } + + newAssignments[proc0].push_back(chosenNode); + schedule.SetAssignedProcessor(chosenNode, proc0); + schedule.SetAssignedSuperstepNoUpdateNumSuperstep(chosenNode, superStep); + ++newTotalAssigned; + weightLimit += graph.VertexWorkWeight(chosenNode); + + for (const VertexType &succ : graph.Children(chosenNode)) { + if (--predec[succ] == 0) { + unsigned earliest = superStep; + for (const VertexType &par : graph.Parents(succ)) { + const bool sameProc = (schedule.AssignedProcessor(par) == proc0); + const unsigned constraint = sameProc ? superStep : schedule.AssignedSuperstep(par) + staleness; + earliest = std::max(earliest, constraint); + } + + if (earliest <= superStep) { + proc0Heap.emplace_back(succ, superStep + staleness); + std::push_heap(proc0Heap.begin(), proc0Heap.end(), std::greater<>{}); + } else if (earliest < superStep + staleness) { + procReadyAdditions[earliest % staleness][proc0].emplace_back(succ, superStep + staleness); + } else { + stepFutureReady.emplace_back(succ); + } + } + } + } // end while assigning + + totalWeightAssigned += weightLimit; + + // Processors 1 through P-1 + for (unsigned proc = 1U; proc < numProcs; ++proc) { + VWorkwT currentWeightAssigned = 0; + while (currentWeightAssigned < weightLimit) { + std::vector> &procHeap = currentProcReadyHeaps[proc]; + VertexType chosenNode = std::numeric_limits::max(); + { + if (procHeap.size() != 0U) { + std::pop_heap(procHeap.begin(), procHeap.end(), std::greater<>{}); + chosenNode = procHeap.back().first; + procHeap.pop_back(); + } else if (currentlyReadyIter != currentlyReady.cend()) { + chosenNode = *currentlyReadyIter; + ++currentlyReadyIter; + } else { + break; + } + } + + newAssignments[proc].push_back(chosenNode); + schedule.SetAssignedProcessor(chosenNode, proc); + schedule.SetAssignedSuperstepNoUpdateNumSuperstep(chosenNode, superStep); + ++newTotalAssigned; + currentWeightAssigned += graph.VertexWorkWeight(chosenNode); + + for (const VertexType &succ : graph.Children(chosenNode)) { + if (--predec[succ] == 0) { + unsigned earliest = superStep; + for (const VertexType &par : graph.Parents(succ)) { + const bool sameProc = (schedule.AssignedProcessor(par) == proc); + const unsigned constraint = sameProc ? superStep : schedule.AssignedSuperstep(par) + staleness; + earliest = std::max(earliest, constraint); + } + + if (earliest <= superStep) { + procHeap.emplace_back(succ, superStep + staleness); + std::push_heap(procHeap.begin(), procHeap.end(), std::greater<>{}); + } else if (earliest < superStep + staleness) { + procReadyAdditions[earliest % staleness][proc].emplace_back(succ, superStep + staleness); + } else { + stepFutureReady.emplace_back(succ); + } + } + } + } // end while assigning + weightLimit = std::max(weightLimit, currentWeightAssigned); + totalWeightAssigned += currentWeightAssigned; + } // end processor loops + + bool acceptStep = false; + + double score + = static_cast(totalWeightAssigned) / static_cast(weightLimit + instance.SynchronisationCosts()); + double parallelism = 0.0; + if (weightLimit > 0) { + parallelism = static_cast(totalWeightAssigned) / static_cast(weightLimit); + } + + if (score > 0.99 * bestScore) { // It is possible to make this less strict, i.e. score > 0.98 * best_score. + // The purpose of this would be to encourage larger supersteps. + bestScore = std::max(bestScore, score); + bestParallelism = parallelism; + acceptStep = true; + } else { + continueSuperstepAttemps = false; + } + + if (weightLimit >= minWeightParallelCheck) { + if (parallelism < std::max(2.0, 0.8 * desiredParallelism)) { + continueSuperstepAttemps = false; + } + } + + if (weightLimit <= minSuperstepWeight) { + continueSuperstepAttemps = true; + if (totalAssigned + newTotalAssigned == numVertices) { + acceptStep = true; + continueSuperstepAttemps = false; + } + } + + if (currentlyReadyIter == currentlyReady.cend()) { + continueSuperstepAttemps = false; + } + + if (std::distance(currentlyReady.cbegin(), currentlyReadyIter) > maxCurrentlyReadyUsage) { + continueSuperstepAttemps = false; + } + + if (totalAssigned + newTotalAssigned == numVertices) { + continueSuperstepAttemps = false; + } + + // Undo predec decreases + for (const auto &newLocalAssignments : newAssignments) { + for (const VertexType &node : newLocalAssignments) { + for (const VertexType &succ : graph.Children(node)) { + ++predec[succ]; + } + } + } + + if (acceptStep) { + std::swap(bestFutureReady, stepFutureReady); + std::swap(bestProcReadyAdditions, procReadyAdditions); + std::swap(bestcurrentlyReadyIter, currentlyReadyIter); + std::swap(bestNewAssignments, newAssignments); + std::swap(bestCurrentProcReadyHeaps, currentProcReadyHeaps); + } + + limit++; + limit += (limit / 2); + } + + // apply best iteration + currentlyReady.erase(currentlyReady.begin(), bestcurrentlyReadyIter); + std::swap(futureReady[reducedSuperStep], bestFutureReady); + + for (auto &localProcReady : procReady[reducedSuperStep]) { + localProcReady.clear(); + } + + const unsigned nextSuperStep = superStep + 1U; + for (unsigned proc = 0U; proc < numProcs; ++proc) { + for (const auto &vertStepPair : bestCurrentProcReadyHeaps[proc]) { + if (vertStepPair.second <= nextSuperStep) { + futureReady[nextSuperStep % staleness].emplace_back(vertStepPair.first); + } else { + procReady[nextSuperStep % staleness][proc].emplace_back(vertStepPair); + } + } + } + + for (std::size_t stepInd = 0U; stepInd < staleness; ++stepInd) { + for (unsigned proc = 0U; proc < numProcs; ++proc) { + procReady[stepInd][proc].insert(procReady[stepInd][proc].end(), + bestProcReadyAdditions[stepInd][proc].begin(), + bestProcReadyAdditions[stepInd][proc].end()); + } + } + + for (unsigned proc = 0U; proc < numProcs; ++proc) { + totalAssigned += bestNewAssignments[proc].size(); + for (const VertexType &node : bestNewAssignments[proc]) { + schedule.SetAssignedProcessor(node, proc); + + for (const VertexType &succ : graph.Children(node)) { + --predec[succ]; + } + } + } + + ++superStep; + desiredParallelism = (0.3 * desiredParallelism) + (0.6 * bestParallelism) + + (0.1 * static_cast(numProcs)); // weights should sum up to one + } + + schedule.SetNumberOfSupersteps(superStep); + + return ReturnStatus::OSP_SUCCESS; +} + +} // end namespace osp diff --git a/tests/max_bsp_schedulers.cpp b/tests/max_bsp_schedulers.cpp index 908b0493..46a53511 100644 --- a/tests/max_bsp_schedulers.cpp +++ b/tests/max_bsp_schedulers.cpp @@ -27,15 +27,38 @@ limitations under the License. #include "osp/auxiliary/io/hdag_graph_file_reader.hpp" #include "osp/bsp/scheduler/GreedySchedulers/GreedyBspScheduler.hpp" #include "osp/bsp/scheduler/GreedySchedulers/GreedyVarianceSspScheduler.hpp" +#include "osp/bsp/scheduler/GreedySchedulers/GrowLocalMaxBsp.hpp" #include "osp/bsp/scheduler/MaxBspScheduler.hpp" +#include "osp/graph_implementations/adj_list_impl/compact_sparse_graph.hpp" #include "osp/graph_implementations/adj_list_impl/computational_dag_edge_idx_vector_impl.hpp" #include "osp/graph_implementations/adj_list_impl/computational_dag_vector_impl.hpp" #include "test_graphs.hpp" using namespace osp; +using VImpl1 = CDagVertexImpl; +using VImpl2 = CDagVertexImpl; + std::vector TestArchitectures() { return {"data/machine_params/p3.arch"}; } +template +void checkPrecedenceContraints(const BspSchedule &schedule, const unsigned staleness) { + for (const auto &v : schedule.GetInstance().GetComputationalDag().Vertices()) { + BOOST_CHECK_LT(schedule.AssignedSuperstep(v), schedule.NumberOfSupersteps()); + + for (const auto &chld : schedule.GetInstance().GetComputationalDag().Children(v)) { + const unsigned sameProcessors = (schedule.AssignedProcessor(v) == schedule.AssignedProcessor(chld)) ? 0U : staleness; + + BOOST_CHECK_LE(schedule.AssignedSuperstep(v) + sameProcessors, schedule.AssignedSuperstep(chld)); + if (schedule.AssignedSuperstep(v) + sameProcessors > schedule.AssignedSuperstep(chld)) { + std::cout << "Vertex: " << v << " (S:" << schedule.AssignedSuperstep(v) << " P:" << schedule.AssignedProcessor(v) + << ")" << " Child: " << chld << " (S:" << schedule.AssignedSuperstep(chld) + << " P:" << schedule.AssignedProcessor(chld) << ")" << '\n'; + } + } + } +} + template void RunTest(Scheduler *testScheduler) { // static_assert(std::is_base_of::value, "Class is not a scheduler!"); @@ -61,22 +84,25 @@ void RunTest(Scheduler *testScheduler) { std::cout << "Graph: " << nameGraph << std::endl; std::cout << "Architecture: " << nameMachine << std::endl; - BspInstance instance; + ComputationalDagVectorImpl graph; + BspArchitecture arch; - bool statusGraph = file_reader::ReadGraph((cwd / filenameGraph).string(), instance.GetComputationalDag()); - bool statusArchitecture - = file_reader::ReadBspArchitecture((cwd / "data/machine_params/p3.arch").string(), instance.GetArchitecture()); + bool statusGraph = file_reader::ReadGraph((cwd / filenameGraph).string(), graph); + bool statusArchitecture = file_reader::ReadBspArchitecture((cwd / filenameMachine).string(), arch); if (!statusGraph || !statusArchitecture) { std::cout << "Reading files failed." << std::endl; BOOST_CHECK(false); } + BspInstance instance(graph, arch); + BspSchedule schedule(instance); const auto result = testScheduler->ComputeSchedule(schedule); BOOST_CHECK_EQUAL(ReturnStatus::OSP_SUCCESS, result); BOOST_CHECK(schedule.SatisfiesPrecedenceConstraints()); + checkPrecedenceContraints(schedule, 1U); } } } @@ -104,7 +130,7 @@ void RunTestMaxBsp(MaxBspScheduler *testScheduler) { << "Graph: " << nameGraph << std::endl << "Architecture: " << nameMachine << std::endl; - ComputationalDagEdgeIdxVectorImplDefIntT graph; + ComputationalDagVectorImpl graph; BspArchitecture arch; bool statusGraph = file_reader::ReadGraph((cwd / filenameGraph).string(), graph); @@ -121,24 +147,37 @@ void RunTestMaxBsp(MaxBspScheduler *testScheduler) { BOOST_CHECK_EQUAL(result, ReturnStatus::OSP_SUCCESS); BOOST_CHECK(schedule.SatisfiesPrecedenceConstraints()); + checkPrecedenceContraints(schedule, 2U); } } } // Tests ComputeSchedule(BspSchedule&) → staleness = 1 BOOST_AUTO_TEST_CASE(GreedyVarianceSspSchedulerTestVectorImpl) { - GreedyVarianceSspScheduler test; + GreedyVarianceSspScheduler> test; RunTest(&test); } // Tests ComputeSchedule(BspSchedule&) → staleness = 1 (different graph impl) BOOST_AUTO_TEST_CASE(GreedyVarianceSspSchedulerTestEdgeIdxImpl) { - GreedyVarianceSspScheduler test; + GreedyVarianceSspScheduler> test; RunTest(&test); } // Tests ComputeSchedule(MaxBspSchedule&) → staleness = 2 BOOST_AUTO_TEST_CASE(GreedyVarianceSspSchedulerMaxBspScheduleLargeTest) { - GreedyVarianceSspScheduler test; + GreedyVarianceSspScheduler> test; + RunTestMaxBsp(&test); +} + +// Tests ComputeSchedule(BspSchedule&) → staleness = 1 +BOOST_AUTO_TEST_CASE(GrowLocalSSPBspScheduleLargeTest) { + GrowLocalSSP> test; + RunTest(&test); +} + +// Tests ComputeSchedule(MaxBspSchedule&) → staleness = 2 +BOOST_AUTO_TEST_CASE(GrowLocalSSPMaxBspScheduleLargeTest) { + GrowLocalSSP> test; RunTestMaxBsp(&test); }