Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions docs/en/antalya/partition_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ TO TABLE [destination_database.]destination_table
- **Default**: `false`
- **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition that was already exported to the same destination. **IMPORTANT:** this is dangerous because it can lead to duplicated data, use it with caution.

#### `export_merge_tree_partition_max_retries` (Optional)
#### `export_merge_tree_partition_retry_initial_backoff_ms` (Optional)

- **Type**: `UInt64`
- **Default**: `3`
- **Description**: Maximum number of retries for exporting a merge tree part in an export partition task. If it exceeds, the entire task fails.
- **Default**: `5000`
- **Description**: Initial delay (in milliseconds) before retrying a failed part export. The delay grows exponentially with the per-replica retry count (`delay = min(initial << (attempts - 1), max)`). The back-off is per-replica in-memory state: it only spaces this replica's retries out in time and never prevents another replica from attempting the same part. Retryable failures (transient memory/network/object-storage/Keeper errors) are retried until the task succeeds or `export_merge_tree_partition_task_timeout_seconds` elapses, while non-retryable failures (e.g. schema/type incompatibilities) fail the task immediately.

#### `export_merge_tree_partition_retry_max_backoff_ms` (Optional)

- **Type**: `UInt64`
- **Default**: `60000`
- **Description**: Maximum delay (in milliseconds) between retries of a failed part export. Caps the exponential growth controlled by `export_merge_tree_partition_retry_initial_backoff_ms`.

#### `export_merge_tree_part_file_already_exists_policy` (Optional)

Expand Down
2 changes: 2 additions & 0 deletions src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ static struct InitFiu
ONCE(iceberg_export_after_commit_before_zk_completed) \
REGULAR(export_partition_commit_always_throw) \
ONCE(export_partition_status_change_throw) \
REGULAR(export_part_non_retryable_throw) \
REGULAR(export_part_retryable_throw) \
ONCE(backup_add_empty_memory_table) \
PAUSEABLE_ONCE(backup_pause_on_start) \
PAUSEABLE_ONCE(restore_pause_on_start) \
Expand Down
12 changes: 10 additions & 2 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7555,8 +7555,16 @@ Overwrite file if it already exists when exporting a merge tree part
DECLARE(Bool, export_merge_tree_partition_force_export, false, R"(
Ignore existing partition export and overwrite the zookeeper entry
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"(
Maximum number of retries for exporting a merge tree part in an export partition task
DECLARE(UInt64, export_merge_tree_partition_retry_initial_backoff_ms, 5000, R"(
Initial delay (in milliseconds) before retrying a failed part export in an export partition task.
The delay grows exponentially with the per-replica retry count (capped doubling): `delay = min(initial << (attempts - 1), max)`, where `max` is `export_merge_tree_partition_retry_max_backoff_ms`.
The back-off is per-replica in-memory state: it only spaces this replica's retries out in time and never prevents another replica from attempting the same part. Retryable failures are retried until the task succeeds or `export_merge_tree_partition_task_timeout_seconds` elapses.
To survive a long transient outage (e.g. object storage downtime), raise `export_merge_tree_partition_task_timeout_seconds`.

Note: the effective resolution is bounded by the export select-task tick (~5s), so back-off delays shorter than the tick interval are rounded up to it in practice.
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_retry_max_backoff_ms, 60000, R"(
Maximum delay (in milliseconds) between retries of a failed part export in an export partition task. Caps the exponential growth controlled by `export_merge_tree_partition_retry_initial_backoff_ms`.
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_task_timeout_seconds, 86400, R"(
Maximum wall-clock duration (in seconds) an export partition task is allowed to remain in the PENDING state before it is auto-killed by the background cleanup loop.
Expand Down
3 changes: 2 additions & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"export_merge_tree_partition_task_timeout_seconds", "3600", "86400", "Increase default value to make it more realistic"},
{"export_merge_tree_part_allow_lossy_cast", false, false, "New setting to gate lossy casts in EXPORT PART/PARTITION behind explicit acknowledgment"},
{"export_merge_tree_partition_retry_initial_backoff_ms", 5000, 5000, "New setting for exponential back-off between failed part export retries in an export partition task"},
{"export_merge_tree_partition_retry_max_backoff_ms", 60000, 60000, "New setting capping the exponential back-off between failed part export retries in an export partition task"},
});
addSettingsChanges(settings_changes_history, "26.3",
{
Expand Down Expand Up @@ -329,7 +331,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
{"export_merge_tree_partition_force_export", false, false, "New setting."},
{"export_merge_tree_partition_max_retries", 3, 3, "New setting."},
{"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."},
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
Expand Down
13 changes: 7 additions & 6 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry

String part_name;
Status status;
size_t retry_count;
String finished_by;

std::string toJsonString() const
Expand All @@ -32,7 +31,6 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry

json.set("part_name", part_name);
json.set("status", String(magic_enum::enum_name(status)));
json.set("retry_count", retry_count);
json.set("finished_by", finished_by);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Expand All @@ -51,7 +49,6 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry

entry.part_name = json->getValue<String>("part_name");
entry.status = magic_enum::enum_cast<Status>(json->getValue<String>("status")).value();
entry.retry_count = json->getValue<size_t>("retry_count");
if (json->has("finished_by"))
{
entry.finished_by = json->getValue<String>("finished_by");
Expand Down Expand Up @@ -163,7 +160,8 @@ struct ExportReplicatedMergeTreePartitionManifest
size_t number_of_parts;
std::vector<String> parts;
time_t create_time;
size_t max_retries;
size_t retry_initial_backoff_ms;
size_t retry_max_backoff_ms;
size_t task_timeout_seconds;
size_t max_threads;
bool parallel_formatting;
Expand Down Expand Up @@ -204,7 +202,8 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy)));
json.set("filename_pattern", filename_pattern);
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("retry_initial_backoff_ms", retry_initial_backoff_ms);
json.set("retry_max_backoff_ms", retry_max_backoff_ms);
json.set("task_timeout_seconds", task_timeout_seconds);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
json.set("allow_lossy_cast", allow_lossy_cast);
Expand All @@ -228,7 +227,9 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.destination_table = json->getValue<String>("destination_table");
manifest.source_replica = json->getValue<String>("source_replica");
manifest.number_of_parts = json->getValue<size_t>("number_of_parts");
manifest.max_retries = json->getValue<size_t>("max_retries");

