diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 9d883d222462..6f7e1f91ce84 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -864,6 +864,14 @@ BucketSplitter FormatFactory::getSplitter(const String & format) return creator.bucket_splitter_creator(); } +bool FormatFactory::checkFormatHasSplitter(const String & format) const +{ + auto it = dict.find(boost::to_lower_copy(format)); + if (it == dict.end()) + return false; + return static_cast(it->second.bucket_splitter_creator); +} + void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator) { chassert(input_creator); diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 76a04bb98798..2aac56dd2d87 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -373,6 +373,9 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> void registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info); void registerSplitter(const String & format, BucketSplitterCreator splitter); BucketSplitter getSplitter(const String & format); + /// Returns true if `format` is registered and has a bucket splitter + /// (e.g. Parquet). Used to decide whether to attempt single-file parallel splitting. + bool checkFormatHasSplitter(const String & format) const; private: FormatsDictionary dict; diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index 3ce1e6d1fbe1..768d1fea7163 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -67,6 +67,11 @@ struct IBucketSplitter /// Returns information about the resulting buckets (see the structure above for details). virtual std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) = 0; + /// Splits a file into approximately `target_count` buckets, each covering a roughly + /// equal slice of the file. Useful for parallelising one large file across N readers. + /// The result has at most `target_count` buckets and never drops any data. + virtual std::vector splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) = 0; + virtual ~IBucketSplitter() = default; }; using BucketSplitter = std::shared_ptr; diff --git a/src/Processors/Formats/Impl/ArrowFieldIndexUtil.h b/src/Processors/Formats/Impl/ArrowFieldIndexUtil.h new file mode 100644 index 000000000000..a7ebf10da3c4 --- /dev/null +++ b/src/Processors/Formats/Impl/ArrowFieldIndexUtil.h @@ -0,0 +1,276 @@ +#pragma once + +#include "config.h" + +#if USE_PARQUET || USE_ORC + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace arrow +{ + class Schema; + class DataType; + class Field; +} + + +namespace DB +{ + +class Block; + +namespace ErrorCodes +{ + extern const int THERE_IS_NO_COLUMN; +} + +// This is only used for parquet now. +class ArrowFieldIndexUtil +{ +public: + explicit ArrowFieldIndexUtil(bool ignore_case_, bool allow_missing_columns_) + : ignore_case(ignore_case_) + , allow_missing_columns(allow_missing_columns_) + { + } + + /// Recursively count every field indices. Return a map + /// - key: field name with full path. eg. a struct field's name is like a.x.i + /// - value: a pair, first value refers to this field's start index, second value refers to how many + /// indices this field take. eg. + /// For a parquet schema {x: int, y: {i: int, j: int}}, the return will be + /// - x: (0, 1) + /// - y: (1, 2) + /// - y.i: (1, 1) + /// - y.j: (2, 1) + std::unordered_map> + calculateFieldIndices(const arrow::Schema & schema) + { + std::unordered_map> result; + int index_start = 0; + for (int i = 0; i < schema.num_fields(); ++i) + { + const auto & field = schema.field(i); + calculateFieldIndices(*field, field->name(), index_start, result); + } + return result; + } + + // For a parquet schema {x: {i: int, j: int}}, this should be populated as follows + // clickhouse_index = 0, parquet_indexes = {0, 1} + struct ClickHouseIndexToParquetIndex + { + std::size_t clickhouse_index; + std::vector parquet_indexes; + }; + + /// Only collect the required fields' indices. Eg. when just read a field of a struct, + /// don't need to collect the whole indices in this struct. + std::vector findRequiredIndices( + const Block & header, + const arrow::Schema & schema, + const parquet::FileMetaData & file, + const std::optional> & clickhouse_to_parquet_names) + { + std::vector required_indices; + std::unordered_set added_indices; + /// Flat all named fields' index information into a map. + auto fields_indices = calculateFieldIndices(schema); + for (size_t i = 0, n = header.columns(); i < n; ++i) + { + const auto & named_col = header.getByPosition(i); + std::string col_name = named_col.name; + String transformed_name = col_name; + if (clickhouse_to_parquet_names) + { + if (auto it = clickhouse_to_parquet_names->find(col_name); it != clickhouse_to_parquet_names->end()) + transformed_name = it->second; + } + if (ignore_case) + { + boost::to_lower(col_name); + boost::to_lower(transformed_name); + } + findRequiredIndices(col_name, transformed_name, i, named_col.type, fields_indices, added_indices, required_indices, file, clickhouse_to_parquet_names); + } + return required_indices; + } + + /// Count the number of indices for types. + size_t countIndicesForType(std::shared_ptr type) + { + if (type->id() == arrow::Type::LIST) + { + return countIndicesForType(static_cast(type.get())->value_type()); + } + + if (type->id() == arrow::Type::STRUCT) + { + int indices = 0; + auto * struct_type = static_cast(type.get()); + for (int i = 0; i != struct_type->num_fields(); ++i) + indices += countIndicesForType(struct_type->field(i)->type()); + return indices; + } + + if (type->id() == arrow::Type::MAP) + { + auto * map_type = static_cast(type.get()); + return countIndicesForType(map_type->key_type()) + countIndicesForType(map_type->item_type()) ; + } + + return 1; + } + +private: + bool ignore_case; + bool allow_missing_columns; + void calculateFieldIndices(const arrow::Field & field, + std::string field_name, + int & current_start_index, + std::unordered_map> & result, const std::string & name_prefix = "") + { + const auto & field_type = field.type(); + if (field_name.empty()) + { + current_start_index += countIndicesForType(field_type); + return; + } + if (ignore_case) + { + boost::to_lower(field_name); + } + + std::string full_path_name = name_prefix.empty() ? field_name : name_prefix + "." + field_name; + auto & index_info = result[full_path_name]; + index_info.first = current_start_index; + if (field_type->id() == arrow::Type::STRUCT) + { + auto * struct_type = static_cast(field_type.get()); + for (int i = 0, n = struct_type->num_fields(); i < n; ++i) + { + const auto & sub_field = struct_type->field(i); + calculateFieldIndices(*sub_field, sub_field->name(), current_start_index, result, full_path_name); + } + } + else if (field_type->id() == arrow::Type::LIST) + { + // It is a nested table. + const auto * list_type = static_cast(field_type.get()); + const auto value_field = list_type->value_field(); + auto index_snapshot = current_start_index; + calculateFieldIndices(*value_field, field_name, current_start_index, result, name_prefix); + // The nested struct field has the same name as this list field. + // rewrite it back to the original value. + index_info.first = index_snapshot; + } + else if (field_type->id() == arrow::Type::MAP) + { + const auto * map_type = static_cast(field_type.get()); + auto index_snapshot = current_start_index; + current_start_index += countIndicesForType(map_type->key_type()); + calculateFieldIndices(*map_type->item_field(), field_name, current_start_index, result, name_prefix); + index_info.first = index_snapshot; + } + else + { + current_start_index += countIndicesForType(field_type); + } + index_info.second = current_start_index - index_info.first; + } + + void findRequiredIndices( + const String & name, + const String & transformed_name, + std::size_t header_index, + DataTypePtr data_type, + const std::unordered_map> & field_indices, + std::unordered_set & added_indices, + std::vector & required_indices, + const parquet::FileMetaData & file, + const std::optional> & clickhouse_to_parquet_names) + { + auto nested_type = removeNullable(data_type); + if (const DB::DataTypeTuple * type_tuple = typeid_cast(nested_type.get())) + { + if (type_tuple->hasExplicitNames()) + { + auto field_names = type_tuple->getElementNames(); + auto field_types = type_tuple->getElements(); + for (size_t i = 0, n = field_names.size(); i < n; ++i) + { + auto field_name = field_names[i]; + if (ignore_case) + boost::to_lower(field_name); + const auto & field_type = field_types[i]; + auto full_name = Nested::concatenateName(name, field_name); + if (clickhouse_to_parquet_names) + { + if (auto it = clickhouse_to_parquet_names->find(full_name); it != clickhouse_to_parquet_names->end()) + { + full_name = it->second; + } + } + + findRequiredIndices(Nested::concatenateName(name, field_name), full_name, header_index, field_type, field_indices, added_indices, required_indices, file, clickhouse_to_parquet_names); + } + return; + } + } + else if (const auto * type_array = typeid_cast(nested_type.get())) + { + String element_name = name; + String element_transformed_name = transformed_name; + if (clickhouse_to_parquet_names) + { + element_name = Nested::concatenateName(name, "element"); + element_transformed_name = Nested::concatenateName(transformed_name, "element"); + if (auto it = clickhouse_to_parquet_names->find(element_name); it != clickhouse_to_parquet_names->end()) + element_transformed_name = it->second; + } + findRequiredIndices(element_name, element_transformed_name, header_index, type_array->getNestedType(), field_indices, added_indices, required_indices, file, clickhouse_to_parquet_names); + return; + } + else if (const auto * type_map = typeid_cast(nested_type.get())) + { + findRequiredIndices(name, transformed_name, header_index, type_map->getKeyType(), field_indices, added_indices, required_indices, file, clickhouse_to_parquet_names); + findRequiredIndices(Nested::concatenateName(name, "value"), Nested::concatenateName(transformed_name, "value"), header_index, type_map->getValueType(), field_indices, added_indices, required_indices, file, clickhouse_to_parquet_names); + return; + } + auto it = field_indices.find(transformed_name); + if (it == field_indices.end()) + { + if (!allow_missing_columns) + throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Not found field ({})", name); + } + else + { + ClickHouseIndexToParquetIndex index_mapping; + index_mapping.clickhouse_index = header_index; + for (int j = 0; j < it->second.second; ++j) + { + auto index = it->second.first + j; + if (added_indices.insert(index).second) + { + index_mapping.parquet_indexes.emplace_back(index); + } + } + + required_indices.emplace_back(index_mapping); + } + } +}; + +} + +#endif diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index af13d9766afe..2d07b5786168 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -64,8 +64,26 @@ void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_, reader.prefilterAndInitRowGroups(row_groups_to_read); reader.preparePrewhere(); - ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, reader.row_groups.size()); - ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups, reader.file_metadata.row_groups.size() - reader.row_groups.size()); + /// Profile events must reflect only the row groups that belong to this bucket, otherwise + /// every bucket of a single-file split would report the file's totals and the events would + /// be multiplied by the number of buckets. + size_t read_count; + size_t total_in_partition; + if (row_groups_to_read.has_value()) + { + read_count = 0; + for (const auto & rg : reader.row_groups) + if (rg.need_to_process) + ++read_count; + total_in_partition = row_groups_to_read->size(); + } + else + { + read_count = reader.row_groups.size(); + total_in_partition = reader.file_metadata.row_groups.size(); + } + ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, read_count); + ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups, total_in_partition - read_count); size_t num_row_groups = reader.row_groups.size(); for (size_t i = size_t(ReadStage::NotStarted) + 1; i < size_t(ReadStage::Deliver); ++i) diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index e2ae38831029..a5d67cb5ed16 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -132,8 +132,23 @@ Chunk ParquetV3BlockInputFormat::read() temp_prefetcher.init(in, read_options, parser_shared_resources); parquet::format::FileMetaData file_metadata = getFileMetadata(temp_prefetcher); + size_t num_rows = 0; + if (buckets_to_read) + { + /// Only count rows in the assigned row groups. Otherwise multiple sources + /// reading buckets of the same file would each report the file's total. + for (size_t rg : buckets_to_read->row_group_ids) + { + if (rg < file_metadata.row_groups.size()) + num_rows += size_t(file_metadata.row_groups[rg].num_rows); + } + } + else + { + num_rows = size_t(file_metadata.num_rows); + } - auto chunk = getChunkForCount(size_t(file_metadata.num_rows)); + auto chunk = getChunkForCount(num_rows); chunk.getChunkInfos().add(std::make_shared(0)); reported_count = true; @@ -323,6 +338,32 @@ std::vector ParquetBucketSplitter::splitToBuckets(size_t buck return result; } +std::vector ParquetBucketSplitter::splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) +{ + std::atomic is_stopped = false; + auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); + auto metadata = parquet::ReadMetaData(arrow_file); + const size_t num_row_groups = metadata->num_row_groups(); + + if (target_count == 0 || num_row_groups == 0) + return {}; + + const size_t num_chunks = std::min(target_count, num_row_groups); + std::vector result; + result.reserve(num_chunks); + for (size_t g = 0; g < num_chunks; ++g) + { + size_t lo = g * num_row_groups / num_chunks; + size_t hi = (g + 1) * num_row_groups / num_chunks; + std::vector ids; + ids.reserve(hi - lo); + for (size_t k = lo; k < hi; ++k) + ids.push_back(k); + result.push_back(std::make_shared(ids)); + } + return result; +} + void registerInputFormatParquet(FormatFactory & factory) { factory.registerFileBucketInfo( diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h index 12859a22d16f..425ea3626e34 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h @@ -32,6 +32,7 @@ struct ParquetBucketSplitter : public IBucketSplitter { ParquetBucketSplitter() = default; std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) override; + std::vector splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) override; }; class ParquetV3BlockInputFormat : public IInputFormat diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index c847b246e7a1..ea1561c65166 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1551,6 +1551,15 @@ Chunk StorageFileSource::generate() progress_callback(FileProgress(0, tryGetFileSizeFromReadBuffer(*read_buf).value_or(0))); } } + else if (fixed_file_path.has_value()) + { + /// This source was assigned to one specific (file, bucket) pair. + /// Consume it exactly once. + if (fixed_file_consumed) + return {}; + fixed_file_consumed = true; + current_path = *fixed_file_path; + } else { current_path = files_iterator->next(); @@ -1594,7 +1603,11 @@ Chunk StorageFileSource::generate() if (getContext()->getSettingsRef()[Setting::engine_file_skip_empty_files] && file_stat.st_size == 0) continue; - if (need_only_count && tryGetCountFromCache(file_stat)) + /// The count cache stores the file's total row count. When this source + /// only reads a subset of the file (file_bucket_info is set), the cache + /// is inapplicable — using it would have every source report the full + /// total and produce a count that's multiplied by the number of buckets. + if (need_only_count && !file_bucket_info && tryGetCountFromCache(file_stat)) continue; read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, getContext()); @@ -1665,6 +1678,12 @@ Chunk StorageFileSource::generate() input_format->setSerializationHints(serialization_hints); + /// If this source was assigned to read only a subset of the file's buckets + /// (used to read one large file with multiple parallel sources), pass the + /// bucket assignment to the format before it starts reading. + if (file_bucket_info) + input_format->setBucketsToRead(file_bucket_info); + if (need_only_count) input_format->needOnlyCount(); @@ -1733,7 +1752,8 @@ Chunk StorageFileSource::generate() finished_generate = true; if (input_format && storage->format_name != "Distributed" && getContext()->getSettingsRef()[Setting::use_cache_for_count_from_files] - && (!format_filter_info || !format_filter_info->hasFilter())) + && (!format_filter_info || !format_filter_info->hasFilter()) + && !file_bucket_info) addNumRowsToCache(current_path, total_rows_in_file); total_rows_in_file = 0; @@ -1936,11 +1956,55 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui if (max_num_streams > files_to_read) num_streams = files_to_read; + auto ctx = getContext(); + + /// If we are reading exactly one local file in a splittable format (e.g. Parquet), + /// we can split it into multiple buckets (row group ranges) and create one source + /// per bucket. This recovers the parallelism we'd otherwise have only when reading + /// many files at once. Without this, a single big Parquet file feeds the whole + /// downstream pipeline through a single source/Resize(1->N) — leaving most of the + /// CPU idle on machines with many cores. + /// + /// We use the file list from `files_iterator` rather than `storage->paths`: the + /// iterator has already pruned files by `_path`/`_file` virtual-column predicates + /// (`createPathAndFileFilterDAG`), so the optimization respects that pruning. If + /// the predicate excludes the only path the file is not read at all. It also + /// means a query against many paths whose predicate prunes down to a single file + /// still benefits from the split. + std::vector per_source_buckets; + String single_file_path; + if (max_num_streams > 1 + && !storage->archive_info + && !storage->use_table_fd + && !storage->has_peekable_read_buffer_from_fd.load() + && !storage->distributed_processing + && storage->compression_method == "auto" + && FormatFactory::instance().checkFormatHasSplitter(storage->format_name) + && FormatFactory::instance().checkParallelizeOutputAfterReading(storage->format_name, ctx) + && files_iterator->getFiles().size() == 1) + { + auto splitter = FormatFactory::instance().getSplitter(storage->format_name); + single_file_path = files_iterator->getFiles().front(); + struct stat file_stat = getFileStat(single_file_path, false, -1, storage->getName()); + if (file_stat.st_size > 0) + { + auto buf = createReadBuffer( + single_file_path, file_stat, false, -1, storage->compression_method, ctx); + auto buckets = splitter->splitToBucketsByCount( + max_num_streams, *buf, + storage->format_settings.value_or(getFormatSettings(ctx))); + + if (buckets.size() >= 2) + { + per_source_buckets = std::move(buckets); + num_streams = per_source_buckets.size(); + } + } + } + Pipes pipes; pipes.reserve(num_streams); - auto ctx = getContext(); - /// Set total number of bytes to process. For progress bar. auto progress_callback = ctx->getFileProgressCallback(); @@ -1971,13 +2035,19 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui parser_shared_resources, format_filter_info); + if (i < per_source_buckets.size()) + { + source->fixed_file_path = single_file_path; + source->file_bucket_info = per_source_buckets[i]; + } + pipes.emplace_back(std::move(source)); } auto pipe = Pipe::unitePipes(std::move(pipes)); size_t output_ports = pipe.numOutputPorts(); const bool parallelize_output = ctx->getSettingsRef()[Setting::parallelize_output_from_storages]; - if (parallelize_output && storage->parallelizeOutputAfterReading(ctx) && output_ports > 0 && output_ports < max_num_streams) + if (parallelize_output && storage->parallelizeOutputAfterReading(ctx) && output_ports > 0 && output_ports != max_num_streams) pipe.resize(max_num_streams); if (pipe.empty()) diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index 3bac0dd5af70..8370cd4c5418 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -25,6 +25,9 @@ using OutputFormatPtr = std::shared_ptr; class IInputFormat; using InputFormatPtr = std::shared_ptr; +struct FileBucketInfo; +using FileBucketInfoPtr = std::shared_ptr; + class PullingPipelineExecutor; class StorageFile final : public IStorage @@ -248,6 +251,11 @@ class StorageFileSource : public ISource, WithContext } const String & getFileNameInArchive(); + + /// Returns the (possibly virtual-column-filtered) list of files this iterator + /// will produce. Only meaningful when not reading from an archive and not + /// using distributed_processing. + const std::vector & getFiles() const { return files; } private: std::vector files; @@ -318,6 +326,17 @@ class StorageFileSource : public ISource, WithContext std::shared_ptr archive_reader; std::unique_ptr file_enumerator; + /// Optional subset-of-file assignment. When set, the input format only reads + /// these buckets (e.g. for Parquet — only the listed row groups). This is how + /// a single big file is processed in parallel by multiple sources. + FileBucketInfoPtr file_bucket_info; + + /// When this source has been assigned a specific (file, bucket) pair, it + /// reads only that one file (once) and ignores the shared FilesIterator. + /// Set together with `file_bucket_info`. + std::optional fixed_file_path; + bool fixed_file_consumed = false; + ColumnsDescription columns_description; NamesAndTypesList requested_columns; NamesAndTypesList requested_virtual_columns; diff --git a/tests/queries/0_stateless/02725_parquet_preserve_order.reference b/tests/queries/0_stateless/02725_parquet_preserve_order.reference index 3f410c13ec44..ce91c5aed9ea 100644 --- a/tests/queries/0_stateless/02725_parquet_preserve_order.reference +++ b/tests/queries/0_stateless/02725_parquet_preserve_order.reference @@ -8,5 +8,4 @@ ExpressionTransform (Expression) ExpressionTransform × 2 (ReadFromFile) - Resize 1 → 2 - File 0 → 1 + File × 2 0 → 1