diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 143f915b3dea..dc9aedbee495 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -154,6 +154,11 @@ include(CMakeParseArguments) include(ExternalProject) include(FindPackageHandleStandardArgs) +option(ENABLE_MEM_POOL_STATS "Enable Memory Pool Statistics" ON) +if(ENABLE_MEM_POOL_STATS) + add_definitions(-DENABLE_MEMORY_POOL_STATS) +endif() + include(GNUInstallDirs) if(IS_ABSOLUTE "${CMAKE_INSTALL_BINDIR}") set(ARROW_PKG_CONFIG_BINDIR "${CMAKE_INSTALL_BINDIR}") diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc index 0759a1ab34c0..82ddbccb0c00 100644 --- a/cpp/src/arrow/acero/plan_test.cc +++ b/cpp/src/arrow/acero/plan_test.cc @@ -1761,12 +1761,15 @@ TEST(ExecPlanExecution, UnalignedInput) { Declaration plan = Declaration::Sequence({ {"exec_batch_source", ExecBatchSourceNodeOptions(data.schema, data.batches)}, }); - +#ifdef ENABLE_MEMORY_POOL_STATS int64_t initial_bytes_allocated = default_memory_pool()->total_bytes_allocated(); +#endif // By default we should warn and so the plan should finish ok ASSERT_OK(DeclarationToStatus(plan)); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); +#endif QueryOptions query_options; @@ -1774,18 +1777,24 @@ TEST(ExecPlanExecution, UnalignedInput) { // Nothing should happen if we ignore alignment query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore; ASSERT_OK(DeclarationToStatus(plan, query_options)); +# ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); +# endif #endif query_options.unaligned_buffer_handling = UnalignedBufferHandling::kError; ASSERT_THAT(DeclarationToStatus(plan, query_options), Raises(StatusCode::Invalid, testing::HasSubstr("An input buffer was poorly aligned"))); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); +#endif query_options.unaligned_buffer_handling = UnalignedBufferHandling::kReallocate; ASSERT_OK(DeclarationToStatus(plan, query_options)); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_LT(initial_bytes_allocated, default_memory_pool()->total_bytes_allocated()); +#endif } } // namespace acero diff --git a/cpp/src/arrow/buffer_test.cc b/cpp/src/arrow/buffer_test.cc index 4dd210076ed1..f45a93ce2126 100644 --- a/cpp/src/arrow/buffer_test.cc +++ b/cpp/src/arrow/buffer_test.cc @@ -676,7 +676,9 @@ TEST(TestAllocateResizableBuffer, ZeroSize) { TEST(TestAllocateResizableBuffer, ZeroResize) { MemoryPool* pool = default_memory_pool(); +#ifdef ENABLE_MEMORY_POOL_STATS auto allocated_bytes = pool->bytes_allocated(); +#endif { std::shared_ptr buffer; @@ -684,17 +686,22 @@ TEST(TestAllocateResizableBuffer, ZeroResize) { ASSERT_EQ(buffer->size(), 1000); ASSERT_NE(buffer->data(), nullptr); ASSERT_EQ(buffer->mutable_data(), buffer->data()); - +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_GE(pool->bytes_allocated(), allocated_bytes + 1000); +#endif ASSERT_OK(buffer->Resize(0)); ASSERT_NE(buffer->data(), nullptr); ASSERT_EQ(buffer->mutable_data(), buffer->data()); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_GE(pool->bytes_allocated(), allocated_bytes); ASSERT_LT(pool->bytes_allocated(), allocated_bytes + 1000); +#endif } +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool->bytes_allocated(), allocated_bytes); +#endif } TEST(TestBufferBuilder, ResizeReserve) { diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index cb204806f90c..dbf6e0f53e0a 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -312,21 +312,25 @@ class TestSchemaExport : public ::testing::Test { SchemaExportChecker checker(std::move(flattened_formats), std::move(flattened_names), std::move(flattened_flags), std::move(flattened_metadata)); - +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); - +#endif struct ArrowSchema c_export; ASSERT_OK(ExportTraits::ExportFunc(*schema_like, &c_export)); SchemaExportGuard guard(&c_export); +#ifdef ENABLE_MEMORY_POOL_STATS auto new_bytes = pool_->bytes_allocated(); ASSERT_GT(new_bytes, orig_bytes); +#endif checker(&c_export); // Release the ArrowSchema, underlying data should be destroyed guard.Release(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } template @@ -681,7 +685,9 @@ class TestArrayExport : public ::testing::Test { template void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) { +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif std::shared_ptr arr; ASSERT_OK_AND_ASSIGN(arr, ToResult(factory())); @@ -692,17 +698,23 @@ class TestArrayExport : public ::testing::Test { ASSERT_OK(ExportArray(*arr, &c_export)); ArrayExportGuard guard(&c_export); +#ifdef ENABLE_MEMORY_POOL_STATS auto new_bytes = pool_->bytes_allocated(); ASSERT_GT(new_bytes, orig_bytes); +#endif // Release the shared_ptr, underlying data should be held alive arr.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), new_bytes); +#endif check_func(&c_export, data); // Release the ArrowArray, underlying data should be destroyed guard.Release(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } template @@ -726,7 +738,9 @@ class TestArrayExport : public ::testing::Test { template void TestMoveWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) { +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif std::shared_ptr arr; ASSERT_OK_AND_ASSIGN(arr, ToResult(factory())); @@ -739,18 +753,24 @@ class TestArrayExport : public ::testing::Test { ASSERT_TRUE(ArrowArrayIsReleased(&c_export_temp)); ArrayExportGuard guard(&c_export_final); +#ifdef ENABLE_MEMORY_POOL_STATS auto new_bytes = pool_->bytes_allocated(); ASSERT_GT(new_bytes, orig_bytes); +#endif check_func(&c_export_final, data); // Release the shared_ptr, underlying data should be held alive arr.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), new_bytes); +#endif check_func(&c_export_final, data); // Release the ArrowArray, underlying data should be destroyed guard.Release(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } template @@ -771,15 +791,18 @@ class TestArrayExport : public ::testing::Test { template void TestMoveChildWithArrayFactory(ArrayFactory&& factory, int64_t child_id, ExportCheckFunc&& check_func) { +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif std::shared_ptr arr; ASSERT_OK_AND_ASSIGN(arr, ToResult(factory())); struct ArrowArray c_export_parent, c_export_child; ASSERT_OK(ExportArray(*arr, &c_export_parent)); - +#ifdef ENABLE_MEMORY_POOL_STATS auto bytes_with_parent = pool_->bytes_allocated(); ASSERT_GT(bytes_with_parent, orig_bytes); +#endif // Move the child ArrowArray to its final location { @@ -791,22 +814,28 @@ class TestArrayExport : public ::testing::Test { // Now parent is released ASSERT_TRUE(ArrowArrayIsReleased(&c_export_parent)); +#ifdef ENABLE_MEMORY_POOL_STATS auto bytes_with_child = pool_->bytes_allocated(); ASSERT_LT(bytes_with_child, bytes_with_parent); ASSERT_GT(bytes_with_child, orig_bytes); +#endif const ArrayData& data = *arr->data()->child_data[child_id]; // non-owning reference check_func(&c_export_child, data); // Release the shared_ptr, some underlying data should be held alive arr.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_LT(pool_->bytes_allocated(), bytes_with_child); ASSERT_GT(pool_->bytes_allocated(), orig_bytes); +#endif check_func(&c_export_child, data); // Release the ArrowArray, underlying data should be destroyed child_guard.Release(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } template @@ -825,15 +854,19 @@ class TestArrayExport : public ::testing::Test { void TestMoveChildrenWithArrayFactory(ArrayFactory&& factory, const std::vector children_ids, ExportCheckFunc&& check_func) { +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif std::shared_ptr arr; ASSERT_OK_AND_ASSIGN(arr, ToResult(factory())); struct ArrowArray c_export_parent; ASSERT_OK(ExportArray(*arr, &c_export_parent)); +#ifdef ENABLE_MEMORY_POOL_STATS auto bytes_with_parent = pool_->bytes_allocated(); ASSERT_GT(bytes_with_parent, orig_bytes); +#endif // Move the children ArrowArrays to their final locations std::vector c_export_children(children_ids.size()); @@ -853,17 +886,21 @@ class TestArrayExport : public ::testing::Test { // Now parent is released ASSERT_TRUE(ArrowArrayIsReleased(&c_export_parent)); +#ifdef ENABLE_MEMORY_POOL_STATS auto bytes_with_child = pool_->bytes_allocated(); ASSERT_LT(bytes_with_child, bytes_with_parent); ASSERT_GT(bytes_with_child, orig_bytes); +#endif for (size_t i = 0; i < children_ids.size(); ++i) { check_func(&c_export_children[i], *child_data[i]); } // Release the shared_ptr, the children data should be held alive arr.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_LT(pool_->bytes_allocated(), bytes_with_child); ASSERT_GT(pool_->bytes_allocated(), orig_bytes); +#endif for (size_t i = 0; i < children_ids.size(); ++i) { check_func(&c_export_children[i], *child_data[i]); } @@ -872,7 +909,9 @@ class TestArrayExport : public ::testing::Test { for (auto& child_guard : child_guards) { child_guard.Release(); } +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } template @@ -1446,7 +1485,9 @@ class TestDeviceArrayExport : public ::testing::Test { template void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) { +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif std::shared_ptr arr; ASSERT_OK_AND_ASSIGN(arr, ToResult(factory())); @@ -1458,17 +1499,23 @@ class TestDeviceArrayExport : public ::testing::Test { ASSERT_OK(ExportDeviceArray(*arr, sync, &c_export)); ArrayExportGuard guard(&c_export.array); +#ifdef ENABLE_MEMORY_POOL_STATS auto new_bytes = pool_->bytes_allocated(); ASSERT_GT(new_bytes, orig_bytes); +#endif // Release the shared_ptr, underlying data should be held alive arr.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), new_bytes); +#endif check_func(&c_export, data, kMyDeviceType, 1, nullptr); // Release the ArrowArray, underlying data should be destroyed guard.Release(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } template @@ -3623,12 +3670,16 @@ class TestSchemaRoundtrip : public ::testing::Test { struct ArrowSchema c_schema {}; // zeroed SchemaExportGuard schema_guard(&c_schema); +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif type = factory(); auto type_use_count = type.use_count(); ASSERT_OK(ExportType(*type, &c_schema)); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_GT(pool_->bytes_allocated(), orig_bytes); +#endif // Export stores no reference to the type ASSERT_EQ(type_use_count, type.use_count()); type.reset(); @@ -3639,8 +3690,9 @@ class TestSchemaRoundtrip : public ::testing::Test { AssertTypeEqual(*type, *actual, /*check_metadata=*/true); type.reset(); actual.reset(); - +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } template @@ -3654,12 +3706,16 @@ class TestSchemaRoundtrip : public ::testing::Test { struct ArrowSchema c_schema {}; // zeroed SchemaExportGuard schema_guard(&c_schema); +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif schema = factory(); auto schema_use_count = schema.use_count(); ASSERT_OK(ExportSchema(*schema, &c_schema)); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_GT(pool_->bytes_allocated(), orig_bytes); +#endif // Export stores no reference to the schema ASSERT_EQ(schema_use_count, schema.use_count()); schema.reset(); @@ -3670,8 +3726,9 @@ class TestSchemaRoundtrip : public ::testing::Test { AssertSchemaEqual(*schema, *actual, /*check_metadata=*/true); schema.reset(); actual.reset(); - +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } protected: @@ -3871,19 +3928,25 @@ class TestArrayRoundtrip : public ::testing::Test { ArrayExportGuard array_guard(&c_array); SchemaExportGuard schema_guard(&c_schema); +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif ASSERT_OK_AND_ASSIGN(array, ToResult(factory())); ASSERT_OK(ExportType(*array->type(), &c_schema)); ASSERT_OK(ExportArray(*array, &c_array)); +#ifdef ENABLE_MEMORY_POOL_STATS auto new_bytes = pool_->bytes_allocated(); if (array->type_id() != Type::NA) { ASSERT_GT(new_bytes, orig_bytes); } +#endif array.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), new_bytes); +#endif ASSERT_OK_AND_ASSIGN(array, ImportArray(&c_array, &c_schema)); ASSERT_OK(array->ValidateFull()); ASSERT_TRUE(ArrowSchemaIsReleased(&c_schema)); @@ -3905,7 +3968,9 @@ class TestArrayRoundtrip : public ::testing::Test { AssertArraysEqual(*expected, *array, true); } array.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } template @@ -3916,14 +3981,20 @@ class TestArrayRoundtrip : public ::testing::Test { ArrayExportGuard array_guard(&c_array); SchemaExportGuard schema_guard(&c_schema); +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif ASSERT_OK_AND_ASSIGN(batch, ToResult(factory())); ASSERT_OK(ExportSchema(*batch->schema(), &c_schema)); ASSERT_OK(ExportRecordBatch(*batch, &c_array)); +#ifdef ENABLE_MEMORY_POOL_STATS auto new_bytes = pool_->bytes_allocated(); +#endif batch.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), new_bytes); +#endif ASSERT_OK_AND_ASSIGN(batch, ImportRecordBatch(&c_array, &c_schema)); ASSERT_OK(batch->ValidateFull()); ASSERT_TRUE(ArrowSchemaIsReleased(&c_schema)); @@ -3945,7 +4016,9 @@ class TestArrayRoundtrip : public ::testing::Test { AssertBatchesEqual(*expected, *batch); } batch.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } void TestWithJSON(std::shared_ptr type, const char* json) { @@ -4311,20 +4384,26 @@ class TestDeviceArrayRoundtrip : public ::testing::Test { ArrayExportGuard array_guard(&c_array.array); SchemaExportGuard schema_guard(&c_schema); +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif ASSERT_OK_AND_ASSIGN(array, ToResult(factory())); ASSERT_OK(ExportType(*array->type(), &c_schema)); std::shared_ptr sync{nullptr}; ASSERT_OK(ExportDeviceArray(*array, sync, &c_array)); +#ifdef ENABLE_MEMORY_POOL_STATS auto new_bytes = pool_->bytes_allocated(); if (array->type_id() != Type::NA) { ASSERT_GT(new_bytes, orig_bytes); } +#endif array.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), new_bytes); +#endif ASSERT_OK_AND_ASSIGN(array, ImportDeviceArray(&c_array, &c_schema, DeviceMapper)); ASSERT_OK(array->ValidateFull()); ASSERT_TRUE(ArrowSchemaIsReleased(&c_schema)); @@ -4346,7 +4425,9 @@ class TestDeviceArrayRoundtrip : public ::testing::Test { AssertArraysEqual(*expected, *array, true); } array.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } template @@ -4360,15 +4441,21 @@ class TestDeviceArrayRoundtrip : public ::testing::Test { ArrayExportGuard array_guard(&c_array.array); SchemaExportGuard schema_guard(&c_schema); +#ifdef ENABLE_MEMORY_POOL_STATS auto orig_bytes = pool_->bytes_allocated(); +#endif ASSERT_OK_AND_ASSIGN(batch, ToResult(factory())); ASSERT_OK(ExportSchema(*batch->schema(), &c_schema)); ASSERT_OK_AND_ASSIGN(auto sync, mm->MakeDeviceSyncEvent()); ASSERT_OK(ExportDeviceRecordBatch(*batch, sync, &c_array)); +#ifdef ENABLE_MEMORY_POOL_STATS auto new_bytes = pool_->bytes_allocated(); +#endif batch.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), new_bytes); +#endif ASSERT_OK_AND_ASSIGN(batch, ImportDeviceRecordBatch(&c_array, &c_schema, DeviceMapper)); ASSERT_OK(batch->ValidateFull()); @@ -4392,7 +4479,9 @@ class TestDeviceArrayRoundtrip : public ::testing::Test { AssertBatchesEqual(*expected, *batch); } batch.reset(); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(pool_->bytes_allocated(), orig_bytes); +#endif } void TestWithJSON(const std::shared_ptr& mm, @@ -4450,7 +4539,11 @@ class BaseArrayStreamTest : public ::testing::Test { orig_allocated_ = pool_->bytes_allocated(); } - void TearDown() override { ASSERT_EQ(pool_->bytes_allocated(), orig_allocated_); } + void TearDown() override { +#ifdef ENABLE_MEMORY_POOL_STATS + ASSERT_EQ(pool_->bytes_allocated(), orig_allocated_); +#endif + } RecordBatchVector MakeBatches(std::shared_ptr schema, ArrayVector arrays) { DCHECK_EQ(schema->num_fields(), 1); @@ -4573,7 +4666,9 @@ TEST_F(TestArrayStreamExport, ArrayLifetime) { AssertSchemaEqual(*schema, *got_schema, /*check_metadata=*/true); } +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_GT(pool_->bytes_allocated(), orig_allocated_); +#endif ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array1, schema)); AssertBatchesEqual(*batches[1], *batch); ASSERT_OK_AND_ASSIGN(batch, ImportRecordBatch(&c_array0, schema)); @@ -4656,7 +4751,9 @@ TEST_F(TestArrayStreamExport, ChunkedArrayExport) { AssertTypeEqual(*chunked_array->type(), *got_type); } +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_GT(pool_->bytes_allocated(), orig_allocated_); +#endif ASSERT_OK_AND_ASSIGN(auto array, ImportArray(&c_array0, chunked_array->type())); AssertArraysEqual(*chunked_array->chunk(0), *array); ASSERT_OK_AND_ASSIGN(array, ImportArray(&c_array1, chunked_array->type())); @@ -5012,7 +5109,9 @@ TEST_F(TestArrayDeviceStreamExport, ArrayLifetime) { ASSERT_EQ(kMyDeviceType, c_array0.device_type); ASSERT_EQ(kMyDeviceType, c_array1.device_type); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_GT(pool_->bytes_allocated(), orig_allocated_); +#endif ASSERT_OK_AND_ASSIGN( auto batch, ImportDeviceRecordBatch(&c_array1, schema, TestDeviceArrayRoundtrip::DeviceMapper)); @@ -5115,7 +5214,9 @@ TEST_F(TestArrayDeviceStreamExport, ChunkedArrayExport) { ASSERT_EQ(kMyDeviceType, c_array0.device_type); ASSERT_EQ(kMyDeviceType, c_array1.device_type); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_GT(pool_->bytes_allocated(), orig_allocated_); +#endif ASSERT_OK_AND_ASSIGN(auto array, ImportDeviceArray(&c_array0, chunked_array->type(), TestDeviceArrayRoundtrip::DeviceMapper)); diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index 98a1ab8b7aca..b54055442f5e 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -291,8 +291,10 @@ TEST(ResizableArrayData, Basic) { std::unique_ptr pool = MemoryPool::CreateDefault(); for (const auto& type : kSampleFixedDataTypes) { ARROW_SCOPED_TRACE("Type: ", type->ToString()); +#ifdef ENABLE_MEMORY_POOL_STATS int byte_width = arrow::internal::checked_pointer_cast(type)->bit_width() / 8; +#endif { ResizableArrayData array; ASSERT_OK(array.Init(type, pool.get(), /*log_num_rows_min=*/16)); @@ -302,21 +304,27 @@ TEST(ResizableArrayData, Basic) { // Even though we are only asking for 2 rows we specified a rather high // log_num_rows_min so it should allocate at least that many rows. Padding // and rounding up to a power of 2 will make the allocations larger. +#ifdef ENABLE_MEMORY_POOL_STATS int min_bytes_needed_for_values = byte_width * (1 << 16); int min_bytes_needed_for_validity = (1 << 16) / 8; int min_bytes_needed = min_bytes_needed_for_values + min_bytes_needed_for_validity; ASSERT_LT(min_bytes_needed, pool->bytes_allocated()); ASSERT_GT(min_bytes_needed * 2, pool->bytes_allocated()); +#endif ASSERT_OK(array.ResizeFixedLengthBuffers(1 << 17)); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_LT(min_bytes_needed * 2, pool->bytes_allocated()); ASSERT_GT(min_bytes_needed * 4, pool->bytes_allocated()); +#endif ASSERT_EQ(1 << 17, array.num_rows()); // Shrinking array won't shrink allocated RAM ASSERT_OK(array.ResizeFixedLengthBuffers(2)); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_LT(min_bytes_needed * 2, pool->bytes_allocated()); ASSERT_GT(min_bytes_needed * 4, pool->bytes_allocated()); +#endif ASSERT_EQ(2, array.num_rows()); } // After array is destroyed buffers should be freed @@ -357,8 +365,10 @@ TEST(ResizableArrayData, Binary) { ASSERT_OK(array.ResizeVaryingLengthBuffer()); // Each string is 1000 bytes. The offsets, padding, etc. should be less than 1000 // bytes +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_LT(2000, pool->bytes_allocated()); ASSERT_GT(3000, pool->bytes_allocated()); +#endif } // After array is destroyed buffers should be freed ASSERT_EQ(0, pool->bytes_allocated()); @@ -376,7 +386,9 @@ TEST(ExecBatchBuilder, AppendNullsBeyondLimit) { builder.AppendNulls(pool, {int64(), boolean()}, num_rows_max + 1 - 10)); ExecBatch built = builder.Flush(); ASSERT_EQ(10, built.length); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } ASSERT_EQ(0, pool->bytes_allocated()); } @@ -403,7 +415,9 @@ TEST(ExecBatchBuilder, AppendValuesBeyondLimit) { /*num_cols=*/1)); ExecBatch built = builder.Flush(); ASSERT_EQ(trimmed_batch, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } ASSERT_EQ(0, pool->bytes_allocated()); } @@ -466,7 +480,9 @@ TEST(ExecBatchBuilder, AppendVarLengthBeyondLimit) { ASSERT_EQ(array->GetString(i), str_8mb); } ASSERT_EQ(array->GetString(num_rows), str_8mb_minus_1); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } ASSERT_EQ(0, pool->bytes_allocated()); @@ -510,7 +526,9 @@ TEST(ExecBatchBuilder, AppendBatches) { ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/2)); ExecBatch built = builder.Flush(); ASSERT_EQ(combined, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } ASSERT_EQ(0, pool->bytes_allocated()); } @@ -531,7 +549,9 @@ TEST(ExecBatchBuilder, AppendBatchesSomeRows) { ASSERT_OK(builder.AppendSelected(pool, batch_two, 2, row_ids, /*num_cols=*/2)); ExecBatch built = builder.Flush(); ASSERT_EQ(combined, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } ASSERT_EQ(0, pool->bytes_allocated()); } @@ -560,7 +580,9 @@ TEST(ExecBatchBuilder, AppendBatchDupRows) { ExecBatch batch_string_appended = JSONToExecBatch({binary()}, R"([["123456789"], ["123456789"]])"); ASSERT_EQ(batch_string_appended, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } { @@ -597,7 +619,9 @@ TEST(ExecBatchBuilder, AppendBatchDupRows) { ExecBatch batch_fsb_appended = JSONToExecBatch( {fixed_size_binary(3)}, R"([["123"], ["123"], ["123"], ["123"]])"); ASSERT_EQ(batch_fsb_appended, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } { @@ -619,7 +643,9 @@ TEST(ExecBatchBuilder, AppendBatchDupRows) { ExecBatch batch_fsb_appended = JSONToExecBatch({fixed_size_binary(9)}, R"([["123456789"], ["123456789"]])"); ASSERT_EQ(batch_fsb_appended, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } ASSERT_EQ(0, pool->bytes_allocated()); @@ -646,7 +672,9 @@ TEST(ExecBatchBuilder, AppendBatchesSomeCols) { first_col_ids)); ExecBatch built = builder.Flush(); ASSERT_EQ(first_col_only, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } { ExecBatchBuilder builder; @@ -656,7 +684,9 @@ TEST(ExecBatchBuilder, AppendBatchesSomeCols) { ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1)); ExecBatch built = builder.Flush(); ASSERT_EQ(first_col_only, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } { ExecBatchBuilder builder; @@ -668,7 +698,9 @@ TEST(ExecBatchBuilder, AppendBatchesSomeCols) { last_col_ids)); ExecBatch built = builder.Flush(); ASSERT_EQ(last_col_only, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } ASSERT_EQ(0, pool->bytes_allocated()); } @@ -690,14 +722,18 @@ TEST(ExecBatchBuilder, AppendNulls) { ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2)); ExecBatch built = builder.Flush(); ASSERT_EQ(combined, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } { ExecBatchBuilder builder; ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2)); ExecBatch built = builder.Flush(); ASSERT_EQ(just_nulls, built); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_NE(0, pool->bytes_allocated()); +#endif } ASSERT_EQ(0, pool->bytes_allocated()); } diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 07d17e530b10..8b66f7cd10e1 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -36,6 +36,8 @@ namespace internal { /////////////////////////////////////////////////////////////////////// // Helper tracking memory statistics +#ifdef ENABLE_MEMORY_POOL_STATS + /// \brief Memory pool statistics /// /// 64-byte aligned so that all atomic values are on the same cache line. @@ -99,6 +101,22 @@ class alignas(64) MemoryPoolStats { bytes_allocated_.fetch_sub(size, std::memory_order_acq_rel); } }; +#else +/// @brief Memory pool statistics +/// +/// Currently, doesn't track any stats. +/// To enable, pass -DENABLE_MEMORY_POOL_STATS=ON during generation. +class alignas(64) MemoryPoolStats { + public: + int64_t max_memory() const { return 0; } + int64_t bytes_allocated() const { return 0; } + int64_t total_bytes_allocated() const { return 0; } + int64_t num_allocations() const { return 0; } + inline void DidAllocateBytes(int64_t size) { (void)0; } + inline void DidReallocateBytes(int64_t old_size, int64_t new_size) { (void)0; } + inline void DidFreeBytes(int64_t size) { (void)0; } +}; +#endif } // namespace internal diff --git a/cpp/src/arrow/memory_pool_test.cc b/cpp/src/arrow/memory_pool_test.cc index 0af1ed2d9eca..acb30deca3a7 100644 --- a/cpp/src/arrow/memory_pool_test.cc +++ b/cpp/src/arrow/memory_pool_test.cc @@ -107,6 +107,7 @@ TEST(DefaultMemoryPool, Identity) { specific_pools.end()); } +#ifdef ENABLE_MEMORY_POOL_STATS TEST(DefaultMemoryPoolDeathTest, Statistics) { MemoryPool* pool = default_memory_pool(); uint8_t* data1; @@ -181,6 +182,7 @@ TEST(ProxyMemoryPool, Logging) { ASSERT_EQ(0, pool->bytes_allocated()); ASSERT_EQ(0, pp.bytes_allocated()); } +#endif TEST(Jemalloc, SetDirtyPageDecayMillis) { // ARROW-6910 @@ -319,6 +321,7 @@ TEST_F(TestCappedMemoryPool, Reallocate) { this->TestReallocate(); } TEST_F(TestCappedMemoryPool, Alignment) { this->TestAlignment(); } +#ifdef ENABLE_MEMORY_POOL_STATS TEST_F(TestCappedMemoryPool, AllocateLimit) { auto pool = InitPool(/*limit=*/1000); @@ -379,5 +382,5 @@ TEST_F(TestCappedMemoryPool, ReallocateLimit) { pool->Free(data1, 600); pool->Free(data2, 300); } - +#endif } // namespace arrow diff --git a/cpp/src/arrow/memory_pool_test.h b/cpp/src/arrow/memory_pool_test.h index 32f1cc5d1d31..0ab05b4abd19 100644 --- a/cpp/src/arrow/memory_pool_test.h +++ b/cpp/src/arrow/memory_pool_test.h @@ -38,20 +38,30 @@ class TestMemoryPoolBase : public ::testing::Test { auto pool = memory_pool(); uint8_t* data; +#ifdef ENABLE_MEMORY_POOL_STATS const auto old_bytes_allocated = pool->bytes_allocated(); +#endif ASSERT_OK(pool->Allocate(100, &data)); EXPECT_EQ(static_cast(0), reinterpret_cast(data) % 64); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(old_bytes_allocated + 100, pool->bytes_allocated()); +#endif uint8_t* data2; ASSERT_OK(pool->Allocate(27, &data2)); EXPECT_EQ(static_cast(0), reinterpret_cast(data2) % 64); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(old_bytes_allocated + 127, pool->bytes_allocated()); +#endif pool->Free(data, 100); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(old_bytes_allocated + 27, pool->bytes_allocated()); +#endif pool->Free(data2, 27); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(old_bytes_allocated, pool->bytes_allocated()); +#endif } void TestOOM() { @@ -71,19 +81,25 @@ class TestMemoryPoolBase : public ::testing::Test { uint8_t* data; ASSERT_OK(pool->Allocate(10, &data)); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(10, pool->bytes_allocated()); +#endif data[0] = 35; data[9] = 12; // Expand ASSERT_OK(pool->Reallocate(10, 20, &data)); ASSERT_EQ(data[9], 12); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(20, pool->bytes_allocated()); +#endif // Shrink ASSERT_OK(pool->Reallocate(20, 5, &data)); ASSERT_EQ(data[0], 35); +#ifdef ENABLE_MEMORY_POOL_STATS ASSERT_EQ(5, pool->bytes_allocated()); +#endif // Free pool->Free(data, 5); diff --git a/cpp/src/arrow/stl_test.cc b/cpp/src/arrow/stl_test.cc index ce5adf0c0e26..754e8a57f998 100644 --- a/cpp/src/arrow/stl_test.cc +++ b/cpp/src/arrow/stl_test.cc @@ -527,6 +527,7 @@ TEST(TestTupleVectorFromTable, CastingNeeded) { ASSERT_EQ(rows, expected_rows); } +#ifdef ENABLE_MEMORY_POOL_STATS TEST(STLMemoryPool, Base) { std::allocator allocator; STLMemoryPool> pool(allocator); @@ -557,6 +558,7 @@ TEST(allocator, MemoryTracking) { alloc.deallocate(data, 100); ASSERT_EQ(0, pool->bytes_allocated()); } +#endif #if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || defined(ARROW_JEMALLOC)) @@ -567,6 +569,7 @@ TEST(allocator, TestOOM) { ASSERT_THROW(alloc.allocate(max_alloc), std::bad_alloc); } +# ifdef ENABLE_MEMORY_POOL_STATS TEST(stl_allocator, MaxMemory) { auto pool = MemoryPool::CreateDefault(); @@ -579,6 +582,7 @@ TEST(stl_allocator, MaxMemory) { ASSERT_EQ(2000, pool->max_memory()); } +# endif #endif // !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) // || defined(ARROW_JEMALLOC)) diff --git a/cpp/src/arrow/util/hashing_test.cc b/cpp/src/arrow/util/hashing_test.cc index 6e4c59a1ebd2..3d90c3d6590a 100644 --- a/cpp/src/arrow/util/hashing_test.cc +++ b/cpp/src/arrow/util/hashing_test.cc @@ -377,6 +377,7 @@ TEST(ScalarMemoTable, StressInt64) { ASSERT_EQ(table.size(), map.size()); } +#ifdef ENABLE_MEMORY_POOL_STATS TEST(ScalarMemoTable, MergeTablePropagatesInsertError) { int64_t bytes_allocated_limit = 0; { @@ -402,6 +403,7 @@ TEST(ScalarMemoTable, MergeTablePropagatesInsertError) { ASSERT_RAISES(OutOfMemory, target.MergeTable(source)); } +#endif TEST(BinaryMemoTable, Basics) { std::string A = "", B = "a", C = "foo", D = "bar", E, F; @@ -507,6 +509,7 @@ TEST(BinaryMemoTable, Stress) { ASSERT_EQ(table.size(), map.size()); } +#ifdef ENABLE_MEMORY_POOL_STATS TEST(BinaryMemoTable, MergeTablePropagatesInsertError) { const std::vector initial_values = {"a", "bb", "ccc", "dddd"}; const std::string extra_value(4096, 'x'); @@ -535,7 +538,7 @@ TEST(BinaryMemoTable, MergeTablePropagatesInsertError) { ASSERT_RAISES(OutOfMemory, target.MergeTable(source)); } - +#endif TEST(BinaryMemoTable, Empty) { BinaryMemoTable table(default_memory_pool()); ASSERT_EQ(table.size(), 0);