Skip to content

Commit 7e4821f

Browse files
committed
progress on decimation method
1 parent 1a71b03 commit 7e4821f

8 files changed

Lines changed: 89 additions & 41 deletions

Brainstorm_and_TODOs.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@
8989

9090
- [ ] new `ParameterizedDecimation` class
9191
- members:
92+
- [x] `std::vector<size_t> stride_`
93+
- [x] `std::string interpolation_method_` (default to empty string)
94+
- [x] `double cost_per_element_`
95+
- [x] `std::vector<size_t> reduced_shape_`
96+
- [x] `std::unordered_map<sg4::ActorPtr, std::pair<std::vector<size_t>, std::vector<size_t>>> reduced_local_start_and_count_`
97+
- [x] `size_t element_size_`
9298
- methods and behavior:
9399

94100

include/dtlmod/CompressionReductionMethod.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class CompressionReductionMethod : public ReductionMethod {
6666
throw std::runtime_error("not implemented");
6767
// return;// std::make_pair(std::vector<size_t>(), std::vector<size_t>());
6868
}
69+
[[nodiscard]] double get_flop_amount_to_reduce_variable(std::shared_ptr<Variable> var) const override { return 0.0; }
6970
};
7071
/// \endcond
7172
} // namespace dtlmod

include/dtlmod/DTLException.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,16 @@ DECLARE_DTLMOD_EXCEPTION(UnknownOpenModeException, "Unknown open mode. Should be
4848

4949
DECLARE_DTLMOD_EXCEPTION(UnknownVariableException, "Unknown Variable");
5050
DECLARE_DTLMOD_EXCEPTION(MultipleVariableDefinitionException, "Multiple Variable Definition");
51-
DECLARE_DTLMOD_EXCEPTION(InconsistentVariableDefinitionException,
52-
"Insconsistent Variable Definition. The 'shape', 'start', and 'count' vectors must "
53-
"have the same size");
51+
DECLARE_DTLMOD_EXCEPTION(InconsistentVariableDefinitionException, "Insconsistent Variable Definition");
5452
DECLARE_DTLMOD_EXCEPTION(IncorrectPathDefinitionException, "Fullpath must be structured as follows: "
5553
"netzone_name:file_system_name:/path/to/file_name");
5654

5755
DECLARE_DTLMOD_EXCEPTION(GetWhenNoTransactionException, "Impossible to get. No transaction exists for variable");
5856

57+
DECLARE_DTLMOD_EXCEPTION(InconsistentDecimationStrideException, "Insconsistent Decimation Stride definition");
58+
DECLARE_DTLMOD_EXCEPTION(UnknownDecimationOptionException, "Unknown Decimation option");
59+
DECLARE_DTLMOD_EXCEPTION(UnknownDecimationInterpolationException, "Unknown Decimation interpolation method");
60+
5961
} // namespace dtlmod
6062

6163
#endif // DTLMOD_EXCEPTION_HPP

include/dtlmod/DecimationReductionMethod.hpp

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
#include <boost/algorithm/string.hpp>
1010
#include <boost/algorithm/string/split.hpp>
1111
#include <cmath>
12+
#include <numeric> // for std::accumulate
1213

14+
#include "dtlmod/DTLException.hpp"
1315
#include "dtlmod/ReductionMethod.hpp"
1416
#include "dtlmod/Variable.hpp"
1517

