diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index 4e28e5e4f505..a9b5d5a36153 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -175,6 +175,9 @@ struct ExportReplicatedMergeTreePartitionManifest bool write_full_path_in_iceberg_metadata = false; bool allow_lossy_cast = false; String iceberg_metadata_json; + String parquet_compression_method; + UInt64 output_format_compression_level = 3; + UInt64 parquet_row_group_size = 1000000; std::string toJsonString() const { @@ -208,6 +211,9 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("task_timeout_seconds", task_timeout_seconds); json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata); json.set("allow_lossy_cast", allow_lossy_cast); + json.set("parquet_compression_method", parquet_compression_method); + json.set("output_format_compression_level", output_format_compression_level); + json.set("parquet_row_group_size", parquet_row_group_size); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); Poco::JSON::Stringifier::stringify(json, oss); @@ -266,6 +272,10 @@ struct ExportReplicatedMergeTreePartitionManifest /// on upgrade. New tasks always persist the initiator's actual choice. manifest.allow_lossy_cast = json->has("allow_lossy_cast") ? json->getValue("allow_lossy_cast") : true; + manifest.parquet_compression_method = json->has("parquet_compression_method") ? json->getValue("parquet_compression_method") : "zstd"; + manifest.output_format_compression_level = json->has("output_format_compression_level") ? json->getValue("output_format_compression_level") : 3; + manifest.parquet_row_group_size = json->has("parquet_row_group_size") ? json->getValue("parquet_row_group_size") : 1000000; + return manifest; } }; diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 679e07d0b132..f9385dfda767 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -81,6 +81,9 @@ namespace ExportPartitionUtils context_copy->setCurrentQueryId(manifest.query_id); context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding); + context_copy->setSetting("output_format_parquet_compression_method", manifest.parquet_compression_method); + context_copy->setSetting("output_format_compression_level", manifest.output_format_compression_level); + context_copy->setSetting("output_format_parquet_row_group_size", manifest.parquet_row_group_size); context_copy->setSetting("max_threads", manifest.max_threads); context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy))); context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 046399f0bdfe..766bcefe5100 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -218,6 +218,9 @@ namespace Setting extern const SettingsUInt64 export_merge_tree_partition_task_timeout_seconds; extern const SettingsBool output_format_parallel_formatting; extern const SettingsBool output_format_parquet_parallel_encoding; + extern const SettingsParquetCompression output_format_parquet_compression_method; + extern const SettingsUInt64 output_format_compression_level; + extern const SettingsUInt64 output_format_parquet_row_group_size; extern const SettingsMaxThreads max_threads; extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file; @@ -8505,6 +8508,9 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.max_threads = query_context->getSettingsRef()[Setting::max_threads]; manifest.parallel_formatting = query_context->getSettingsRef()[Setting::output_format_parallel_formatting]; manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding]; + manifest.parquet_compression_method = query_context->getSettingsRef()[Setting::output_format_parquet_compression_method].toString(); + manifest.output_format_compression_level = query_context->getSettingsRef()[Setting::output_format_compression_level]; + manifest.parquet_row_group_size = query_context->getSettingsRef()[Setting::output_format_parquet_row_group_size]; manifest.max_bytes_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_bytes_per_file]; manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file];