From 2d5e1487c70cd62f3368a8482f19d13958d81825 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Mon, 18 May 2026 22:09:50 +0200 Subject: [PATCH 1/2] Cherry-pick of https://github.com/Altinity/ClickHouse/pull/1804 with unresolved conflict markers (resolution in next commit) --- Original cherry-pick message follows: Merge pull request #1804 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-102115 Antalya 26.3: Query condition cache for iceberg tables # Conflicts: # src/Databases/DataLake/DatabaseDataLake.cpp # src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp # src/Processors/Formats/Impl/ParquetBlockInputFormat.h # src/Storages/ObjectStorage/StorageObjectStorageSource.cpp --- src/Databases/DataLake/DatabaseDataLake.cpp | 8 + .../Formats/Impl/ParquetBlockInputFormat.cpp | 1563 +++++++++++++++++ .../Formats/Impl/ParquetBlockInputFormat.h | 386 ++++ .../StorageObjectStorageSource.cpp | 67 + 4 files changed, 2024 insertions(+) create mode 100644 src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp create mode 100644 src/Processors/Formats/Impl/ParquetBlockInputFormat.h diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 54bb3ccd23a6..1baacebb7ccd 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -669,6 +669,11 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con const auto catalog_uuid = table_metadata.getTableUUID(); const UUID table_uuid = catalog_uuid ? parseFromString(*catalog_uuid) : UUIDHelpers::Nil; +<<<<<<< HEAD +======= + + std::string cluster_name = configuration->isClusterSupported() ? settings[DatabaseDataLakeSetting::object_storage_cluster].value : ""; +>>>>>>> 5779b86fb2b (Merge pull request #1804 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-102115) if (can_use_parallel_replicas && !is_secondary_query) { @@ -697,7 +702,10 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con return std::make_shared( configuration, configuration->createObjectStorage(context_copy, /* is_readonly */ false, catalog->getCredentialsConfigurationCallback(StorageID(getDatabaseName(), name, table_uuid))), +<<<<<<< HEAD context_copy, +======= +>>>>>>> 5779b86fb2b (Merge pull request #1804 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-102115) StorageID(getDatabaseName(), name, table_uuid), /* columns */columns, /* constraints */ConstraintsDescription{}, diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp new file mode 100644 index 000000000000..28c4221839c8 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -0,0 +1,1563 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if USE_PARQUET + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace ProfileEvents +{ + extern const Event ParquetFetchWaitTimeMicroseconds; + extern const Event ParquetReadRowGroups; + extern const Event ParquetPrunedRowGroups; +} + +namespace CurrentMetrics +{ + extern const Metric FormatParsingThreads; + extern const Metric FormatParsingThreadsActive; + extern const Metric FormatParsingThreadsScheduled; + + extern const Metric IOThreads; + extern const Metric IOThreadsActive; + extern const Metric IOThreadsScheduled; +} + +namespace DB +{ + +namespace Setting +{ + extern const SettingsBool use_parquet_metadata_cache; +} + +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; + extern const int MEMORY_LIMIT_EXCEEDED; + extern const int CANNOT_READ_ALL_DATA; + extern const int CANNOT_PARSE_NUMBER; + extern const int LOGICAL_ERROR; +} + +namespace +{ +String removeListElement(const String & value) +{ + const String pattern = ".list.element"; + String result = value; + size_t pos; + while ((pos = result.find(pattern)) != std::string::npos) + result.erase(pos, pattern.size()); + return result; +} + + +void traverseAllFields(const parquet::schema::NodePtr & node, std::unordered_map & fields_mapping, const String & current_path = "") +{ + if (node->is_group()) + { + auto group = std::static_pointer_cast(node); + for (int i = 0; i < group->field_count(); ++i) + traverseAllFields(group->field(i), fields_mapping, Nested::concatenateName(current_path, group->name())); + } + int field_id = node->field_id(); + fields_mapping[field_id] = removeListElement(Nested::concatenateName(current_path, node->name())); +} + +} + +#define THROW_ARROW_NOT_OK(status) \ + do \ + { \ + if (::arrow::Status _s = (status); !_s.ok()) \ + { \ + throw Exception::createDeprecated(_s.ToString(), \ + _s.IsOutOfMemory() ? ErrorCodes::MEMORY_LIMIT_EXCEEDED : ErrorCodes::INCORRECT_DATA); \ + } \ + } while (false) + +/// Decode min/max value from column chunk statistics. Returns Null if missing or unsupported. +/// +/// There are two questionable decisions in this implementation: +/// * We parse the value from the encoded byte string instead of casting the parquet::Statistics +/// to parquet::TypedStatistics and taking the value from there. +/// * We dispatch based on the parquet logical+converted+physical type instead of the ClickHouse type. +/// The idea is that this is similar to what we'll have to do when reimplementing Parquet parsing in +/// ClickHouse instead of using Arrow (for speed). So, this is an exercise in parsing Parquet manually. +static Field decodePlainParquetValueSlow(const std::string & data, parquet::Type::type physical_type, const parquet::ColumnDescriptor & descr, TypeIndex type_hint) +{ + using namespace parquet; + + auto decode_integer = [&](bool signed_) -> UInt64 { + size_t size; + switch (physical_type) + { + case parquet::Type::type::BOOLEAN: size = 1; break; + case parquet::Type::type::INT32: size = 4; break; + case parquet::Type::type::INT64: size = 8; break; + default: throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpected physical type for number"); + } + if (data.size() != size) + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpected size: {}", data.size()); + + UInt64 val = 0; + memcpy(&val, data.data(), size); + + /// Sign-extend. + if (signed_ && size < 8 && (val >> (size * 8 - 1)) != 0) + val |= 0 - (1ul << (size * 8)); + + return val; + }; + + /// Decimal. + do // while (false) + { + Int32 scale; + if (descr.logical_type() && descr.logical_type()->is_decimal()) + scale = assert_cast(*descr.logical_type()).scale(); + else if (descr.converted_type() == ConvertedType::type::DECIMAL) + scale = descr.type_scale(); + else + break; + + size_t size; + bool big_endian = false; + switch (physical_type) + { + case Type::type::BOOLEAN: size = 1; break; + case Type::type::INT32: size = 4; break; + case Type::type::INT64: size = 8; break; + + case Type::type::FIXED_LEN_BYTE_ARRAY: + big_endian = true; + size = data.size(); + break; + default: throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpected decimal physical type"); + } + /// Note that size is not necessarily a power of two. + /// E.g. spark turns 8-byte unsigned integers into 9-byte signed decimals. + if (data.size() != size || size < 1 || size > 32) + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpected decimal size: {} (actual {})", size, data.size()); + + Int256 val = 0; + memcpy(&val, data.data(), size); + if (big_endian) + std::reverse(reinterpret_cast(&val), reinterpret_cast(&val) + size); + /// Sign-extend. + if (size < 32 && (val >> (size * 8 - 1)) != 0) + val |= ~((Int256(1) << (size * 8)) - 1); + + auto narrow = [&](auto x) -> Field + { + memcpy(&x, &val, sizeof(x)); + return Field(DecimalField(x, static_cast(scale))); + }; + if (size <= 4) + return narrow(Decimal32(0)); + if (size <= 8) + return narrow(Decimal64(0)); + if (size <= 16) + return narrow(Decimal128(0)); + return narrow(Decimal256(0)); + } + while (false); + + /// Timestamp (decimal). + { + Int32 scale = -1; + bool is_timestamp = true; + if (descr.logical_type() && (descr.logical_type()->is_time() || descr.logical_type()->is_timestamp())) + { + LogicalType::TimeUnit::unit unit = descr.logical_type()->is_time() + ? assert_cast(*descr.logical_type()).time_unit() + : assert_cast(*descr.logical_type()).time_unit(); + switch (unit) + { + case LogicalType::TimeUnit::unit::MILLIS: scale = 3; break; + case LogicalType::TimeUnit::unit::MICROS: scale = 6; break; + case LogicalType::TimeUnit::unit::NANOS: scale = 9; break; + default: throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unknown time unit"); + } + } + else switch (descr.converted_type()) + { + case ConvertedType::type::TIME_MILLIS: scale = 3; break; + case ConvertedType::type::TIME_MICROS: scale = 6; break; + case ConvertedType::type::TIMESTAMP_MILLIS: scale = 3; break; + case ConvertedType::type::TIMESTAMP_MICROS: scale = 6; break; + default: is_timestamp = false; + } + + if (is_timestamp) + { + Int64 val = static_cast(decode_integer(/* signed */ true)); + return Field(DecimalField(Decimal64(val), scale)); + } + } + + /// Floats. + + if (physical_type == Type::type::FLOAT) + { + if (data.size() != 4) + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpected float size"); + Float32 val; + memcpy(&val, data.data(), data.size()); + return Field(val); + } + + if (physical_type == Type::type::DOUBLE) + { + if (data.size() != 8) + throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpected float size"); + Float64 val; + memcpy(&val, data.data(), data.size()); + return Field(val); + } + + if (physical_type == Type::type::BYTE_ARRAY || physical_type == Type::type::FIXED_LEN_BYTE_ARRAY) + { + /// Arrow's parquet decoder handles missing min/max values slightly incorrectly. + /// In a parquet file, min and max have separate is_set flags, i.e. one may be missing even + /// if the other is set. Arrow decoder ORs (!) these two flags together into one: HasMinMax(). + /// So, if exactly one of {min, max} is missing, Arrow reports it as empty string, with no + /// indication that it's actually missing. + /// + /// How can exactly one of {min, max} be missing? This happens if one of the two strings + /// exceeds the length limit for stats. Repro: + /// + /// insert into function file('t.parquet') select arrayStringConcat(range(number*1000000)) from numbers(2) settings output_format_parquet_use_custom_encoder=0 + /// select tupleElement(tupleElement(row_groups[1], 'columns')[1], 'statistics') from file('t.parquet', ParquetMetadata) + /// + /// Here the row group contains two strings: one empty, one very long. But the statistics + /// reported by arrow are indistinguishable from statistics if all strings were empty. + /// (Min and max are the last two tuple elements in the output of the second query. Notice + /// how they're empty strings instead of NULLs.) + /// + /// So we have to be conservative and treat empty string as unknown. + /// This is unfortunate because it's probably common for string columns to have lots of empty + /// values, and filter pushdown would probably often be useful in that case. + /// + /// TODO: Remove this workaround either when we implement our own Parquet decoder that + /// doesn't have this bug, or if it's fixed in Arrow. + if (data.empty()) + return Field(); + + /// Long integers, encoded either as text or as little-endian bytes. + /// The parquet file doesn't know that it's numbers, so the min/max are produced by comparing + /// strings lexicographically. So these min and max are mostly useless to us. + /// There's one case where they're not useless: min == max; currently we don't make use of this. + switch (type_hint) + { + case TypeIndex::UInt128: + case TypeIndex::UInt256: + case TypeIndex::Int128: + case TypeIndex::Int256: + case TypeIndex::IPv6: + return Field(); + default: break; + } + + /// Strings. + return Field(data); + } + + /// This type is deprecated in Parquet. + /// TODO: But turns out it's still used in practice, we should support it. + if (physical_type == Type::type::INT96) + return Field(); + + /// Integers. + + bool signed_ = true; + if (descr.logical_type() && descr.logical_type()->is_int()) + signed_ = assert_cast(*descr.logical_type()).is_signed(); + else + signed_ = descr.converted_type() != ConvertedType::type::UINT_8 && + descr.converted_type() != ConvertedType::type::UINT_16 && + descr.converted_type() != ConvertedType::type::UINT_32 && + descr.converted_type() != ConvertedType::type::UINT_64; + + UInt64 val = decode_integer(signed_); + Field field = signed_ ? Field(static_cast(val)) : Field(val); + return field; +} + +struct ParquetBloomFilter final : public KeyCondition::BloomFilter +{ + explicit ParquetBloomFilter(std::unique_ptr && parquet_bf_) + : parquet_bf(std::move(parquet_bf_)) {} + + bool findAnyHash(const std::vector & hashes) override + { + for (const auto hash : hashes) + { + if (parquet_bf->FindHash(hash)) + { + return true; + } + } + + return false; + } + +private: + std::unique_ptr parquet_bf; +}; + +static KeyCondition::ColumnIndexToBloomFilter buildColumnIndexToBF( + parquet::BloomFilterReader & bf_reader, + int row_group, + const std::vector & clickhouse_column_index_to_parquet_index, + const std::unordered_set & filtering_columns +) +{ + auto rg_bf = bf_reader.RowGroup(row_group); + + if (!rg_bf) + { + return {}; + } + + KeyCondition::ColumnIndexToBloomFilter index_to_column_bf; + + for (const auto & [clickhouse_index, parquet_indexes] : clickhouse_column_index_to_parquet_index) + { + if (!filtering_columns.contains(clickhouse_index)) + { + continue; + } + + // Complex / nested types contain more than one index. We don't support those. + if (parquet_indexes.size() != 1) + { + continue; + } + + auto parquet_index = parquet_indexes[0]; + + auto parquet_bf = rg_bf->GetColumnBloomFilter(parquet_index); + + if (!parquet_bf) + { + continue; + } + + index_to_column_bf[clickhouse_index] = std::make_unique(std::move(parquet_bf)); + } + + return index_to_column_bf; +} + +/// Range of values for each column, based on statistics in the Parquet metadata. +/// This is lower/upper bounds, not necessarily exact min and max, e.g. the min/max can be just +/// missing in the metadata. +static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaData & file, int row_group_idx, const Block & header, const FormatSettings & format_settings) +{ + auto column_name_for_lookup = [&](std::string column_name) -> std::string + { + if (format_settings.parquet.case_insensitive_column_matching) + boost::to_lower(column_name); + return column_name; + }; + + std::unique_ptr row_group = file.RowGroup(row_group_idx); + + std::unordered_map> name_to_statistics; + for (int i = 0; i < row_group->num_columns(); ++i) + { + auto c = row_group->ColumnChunk(i); + auto s = c->statistics(); + if (!s) + continue; + + if (s->descr()->schema_node()->is_repeated()) + continue; + + auto path = c->path_in_schema()->ToDotVector(); + if (path.size() != 1) + continue; // compound types not supported + + name_to_statistics.emplace(column_name_for_lookup(path[0]), s); + } + + /// +-----+ + /// / /| + /// +-----+ | + /// | | + + /// | |/ + /// +-----+ + std::vector hyperrectangle(header.columns(), Range::createWholeUniverse()); + + for (size_t idx = 0; idx < header.columns(); ++idx) + { + const std::string & name = header.getByPosition(idx).name; + auto it = name_to_statistics.find(column_name_for_lookup(name)); + if (it == name_to_statistics.end()) + continue; + auto stats = it->second; + + DataTypePtr type = header.getByPosition(idx).type; + if (type->lowCardinality()) + type = assert_cast(*type).getDictionaryType(); + if (type->isNullable()) + type = assert_cast(*type).getNestedType(); + Field default_value = type->getDefault(); + TypeIndex type_index = type->getTypeId(); + + /// Only primitive fields are supported, not arrays, maps, tuples, or Nested. + /// Arrays, maps, and Nested can't be meaningfully supported because Parquet only has min/max + /// across all *elements* of the array, not min/max array itself. + /// Same limitation for tuples, but maybe it would make sense to have some kind of tuple + /// expansion in KeyCondition to accept ranges per element instead of whole tuple. + + Field min; + Field max; + if (stats->HasMinMax()) + { + try + { + min = decodePlainParquetValueSlow(stats->EncodeMin(), stats->physical_type(), *stats->descr(), type_index); + max = decodePlainParquetValueSlow(stats->EncodeMax(), stats->physical_type(), *stats->descr(), type_index); + + /// If the data type in parquet file substantially differs from the requested data type, + /// it's sometimes correct to just typecast the min/max values. + /// Other times it's incorrect, e.g.: + /// INSERT INTO FUNCTION file('t.parquet', Parquet, 'x String') VALUES ('1'), ('100'), ('2'); + /// SELECT * FROM file('t.parquet', Parquet, 'x Int64') WHERE x >= 3; + /// If we just typecast min/max from string to integer, this query will incorrectly return empty result. + /// Allow conversion in some simple cases, otherwise ignore the min/max values. + auto min_type = min.getType(); + auto max_type = max.getType(); + min = tryConvertFieldToType(min, *type); + max = tryConvertFieldToType(max, *type); + auto ok_cast = [&](Field::Types::Which from, Field::Types::Which to) -> bool + { + if (from == to) + return true; + /// Decimal -> wider decimal. + if (Field::isDecimal(from) || Field::isDecimal(to)) + return Field::isDecimal(from) && Field::isDecimal(to) && to >= from; + /// Integer -> IP. + if (to == Field::Types::IPv4) + return from == Field::Types::UInt64; + /// Disable index for everything else, especially string <-> number. + return false; + }; + if (!(ok_cast(min_type, min.getType()) && ok_cast(max_type, max.getType())) && + !(min == max) && + !(min_type == Field::Types::Int64 && min.getType() == Field::Types::UInt64 && min.safeGet() >= 0) && + !(max_type == Field::Types::UInt64 && max.getType() == Field::Types::Int64 && max.safeGet() <= UInt64(INT64_MAX))) + { + min = Field(); + max = Field(); + } + } + catch (Exception & e) + { + e.addMessage(" (When parsing Parquet statistics for column {}, physical type {}, {}. Please report an issue and use input_format_parquet_filter_push_down = false to work around.)", name, static_cast(stats->physical_type()), stats->descr()->ToString()); + throw; + } + } + + /// In Range, NULL is represented as positive or negative infinity (represented by a special + /// kind of Field, different from floating-point infinities). + + bool always_null = stats->descr()->max_definition_level() != 0 && + stats->HasNullCount() && stats->num_values() == 0; + bool can_be_null = stats->descr()->max_definition_level() != 0 && + (!stats->HasNullCount() || stats->null_count() != 0); + bool null_as_default = format_settings.null_as_default && !isNullableOrLowCardinalityNullable(header.getByPosition(idx).type); + + if (always_null) + { + /// Single-point range containing either the default value or one of the infinities. + if (null_as_default) + hyperrectangle[idx].right = hyperrectangle[idx].left = default_value; + else + hyperrectangle[idx].right = hyperrectangle[idx].left; + continue; + } + + if (can_be_null) + { + if (null_as_default) + { + /// Make sure the range contains the default value. + if (!min.isNull() && accurateLess(default_value, min)) + min = default_value; + if (!max.isNull() && accurateLess(max, default_value)) + max = default_value; + } + else + { + /// Make sure the range reaches infinity on at least one side. + if (!min.isNull() && !max.isNull()) + min = Field(); + } + } + else + { + /// If the column doesn't have nulls, exclude both infinities. + if (min.isNull()) + hyperrectangle[idx].left_included = false; + if (max.isNull()) + hyperrectangle[idx].right_included = false; + } + + if (!min.isNull()) + hyperrectangle[idx].left = std::move(min); + if (!max.isNull()) + hyperrectangle[idx].right = std::move(max); + } + + return hyperrectangle; +} + +std::unordered_set getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn) +{ + std::unordered_set column_keys; + + for (const auto & element : rpn) + { + if (auto bf_data = element.bloom_filter_data) + { + for (const auto index : bf_data->key_columns) + { + column_keys.insert(index); + } + } + } + + return column_keys; +} + +const parquet::ColumnDescriptor * getColumnDescriptorIfBloomFilterIsPresent( + const std::unique_ptr & parquet_rg_metadata, + const std::vector & clickhouse_column_index_to_parquet_index, + std::size_t clickhouse_column_index) +{ + if (clickhouse_column_index_to_parquet_index.size() <= clickhouse_column_index) + { + return nullptr; + } + + const auto & parquet_indexes = clickhouse_column_index_to_parquet_index[clickhouse_column_index].parquet_indexes; + + /// Complex types like structs, tuples and maps will have more than one index; we don't support those for now. + /// Empty tuples are also not supported. + if (parquet_indexes.size() != 1) + { + return nullptr; + } + + auto parquet_column_index = parquet_indexes[0]; + + const auto * parquet_column_descriptor = parquet_rg_metadata->schema()->Column(parquet_column_index); + + bool column_has_bloom_filter = parquet_rg_metadata->ColumnChunk(parquet_column_index)->bloom_filter_offset().has_value(); + if (!column_has_bloom_filter) + { + return nullptr; + } + + return parquet_column_descriptor; +} + +void ParquetFileBucketInfo::serialize(WriteBuffer & buffer) +{ + writeVarUInt(row_group_ids.size(), buffer); + for (auto chunk : row_group_ids) + writeVarUInt(chunk, buffer); +} + +void ParquetFileBucketInfo::deserialize(ReadBuffer & buffer) +{ + size_t size_chunks; + readVarUInt(size_chunks, buffer); + row_group_ids = std::vector{}; + row_group_ids.resize(size_chunks); + size_t bucket; + for (size_t i = 0; i < size_chunks; ++i) + { + readVarUInt(bucket, buffer); + row_group_ids[i] = bucket; + } +} + +String ParquetFileBucketInfo::getIdentifier() const +{ + String result; + for (auto chunk : row_group_ids) + result += "_" + std::to_string(chunk); + return result; +} + +ParquetFileBucketInfo::ParquetFileBucketInfo(const std::vector & row_group_ids_) + : row_group_ids(row_group_ids_) +{ +} + +std::shared_ptr ParquetFileBucketInfo::filterByMatchingRowGroups(const std::vector & matching_row_groups) const +{ + if (matching_row_groups.empty()) + return nullptr; + if (row_group_ids.empty()) + return std::make_shared(matching_row_groups); + std::unordered_set matching_set(matching_row_groups.begin(), matching_row_groups.end()); + std::vector filtered; + for (size_t rg : row_group_ids) + if (matching_set.contains(rg)) + filtered.push_back(rg); + if (filtered.empty()) + return nullptr; + return std::make_shared(std::move(filtered)); +} + +void registerParquetFileBucketInfo(std::unordered_map & instances) +{ + instances.emplace("Parquet", std::make_shared()); +} + +ParquetBlockInputFormat::ParquetBlockInputFormat( + ReadBuffer & buf, + SharedHeader header_, + const FormatSettings & format_settings_, + FormatParserSharedResourcesPtr parser_shared_resources_, + FormatFilterInfoPtr format_filter_info_, + size_t min_bytes_for_seek_) + : IInputFormat(header_, &buf) + , format_settings(format_settings_) + , skip_row_groups(format_settings.parquet.skip_row_groups) + , parser_shared_resources(std::move(parser_shared_resources_)) + , format_filter_info(std::move(format_filter_info_)) + , min_bytes_for_seek(min_bytes_for_seek_) + , pending_chunks(PendingChunk::Compare{.row_group_first = format_settings_.parquet.preserve_order}) + , previous_block_missing_values(getPort().getHeader().columns()) +{ + if (parser_shared_resources->max_parsing_threads > 1) + pool = std::make_unique( + CurrentMetrics::FormatParsingThreads, + CurrentMetrics::FormatParsingThreadsActive, + CurrentMetrics::FormatParsingThreadsScheduled, + parser_shared_resources->max_parsing_threads); + + bool row_group_prefetch = !pool && parser_shared_resources->max_io_threads > 0 && format_settings.parquet.enable_row_group_prefetch; + if (row_group_prefetch) + io_pool = std::make_shared( + CurrentMetrics::IOThreads, + CurrentMetrics::IOThreadsActive, + CurrentMetrics::IOThreadsScheduled, + parser_shared_resources->getIOThreadsPerReader()); +} + +ParquetBlockInputFormat::~ParquetBlockInputFormat() +{ + is_stopped = true; + if (pool) + pool->wait(); + if (io_pool) + io_pool->wait(); +} + +void ParquetBlockInputFormat::initializeIfNeeded() +{ + if (std::exchange(is_initialized, true)) + return; + + if (format_filter_info) + { + format_filter_info->initKeyConditionOnce(getPort().getHeader()); + } + + // Create arrow file adapter. + arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, io_pool); + + if (is_stopped) + return; + + metadata = parquet::ReadMetaData(arrow_file); + if (buckets_to_read) + { + std::unordered_set set_to_read(buckets_to_read->row_group_ids.begin(), buckets_to_read->row_group_ids.end()); + for (int i = 0; i < metadata->num_row_groups(); ++i) + { + if (!set_to_read.contains(i)) + skip_row_groups.insert(i); + } + } + + const bool prefetch_group = io_pool != nullptr; + + std::shared_ptr schema; + THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema)); + + ArrowFieldIndexUtil field_util( + format_settings.parquet.case_insensitive_column_matching, + format_settings.parquet.allow_missing_columns); + + if (format_filter_info && format_filter_info->column_mapper) + { + const auto & group_node = metadata->schema()->group_node(); + + std::unordered_map parquet_field_ids; + parquet_names_to_clickhouse = std::unordered_map{}; + for (int i = 0; i < group_node->field_count(); ++i) + traverseAllFields(group_node->field(i), parquet_field_ids); + + auto result = format_filter_info->column_mapper->makeMapping(parquet_field_ids); + clickhouse_names_to_parquet = std::move(result.first); + parquet_names_to_clickhouse = std::move(result.second); + } + auto index_mapping = field_util.findRequiredIndices(getPort().getHeader(), *schema, *metadata, clickhouse_names_to_parquet); + + for (const auto & [clickhouse_header_index, parquet_indexes] : index_mapping) + { + for (auto parquet_index : parquet_indexes) + { + column_indices.push_back(parquet_index); + } + } + + int num_row_groups = metadata->num_row_groups(); + if (num_row_groups == 0) + { + return; + } + + const auto bf_reader_properties = parquet::default_reader_properties(); + std::unique_ptr bf_reader; + + prefetch_group ? row_group_batches.reserve(1) : row_group_batches.reserve(num_row_groups); + + auto adaptive_chunk_size = [&](int row_group_idx) -> size_t + { + size_t total_size = 0; + auto row_group_meta = metadata->RowGroup(row_group_idx); + for (int column_index : column_indices) + { + total_size += row_group_meta->ColumnChunk(column_index)->total_uncompressed_size(); + } + if (!total_size || !format_settings.parquet.prefer_block_bytes) return 0; + auto average_row_bytes = floor(static_cast(total_size) / static_cast(row_group_meta->num_rows())); + // avoid inf preferred_num_rows; + if (average_row_bytes < 1) return 0; + const size_t preferred_num_rows = static_cast(floor(static_cast(format_settings.parquet.prefer_block_bytes) / average_row_bytes)); + const size_t MIN_ROW_NUM = 128; + // size_t != UInt64 in darwin + return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast(format_settings.parquet.max_block_size)); + }; + + std::unordered_set filtering_columns; + + std::unique_ptr key_condition_with_bloom_filter_data; + + if (format_filter_info && format_filter_info->key_condition) + { + key_condition_with_bloom_filter_data = std::make_unique(*format_filter_info->key_condition); + + if (format_settings.parquet.bloom_filter_push_down) + { + bf_reader = parquet::BloomFilterReader::Make(arrow_file, metadata, bf_reader_properties, nullptr); + + auto hash_one = [&](size_t column_idx, const Field & f) -> std::optional + { + const auto * parquet_column_descriptor + = getColumnDescriptorIfBloomFilterIsPresent(metadata->RowGroup(0), index_mapping, column_idx); + + if (!parquet_column_descriptor) + { + return std::nullopt; + } + + return parquetTryHashField(f, parquet_column_descriptor); + }; + + auto hash_many = [&](size_t column_idx, const ColumnPtr & column) -> std::optional> + { + const auto * parquet_column_descriptor + = getColumnDescriptorIfBloomFilterIsPresent(metadata->RowGroup(0), index_mapping, column_idx); + + if (!parquet_column_descriptor) + { + return std::nullopt; + } + + return parquetTryHashColumn(column.get(), parquet_column_descriptor); + }; + + key_condition_with_bloom_filter_data->prepareBloomFilterData(hash_one, hash_many); + + filtering_columns = getBloomFilterFilteringColumnKeys(key_condition_with_bloom_filter_data->getRPN()); + } + } + + auto skip_row_group_based_on_filters = [&](int row_group) + { + if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down) + { + return false; + } + + KeyCondition::ColumnIndexToBloomFilter column_index_to_bloom_filter; + + std::vector hyperrectangle(getPort().getHeader().columns(), Range::createWholeUniverse()); + + if (format_settings.parquet.filter_push_down) + { + hyperrectangle = getHyperrectangleForRowGroup(*metadata, row_group, getPort().getHeader(), format_settings); + } + + if (format_settings.parquet.bloom_filter_push_down) + { + column_index_to_bloom_filter = buildColumnIndexToBF(*bf_reader, row_group, index_mapping, filtering_columns); + } + + bool maybe_exists = key_condition_with_bloom_filter_data->checkInHyperrectangle(hyperrectangle, getPort().getHeader().getDataTypes(), column_index_to_bloom_filter).can_be_true; + + return !maybe_exists; + }; + + // The first one stores the skipped rows for all the skipped row groups before the first row group batch. + row_group_batches_skipped_rows.push_back(0); + for (int row_group = 0; row_group < num_row_groups; ++row_group) + { + if (skip_row_groups.contains(row_group)) + { + row_group_batches_skipped_rows.back() += metadata->RowGroup(row_group)->num_rows(); + continue; + } + + if (key_condition_with_bloom_filter_data && skip_row_group_based_on_filters(row_group)) + { + ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups); + row_group_batches_skipped_rows.back() += metadata->RowGroup(row_group)->num_rows(); + continue; + } + + // When single-threaded parsing, can prefetch row groups, so need to put all row groups in the same row_group_batch + if (row_group_batches.empty() || (!prefetch_group && row_group_batches.back().total_bytes_compressed >= min_bytes_for_seek)) + { + row_group_batches.emplace_back(); + row_group_batches_skipped_rows.push_back(0); + } + + ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups); + row_group_batches.back().row_groups_idxs.push_back(row_group); + row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows(); + auto row_group_size = metadata->RowGroup(row_group)->total_compressed_size(); + row_group_batches.back().row_group_sizes.push_back(row_group_size); + row_group_batches.back().total_bytes_compressed += row_group_size; + auto rows = adaptive_chunk_size(row_group); + row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size; + } +} + +void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx) +{ + bool row_group_prefetch = io_pool != nullptr; + auto & row_group_batch = row_group_batches[row_group_batch_idx]; + + parquet::ArrowReaderProperties arrow_properties; + parquet::ReaderProperties reader_properties(ArrowMemoryPool::instance()); + arrow_properties.set_use_threads(false); + arrow_properties.set_batch_size(row_group_batch.adaptive_chunk_size); + reader_properties.set_page_checksum_verification(format_settings.parquet.verify_checksums); + + // When reading a row group, arrow will: + // 1. Look at `metadata` to get all byte ranges it'll need to read from the file (typically one + // per requested column in the row group). + // 2. Coalesce ranges that are close together, trading off seeks vs read amplification. + // This is controlled by CacheOptions. + // 3. Process the columns one by one, issuing the corresponding (coalesced) range reads as + // needed. Each range gets its own memory buffer allocated. These buffers stay in memory + // (in arrow::io::internal::ReadRangeCache) until the whole row group reading is done. + // So the memory usage of a "SELECT *" will be at least the compressed size of a row group + // (typically hundreds of MB). + // + // With this coalescing, we don't need any readahead on our side, hence avoid_buffering in + // asArrowFile(). + // + // This adds one unnecessary copy. We should probably do coalescing and prefetch scheduling on + // our side instead. + arrow::io::CacheOptions cache_options; + + if (row_group_prefetch) + { + // Manual prefetch via RowGroupPrefetchIterator + arrow_properties.set_pre_buffer(false); + cache_options = arrow::io::CacheOptions::Defaults(); + } + else + { + arrow_properties.set_pre_buffer(true); + cache_options = arrow::io::CacheOptions::LazyDefaults(); + } + cache_options.hole_size_limit = min_bytes_for_seek; + cache_options.range_size_limit = 1l << 40; // reading the whole row group at once is fine + arrow_properties.set_cache_options(cache_options); + + // Workaround for a workaround in the parquet library. + // + // From ComputeColumnChunkRange() in contrib/arrow/cpp/src/parquet/file_reader.cc: + // > The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the + // > dictionary page header size in total_compressed_size and total_uncompressed_size + // > (see IMPALA-694). We add padding to compensate. + // + // That padding breaks the pre-buffered mode because the padded read ranges may overlap each + // other, failing an assert. So we disable pre-buffering in this case. + // That version is >10 years old, so this is not very important. + if (metadata->writer_version().VersionLt(parquet::ApplicationVersion::PARQUET_816_FIXED_VERSION())) + { + arrow_properties.set_pre_buffer(false); + row_group_prefetch = false; + } + + parquet::arrow::FileReaderBuilder builder; + THROW_ARROW_NOT_OK(builder.Open(arrow_file, reader_properties, metadata)); + builder.properties(arrow_properties); + builder.memory_pool(ArrowMemoryPool::instance()); + // should get raw reader before build, raw_reader will set null after build + auto * parquet_file_reader = builder.raw_reader(); + THROW_ARROW_NOT_OK(builder.Build(&row_group_batch.file_reader)); + if (row_group_prefetch) + { + row_group_batch.prefetch_iterator = std::make_unique(parquet_file_reader, row_group_batch, column_indices, min_bytes_for_seek); + row_group_batch.record_batch_reader = row_group_batch.prefetch_iterator->nextRowGroupReader(); + } + else + { + Stopwatch fetch_wait_time; + THROW_ARROW_NOT_OK( + row_group_batch.file_reader->GetRecordBatchReader(row_group_batch.row_groups_idxs, column_indices, &row_group_batch.record_batch_reader)); + increment(ProfileEvents::ParquetFetchWaitTimeMicroseconds, fetch_wait_time.elapsedMicroseconds()); + } + row_group_batch.arrow_column_to_ch_column = std::make_unique( + getPort().getHeader(), + "Parquet", + format_settings, + parquet_names_to_clickhouse, + clickhouse_names_to_parquet, + format_settings.parquet.allow_missing_columns, + format_settings.null_as_default, + format_settings.date_time_overflow_behavior, + format_settings.parquet.allow_geoparquet_parser, + format_settings.parquet.case_insensitive_column_matching, + false, /* is_stream_ */ + format_settings.parquet.enable_json_parsing); +} + +void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_batch_idx) +{ + chassert(!mutex.try_lock()); + + auto & status = row_group_batches[row_group_batch_idx].status; + chassert(status == RowGroupBatchState::Status::NotStarted || status == RowGroupBatchState::Status::Paused); + + status = RowGroupBatchState::Status::Running; + + pool->scheduleOrThrowOnError( + [this, row_group_batch_idx, thread_group = CurrentThread::getGroup()]() + { + try + { + ThreadGroupSwitcher switcher(thread_group, ThreadName::PARQUET_DECODER); + threadFunction(row_group_batch_idx); + } + catch (...) + { + std::lock_guard lock(mutex); + background_exception = std::current_exception(); + condvar.notify_all(); + } + }); +} + +void ParquetBlockInputFormat::threadFunction(size_t row_group_batch_idx) +{ + std::unique_lock lock(mutex); + + auto & row_group_batch = row_group_batches[row_group_batch_idx]; + chassert(row_group_batch.status == RowGroupBatchState::Status::Running); + + while (true) + { + if (is_stopped || row_group_batch.num_pending_chunks >= max_pending_chunks_per_row_group_batch) + { + row_group_batch.status = RowGroupBatchState::Status::Paused; + return; + } + + decodeOneChunk(row_group_batch_idx, lock); + + if (row_group_batch.status == RowGroupBatchState::Status::Done) + return; + + CurrentThread::updatePerformanceCountersIfNeeded(); + } +} +std::shared_ptr ParquetBlockInputFormat::RowGroupPrefetchIterator::nextRowGroupReader() +{ + if (prefetched_row_groups.empty()) return nullptr; + std::shared_ptr reader; + Stopwatch fetch_wait_time; + // GetRecordBatchReader will block until the data is ready. + // Only the corresponding objects will be created, and no data parsing will be performed. + THROW_ARROW_NOT_OK(row_group_batch.file_reader->GetRecordBatchReader(prefetched_row_groups, column_indices, &reader)); + prefetched_row_groups.clear(); + // Start to prefetch next row groups + prefetchNextRowGroups(); + increment(ProfileEvents::ParquetFetchWaitTimeMicroseconds, fetch_wait_time.elapsedMicroseconds()); + return reader; +} + +void ParquetBlockInputFormat::RowGroupPrefetchIterator::prefetchNextRowGroups() +{ + if (next_row_group_idx < row_group_batch.row_groups_idxs.size()) + { + size_t total_bytes_compressed = 0; + // Merge small row groups, but always prefetch at least one row group + while (next_row_group_idx < row_group_batch.row_groups_idxs.size() && + (total_bytes_compressed < min_bytes_for_seek || prefetched_row_groups.empty())) + { + total_bytes_compressed += row_group_batch.row_group_sizes[next_row_group_idx]; + prefetched_row_groups.emplace_back(row_group_batch.row_groups_idxs[next_row_group_idx]); + ++next_row_group_idx; + } + file_reader->PreBuffer(prefetched_row_groups, column_indices, + row_group_batch.file_reader->properties().io_context(), row_group_batch.file_reader->properties().cache_options()); + } +} + +void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::unique_lock & lock) +{ + auto & row_group_batch = row_group_batches[row_group_batch_idx]; + chassert(row_group_batch.status != RowGroupBatchState::Status::Done); + chassert(lock.owns_lock()); + SCOPE_EXIT({ chassert(lock.owns_lock() || std::uncaught_exceptions()); }); + + lock.unlock(); + + auto end_of_row_group = [&] { + row_group_batch.arrow_column_to_ch_column.reset(); + row_group_batch.record_batch_reader.reset(); + row_group_batch.file_reader.reset(); + + lock.lock(); + row_group_batch.status = RowGroupBatchState::Status::Done; + + // We may be able to schedule more work now, but can't call scheduleMoreWorkIfNeeded() right + // here because we're running on the same thread pool, so it'll deadlock if thread limit is + // reached. Wake up read() instead. + condvar.notify_all(); + }; + + auto get_approx_original_chunk_size = [&](size_t num_rows) + { + return static_cast(std::ceil(static_cast(row_group_batch.total_bytes_compressed) / static_cast(row_group_batch.total_rows) * static_cast(num_rows))); + }; + + if (!row_group_batch.record_batch_reader) + initializeRowGroupBatchReader(row_group_batch_idx); + + PendingChunk res(getPort().getHeader().columns()); + res.chunk_idx = row_group_batch.next_chunk_idx; + res.row_group_batch_idx = row_group_batch_idx; + + auto fetch_batch = [&] + { + chassert(row_group_batch.record_batch_reader); + auto batch = row_group_batch.record_batch_reader->Next(); + if (!batch.ok()) + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString()); + return batch; + }; + + // If record_batch_reader is null, try to get the next row group reader from prefetch iterator + if (!row_group_batch.record_batch_reader && row_group_batch.prefetch_iterator) + { + row_group_batch.record_batch_reader = row_group_batch.prefetch_iterator->nextRowGroupReader(); + } + + // If we still don't have a reader, we're done with this row group + if (!row_group_batch.record_batch_reader) + { + end_of_row_group(); + return; + } + + auto batch = fetch_batch(); + if (!*batch && row_group_batch.prefetch_iterator) + { + row_group_batch.record_batch_reader = row_group_batch.prefetch_iterator->nextRowGroupReader(); + if (row_group_batch.record_batch_reader) + { + batch = fetch_batch(); + } + } + + if (!*batch || !row_group_batch.record_batch_reader) + { + end_of_row_group(); + return; + } + + auto tmp_table = arrow::Table::FromRecordBatches({*batch}); + + /// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields. + /// Otherwise fill the missing columns with zero values of its type. + BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr; + res.approx_original_chunk_size = get_approx_original_chunk_size((*tmp_table)->num_rows()); + res.chunk = row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(*tmp_table, (*tmp_table)->num_rows(), metadata->key_value_metadata(), block_missing_values_ptr); + + lock.lock(); + + row_group_batch.chunk_sizes.push_back(res.chunk.getNumRows()); + ++row_group_batch.next_chunk_idx; + ++row_group_batch.num_pending_chunks; + pending_chunks.push(std::move(res)); + condvar.notify_all(); +} + +void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional row_group_batch_touched) +{ + while (row_group_batches_completed < row_group_batches.size()) + { + auto & row_group = row_group_batches[row_group_batches_completed]; + if (row_group.status != RowGroupBatchState::Status::Done || row_group.num_pending_chunks != 0) + break; + ++row_group_batches_completed; + } + + if (pool) + { + size_t max_decoding_threads = parser_shared_resources->getParsingThreadsPerReader(); + while (row_group_batches_started - row_group_batches_completed < max_decoding_threads && + row_group_batches_started < row_group_batches.size()) + scheduleRowGroup(row_group_batches_started++); + if (row_group_batch_touched) + { + auto & row_group = row_group_batches[*row_group_batch_touched]; + if (row_group.status == RowGroupBatchState::Status::Paused && + row_group.num_pending_chunks < max_pending_chunks_per_row_group_batch) + scheduleRowGroup(*row_group_batch_touched); + } + } +} + +Chunk ParquetBlockInputFormat::read() +{ + initializeIfNeeded(); + + if (is_stopped || row_group_batches_completed == row_group_batches.size()) + return {}; + + if (need_only_count) + { + auto chunk = getChunkForCount(row_group_batches[row_group_batches_completed].total_rows); + size_t total_rows_before = std::accumulate( + row_group_batches_skipped_rows.begin(), + row_group_batches_skipped_rows.begin() + row_group_batches_completed + 1, + 0ull) + + std::accumulate(row_group_batches.begin(), + row_group_batches.begin() + row_group_batches_completed, + 0ull, + [](size_t sum, const RowGroupBatchState & batch) { return sum + batch.total_rows; }); + + row_group_batches_completed++; + chunk.getChunkInfos().add(std::make_shared(total_rows_before)); + return chunk; + } + + std::unique_lock lock(mutex); + + while (true) + { + if (background_exception) + { + is_stopped = true; + /// This exception can be mutated (addMessage()) in IInputFormat::generate(), + /// so we need to copy it (copyMutableException()) to avoid sharing mutable exception between multiple threads + std::rethrow_exception(copyMutableException(background_exception)); + } + if (is_stopped) + return {}; + + scheduleMoreWorkIfNeeded(); + + if (!pending_chunks.empty() && + (!format_settings.parquet.preserve_order || + pending_chunks.top().row_group_batch_idx == row_group_batches_completed)) + { + PendingChunk chunk = std::move(const_cast(pending_chunks.top())); + pending_chunks.pop(); + + auto & row_group = row_group_batches[chunk.row_group_batch_idx]; + chassert(row_group.num_pending_chunks != 0); + chassert(chunk.chunk_idx == row_group.next_chunk_idx - row_group.num_pending_chunks); + --row_group.num_pending_chunks; + + scheduleMoreWorkIfNeeded(chunk.row_group_batch_idx); + + previous_block_missing_values = std::move(chunk.block_missing_values); + previous_approx_bytes_read_for_chunk = chunk.approx_original_chunk_size; + + + size_t total_rows_before = std::accumulate( + row_group_batches_skipped_rows.begin(), + row_group_batches_skipped_rows.begin() + chunk.row_group_batch_idx + 1, + 0ull) + + std::accumulate(row_group_batches.begin(), + row_group_batches.begin() + chunk.row_group_batch_idx, + 0ull, + [](size_t sum, const RowGroupBatchState & batch) { return sum + batch.total_rows; }) + + + std::accumulate(row_group.chunk_sizes.begin(), row_group.chunk_sizes.begin() + chunk.chunk_idx, 0ull); + + + chunk.chunk.getChunkInfos().add(std::make_shared(total_rows_before)); + + return std::move(chunk.chunk); + } + + if (row_group_batches_completed == row_group_batches.size()) + return {}; + + if (pool) + condvar.wait(lock); + else + decodeOneChunk(row_group_batches_completed, lock); + } +} + +void ParquetBlockInputFormat::setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) +{ + buckets_to_read = std::static_pointer_cast(buckets_to_read_); +} + +void ParquetBlockInputFormat::resetParser() +{ + is_stopped = true; + if (pool) + pool->wait(); + + arrow_file.reset(); + metadata.reset(); + column_indices.clear(); + row_group_batches.clear(); + row_group_batches_skipped_rows.clear(); + while (!pending_chunks.empty()) + pending_chunks.pop(); + row_group_batches_completed = 0; + previous_block_missing_values.clear(); + row_group_batches_started = 0; + background_exception = nullptr; + + is_stopped = false; + is_initialized = false; + + IInputFormat::resetParser(); +} + +const BlockMissingValues * ParquetBlockInputFormat::getMissingValues() const +{ + return &previous_block_missing_values; +} + +ArrowParquetSchemaReader::ArrowParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) + : ISchemaReader(in_), format_settings(format_settings_) +{ +} + +void ArrowParquetSchemaReader::initializeIfNeeded() +{ + if (arrow_file) + return; + + std::atomic is_stopped{0}; + arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + metadata = parquet::ReadMetaData(arrow_file); +} + +NamesAndTypesList ArrowParquetSchemaReader::readSchema() +{ + initializeIfNeeded(); + + std::shared_ptr schema; + THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema)); + + /// When Parquet's schema is converted to Arrow's schema, logical types are lost (at least in + /// the currently used Arrow 11 version). This is only a problem for JSON columns, which are + /// string columns with a piece of metadata saying that they should be converted to JSON. + /// Here's a hack to propagate logical types in simple cases. + std::vector> new_fields; + new_fields.reserve(schema->num_fields()); + + ArrowFieldIndexUtil field_util( + format_settings.parquet.case_insensitive_column_matching, + format_settings.parquet.allow_missing_columns); + const auto field_indices = field_util.calculateFieldIndices(*schema); + + for (int i = 0; i < schema->num_fields(); ++i) + { + auto field = schema->field(i); + auto it = field_indices.find(field->name()); + if (it != field_indices.end() && it->second.second == 1) + { + const auto * parquet_node = metadata->schema()->Column(it->second.first); + const auto * lt = parquet_node->logical_type().get(); + + if (lt && !lt->is_invalid()) + { + std::shared_ptr kv = field->HasMetadata() ? field->metadata()->Copy() : arrow::key_value_metadata({}, {}); + THROW_ARROW_NOT_OK(kv->Set("PARQUET:logical_type", lt->ToString())); + + field = field->WithMetadata(std::move(kv)); + } + } + new_fields.emplace_back(std::move(field)); + } + + schema = arrow::schema(std::move(new_fields)); + + auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader( + *schema, + metadata->key_value_metadata(), + "Parquet", + format_settings, + format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference, + format_settings.schema_inference_make_columns_nullable != 0, + format_settings.parquet.case_insensitive_column_matching, + format_settings.parquet.allow_geoparquet_parser, + format_settings.parquet.enable_json_parsing); + if (format_settings.schema_inference_make_columns_nullable == 1) + return getNamesAndRecursivelyNullableTypes(header, format_settings); + return header.getNamesAndTypesList(); +} + +std::optional ArrowParquetSchemaReader::readNumberOrRows() +{ + initializeIfNeeded(); + return metadata->num_rows(); +} + +std::vector ParquetBucketSplitter::splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) +{ + std::atomic is_stopped = false; + auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); + auto metadata = parquet::ReadMetaData(arrow_file); + std::vector bucket_sizes; + for (int i = 0; i < metadata->num_row_groups(); ++i) + bucket_sizes.push_back(metadata->RowGroup(i)->total_byte_size()); + + std::vector> buckets; + size_t current_weight = 0; + for (size_t i = 0; i < bucket_sizes.size(); ++i) + { + if (current_weight + bucket_sizes[i] <= bucket_size) + { + if (buckets.empty()) + buckets.emplace_back(); + buckets.back().push_back(i); + current_weight += bucket_sizes[i]; + } + else + { + current_weight = 0; + buckets.push_back({}); + buckets.back().push_back(i); + current_weight += bucket_sizes[i]; + } + } + + std::vector result; + for (const auto & bucket : buckets) + { + result.push_back(std::make_shared(bucket)); + } + return result; +} + +void registerInputFormatParquet(FormatFactory & factory) +{ + factory.registerFileBucketInfo( + "Parquet", + [] + { + return std::make_shared(); + } + ); + factory.registerRandomAccessInputFormatWithMetadata( + "Parquet", + [](ReadBuffer & buf, + const Block & sample, + const FormatSettings & settings, + const ReadSettings & read_settings, + bool is_remote_fs, + FormatParserSharedResourcesPtr parser_shared_resources, + FormatFilterInfoPtr format_filter_info, + const std::optional & object_with_metadata, + const ContextPtr & context) -> InputFormatPtr + { + auto lambda_logger = getLogger("ParquetMetadataCache"); + size_t min_bytes_for_seek + = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; + if (settings.parquet.use_native_reader_v3) + { + LOG_TRACE(lambda_logger, "using native reader v3 in ParquetBlockInputFormat with metadata cache"); + ParquetMetadataCachePtr metadata_cache = context->getParquetMetadataCache(); + return std::make_shared( + buf, + std::make_shared(sample), + settings, + std::move(parser_shared_resources), + std::move(format_filter_info), + min_bytes_for_seek, + metadata_cache, + object_with_metadata + ); + } + else + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Implementation of ParquetBlockInputFormat using arrow reader didn't require blob metadata for initialization"); + } + }); + factory.registerRandomAccessInputFormat( + "Parquet", + [](ReadBuffer & buf, + const Block & sample, + const FormatSettings & settings, + const ReadSettings & read_settings, + bool is_remote_fs, + FormatParserSharedResourcesPtr parser_shared_resources, + FormatFilterInfoPtr format_filter_info) -> InputFormatPtr + { + auto lambda_logger = getLogger("ParquetMetadataCache"); + size_t min_bytes_for_seek + = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; + if (settings.parquet.use_native_reader_v3) + { + LOG_TRACE(lambda_logger, "using native reader v3 in ParquetBlockInputFormat with no metadata cache"); + return std::make_shared( + buf, + std::make_shared(sample), + settings, + std::move(parser_shared_resources), + std::move(format_filter_info), + min_bytes_for_seek, + nullptr, + std::nullopt + ); + } + else + { + LOG_TRACE(lambda_logger, "using arrow reader in ParquetBlockInputFormat without metadata cache"); + return std::make_shared( + buf, + std::make_shared(sample), + settings, + std::move(parser_shared_resources), + std::move(format_filter_info), + min_bytes_for_seek + ); + } + }); + factory.markFormatSupportsSubsetOfColumns("Parquet"); + factory.registerPrewhereSupportChecker("Parquet", [](const FormatSettings & settings) + { + return settings.parquet.use_native_reader_v3; + }); +} + +void registerParquetSchemaReader(FormatFactory & factory) +{ + factory.registerSplitter("Parquet", [] + { + return std::make_shared(); + }); + factory.registerSchemaReader( + "Parquet", [](ReadBuffer & buf, const FormatSettings & settings) -> SchemaReaderPtr + { + auto lambda_logger = getLogger("ParquetMetadataCache"); + if (settings.parquet.use_native_reader_v3) + { + LOG_TRACE(lambda_logger, "using native reader v3 in ParquetSchemaReader"); + return std::make_shared(buf, settings); + } + else + { + LOG_TRACE(lambda_logger, "using arrow reader in ParquetSchemaReader"); + return std::make_shared(buf, settings); + } + } + ); + + factory.registerAdditionalInfoForSchemaCacheGetter( + "Parquet", + [](const FormatSettings & settings) + { + return fmt::format( + "schema_inference_make_columns_nullable={};enable_json_parsing={};use_native_reader_v3={}", + settings.schema_inference_make_columns_nullable, + settings.parquet.enable_json_parsing, + settings.parquet.use_native_reader_v3); + }); +} + +} + +#else + +namespace DB +{ +class FormatFactory; +void registerInputFormatParquet(FormatFactory &) +{ +} + +void registerParquetSchemaReader(FormatFactory &) {} +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h new file mode 100644 index 000000000000..5ec3e69a88f1 --- /dev/null +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -0,0 +1,386 @@ +#pragma once +#include "config.h" +#if USE_PARQUET + +#include +#include +#include +#include +#include +#include +#include + +namespace parquet +{ +class ParquetFileReader; +class FileMetaData; +} +namespace parquet::arrow { class FileReader; } +namespace arrow { class Buffer; class RecordBatchReader;} +namespace arrow::io { class RandomAccessFile; } + +namespace DB +{ + +class ArrowColumnToCHColumn; + +// Parquet files contain a metadata block with the following information: +// * list of columns, +// * list of "row groups", +// * for each column in each row group: +// - byte range for the data, +// - min, max, count, +// - (note that we *don't* have a reliable estimate of the decompressed+decoded size; the +// metadata has decompressed size, but decoded size is sometimes much bigger because of +// dictionary encoding) +// +// This information could be used for: +// (1) Precise reads - only reading the byte ranges we need, instead of filling the whole +// arbitrarily-sized buffer inside ReadBuffer. We know in advance exactly what ranges we'll +// need to read. +// (2) Skipping row groups based on WHERE conditions. +// (3) Skipping decoding of individual pages based on PREWHERE. +// (4) Projections. I.e. for queries that only request min/max/count, we can report the +// min/max/count from metadata. This can be done per row group. I.e. for row groups that +// fully pass the WHERE conditions we'll use min/max/count from metadata, for row groups that +// only partially overlap with the WHERE conditions we'll read data. +// (4a) Before projections are implemented, we should at least be able to do `SELECT count(*)` +// without reading data. +// +// For (1), we need the IInputFormat to be in control of reading, with its own implementation of +// parallel reading+decoding, instead of using ParallelReadBuffer and ParallelParsingInputFormat. +// That's what RandomAccessInputCreator in FormatFactory is about. + +struct ParquetFileBucketInfo : public FileBucketInfo +{ + std::vector row_group_ids; + + ParquetFileBucketInfo() = default; + explicit ParquetFileBucketInfo(const std::vector & row_group_ids_); + void serialize(WriteBuffer & buffer) override; + void deserialize(ReadBuffer & buffer) override; + String getIdentifier() const override; + String getFormatName() const override + { + return "Parquet"; + } + std::shared_ptr filterByMatchingRowGroups(const std::vector & matching_row_groups) const override; +}; +using ParquetFileBucketInfoPtr = std::shared_ptr; + +struct ParquetBucketSplitter : public IBucketSplitter +{ + ParquetBucketSplitter() = default; + std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) override; +}; + +class ParquetBlockInputFormat : public IInputFormat +{ +public: + ParquetBlockInputFormat( + ReadBuffer & buf, + SharedHeader header, + const FormatSettings & format_settings_, + FormatParserSharedResourcesPtr parser_shared_resources_, + FormatFilterInfoPtr format_filter_info_, + size_t min_bytes_for_seek_); + + ~ParquetBlockInputFormat() override; + + void resetParser() override; + + String getName() const override { return "ParquetBlockInputFormat"; } + + const BlockMissingValues * getMissingValues() const override; + + size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; } + + void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override; + +private: + Chunk read() override; + + void onCancel() noexcept override + { + is_stopped = 1; + } + + void initializeIfNeeded(); + void initializeRowGroupBatchReader(size_t row_group_batch_idx); + + void decodeOneChunk(size_t row_group_batch_idx, std::unique_lock & lock); + + void scheduleMoreWorkIfNeeded(std::optional row_group_batch_touched = std::nullopt); + void scheduleRowGroup(size_t row_group_batch_idx); + + void threadFunction(size_t row_group_batch_idx); + + // Data layout in the file: + // + // row group 0 + // column 0 + // page 0, page 1, ... + // column 1 + // page 0, page 1, ... + // ... + // row group 1 + // column 0 + // ... + // ... + // ... + // + // All columns in one row group have the same number of rows. + // (Not necessarily the same number of *values* if there are arrays or nulls.) + // Pages have arbitrary sizes and numbers of rows, independent from each other, even if in the + // same column or row group. + // + // We can think of this as having lots of data streams, one for each column x row group. + // The main job of this class is to schedule read operations for these streams across threads. + // Also: reassembling the results into chunks, creating/destroying these streams, prefetching. + // + // Some considerations: + // * Row group size is typically hundreds of MB (compressed). Apache recommends 0.5 - 1 GB. + // * Compression ratio can be pretty extreme, especially with dictionary compression. + // We can afford to keep a compressed row group in memory, but not uncompressed. + // * For each pair , the data lives in one contiguous range in the + // file. We know all these ranges in advance, from metadata. + // * The byte range for a column in a row group is often very short, like a few KB. + // So we need to: + // - Avoid unnecessary readahead, e.g. don't read 1 MB when we only need 1 KB. + // - Coalesce nearby ranges into longer reads when needed. E.g. if we need to read 5 ranges, + // 1 KB each, with 1 KB gaps between them, it's better to do 1 x 9 KB read instead of + // 5 x 1 KB reads. + // - Have lots of parallelism for reading (not necessarily for parsing). E.g. if we're + // reading one small column, it may translate to hundreds of tiny reads with long gaps + // between them. If the data comes from an HTTP server, that's hundreds of tiny HTTP GET + // requests. To get good performance, we have to do tens or hundreds of them in parallel. + // So we should probably have separate parallelism control for IO vs parsing (since we + // don't want hundreds of worker threads oversubscribing the CPU cores). + // + // (Some motivating example access patterns: + // - 'SELECT small_column'. Bottlenecked on number of seeks. Need to do lots of file/network + // reads in parallel, for lots of row groups. + // - 'SELECT *' when row group size is big and there are many columns. Read the whole file. + // Need some moderate parallelism for IO and for parsing. Ideally read+parse columns of + // one row group in parallel to avoid having multiple row groups in memory at once. + // - 'SELECT big_column'. Have to read+parse multiple row groups in parallel. + // - 'SELECT big_column, many small columns'. This is a mix of the previous two scenarios. + // We have many columns, but still need to read+parse multiple row groups in parallel.) + + // With all that in mind, here's what we do. + // + // We treat each row group as a sequential single-threaded stream of blocks. + // + // We have a sliding window of active row groups. When a row group becomes active, we start + // reading its data (using RAM). Row group becomes inactive when we finish reading and + // delivering all its blocks and free the RAM. Size of the window is max_decoding_threads. + // + // Decoded blocks are placed in `pending_chunks` queue, then picked up by read(). + // If row group decoding runs too far ahead of delivery (by `max_pending_chunks_per_row_group` + // chunks), we pause the stream for the row group, to avoid using too much memory when decoded + // chunks are much bigger than the compressed data. + // + // Also: + // * If preserve_order = true, we deliver chunks strictly in order of increasing row group. + // Decoding may still proceed in later row groups. + // * If max_decoding_threads <= 1, we run all tasks inline in read(), without thread pool. + + // Potential improvements: + // * Plan all read ranges ahead of time, for the whole file, and do prefetching for them + // in background. Using max_download_threads, which can be made much greater than + // max_decoding_threads by default. + // * Can parse different columns within the same row group in parallel. This would let us have + // fewer row groups in memory at once, reducing memory usage when selecting many columns. + // Should probably do more than one column per task because columns are often very small. + // Maybe split each row group into, say, max_decoding_threads * 2 equal-sized column bunches? + // * Sliding window could take into account the (predicted) memory usage of row groups. + // If row groups are big and many columns are selected, we may use lots of memory when + // reading max_decoding_threads row groups at once. Can adjust the sliding window size based + // on row groups' data sizes from metadata. + // * The max_pending_chunks_per_row_group limit could be based on actual memory usage too. + // Useful for preserve_order. + + class RowGroupPrefetchIterator; + + struct RowGroupBatchState + { + // Transitions: + // + // NotStarted -> Running -> Complete + // Ʌ + // V + // Paused + // + // If max_decoding_threads <= 1: NotStarted -> Complete. + enum class Status : uint8_t + { + NotStarted, + Running, + // Paused decoding because too many chunks are pending. + Paused, + // Decoded everything. + Done, + }; + + Status status = Status::NotStarted; + + // Window of chunks that were decoded but not returned from read(): + // + // (delivered) next_chunk_idx + // v v v + // +---+---+---+---+---+---+---+---+---+---+ + // | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | <-- all chunks + // +---+---+---+---+---+---+---+---+---+---+ + // ^ ^ ^ ^ ^ + // num_pending_chunks + // (in pending_chunks) + // (at most max_pending_chunks_per_row_group) + + size_t next_chunk_idx = 0; + std::vector chunk_sizes; + size_t num_pending_chunks = 0; + + size_t total_rows = 0; + size_t total_bytes_compressed = 0; + std::vector row_group_sizes; + + size_t adaptive_chunk_size = 0; + + std::vector row_groups_idxs; + + // These are only used by the decoding thread, so don't require locking the mutex. + std::unique_ptr file_reader; + std::unique_ptr prefetch_iterator; + std::shared_ptr record_batch_reader; + std::unique_ptr arrow_column_to_ch_column; + }; + + // Chunk ready to be delivered by read(). + struct PendingChunk + { + explicit PendingChunk(size_t num_columns) : block_missing_values(num_columns) {} + + Chunk chunk; + BlockMissingValues block_missing_values; + size_t chunk_idx; // within row group + size_t row_group_batch_idx; + size_t approx_original_chunk_size; + + // For priority_queue. + // In ordered mode we deliver strictly in order of increasing row group idx, + // in unordered mode we prefer to interleave chunks from different row groups. + struct Compare + { + bool row_group_first = false; + + bool operator()(const PendingChunk & a, const PendingChunk & b) const + { + auto tuplificate = [this](const PendingChunk & c) + { return row_group_first ? std::tie(c.row_group_batch_idx, c.chunk_idx) + : std::tie(c.chunk_idx, c.row_group_batch_idx); }; + return tuplificate(a) > tuplificate(b); + } + }; + }; + + // The trigger for row group prefetching improves the overall parsing response time + // by hiding the IO overhead of the next row group in the processing time of the previous row group. + // +-------------------------------------------------------------------------------------------------------------------+ + // |io +-----------+ +-----------+ +-----------+ +-----------+ +-----------+ | + // | |fetch rg 0 |---->|fetch rg 1 |---->|fetch rg 2 |----->|fetch rg 3 |----->|fetch rg 4 | | + // | +-----------+ +-----------+ +-----------+ +-----------+ +-----------+ | + // +-------------------------------------------------------------------------------------------------------------------+ + // +-------------------------------------------------------------------------------------------------------------------+ + // |compute +-----------+ +-----------+ +-----------+ +-----------+ +-----------+ | + // | |parse rg 0 |---->|parse rg 1 |---->|parse rg 2 |----->|parse rg 3 |----->|parse rg 4 | | + // | +-----------+ +-----------+ +-----------+ +-----------+ +-----------+ | + // +-------------------------------------------------------------------------------------------------------------------+ + + class RowGroupPrefetchIterator + { + public: + RowGroupPrefetchIterator( + parquet::ParquetFileReader* file_reader_, RowGroupBatchState & row_group_batch_, const std::vector & column_indices_, size_t min_bytes_for_seek_) + : file_reader(file_reader_), row_group_batch(row_group_batch_), column_indices(column_indices_), min_bytes_for_seek(min_bytes_for_seek_) + { + prefetchNextRowGroups(); + } + std::shared_ptr nextRowGroupReader(); + private: + void prefetchNextRowGroups(); + size_t next_row_group_idx= 0; + std::vector prefetched_row_groups; + parquet::ParquetFileReader * file_reader; + RowGroupBatchState& row_group_batch; + const std::vector& column_indices; + const size_t min_bytes_for_seek; + }; + + const FormatSettings format_settings; + std::unordered_set skip_row_groups; + ParquetFileBucketInfoPtr buckets_to_read; + FormatParserSharedResourcesPtr parser_shared_resources; + FormatFilterInfoPtr format_filter_info; + size_t min_bytes_for_seek; + const size_t max_pending_chunks_per_row_group_batch = 2; + + /// RandomAccessFile is thread safe, so we share it among threads. + /// FileReader is not, so each thread creates its own. + std::shared_ptr arrow_file; + std::shared_ptr metadata; + /// Indices of columns to read from Parquet file. + std::vector column_indices; + + // Window of active row groups: + // + // row_groups_completed row_groups_started + // v v + // +---+---+---+---+---+---+---+---+---+---+ + // | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | <-- all row groups + // +---+---+---+---+---+---+---+---+---+---+ + // ^ ^ ^ ^ ^ + // Done NotStarted + + std::mutex mutex; + // Wakes up the read() call, if any. + std::condition_variable condvar; + + std::vector row_group_batches; + std::vector row_group_batches_skipped_rows; + std::priority_queue, PendingChunk::Compare> pending_chunks; + size_t row_group_batches_completed = 0; + + // These are only used when max_decoding_threads > 1. + size_t row_group_batches_started = 0; + std::unique_ptr pool; + std::shared_ptr io_pool; + + BlockMissingValues previous_block_missing_values; + size_t previous_approx_bytes_read_for_chunk = 0; + + std::exception_ptr background_exception = nullptr; + std::atomic is_stopped{0}; + bool is_initialized = false; + std::optional> parquet_names_to_clickhouse; + std::optional> clickhouse_names_to_parquet; +}; + +class ArrowParquetSchemaReader : public ISchemaReader +{ +public: + ArrowParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_); + + NamesAndTypesList readSchema() override; + std::optional readNumberOrRows() override; + +private: + void initializeIfNeeded(); + + const FormatSettings format_settings; + std::shared_ptr arrow_file; + std::shared_ptr metadata; +}; + +} + +#endif diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 5acfd7ef7a0c..001101e7be3a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -650,6 +650,28 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (!object_info || object_info->getPath().empty()) return {}; +<<<<<<< HEAD +======= + + if (object_info->relative_path_with_metadata.getCommand().isValid()) + { + auto retry_after_us = object_info->relative_path_with_metadata.getCommand().getRetryAfterUs(); + if (retry_after_us.has_value()) + { + /// TODO: Make asyncronous waiting without sleep in thread + /// Now this sleep is on executor node in worker thread + /// Does not block query initiator + auto wait_time = std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()); + ProfileEvents::increment(ProfileEvents::ObjectStorageClusterWaitingMicroseconds, wait_time); + sleepForMicroseconds(wait_time); + continue; + } + object_info->relative_path_with_metadata.setFileMetaInfo(object_info->relative_path_with_metadata.getCommand().getFileMetaInfo()); + } + + if (object_info->getPath().empty()) + return {}; +>>>>>>> 5779b86fb2b (Merge pull request #1804 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-102115) if (!object_info->getObjectMetadata()) { bool with_tags = read_from_format_info.requested_virtual_columns.contains("_tags"); @@ -666,6 +688,51 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade else object_info->setObjectMetadata(object_storage->getObjectMetadata(path, with_tags)); } +<<<<<<< HEAD +======= + + if (query_settings.skip_empty_files && object_info->getObjectMetadata()->size_bytes == 0 + && object_info->getObjectMetadata()->is_size_known) + continue; + + if (query_condition_cache && !object_info->file_bucket_info) + { + auto matching_marks = query_condition_cache->read( + storage_id.uuid, object_info->getFileName(), *format_filter_info->condition_hash); + if (matching_marks.has_value()) + { + const auto & marks = *matching_marks; + size_t total_row_groups = marks.size(); + std::vector matching_row_groups; + for (size_t i = 0; i < total_row_groups; ++i) + if (marks[i]) + matching_row_groups.push_back(i); + + size_t dropped_row_groups = total_row_groups - matching_row_groups.size(); + LOG_DEBUG(log, + "Query condition cache has dropped {}/{} row groups for condition {} in file {}.", + dropped_row_groups, + total_row_groups, + format_filter_info->filter_actions_dag->dumpNames(), + object_info->getFileName()); + + if (matching_row_groups.empty()) + continue; + + auto file_bucket_info = FormatFactory::instance().getFileBucketInfo( + object_info->getFileFormat().value_or(configuration->getFormat())); + if (file_bucket_info) + { + auto filtered = file_bucket_info->filterByMatchingRowGroups(matching_row_groups); + if (!filtered) + continue; + object_info->file_bucket_info = std::move(filtered); + } + } + } + break; + } +>>>>>>> 5779b86fb2b (Merge pull request #1804 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-102115) if (query_settings.skip_empty_files && object_info->getObjectMetadata()->size_bytes == 0 && object_info->getObjectMetadata()->is_size_known) From 2fcaff138a2b4498d3671d0bd85b9a339dd68d8f Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Thu, 25 Jun 2026 04:26:03 +0200 Subject: [PATCH 2/2] Resolve conflicts in cherry-pick of #1804 --- src/Databases/DataLake/DatabaseDataLake.cpp | 8 --- .../StorageObjectStorageSource.cpp | 67 ------------------- 2 files changed, 75 deletions(-) diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 1baacebb7ccd..54bb3ccd23a6 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -669,11 +669,6 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con const auto catalog_uuid = table_metadata.getTableUUID(); const UUID table_uuid = catalog_uuid ? parseFromString(*catalog_uuid) : UUIDHelpers::Nil; -<<<<<<< HEAD -======= - - std::string cluster_name = configuration->isClusterSupported() ? settings[DatabaseDataLakeSetting::object_storage_cluster].value : ""; ->>>>>>> 5779b86fb2b (Merge pull request #1804 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-102115) if (can_use_parallel_replicas && !is_secondary_query) { @@ -702,10 +697,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con return std::make_shared( configuration, configuration->createObjectStorage(context_copy, /* is_readonly */ false, catalog->getCredentialsConfigurationCallback(StorageID(getDatabaseName(), name, table_uuid))), -<<<<<<< HEAD context_copy, -======= ->>>>>>> 5779b86fb2b (Merge pull request #1804 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-102115) StorageID(getDatabaseName(), name, table_uuid), /* columns */columns, /* constraints */ConstraintsDescription{}, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 001101e7be3a..5acfd7ef7a0c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -650,28 +650,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (!object_info || object_info->getPath().empty()) return {}; -<<<<<<< HEAD -======= - - if (object_info->relative_path_with_metadata.getCommand().isValid()) - { - auto retry_after_us = object_info->relative_path_with_metadata.getCommand().getRetryAfterUs(); - if (retry_after_us.has_value()) - { - /// TODO: Make asyncronous waiting without sleep in thread - /// Now this sleep is on executor node in worker thread - /// Does not block query initiator - auto wait_time = std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()); - ProfileEvents::increment(ProfileEvents::ObjectStorageClusterWaitingMicroseconds, wait_time); - sleepForMicroseconds(wait_time); - continue; - } - object_info->relative_path_with_metadata.setFileMetaInfo(object_info->relative_path_with_metadata.getCommand().getFileMetaInfo()); - } - - if (object_info->getPath().empty()) - return {}; ->>>>>>> 5779b86fb2b (Merge pull request #1804 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-102115) if (!object_info->getObjectMetadata()) { bool with_tags = read_from_format_info.requested_virtual_columns.contains("_tags"); @@ -688,51 +666,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade else object_info->setObjectMetadata(object_storage->getObjectMetadata(path, with_tags)); } -<<<<<<< HEAD -======= - - if (query_settings.skip_empty_files && object_info->getObjectMetadata()->size_bytes == 0 - && object_info->getObjectMetadata()->is_size_known) - continue; - - if (query_condition_cache && !object_info->file_bucket_info) - { - auto matching_marks = query_condition_cache->read( - storage_id.uuid, object_info->getFileName(), *format_filter_info->condition_hash); - if (matching_marks.has_value()) - { - const auto & marks = *matching_marks; - size_t total_row_groups = marks.size(); - std::vector matching_row_groups; - for (size_t i = 0; i < total_row_groups; ++i) - if (marks[i]) - matching_row_groups.push_back(i); - - size_t dropped_row_groups = total_row_groups - matching_row_groups.size(); - LOG_DEBUG(log, - "Query condition cache has dropped {}/{} row groups for condition {} in file {}.", - dropped_row_groups, - total_row_groups, - format_filter_info->filter_actions_dag->dumpNames(), - object_info->getFileName()); - - if (matching_row_groups.empty()) - continue; - - auto file_bucket_info = FormatFactory::instance().getFileBucketInfo( - object_info->getFileFormat().value_or(configuration->getFormat())); - if (file_bucket_info) - { - auto filtered = file_bucket_info->filterByMatchingRowGroups(matching_row_groups); - if (!filtered) - continue; - object_info->file_bucket_info = std::move(filtered); - } - } - } - break; - } ->>>>>>> 5779b86fb2b (Merge pull request #1804 from Altinity/feature/antalya-26.3/ClickHouse-ClickHouse-pr-102115) if (query_settings.skip_empty_files && object_info->getObjectMetadata()->size_bytes == 0 && object_info->getObjectMetadata()->is_size_known)