Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ TEST(FlightIntegration, AuthBasicProto) { ASSERT_OK(RunScenario("auth:basic_prot

TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); }

TEST(FlightIntegration, Alignment) { ASSERT_OK(RunScenario("alignment")); }

TEST(FlightIntegration, Ordered) { ASSERT_OK(RunScenario("ordered")); }

TEST(FlightIntegration, ExpirationTimeDoGet) {
Expand Down
183 changes: 145 additions & 38 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "arrow/table.h"
#include "arrow/table_builder.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/align_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"
Expand Down Expand Up @@ -281,6 +282,137 @@ class MiddlewareScenario : public Scenario {
std::shared_ptr<TestClientMiddlewareFactory> client_middleware_;
};

/// \brief The server used for testing FlightClient data alignment.
///
/// The server always returns the same data of various byte widths.
/// The client should return data that is aligned according to the data type
/// if FlightCallOptions.read_options.ensure_memory_alignment is true.
///
/// This scenario is passed only when the client returns aligned data.
class AlignmentServer : public FlightServerBase {
Status GetFlightInfo(const ServerCallContext& context,
const FlightDescriptor& descriptor,
std::unique_ptr<FlightInfo>* result) override {
auto schema = BuildSchema();
std::vector<FlightEndpoint> endpoints{
FlightEndpoint{{"align-data"}, {}, std::nullopt, ""}};
ARROW_ASSIGN_OR_RAISE(
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
*result = std::make_unique<FlightInfo>(info);
return Status::OK();
}

Status DoGet(const ServerCallContext& context, const Ticket& request,
std::unique_ptr<FlightDataStream>* stream) override {
if (request.ticket != "align-data") {
return Status::KeyError("Could not find flight: ", request.ticket);
}
auto record_batch = RecordBatchFromJSON(BuildSchema(), R"([
[1, 1, false],
[2, 2, true],
[3, 3, false]
])");
std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
RecordBatchReader::Make(record_batches));
*stream = std::make_unique<RecordBatchStream>(record_batch_reader);
return Status::OK();
}

private:
std::shared_ptr<Schema> BuildSchema() {
return arrow::schema({
arrow::field("int32", arrow::int32(), false),
arrow::field("int64", arrow::int64(), false),
arrow::field("bool", arrow::boolean(), false),
});
}
};

/// \brief The alignment scenario.
///
/// This tests that the client provides aligned data if requested.
class AlignmentScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
server->reset(new AlignmentServer());
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

arrow::Result<std::shared_ptr<Table>> GetTable(FlightClient* client,
const FlightCallOptions& call_options) {
ARROW_ASSIGN_OR_RAISE(auto info,
client->GetFlightInfo(FlightDescriptor::Command("alignment")));
std::vector<std::shared_ptr<arrow::Table>> tables;
for (const auto& endpoint : info->endpoints()) {
if (!endpoint.locations.empty()) {
std::stringstream ss;
ss << "[";
for (const auto& location : endpoint.locations) {
if (ss.str().size() != 1) {
ss << ", ";
}
ss << location.ToString();
}
ss << "]";
return Status::Invalid(
"Expected to receive empty locations to use the original service: ",
ss.str());
}
ARROW_ASSIGN_OR_RAISE(auto reader, client->DoGet(call_options, endpoint.ticket));
ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
tables.push_back(table);
}
return ConcatenateTables(tables);
}

Status RunClient(std::unique_ptr<FlightClient> client) override {
for (ipc::Alignment ensure_alignment :
{ipc::Alignment::kAnyAlignment, ipc::Alignment::kDataTypeSpecificAlignment,
ipc::Alignment::k64ByteAlignment}) {
auto call_options = FlightCallOptions();
call_options.read_options.ensure_alignment = ensure_alignment;
ARROW_ASSIGN_OR_RAISE(auto table, GetTable(client.get(), call_options));

// Check read data
auto expected_row_count = 3;
if (table->num_rows() != expected_row_count) {
return Status::Invalid("Read table size isn't expected\n", "Expected rows:\n",
expected_row_count, "Actual rows:\n", table->num_rows());
}
auto expected_column_count = 3;
if (table->num_columns() != expected_column_count) {
return Status::Invalid("Read table size isn't expected\n", "Expected columns:\n",
expected_column_count, "Actual columns:\n",
table->num_columns());
}
// Check data alignment
std::vector<bool> needs_alignment;
if (ensure_alignment == ipc::Alignment::kAnyAlignment) {
// this is not a requirement but merely an observation:
// with ensure_alignment=false, flight client returns mis-aligned data
// if this is not the case any more, feel free to remove this assertion
if (util::CheckAlignment(*table, arrow::util::kValueAlignment,
&needs_alignment)) {
return Status::Invalid(
"Read table has aligned data, which is good, but unprecedented");
}
} else {
// with ensure_alignment != kValueAlignment, we require data to be aligned
// the value of the Alignment enum provides us with the byte alignment value
if (!util::CheckAlignment(*table, static_cast<int64_t>(ensure_alignment),
&needs_alignment)) {
return Status::Invalid("Read table has unaligned data");
}
}
}

return Status::OK();
}
};

