Skip to content

Commit 50dfe89

Browse files
EnricoMirok
andauthored
apacheGH-26818: [C++][Python] Preserve order when writing dataset multi-threaded (apache#44470) (#5)
### Rationale for this change The order of rows in a dataset might be important for users and should be preserved when writing to a filesystem. With multi-threaded write, the order is currently not guaranteed, ### What changes are included in this PR? Preserving the dataset order of rows requires the `SourceNode` to sequence the fragments output (this keeps exec batches in the order of fragments), to provide an `ImplicitOrdering` (this gives exec batches an index), and the `ConsumingSinkNode` to sequence exec batches (finally preserve order of batches according to their index). User-facing changes: - Add option `preserve_order` to `FileSystemDatasetWriteOptions` (C++) and `arrow.dataset.write_dataset` (Python). Default behaviour is current behaviour. ### Are these changes tested? Unit tests have been added, ### Are there any user-facing changes? Users can set `FileSystemDatasetWriteOptions.preserve_order = true` (C++) / `arrow.dataset.write_dataset(..., preserve_order=True)` (Python). * GitHub Issue: apache#26818 Lead-authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Rok Mihevc <rok@mihevc.org> Co-authored-by: Rok Mihevc <rok@mihevc.org>
1 parent c4331f5 commit 50dfe89

7 files changed

Lines changed: 214 additions & 8 deletions

File tree

cpp/src/arrow/dataset/file_base.cc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,9 +472,14 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
472472

473473
WriteNodeOptions write_node_options(write_options);
474474
write_node_options.custom_schema = custom_schema;
475+
// preserve existing order across fragments by setting require_sequenced_output=true
476+
bool require_sequenced_output = write_node_options.write_options.preserve_order;
477+
// preserve existing order of sequenced scan output by setting implicit_order=true
478+
bool implicit_ordering = write_node_options.write_options.preserve_order;
475479

476480
acero::Declaration plan = acero::Declaration::Sequence({
477-
{"scan", ScanNodeOptions{dataset, scanner->options()}},
481+
{"scan", ScanNodeOptions{dataset, scanner->options(), require_sequenced_output,
482+
implicit_ordering}},
478483
{"filter", acero::FilterNodeOptions{scanner->options()->filter}},
479484
{"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
480485
{"write", std::move(write_node_options)},
@@ -540,8 +545,13 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,
540545

541546
ARROW_ASSIGN_OR_RAISE(
542547
auto node,
548+
// to preserve order explicitly, sequence the exec batches
549+
// this requires exec batch index to be set upstream (e.g. by SourceNode)
543550
acero::MakeExecNode("consuming_sink", plan, std::move(inputs),
544-
acero::ConsumingSinkNodeOptions{std::move(consumer)}));
551+
acero::ConsumingSinkNodeOptions{
552+
std::move(consumer),
553+
{},
554+
/*sequence_output=*/write_options.preserve_order}));
545555

546556
return node;
547557
}

cpp/src/arrow/dataset/file_base.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
399399
/// Partitioning used to generate fragment paths.
400400
std::shared_ptr<Partitioning> partitioning;
401401

402+
/// If true the order of rows in the dataset is preserved when writing with
403+
/// multiple threads. This may cause notable performance degradation.
404+
bool preserve_order = false;
405+
402406
/// Maximum number of partitions any batch may be written into, default is 1K.
403407
int max_partitions = 1024;
404408

cpp/src/arrow/dataset/file_test.cc

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,30 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include <arrow/compute/function.h>
19+
#include <arrow/compute/registry.h>
1820
#include <cstdint>
1921
#include <memory>
2022
#include <string>
23+
#include <thread>
2124
#include <tuple>
2225
#include <vector>
2326

2427
#include <gmock/gmock.h>
2528
#include <gtest/gtest.h>
2629

30+
#include <arrow/dataset/dataset.h>
31+
#include <arrow/dataset/file_base.h>
32+
#include <arrow/record_batch.h>
33+
#include <arrow/util/async_generator.h>
2734
#include "arrow/acero/exec_plan.h"
2835
#include "arrow/acero/test_util_internal.h"
2936
#include "arrow/array/array_primitive.h"
3037
#include "arrow/compute/test_util_internal.h"
3138
#include "arrow/dataset/api.h"
3239
#include "arrow/dataset/partition.h"
3340
#include "arrow/dataset/plan.h"
41+
#include "arrow/dataset/projector.h"
3442
#include "arrow/dataset/test_util_internal.h"
3543
#include "arrow/filesystem/path_util.h"
3644
#include "arrow/filesystem/test_util.h"
@@ -353,6 +361,165 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
353361
}
354362
}
355363

