Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ struct ExportReplicatedMergeTreePartitionManifest
bool write_full_path_in_iceberg_metadata = false;
bool allow_lossy_cast = false;
String iceberg_metadata_json;
String parquet_compression_method;
UInt64 output_format_compression_level = 3;
UInt64 parquet_row_group_size = 1000000;

std::string toJsonString() const
{
Expand Down Expand Up @@ -208,6 +211,9 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("task_timeout_seconds", task_timeout_seconds);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
json.set("allow_lossy_cast", allow_lossy_cast);
json.set("parquet_compression_method", parquet_compression_method);
json.set("output_format_compression_level", output_format_compression_level);
json.set("parquet_row_group_size", parquet_row_group_size);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -266,6 +272,10 @@ struct ExportReplicatedMergeTreePartitionManifest
/// on upgrade. New tasks always persist the initiator's actual choice.
manifest.allow_lossy_cast = json->has("allow_lossy_cast") ? json->getValue<bool>("allow_lossy_cast") : true;

manifest.parquet_compression_method = json->has("parquet_compression_method") ? json->getValue<String>("parquet_compression_method") : "zstd";
manifest.output_format_compression_level = json->has("output_format_compression_level") ? json->getValue<UInt64>("output_format_compression_level") : 3;
manifest.parquet_row_group_size = json->has("parquet_row_group_size") ? json->getValue<UInt64>("parquet_row_group_size") : 1000000;

return manifest;
}
};
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/MergeTree/ExportPartitionUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ namespace ExportPartitionUtils
context_copy->setCurrentQueryId(manifest.query_id);
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
context_copy->setSetting("output_format_parquet_compression_method", manifest.parquet_compression_method);
context_copy->setSetting("output_format_compression_level", manifest.output_format_compression_level);
context_copy->setSetting("output_format_parquet_row_group_size", manifest.parquet_row_group_size);
context_copy->setSetting("max_threads", manifest.max_threads);
context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy)));
context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file);
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ namespace Setting
extern const SettingsUInt64 export_merge_tree_partition_task_timeout_seconds;
extern const SettingsBool output_format_parallel_formatting;
extern const SettingsBool output_format_parquet_parallel_encoding;
extern const SettingsParquetCompression output_format_parquet_compression_method;
extern const SettingsUInt64 output_format_compression_level;
extern const SettingsUInt64 output_format_parquet_row_group_size;
extern const SettingsMaxThreads max_threads;
extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy;
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
Expand Down Expand Up @@ -8505,6 +8508,9 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
manifest.max_threads = query_context->getSettingsRef()[Setting::max_threads];
manifest.parallel_formatting = query_context->getSettingsRef()[Setting::output_format_parallel_formatting];
manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding];
manifest.parquet_compression_method = query_context->getSettingsRef()[Setting::output_format_parquet_compression_method].toString();
manifest.output_format_compression_level = query_context->getSettingsRef()[Setting::output_format_compression_level];
manifest.parquet_row_group_size = query_context->getSettingsRef()[Setting::output_format_parquet_row_group_size];

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Persist the Parquet row-group byte limit too

When users schedule EXPORT PARTITION with output_format_parquet_row_group_size_bytes (for example to avoid the default 512 MiB cap), this manifest only captures output_format_parquet_row_group_size; the background export context is rebuilt from the manifest, and getFormatSettings consumes both output_format_parquet_row_group_size and output_format_parquet_row_group_size_bytes (src/Formats/FormatFactory.cpp:202-203). Any worker replica will therefore fall back to its default byte limit and produce different row groups than the initiating query. Please persist and restore output_format_parquet_row_group_size_bytes alongside this field.

Useful? React with 👍 / 👎.

manifest.max_bytes_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_bytes_per_file];
manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file];

Expand Down
Loading