@@ -21,17 +23,16 @@ namespace dtlmod {
2123

2224
class ParameterizedDecimation {
2325
friend class DecimationReductionMethod;
26+
std::shared_ptr<Variable> var_; // The variable to which this parameterized decimation is applied
27+
2428
std::vector<size_t> stride_;
2529
std::string interpolation_method_ = "";
2630
double cost_per_element_;
2731

2832
std::vector<size_t> reduced_shape_;
2933
std::unordered_map<sg4::ActorPtr, std::pair<std::vector<size_t>, std::vector<size_t>>> reduced_local_start_and_count_;
30-
size_t element_size_;
3134

3235
protected:
33-
[[nodiscard]] const std::vector<size_t>& get_stride() const { return stride_; }
34-
3536
void set_reduced_shape(const std::vector<size_t>& reduced_shape) { reduced_shape_ = reduced_shape; }
3637
void set_reduced_local_start_and_count(
3738
std::unordered_map<sg4::ActorPtr, std::pair<std::vector<size_t>, std::vector<size_t>>>
@@ -40,36 +41,48 @@ class ParameterizedDecimation {
4041
reduced_local_start_and_count_ = reduced_local_start_and_count;
4142
}
4243

44+
[[nodiscard]] const std::vector<size_t>& get_stride() const { return stride_; }
45+
46+
[[nodiscard]] const std::vector<size_t>& get_reduced_shape() const { return reduced_shape_; }
47+
4348
[[nodiscard]] size_t get_global_reduced_size() const
4449
{
45-
size_t total_size = element_size_;
46-
for (const auto& s : reduced_shape_)
47-
total_size *= s;
48-
return total_size;
50+
return std::accumulate(reduced_shape_.begin(), reduced_shape_.end(), var_->get_element_size(), std::multiplies<>{});
4951
}
5052

5153
[[nodiscard]] size_t get_local_reduced_size() const
5254
{
53-
size_t total_size = element_size_;
54-
auto issuer = sg4::Actor::self();
55-
for (const auto& c : reduced_local_start_and_count_.at(issuer).second)
56-
total_size *= c;
57-
return total_size;
55+
auto start_and_count = reduced_local_start_and_count_.at(sg4::Actor::self()).second;
56+
return std::accumulate(start_and_count.begin(), start_and_count.end(), var_->get_element_size(),
57+
std::multiplies<>{});
5858
}
59-
[[nodiscard]] const std::vector<size_t>& get_reduced_shape() const { return reduced_shape_; }
59+
6060
[[nodiscard]] const std::pair<std::vector<size_t>, std::vector<size_t>>&
6161
get_reduced_start_and_count_for(sg4::ActorPtr publisher) const
6262
{
6363
return reduced_local_start_and_count_.at(publisher);
6464
}
6565

66+
[[nodiscard]] double get_flop_amount_to_decimate() const
67+
{
68+
double amount = cost_per_element_;
69+
if (interpolation_method_.empty()) {
70+
amount *= var_->get_local_size();
71+
} else if (interpolation_method_ == "linear") {
72+
amount = 2 * amount * var_->get_local_size();
73+
} else if (interpolation_method_ == "quadratic") {
74+
amount = 4 * amount * var_->get_local_size();
75+
} else if (interpolation_method_ == "cubic") {
76+
amount = 8 * amount * var_->get_local_size();
77+
} else
78+
throw UnknownDecimationInterpolationException(XBT_THROW_POINT, interpolation_method_.c_str());
79+
return amount;
80+
}
81+
6682
public:
67-
ParameterizedDecimation(const std::vector<size_t> stride, const std::string interpolation_method,
68-
double cost_per_element, size_t element_size)
69-
: stride_(stride)
70-
, interpolation_method_(interpolation_method)
71-
, cost_per_element_(cost_per_element)
72-
, element_size_(element_size)
83+
ParameterizedDecimation(std::shared_ptr<Variable> var, const std::vector<size_t> stride,
84+
const std::string interpolation_method, double cost_per_element)
85+
: var_(var), stride_(stride), interpolation_method_(interpolation_method), cost_per_element_(cost_per_element)
7386
{
7487
}
7588
};
@@ -89,31 +102,34 @@ class DecimationReductionMethod : public ReductionMethod {
89102
if (key == "stride") {
90103
std::vector<std::string> tokens;
91104
boost::split(tokens, value, boost::is_any_of(","), boost::token_compress_on);
92-
// TODO Add Sanity check that the number of tokens equals the number of dimension of var
93-
for (const auto t : tokens)
105+
if (var->get_shape().size() != tokens.size())
106+
throw InconsistentDecimationStrideException(
107+
XBT_THROW_POINT, "Decimation Stride and Variable Shape vectors must have the same size. Stride: " +
108+
std::to_string(tokens.size()) +
109+
", Shape: " + std::to_string(var->get_shape().size()));
110+
for (const auto& t : tokens)
94111
stride.push_back(std::stoul(t));
95112
} else if (key == "interpolation") {
96113
interpolation_method = value;
97114
} else if (key == "cost_per_element") {
98115
cost_per_element = std::stod(value);
99-
} // else
100-
// TODO handle invalid key
116+
} else
117+
throw UnknownDecimationOptionException(XBT_THROW_POINT, key.c_str());
101118
}
102119

103120
per_variable_parameterizations_.try_emplace(
104-
var, std::make_shared<ParameterizedDecimation>(stride, interpolation_method, cost_per_element,
105-
var->get_element_size()));
121+
var, std::make_shared<ParameterizedDecimation>(var, stride, interpolation_method, cost_per_element));
106122
}
107123