/// \brief The server used for testing FlightInfo.ordered.
///
/// If the given command is "ordered", the server sets
Expand Down Expand Up @@ -316,25 +448,16 @@ class OrderedServer : public FlightServerBase {

Status DoGet(const ServerCallContext& context, const Ticket& request,
std::unique_ptr<FlightDataStream>* stream) override {
ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
BuildSchema(), arrow::default_memory_pool()));
auto number_builder = builder->GetFieldAs<Int32Builder>(0);
std::shared_ptr<RecordBatch> record_batch;
if (request.ticket == "1") {
ARROW_RETURN_NOT_OK(number_builder->Append(1));
ARROW_RETURN_NOT_OK(number_builder->Append(2));
ARROW_RETURN_NOT_OK(number_builder->Append(3));
record_batch = RecordBatchFromJSON(BuildSchema(), "[[1], [2], [3]]");
} else if (request.ticket == "2") {
ARROW_RETURN_NOT_OK(number_builder->Append(10));
ARROW_RETURN_NOT_OK(number_builder->Append(20));
ARROW_RETURN_NOT_OK(number_builder->Append(30));
record_batch = RecordBatchFromJSON(BuildSchema(), "[[10], [20], [30]]");
} else if (request.ticket == "3") {
ARROW_RETURN_NOT_OK(number_builder->Append(100));
ARROW_RETURN_NOT_OK(number_builder->Append(200));
ARROW_RETURN_NOT_OK(number_builder->Append(300));
record_batch = RecordBatchFromJSON(BuildSchema(), "[[100], [200], [300]]");
} else {
return Status::KeyError("Could not find flight: ", request.ticket);
}
ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
RecordBatchReader::Make(record_batches));
Expand Down Expand Up @@ -390,19 +513,9 @@ class OrderedScenario : public Scenario {

// Build expected table
auto schema = arrow::schema({arrow::field("number", arrow::int32(), false)});
ARROW_ASSIGN_OR_RAISE(auto builder,
RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
auto number_builder = builder->GetFieldAs<Int32Builder>(0);
ARROW_RETURN_NOT_OK(number_builder->Append(1));
ARROW_RETURN_NOT_OK(number_builder->Append(2));
ARROW_RETURN_NOT_OK(number_builder->Append(3));
ARROW_RETURN_NOT_OK(number_builder->Append(10));
ARROW_RETURN_NOT_OK(number_builder->Append(20));
ARROW_RETURN_NOT_OK(number_builder->Append(30));
ARROW_RETURN_NOT_OK(number_builder->Append(100));
ARROW_RETURN_NOT_OK(number_builder->Append(200));
ARROW_RETURN_NOT_OK(number_builder->Append(300));
ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
auto expected_record_batch = RecordBatchFromJSON(schema, R"([
[1], [2], [3], [10], [20], [30], [100], [200], [300]
])");
std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
expected_record_batch};
ARROW_ASSIGN_OR_RAISE(auto expected_table,
Expand Down Expand Up @@ -490,11 +603,8 @@ class ExpirationTimeServer : public FlightServerBase {
}
}
status.num_gets++;
ARROW_ASSIGN_OR_RAISE(auto builder, RecordBatchBuilder::Make(
BuildSchema(), arrow::default_memory_pool()));
auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
ARROW_RETURN_NOT_OK(number_builder->Append(index));
ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
auto record_batch =
RecordBatchFromJSON(BuildSchema(), "[[" + std::to_string(index) + "]]");
std::vector<std::shared_ptr<RecordBatch>> record_batches{record_batch};
ARROW_ASSIGN_OR_RAISE(auto record_batch_reader,
RecordBatchReader::Make(record_batches));
Expand Down Expand Up @@ -621,13 +731,7 @@ class ExpirationTimeDoGetScenario : public Scenario {

// Build expected table
auto schema = arrow::schema({arrow::field("number", arrow::uint32(), false)});
ARROW_ASSIGN_OR_RAISE(auto builder,
RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
auto number_builder = builder->GetFieldAs<UInt32Builder>(0);
ARROW_RETURN_NOT_OK(number_builder->Append(0));
ARROW_RETURN_NOT_OK(number_builder->Append(1));
ARROW_RETURN_NOT_OK(number_builder->Append(2));
ARROW_ASSIGN_OR_RAISE(auto expected_record_batch, builder->Flush());
auto expected_record_batch = RecordBatchFromJSON(schema, "[[0], [1], [2]]");
std::vector<std::shared_ptr<RecordBatch>> expected_record_batches{
expected_record_batch};
ARROW_ASSIGN_OR_RAISE(auto expected_table,
Expand Down Expand Up @@ -2382,6 +2486,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "middleware") {
*out = std::make_shared<MiddlewareScenario>();
return Status::OK();
} else if (scenario_name == "alignment") {
*out = std::make_shared<AlignmentScenario>();
return Status::OK();
} else if (scenario_name == "ordered") {
*out = std::make_shared<OrderedScenario>();
return Status::OK();
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/ipc/type_fwd.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/align_util.h"
#include "arrow/util/compression.h"
#include "arrow/util/visibility.h"

Expand Down Expand Up @@ -128,6 +129,18 @@ struct ARROW_EXPORT IpcWriteOptions {
static IpcWriteOptions Defaults();
};

/// \brief Alignment of data in memory
/// Alignment values larger than 0 are taken directly as byte alignment value
/// See util::EnsureAlignment(..., int64_t alignment, ...)
enum class Alignment : int64_t {
/// \brief data is aligned depending on the actual data type
kDataTypeSpecificAlignment = util::kValueAlignment,
/// \brief no particular alignment enforced
kAnyAlignment = 0,
/// \brief data is aligned to 64-byte boundary
k64ByteAlignment = 64
};

/// \brief Options for reading Arrow IPC messages
struct ARROW_EXPORT IpcReadOptions {
/// \brief The maximum permitted schema nesting depth.
Expand Down Expand Up @@ -161,6 +174,16 @@ struct ARROW_EXPORT IpcReadOptions {
/// RecordBatchStreamReader and StreamDecoder classes.
bool ensure_native_endian = true;

/// \brief How to align data if mis-aligned
///
/// Data is copied to aligned memory locations allocated via the
/// MemoryPool configured as \ref arrow::ipc::IpcReadOptions::memory_pool.
/// Some use cases might require data to have a specific alignment, for example,
/// for the data buffer of an Int32 array to be aligned on a 4-byte boundary.
///
/// Default (kAnyAlignment) keeps the alignment as is, so no copy of data occurs.
Alignment ensure_alignment = Alignment::kAnyAlignment;

/// \brief Options to control caching behavior when pre-buffering is requested
///
/// The lazy property will always be reset to true to deliver the expected behavior
Expand Down
14 changes: 12 additions & 2 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/align_util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/checked_cast.h"
Expand Down Expand Up @@ -636,8 +637,17 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
arrow::internal::SwapEndianArrayData(filtered_column));
}
}
return RecordBatch::Make(std::move(filtered_schema), metadata->length(),
std::move(filtered_columns));
auto batch = RecordBatch::Make(std::move(filtered_schema), metadata->length(),
std::move(filtered_columns));

if (ARROW_PREDICT_FALSE(context.options.ensure_alignment != Alignment::kAnyAlignment)) {
return util::EnsureAlignment(batch,
// the numerical value of ensure_alignment enum is taken
// literally as byte alignment
static_cast<int64_t>(context.options.ensure_alignment),
context.options.memory_pool);
}
return batch;
}

Result<std::shared_ptr<RecordBatch>> LoadRecordBatch(
Expand Down
13 changes: 10 additions & 3 deletions cpp/src/arrow/util/align_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "arrow/array.h"
#include "arrow/chunked_array.h"
#include "arrow/extension_type.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/type_fwd.h"
Expand All @@ -28,6 +29,8 @@

namespace arrow {

using internal::checked_cast;

namespace util {

bool CheckAlignment(const Buffer& buffer, int64_t alignment) {
Expand All @@ -44,9 +47,13 @@ namespace {
Type::type GetTypeForBuffers(const ArrayData& array) {
Type::type type_id = array.type->storage_id();
if (type_id == Type::DICTIONARY) {
return ::arrow::internal::checked_pointer_cast<DictionaryType>(array.type)
->index_type()
->id();
// return index type id, provided by the DictionaryType array.type or
// array.type->storage_type() if array.type is an ExtensionType
DataType* dict_type = array.type.get();
if (array.type->id() == Type::EXTENSION) {
dict_type = checked_cast<ExtensionType*>(dict_type)->storage_type().get();
}
return checked_cast<DictionaryType*>(dict_type)->index_type()->id();
}
return type_id;
}
Expand Down
13 changes: 11 additions & 2 deletions docs/source/cpp/flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,17 @@ Memory management
-----------------

Flight tries to reuse allocations made by gRPC to avoid redundant
data copies. However, this means that those allocations may not
data copies. However, experience shows that such data is frequently
misaligned. Some use cases might require data to have data type-specific
alignment (for example, for the data buffer of an Int32 array to be aligned
on a 4-byte boundary), which can be enforced
by setting :member:`arrow::ipc::IpcReadOptions::ensure_alignment`
to :member:`arrow::ipc::Alignment::kDataTypeSpecificAlignment`.
This uses the :member:`arrow::ipc::IpcReadOptions::memory_pool`
to allocate memory with aligned addresses, but only for mis-aligned data.
However, this creates data copies of your data received via Flight.

Unless gRPC data are copied as described above, allocations made by gRPC may not
be tracked by the Arrow memory pool, and that memory usage behavior,
such as whether free memory is returned to the system, is dependent
on the allocator that gRPC uses (usually the system allocator).
Expand Down Expand Up @@ -361,5 +371,4 @@ Closing unresponsive connections
.. _ARROW-15764: https://issues.apache.org/jira/browse/ARROW-15764
.. _ARROW-16697: https://issues.apache.org/jira/browse/ARROW-16697
.. _ARROW-6062: https://issues.apache.org/jira/browse/ARROW-6062

.. _gRPC: https://grpc.io/
6 changes: 6 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1874,12 +1874,18 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
@staticmethod
CIpcWriteOptions Defaults()

ctypedef enum CAlignment" arrow::ipc::Alignment":
CAlignment_Any" arrow::ipc::Alignment::kAnyAlignment"
CAlignment_DataTypeSpecific" arrow::ipc::Alignment::kDataTypeSpecificAlignment"
CAlignment_64Byte" arrow::ipc::Alignment::k64ByteAlignment"

cdef cppclass CIpcReadOptions" arrow::ipc::IpcReadOptions":
int max_recursion_depth
CMemoryPool* memory_pool
vector[int] included_fields
c_bool use_threads
c_bool ensure_native_endian
CAlignment ensure_alignment

@staticmethod
CIpcReadOptions Defaults()
Expand Down
Loading
Loading