manifest.retry_initial_backoff_ms = json->getValue<size_t>("retry_initial_backoff_ms");
manifest.retry_max_backoff_ms = json->getValue<size_t>("retry_max_backoff_ms");
Comment on lines +231 to +232

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Default backoff fields for old export manifests

When a node upgrades with an export created by the previous code still under /exports, that metadata.json does not contain these two new fields because older manifests only wrote max_retries. ExportReplicatedMergeTreePartitionManifest::fromJsonString is used by the manifest updater before it can clean up or kill tasks, and getValue throws on missing keys, so one old in-flight export makes poll fail repeatedly and prevents this table's export state from progressing until the ZooKeeper entry is removed manually. Please read these fields with defaults when absent, similar to the existing allow_lossy_cast compatibility handling.

Useful? React with 👍 / 👎.


if (json->has("iceberg_metadata_json"))
{
Expand Down
25 changes: 25 additions & 0 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include "Common/setThreadName.h"
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/ProfileEventsScope.h>
#include <Databases/DatabaseReplicated.h>
#include <Storages/MergeTree/ExportList.h>
Expand All @@ -43,6 +44,18 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED;
extern const int BAD_ARGUMENTS;
extern const int FAULT_INJECTED;
}

namespace FailPoints
{
/// Throw a non-retryable (denylisted) error from the part-export worker, so the whole
/// export task transitions to FAILED immediately regardless of any timeout.
extern const char export_part_non_retryable_throw[];
/// Throw a retryable error from the part-export worker, so the part is retried with the
/// per-replica back-off until the task succeeds or the absolute timeout fires.
extern const char export_part_retryable_throw[];
}

namespace Setting
Expand Down Expand Up @@ -234,6 +247,18 @@ bool ExportPartTask::executeStep()
{
ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ThreadName::EXPORT_PART);

fiu_do_on(FailPoints::export_part_non_retryable_throw,
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Failpoint: export_part_non_retryable_throw");
});

fiu_do_on(FailPoints::export_part_retryable_throw,
{
throw Exception(ErrorCodes::FAULT_INJECTED,
"Failpoint: export_part_retryable_throw");
});

const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context);

sink = destination_storage->import(
Expand Down
19 changes: 14 additions & 5 deletions src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ std::vector<ReplicatedPartitionExportInfo> ExportPartitionManifestUpdatingTask::
if (!model)
return {};

const auto backoff = storage.export_merge_tree_partition_task_scheduler->getLocalBackoffSnapshot();

std::vector<ReplicatedPartitionExportInfo> infos;
infos.reserve(model->size());

Expand Down Expand Up @@ -285,6 +287,13 @@ std::vector<ReplicatedPartitionExportInfo> ExportPartitionManifestUpdatingTask::
}
info.exception_count = total_exception_count;

if (const auto it = backoff.find(entry.getTransactionId()); it != backoff.end())
{
info.backoff_per_part.reserve(it->second.size());
for (const auto & [part_name, state] : it->second)
info.backoff_per_part.push_back({part_name, state.attempts, state.next_retry_time});
}

infos.emplace_back(std::move(info));
}

Expand Down Expand Up @@ -490,20 +499,20 @@ void ExportPartitionManifestUpdatingTask::poll()
"Caught exception while committing export for {}: {}",
work.entry_path, e.message());

const bool exceeded_commimt_max_retries = ExportPartitionUtils::handleCommitFailure(
const bool became_failed = ExportPartitionUtils::handleCommitFailure(
zk,
work.entry_path,
work.metadata.max_retries,
e.code(),
storage.getReplicaName(),
e.message(),
log_ptr);

if (exceeded_commimt_max_retries)
if (became_failed)
{
LOG_WARNING(log_ptr,
"ExportPartition Manifest Updating Task: "
"Commit for {} transitioned to FAILED after exhausting max_retries={}",
work.entry_path, work.metadata.max_retries);
"Commit for {} transitioned to FAILED due to non-retryable error (code {})",
work.entry_path, e.code());
}
}
}
Expand Down
Loading
Loading