108124
void reduce_variable(std::shared_ptr<Variable> var)
109125
{
110126
auto parameterization = per_variable_parameterizations_[var];
111-
auto shape = var->get_shape();
127+
auto original_shape = var->get_shape();
112128
auto stride = parameterization->get_stride();
113129

114130
std::vector<size_t> reduced_shape;
115131
size_t i = 0;
116-
for (auto dim_size : shape)
132+
for (auto dim_size : original_shape)
117133
reduced_shape.push_back(std::ceil(dim_size / (stride[i++] * 1.0)));
118134

119135
std::unordered_map<sg4::ActorPtr, std::pair<std::vector<size_t>, std::vector<size_t>>>
@@ -123,11 +139,11 @@ class DecimationReductionMethod : public ReductionMethod {
123139
std::vector<size_t> reduced_start;
124140
std::vector<size_t> reduced_count;
125141

126-
for (size_t i = 0; i < shape.size(); i++) {
142+
for (size_t i = 0; i < original_shape.size(); i++) {
127143
// Sanity checks that shape, start, and count have the same size have already been done
128144
size_t r_start = std::ceil(start[i] / (stride[i] * 1.0));
129145
size_t r_next_start =
130-
std::min(shape[i], static_cast<size_t>(std::ceil((start[i] + count[i]) / (stride[i] * 1.0))));
146+
std::min(original_shape[i], static_cast<size_t>(std::ceil((start[i] + count[i]) / (stride[i] * 1.0))));
131147
XBT_CDEBUG(dtlmod, "Dim %lu: stride = %lu, Start = %lu, r_start = %lu, Count = %lu, r_count = %lu", i,
132148
stride[i], start[i], r_start, count[i], r_next_start - r_start);
133149
reduced_start.push_back(r_start);
@@ -149,10 +165,17 @@ class DecimationReductionMethod : public ReductionMethod {
149165
{
150166
return per_variable_parameterizations_.at(var)->get_local_reduced_size();
151167
}
168+
169+
[[nodiscard]] double get_flop_amount_to_reduce_variable(std::shared_ptr<Variable> var) const override
170+
{
171+
return per_variable_parameterizations_.at(var)->get_flop_amount_to_decimate();
172+
}
173+
152174
[[nodiscard]] const std::vector<size_t>& get_reduced_variable_shape(std::shared_ptr<Variable> var) const override
153175
{
154176
return per_variable_parameterizations_.at(var)->get_reduced_shape();
155177
}
178+
156179
[[nodiscard]] const std::pair<std::vector<size_t>, std::vector<size_t>>&
157180
get_reduced_start_and_count_for(std::shared_ptr<Variable> var, sg4::ActorPtr publisher) const override
158181
{

include/dtlmod/ReductionMethod.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class ReductionMethod {
3535
virtual const std::vector<size_t>& get_reduced_variable_shape(std::shared_ptr<Variable> var) const = 0;
3636
virtual const std::pair<std::vector<size_t>, std::vector<size_t>>&
3737
get_reduced_start_and_count_for(std::shared_ptr<Variable> var, simgrid::s4u::ActorPtr publisher) const = 0;
38+
virtual double get_flop_amount_to_reduce_variable(std::shared_ptr<Variable> var) const = 0;
39+
3840
/// @brief Helper function to print out the name of the ReductionMethod.
3941
/// @return The corresponding string
4042
[[nodiscard]] const std::string& get_name() const { return name_; }

src/Engine.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,12 @@ void Engine::begin_transaction()
3838
void Engine::put(std::shared_ptr<Variable> var) const
3939
{
4040
if (var->is_reduced()) {
41-
XBT_INFO("Put of a reduced version of %s (initial size = %lu, reduced size = %lu)", var->get_cname(),
42-
var->get_local_size(), var->get_reduction_method()->get_reduced_variable_local_size(var));
41+
// Perform a Exec activity before putting the variable into the DTL to account for the time needed to reduce it.
42+
sg4::this_actor::execute(var->get_reduction_method()->get_flop_amount_to_reduce_variable(var));
43+
XBT_DEBUG("Variable %s has been reduced!", var->get_cname());
44+
// Now put the reduced version of the variable into the DTL, i.e., using its reduced local size.
45+
XBT_DEBUG("Put this reduced version of %s (initial size = %lu, reduced size = %lu)", var->get_cname(),
46+
var->get_local_size(), var->get_reduction_method()->get_reduced_variable_local_size(var));
4347
transport_->put(var, var->get_reduction_method()->get_reduced_variable_local_size(var));
4448
} else
4549
transport_->put(var, var->get_local_size());

src/FileEngine.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ void FileEngine::end_pub_transaction()
115115
XBT_DEBUG("Start the %d publish activities for the transaction", file_pub_transaction_[self].size());
116116
for (const auto& [file, size] : to_write) {
117117
auto write = file->write_async(size, true);
118-
write->on_this_completion_cb([this, self, write](sg4::Io const&) {
118+
write->on_this_completion_cb([this, self, write, size](sg4::Io const&) {
119+
XBT_DEBUG("%llu bytes have been written for Actor %s", size, self->get_cname());
119120
pub_activities_completed_->notify_all();
120121
file_pub_transaction_[self].erase(write);
121122
});

test/dtl_reduction.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ class DTLReductionTest : public ::testing::Test {
3232
void setup_platform()
3333
{
3434
auto* zone = sg4::Engine::get_instance()->get_netzone_root()->add_netzone_empty("zone");
35-
host_ = zone->add_host("host", "1Gf");
36-
disk_ = host_->add_disk("disk", "5.5GBps", "2.1GBps");
35+
host_ = zone->add_host("host", "6Gf");
36+
disk_ = host_->add_disk("disk", "560MBps", "510MBps");
3737
zone->seal();
3838

3939
auto my_fs = sgfs::FileSystem::create("my_fs");
@@ -49,6 +49,7 @@ class DTLReductionTest : public ::testing::Test {
4949
TEST_F(DTLReductionTest, SimpleDecimationFileEngine)
5050
{
5151
DO_TEST_WITH_FORK([this]() {
52+
xbt_log_control_set("dtlmod_engine.thresh:debug");
5253
this->setup_platform();
5354
host_->add_actor("TestActor", [this]() {
5455
std::shared_ptr<dtlmod::ReductionMethod> decimator;
@@ -64,10 +65,6 @@ TEST_F(DTLReductionTest, SimpleDecimationFileEngine)
6465
stream->define_variable("var3D", {640, 640, 640}, {0, 0, 0}, {640, 640, 640}, sizeof(double));
6566
XBT_INFO("Define a Decimation Reduction Method");
6667
ASSERT_NO_THROW(decimator = stream->define_reduction_method("decimation"));
67-
XBT_INFO("Assign the decimation method to 'var3D'");
68-
ASSERT_NO_THROW(var->set_reduction_operation(decimator, {{"stride", "1,2,4"}}));
69-
XBT_INFO("Check that the variable is marked as 'reduced'");
70-
ASSERT_TRUE(var->is_reduced());
7168
XBT_INFO("Open the stream in Pulish mode");
7269
auto engine = stream->open("zone:my_fs:/host/scratch/my-working-dir/my-output", dtlmod::Stream::Mode::Publish);
7370
ASSERT_NO_THROW(sg4::this_actor::sleep_for(1));
@@ -77,6 +74,18 @@ TEST_F(DTLReductionTest, SimpleDecimationFileEngine)
7774
ASSERT_NO_THROW(engine->put(var));
7875
XBT_INFO("End a Transaction");
7976
engine->end_transaction();
77+
XBT_INFO("Sleep until t = 6s");
78+
sg4::this_actor::sleep_until(6);
79+
XBT_INFO("Assign the decimation method to 'var3D'");
80+
ASSERT_NO_THROW(var->set_reduction_operation(decimator, {{"stride", "1,2,4"}}));
81+
XBT_INFO("Check that the variable is marked as 'reduced'");
82+
ASSERT_TRUE(var->is_reduced());
83+
XBT_INFO("Start a Transaction");
84+
engine->begin_transaction();
85+
XBT_INFO("Put reduced Variable 'var' into the DTL");
86+
ASSERT_NO_THROW(engine->put(var));
87+
XBT_INFO("End a Transaction");
88+
engine->end_transaction();
8089
XBT_INFO("Close the engine");
8190
engine->close();
8291

0 commit comments

Comments
 (0)