364+
// This kernel delays execution for some specific scalar values,
365+
// which guarantees the writing phase sees out-of-order exec batches
366+
Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
367+
compute::ExecResult* out) {
368+
const ArraySpan& input = batch[0].array;
369+
const auto* input_values = input.GetValues<uint32_t>(1);
370+
uint8_t* output_values = out->array_span()->buffers[1].data;
371+
372+
// Boolean data is stored in 1 bit per value
373+
for (int64_t i = 0; i < input.length; ++i) {
374+
if (input_values[i] % 16 == 0) {
375+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
376+
}
377+
bit_util::SetBitTo(output_values, i, true);
378+
}
379+
380+
return Status::OK();
381+
}
382+
383+
// A fragment with start=0 will defer ScanBatchesAsync returning a batch generator
384+
// This guarantees a dataset of multiple fragments could produce out-of-order batches
385+
class MockFragment : public Fragment {
386+
public:
387+
explicit MockFragment(uint32_t start, int64_t rows_per_batch, int num_batches,
388+
const std::shared_ptr<Schema>& schema)
389+
: Fragment(compute::literal(true), schema),
390+
start_(start),
391+
rows_per_batch_(rows_per_batch),
392+
num_batches_(num_batches) {}
393+
394+
Result<RecordBatchGenerator> ScanBatchesAsync(
395+
const std::shared_ptr<ScanOptions>& options) override {
396+
// Fragment with start_=0 defers returning the generator
397+
if (start_ == 0) {
398+
std::this_thread::sleep_for(std::chrono::duration<double>(0.1));
399+
}
400+
401+
auto vec = gen::Gen({gen::Step(start_)})
402+
->FailOnError()
403+
->RecordBatches(rows_per_batch_, num_batches_);
404+
auto it = MakeVectorIterator(vec);
405+
return MakeBackgroundGenerator(std::move(it), io::default_io_context().executor());
406+
}
407+
408+
std::string type_name() const override { return "mock"; }
409+
410+
protected:
411+
Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
412+
return given_physical_schema_;
413+
};
414+
415+
private:
416+
uint32_t start_;
417+
int64_t rows_per_batch_;
418+
int num_batches_;
419+
};
420+
421+
// This dataset consists of multiple fragments with incrementing values across the
422+
// fragments
423+
class MockDataset : public Dataset {
424+
public:
425+
explicit MockDataset(const std::shared_ptr<Schema>& schema) : Dataset(schema) {}
426+
427+
MockDataset(const std::shared_ptr<Schema>& schema,
428+
const compute::Expression& partition_expression)
429+
: Dataset(schema, partition_expression) {}
430+
431+
std::string type_name() const override { return "mock"; }
432+
Result<std::shared_ptr<Dataset>> ReplaceSchema(
433+
std::shared_ptr<Schema> schema) const override {
434+
RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
435+
return std::make_shared<MockDataset>(std::move(schema));
436+
}
437+
438+
protected:
439+
Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override {
440+
FragmentVector fragments;
441+
fragments.push_back(std::make_shared<MockFragment>(0, 2, 1024, schema_));
442+
fragments.push_back(std::make_shared<MockFragment>(2 * 1024, 2, 1024, schema_));
443+
return MakeVectorIterator(std::move(fragments));
444+
};
445+
};
446+
447+
TEST_F(TestFileSystemDataset, MultiThreadedWritePersistsOrder) {
448+
// Test for GH-26818
449+
//
450+
// This test uses std::this_thread::sleep_for to increase chances for batches
451+
// to get written out-of-order in multi-threaded environment.
452+
// With preserve_order = false, the existence of out-of-order is asserted to
453+
// verify that the test setup reliably writes out-of-order sequences, and
454+
// that write_options.preserve_order = preserve_order can recreate order.
455+
//
456+
// Estimates for out_of_order == false and preserve_order == false to occur
457+
// are 10^-62 https://github.com/apache/arrow/pull/44470#discussion_r2079049038
458+
//
459+
// If this test starts to reliably fail with preserve_order == false, the test setup
460+
// has to be revised to again reliably produce out-of-order sequences.
461+
auto format = std::make_shared<IpcFileFormat>();
462+
FileSystemDatasetWriteOptions write_options;
463+
write_options.file_write_options = format->DefaultWriteOptions();
464+
write_options.base_dir = "root";
465+
write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
466+
write_options.basename_template = "{i}.feather";
467+
468+
// The Mock dataset delays emitting the first fragment, which test sequenced output of
469+
// scan node
470+
auto dataset = std::make_shared<MockDataset>(schema({field("f0", int32())}));
471+
472+
// The delay scalar function delays some batches of all fragments, which tests implicit
473+
// ordering
474+
auto delay_func = std::make_shared<compute::ScalarFunction>("delay", compute::Arity(1),
475+
compute::FunctionDoc());
476+
compute::ScalarKernel delay_kernel;
477+
delay_kernel.exec = delay;
478+
delay_kernel.signature = compute::KernelSignature::Make({int32()}, boolean());
479+
ASSERT_OK(delay_func->AddKernel(delay_kernel));
480+
ASSERT_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
481+
482+
for (bool preserve_order : {true, false}) {
483+
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
484+
ASSERT_OK(scanner_builder->UseThreads(true));
485+
ASSERT_OK(
486+
scanner_builder->Filter(compute::call("delay", {compute::field_ref("f0")})));
487+
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
488+
489+
auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
490+
write_options.filesystem = fs;
491+
write_options.preserve_order = preserve_order;
492+
493+
ASSERT_OK(FileSystemDataset::Write(write_options, scanner));
494+
495+
// Read the file back out and verify the order
496+
ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
497+
fs, {"root/0.feather"}, format, {}));
498+
ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{}));
499+
ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
500+
ASSERT_OK(scanner_builder->UseThreads(false));
501+
ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
502+
ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
503+
TableBatchReader reader(*actual);
504+
std::shared_ptr<RecordBatch> batch;
505+
ASSERT_OK(reader.ReadNext(&batch));
506+
int32_t prev = -1;
507+
auto out_of_order = false;
508+
while (batch != nullptr) {
509+
const auto* values = batch->column(0)->data()->GetValues<int32_t>(1);
510+
for (int row = 0; row < batch->num_rows(); ++row) {
511+
int32_t value = values[row];
512+
if (value <= prev) {
513+
out_of_order = true;
514+
}
515+
prev = value;
516+
}
517+
ASSERT_OK(reader.ReadNext(&batch));
518+
}
519+
ASSERT_EQ(!out_of_order, preserve_order);
520+
}
521+
}
522+
356523
class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool, bool>> {
357524
using PlanFactory = std::function<std::vector<acero::Declaration>(
358525
const FileSystemDatasetWriteOptions&,

python/pyarrow/_dataset.pyx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4092,6 +4092,7 @@ def _filesystemdataset_write(
40924092
str basename_template not None,
40934093
FileSystem filesystem not None,
40944094
Partitioning partitioning not None,
4095+
bool preserve_order,
40954096
FileWriteOptions file_options not None,
40964097
int max_partitions,
40974098
object file_visitor,
@@ -4114,6 +4115,7 @@ def _filesystemdataset_write(
41144115
c_options.filesystem = filesystem.unwrap()
41154116
c_options.base_dir = tobytes(_stringify_path(base_dir))
41164117
c_options.partitioning = partitioning.unwrap()
4118+
c_options.preserve_order = preserve_order
41174119
c_options.max_partitions = max_partitions
41184120
c_options.max_open_files = max_open_files
41194121
c_options.max_rows_per_file = max_rows_per_file

python/pyarrow/dataset.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -848,9 +848,9 @@ def _ensure_write_partitioning(part, schema, flavor):
848848

849849

850850
def write_dataset(data, base_dir, *, basename_template=None, format=None,
851-
partitioning=None, partitioning_flavor=None, schema=None,
852-
filesystem=None, file_options=None, use_threads=True,
853-
max_partitions=None, max_open_files=None,
851+
partitioning=None, partitioning_flavor=None,
852+
schema=None, filesystem=None, file_options=None, use_threads=True,
853+
preserve_order=False, max_partitions=None, max_open_files=None,
854854
max_rows_per_file=None, min_rows_per_group=None,
855855
max_rows_per_group=None, file_visitor=None,
856856
existing_data_behavior='error', create_dir=True):
@@ -893,7 +893,13 @@ def write_dataset(data, base_dir, *, basename_template=None, format=None,
893893
``FileFormat.make_write_options()`` function.
894894
use_threads : bool, default True
895895
Write files in parallel. If enabled, then maximum parallelism will be
896-
used determined by the number of available CPU cores.
896+
used determined by the number of available CPU cores. Using multiple
897+
threads may change the order of rows in the written dataset if
898+
preserve_order is set to False.
899+
preserve_order : bool, default False
900+
Preserve the order of rows. If enabled, order of rows in the dataset are
901+
guaranteed to be preserved even if use_threads is set to True. This may
902+
cause notable performance degradation.
897903
max_partitions : int, default 1024
898904
Maximum number of partitions any batch may be written into.
899905
max_open_files : int, default 1024
@@ -1033,7 +1039,7 @@ def file_visitor(written_file):
10331039

10341040
_filesystemdataset_write(
10351041
scanner, base_dir, basename_template, filesystem, partitioning,
1036-
file_options, max_partitions, file_visitor, existing_data_behavior,
1037-
max_open_files, max_rows_per_file,
1042+
preserve_order, file_options, max_partitions, file_visitor,
1043+
existing_data_behavior, max_open_files, max_rows_per_file,
10381044
min_rows_per_group, max_rows_per_group, create_dir
10391045
)

python/pyarrow/includes/libarrow_dataset.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
221221
shared_ptr[CFileSystem] filesystem
222222
c_string base_dir
223223
shared_ptr[CPartitioning] partitioning
224+
c_bool preserve_order
224225
int max_partitions
225226
c_string basename_template
226227
function[cb_writer_finish_internal] writer_pre_finish

python/pyarrow/tests/test_dataset.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4591,6 +4591,22 @@ def file_visitor(written_file):
45914591
assert result1.to_table().equals(result2.to_table())
45924592

45934593

4594+
@pytest.mark.parquet
4595+
@pytest.mark.pandas
4596+
def test_write_dataset_use_threads_preserve_order(tempdir):
4597+
# see GH-26818
4598+
table = pa.table({"a": range(1024)})
4599+
batches = table.to_batches(max_chunksize=2)
4600+
ds.write_dataset(batches, tempdir, format="parquet",
4601+
use_threads=True, preserve_order=True)
4602+
seq = ds.dataset(tempdir).to_table(use_threads=False)['a'].to_numpy()
4603+
prev = -1
4604+
for item in seq:
4605+
curr = int(item)
4606+
assert curr > prev, f"Sequence expected to be ordered: {seq}"
4607+
prev = curr
4608+
4609+
45944610
def test_write_table(tempdir):
45954611
table = pa.table([
45964612
pa.array(range(20)), pa.array(random.random() for _ in range(20)),

0 commit comments

Comments
 (0)