diff --git a/docs/en/antalya/partition_export.md b/docs/en/antalya/partition_export.md index 95a2cfe36fb4..7d3d8db9e02c 100644 --- a/docs/en/antalya/partition_export.md +++ b/docs/en/antalya/partition_export.md @@ -61,11 +61,17 @@ TO TABLE [destination_database.]destination_table - **Default**: `false` - **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition that was already exported to the same destination. **IMPORTANT:** this is dangerous because it can lead to duplicated data, use it with caution. -#### `export_merge_tree_partition_max_retries` (Optional) +#### `export_merge_tree_partition_retry_initial_backoff_ms` (Optional) - **Type**: `UInt64` -- **Default**: `3` -- **Description**: Maximum number of retries for exporting a merge tree part in an export partition task. If it exceeds, the entire task fails. +- **Default**: `5000` +- **Description**: Initial delay (in milliseconds) before retrying a failed part export. The delay grows exponentially with the per-replica retry count (`delay = min(initial << (attempts - 1), max)`). The back-off is per-replica in-memory state: it only spaces this replica's retries out in time and never prevents another replica from attempting the same part. Retryable failures (transient memory/network/object-storage/Keeper errors) are retried until the task succeeds or `export_merge_tree_partition_task_timeout_seconds` elapses, while non-retryable failures (e.g. schema/type incompatibilities) fail the task immediately. + +#### `export_merge_tree_partition_retry_max_backoff_ms` (Optional) + +- **Type**: `UInt64` +- **Default**: `60000` +- **Description**: Maximum delay (in milliseconds) between retries of a failed part export. Caps the exponential growth controlled by `export_merge_tree_partition_retry_initial_backoff_ms`. #### `export_merge_tree_part_file_already_exists_policy` (Optional) diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index d8ee99f58cb3..9966bd864f70 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -135,6 +135,8 @@ static struct InitFiu ONCE(iceberg_export_after_commit_before_zk_completed) \ REGULAR(export_partition_commit_always_throw) \ ONCE(export_partition_status_change_throw) \ + REGULAR(export_part_non_retryable_throw) \ + REGULAR(export_part_retryable_throw) \ ONCE(backup_add_empty_memory_table) \ PAUSEABLE_ONCE(backup_pause_on_start) \ PAUSEABLE_ONCE(restore_pause_on_start) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 7a9089880d96..604ec0e4b007 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7555,8 +7555,16 @@ Overwrite file if it already exists when exporting a merge tree part DECLARE(Bool, export_merge_tree_partition_force_export, false, R"( Ignore existing partition export and overwrite the zookeeper entry )", 0) \ - DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"( -Maximum number of retries for exporting a merge tree part in an export partition task + DECLARE(UInt64, export_merge_tree_partition_retry_initial_backoff_ms, 5000, R"( +Initial delay (in milliseconds) before retrying a failed part export in an export partition task. +The delay grows exponentially with the per-replica retry count (capped doubling): `delay = min(initial << (attempts - 1), max)`, where `max` is `export_merge_tree_partition_retry_max_backoff_ms`. +The back-off is per-replica in-memory state: it only spaces this replica's retries out in time and never prevents another replica from attempting the same part. Retryable failures are retried until the task succeeds or `export_merge_tree_partition_task_timeout_seconds` elapses. +To survive a long transient outage (e.g. object storage downtime), raise `export_merge_tree_partition_task_timeout_seconds`. + +Note: the effective resolution is bounded by the export select-task tick (~5s), so back-off delays shorter than the tick interval are rounded up to it in practice. +)", 0) \ + DECLARE(UInt64, export_merge_tree_partition_retry_max_backoff_ms, 60000, R"( +Maximum delay (in milliseconds) between retries of a failed part export in an export partition task. Caps the exponential growth controlled by `export_merge_tree_partition_retry_initial_backoff_ms`. )", 0) \ DECLARE(UInt64, export_merge_tree_partition_task_timeout_seconds, 86400, R"( Maximum wall-clock duration (in seconds) an export partition task is allowed to remain in the PENDING state before it is auto-killed by the background cleanup loop. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3a72ec667415..a2cc6682a9a6 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -44,6 +44,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"export_merge_tree_partition_task_timeout_seconds", "3600", "86400", "Increase default value to make it more realistic"}, {"export_merge_tree_part_allow_lossy_cast", false, false, "New setting to gate lossy casts in EXPORT PART/PARTITION behind explicit acknowledgment"}, + {"export_merge_tree_partition_retry_initial_backoff_ms", 5000, 5000, "New setting for exponential back-off between failed part export retries in an export partition task"}, + {"export_merge_tree_partition_retry_max_backoff_ms", 60000, 60000, "New setting capping the exponential back-off between failed part export retries in an export partition task"}, }); addSettingsChanges(settings_changes_history, "26.3", { @@ -329,7 +331,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."}, {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, {"export_merge_tree_partition_force_export", false, false, "New setting."}, - {"export_merge_tree_partition_max_retries", 3, 3, "New setting."}, {"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."}, {"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."}, {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index 4e28e5e4f505..dc80423a81a1 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -23,7 +23,6 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry String part_name; Status status; - size_t retry_count; String finished_by; std::string toJsonString() const @@ -32,7 +31,6 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry json.set("part_name", part_name); json.set("status", String(magic_enum::enum_name(status))); - json.set("retry_count", retry_count); json.set("finished_by", finished_by); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); @@ -51,7 +49,6 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry entry.part_name = json->getValue("part_name"); entry.status = magic_enum::enum_cast(json->getValue("status")).value(); - entry.retry_count = json->getValue("retry_count"); if (json->has("finished_by")) { entry.finished_by = json->getValue("finished_by"); @@ -163,7 +160,8 @@ struct ExportReplicatedMergeTreePartitionManifest size_t number_of_parts; std::vector parts; time_t create_time; - size_t max_retries; + size_t retry_initial_backoff_ms; + size_t retry_max_backoff_ms; size_t task_timeout_seconds; size_t max_threads; bool parallel_formatting; @@ -204,7 +202,8 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy))); json.set("filename_pattern", filename_pattern); json.set("create_time", create_time); - json.set("max_retries", max_retries); + json.set("retry_initial_backoff_ms", retry_initial_backoff_ms); + json.set("retry_max_backoff_ms", retry_max_backoff_ms); json.set("task_timeout_seconds", task_timeout_seconds); json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata); json.set("allow_lossy_cast", allow_lossy_cast); @@ -228,7 +227,9 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.destination_table = json->getValue("destination_table"); manifest.source_replica = json->getValue("source_replica"); manifest.number_of_parts = json->getValue("number_of_parts"); - manifest.max_retries = json->getValue("max_retries"); + + manifest.retry_initial_backoff_ms = json->getValue("retry_initial_backoff_ms"); + manifest.retry_max_backoff_ms = json->getValue("retry_max_backoff_ms"); if (json->has("iceberg_metadata_json")) { diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index e626e7364eaa..2305757c895c 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -18,6 +18,7 @@ #include #include "Common/setThreadName.h" #include +#include #include #include #include @@ -43,6 +44,18 @@ namespace ErrorCodes extern const int FILE_ALREADY_EXISTS; extern const int LOGICAL_ERROR; extern const int QUERY_WAS_CANCELLED; + extern const int BAD_ARGUMENTS; + extern const int FAULT_INJECTED; +} + +namespace FailPoints +{ + /// Throw a non-retryable (denylisted) error from the part-export worker, so the whole + /// export task transitions to FAILED immediately regardless of any timeout. + extern const char export_part_non_retryable_throw[]; + /// Throw a retryable error from the part-export worker, so the part is retried with the + /// per-replica back-off until the task succeeds or the absolute timeout fires. + extern const char export_part_retryable_throw[]; } namespace Setting @@ -234,6 +247,18 @@ bool ExportPartTask::executeStep() { ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ThreadName::EXPORT_PART); + fiu_do_on(FailPoints::export_part_non_retryable_throw, + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Failpoint: export_part_non_retryable_throw"); + }); + + fiu_do_on(FailPoints::export_part_retryable_throw, + { + throw Exception(ErrorCodes::FAULT_INJECTED, + "Failpoint: export_part_retryable_throw"); + }); + const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context); sink = destination_storage->import( diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 6aa9a15cfe0f..64c2d470e74f 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -255,6 +255,8 @@ std::vector ExportPartitionManifestUpdatingTask:: if (!model) return {}; + const auto backoff = storage.export_merge_tree_partition_task_scheduler->getLocalBackoffSnapshot(); + std::vector infos; infos.reserve(model->size()); @@ -285,6 +287,13 @@ std::vector ExportPartitionManifestUpdatingTask:: } info.exception_count = total_exception_count; + if (const auto it = backoff.find(entry.getTransactionId()); it != backoff.end()) + { + info.backoff_per_part.reserve(it->second.size()); + for (const auto & [part_name, state] : it->second) + info.backoff_per_part.push_back({part_name, state.attempts, state.next_retry_time}); + } + infos.emplace_back(std::move(info)); } @@ -490,20 +499,20 @@ void ExportPartitionManifestUpdatingTask::poll() "Caught exception while committing export for {}: {}", work.entry_path, e.message()); - const bool exceeded_commimt_max_retries = ExportPartitionUtils::handleCommitFailure( + const bool became_failed = ExportPartitionUtils::handleCommitFailure( zk, work.entry_path, - work.metadata.max_retries, + e.code(), storage.getReplicaName(), e.message(), log_ptr); - if (exceeded_commimt_max_retries) + if (became_failed) { LOG_WARNING(log_ptr, "ExportPartition Manifest Updating Task: " - "Commit for {} transitioned to FAILED after exhausting max_retries={}", - work.entry_path, work.metadata.max_retries); + "Commit for {} transitioned to FAILED due to non-retryable error (code {})", + work.entry_path, e.code()); } } } diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 89b97460ec08..1e1f207f7510 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -12,6 +12,7 @@ #include "Storages/MergeTree/MergeTreePartExportManifest.h" #include "Formats/FormatFactory.h" #include +#include namespace ProfileEvents { @@ -40,20 +41,50 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +namespace +{ + /// Capped exponential back-off, matching the standard ClickHouse convention + /// (see ZooKeeperRetriesControl): delay = min(initial << (retry_count - 1), max). + /// `retry_count` is the number of failures so far (>= 1 when a retry is pending). + /// The shift is guarded against overflow by saturating to `max_backoff_ms`. + size_t computeRetryBackoffMs(size_t retry_count, size_t initial_backoff_ms, size_t max_backoff_ms) + { + const size_t initial = std::min(initial_backoff_ms, max_backoff_ms); + + if (retry_count <= 1 || initial == 0) + return initial; + + const size_t shift = retry_count - 1; + + /// If shifting would overflow size_t, the result is certainly clamped to the cap. + static constexpr size_t bits = sizeof(size_t) * 8; + if (shift >= bits) + return max_backoff_ms; + + const size_t headroom = std::numeric_limits::max() >> shift; + if (initial > headroom) + return max_backoff_ms; + + return std::min(initial << shift, max_backoff_ms); + } +} + ExportPartitionTaskScheduler::ExportPartitionTaskScheduler(StorageReplicatedMergeTree & storage_) : storage(storage_) { } -void ExportPartitionTaskScheduler::run() +std::optional ExportPartitionTaskScheduler::run() { + std::optional earliest_backoff_retry; + const auto available_move_executors = storage.background_moves_assignee.getAvailableMoveExecutors(); /// this is subject to TOCTOU - but for now we choose to live with it. if (available_move_executors == 0) { LOG_INFO(storage.log, "ExportPartition scheduler task: No available move executors, skipping"); - return; + return earliest_backoff_retry; } /// Respect the background memory soft-limit: refuse to schedule new export-part tasks when @@ -67,7 +98,7 @@ void ExportPartitionTaskScheduler::run() "so won't select new parts to export. Current background tasks memory usage: {}.", formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()), formatReadableSizeWithBinarySuffix(background_memory_tracker.get())); - return; + return earliest_backoff_retry; } LOG_INFO(storage.log, "ExportPartition scheduler task: Available move executors: {}", available_move_executors); @@ -82,10 +113,12 @@ void ExportPartitionTaskScheduler::run() /// is a pure reader; status converges via the status watch -> handleStatusChanges and poll(). const auto model = storage.export_partition_manifests.get(); if (!model) - return; + return earliest_backoff_retry; auto zk = storage.getZooKeeper(); + pruneLocalBackoff(model->get()); + // Iterate sorted by create_time for (const auto & entry : model->get()) { @@ -172,6 +205,8 @@ void ExportPartitionTaskScheduler::run() std::unordered_set locked_parts_set(locked_parts.begin(), locked_parts.end()); + const auto now = time(nullptr); + for (const auto & zk_part_name : parts_in_processing_or_pending) { if (scheduled_exports_count >= available_move_executors) @@ -186,6 +221,11 @@ void ExportPartitionTaskScheduler::run() continue; } + if (shouldBackOff(entry.getTransactionId(), zk_part_name, now, earliest_backoff_retry)) + { + continue; + } + const auto part = storage.getPartIfExists(zk_part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}); if (!part) { @@ -234,10 +274,99 @@ void ExportPartitionTaskScheduler::run() ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); - /// we should not increment retry_count because the node might just be full + /// Dispatch-time failure (e.g. Keeper node full). We do not arm the local + /// back-off here: the export never started, so the part stays immediately + /// eligible for this or another replica on the next tick. } } } + + return earliest_backoff_retry; +} + +bool ExportPartitionTaskScheduler::shouldBackOff( + const std::string & transaction_id, + const std::string & part_name, + time_t now, + std::optional & earliest_backoff_retry) const +{ + std::lock_guard lock(local_backoff_mutex); + const auto task_it = local_backoff.find(transaction_id); + if (task_it == local_backoff.end()) + return false; + + const auto part_it = task_it->second.find(part_name); + if (part_it == task_it->second.end() || now >= part_it->second.next_retry_time) + return false; + + const auto next_retry_time = part_it->second.next_retry_time; + LOG_TRACE(storage.log, "ExportPartition scheduler task: Part {} is backing off locally, next retry at {} (now {}), skipping", part_name, next_retry_time, now); + earliest_backoff_retry = earliest_backoff_retry + ? std::min(*earliest_backoff_retry, next_retry_time) : next_retry_time; + return true; +} + +time_t ExportPartitionTaskScheduler::registerLocalBackoff( + const std::string & transaction_id, + const std::string & part_name, + const ExportReplicatedMergeTreePartitionManifest & manifest) +{ + std::lock_guard lock(local_backoff_mutex); + + /// First retryable failure for (transaction_id, part_name): create the map entries. + auto & parts = local_backoff.try_emplace(transaction_id).first->second; + auto & backoff = parts.try_emplace(part_name).first->second; + + ++backoff.attempts; + const auto backoff_ms = computeRetryBackoffMs( + backoff.attempts, manifest.retry_initial_backoff_ms, manifest.retry_max_backoff_ms); + /// Round up to whole seconds so a sub-second initial back-off still defers at least one second. + backoff.next_retry_time = time(nullptr) + static_cast((backoff_ms + 999) / 1000); + return backoff.next_retry_time; +} + +void ExportPartitionTaskScheduler::clearLocalBackoff(const std::string & transaction_id, const std::string & part_name) +{ + std::lock_guard lock(local_backoff_mutex); + if (const auto task_it = local_backoff.find(transaction_id); task_it != local_backoff.end()) + { + task_it->second.erase(part_name); + if (task_it->second.empty()) + local_backoff.erase(task_it); + } +} + +void ExportPartitionTaskScheduler::pruneLocalBackoff(const ExportPartitionTaskEntriesContainer::index::type & model) +{ + std::lock_guard lock(local_backoff_mutex); + for (auto it = local_backoff.begin(); it != local_backoff.end();) + { + const auto found = model.find(it->first); + if (found != model.end() && found->status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) + { + ++it; + continue; + } + + it = local_backoff.erase(it); + } +} + +ExportPartitionTaskScheduler::LocalBackoffMap ExportPartitionTaskScheduler::getLocalBackoffSnapshot() const +{ + LocalBackoffMap snapshot; + + std::lock_guard lock(local_backoff_mutex); + snapshot.reserve(local_backoff.size()); + for (const auto & [transaction_id, parts] : local_backoff) + { + auto & out_parts = snapshot[transaction_id]; + out_parts.reserve(parts.size()); + for (const auto & [part_name, backoff] : parts) + out_parts.emplace(part_name, LocalBackoff{backoff.attempts, backoff.next_retry_time}); + } + + return snapshot; } void ExportPartitionTaskScheduler::handlePartExportCompletion( @@ -261,7 +390,7 @@ void ExportPartitionTaskScheduler::handlePartExportCompletion( } else { - handlePartExportFailure(processing_parts_path, part_name, export_path, zk, result.exception, manifest.max_retries); + handlePartExportFailure(part_name, export_path, zk, result.exception, manifest); } } @@ -289,6 +418,9 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess( return; } + /// Part is done on this replica; drop any local back-off state we held for it. + clearLocalBackoff(manifest.transaction_id, part_name); + LOG_INFO(storage.log, "ExportPartition scheduler task: Marked part export {} as completed", part_name); if (!areAllPartsProcessed(export_path, zk)) @@ -307,15 +439,16 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess( { LOG_INFO(storage.log, "ExportPartition scheduler task: Caught exception while committing export partition, {}", e.message()); - /// Bump commit-attempts counter; transition to FAILED once the budget is exhausted. - /// Prevents the task from remaining stuck in PENDING if commit() fails persistently - /// (e.g. schema/spec mismatch, prolonged destination outage). + /// Classify the commit failure: a non-retryable error (e.g. schema/spec mismatch) + /// transitions the task to FAILED immediately; a retryable one (transient catalog or + /// destination outage) only records the exception and leaves the task PENDING so the + /// commit is retried until the absolute task timeout. /// The exception is recorded in /last_exception via appendExceptionOps - /// inside the same multi as the commit_attempts bump and the (possible) FAILED set. + /// inside the same multi as the (possible) FAILED set. const bool became_failed = ExportPartitionUtils::handleCommitFailure( zk, export_path, - manifest.max_retries, + e.code(), storage.replica_name, e.message(), storage.log.load()); @@ -323,19 +456,18 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess( if (became_failed) { LOG_WARNING(storage.log, - "ExportPartition scheduler task: Commit for {} transitioned to FAILED after exhausting max_retries={}", - export_path.string(), manifest.max_retries); + "ExportPartition scheduler task: Commit for {} transitioned to FAILED due to non-retryable error (code {})", + export_path.string(), e.code()); } } } void ExportPartitionTaskScheduler::handlePartExportFailure( - const std::filesystem::path & processing_parts_path, const std::string & part_name, const std::filesystem::path & export_path, const zkutil::ZooKeeperPtr & zk, const std::optional & exception, - size_t max_retries + const ExportReplicatedMergeTreePartitionManifest & manifest ) { LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} export failed", part_name); @@ -415,42 +547,25 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( return; } - Coordination::Requests ops; - - const auto processing_part_path = processing_parts_path / part_name; - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - std::string processing_part_string; - - if (!zk->tryGet(processing_part_path, processing_part_string)) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get processing part, will not increment error counts"); - return; - } - - /// todo arthur could this have been cached? - auto processing_part_entry = ExportReplicatedMergeTreePartitionProcessingPartEntry::fromJsonString(processing_part_string); + const bool non_retryable = ExportPartitionUtils::isNonRetryableExportError(exception->code()); - processing_part_entry.retry_count++; + Coordination::Requests ops; ops.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version)); - ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1)); - - LOG_INFO(storage.log, "ExportPartition scheduler task: Updating processing part entry for part {}, retry count: {}, max retries: {}", part_name, processing_part_entry.retry_count, max_retries); - if (processing_part_entry.retry_count >= max_retries) + if (non_retryable) { - /// just set status in processing_part_path and finished_by - processing_part_entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::FAILED; - processing_part_entry.finished_by = storage.replica_name; - - ops.emplace_back(zkutil::makeSetRequest(status_path, String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), status_stat.version)); - LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name); + /// Deterministic failure (e.g. schema/type incompatibility): retrying cannot help, + /// so fail the whole task immediately instead of waiting for the absolute timeout. + ops.emplace_back(zkutil::makeSetRequest( + status_path, + String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), + status_stat.version)); + LOG_WARNING(storage.log, "ExportPartition scheduler task: Part {} failed with non-retryable error (code {}), failing the entire task", part_name, exception->code()); } else { - LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit not exceeded for part {}, will increment retry count", part_name); + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} failed with retryable error (code {}), will back off and retry until the task timeout", part_name, exception->code()); } ExportPartitionUtils::appendExceptionOps( @@ -466,7 +581,15 @@ void ExportPartitionTaskScheduler::handlePartExportFailure( return; } - LOG_INFO(storage.log, "ExportPartition scheduler task: Successfully updated exception counters for part {}", part_name); + /// Only after the lock release + exception record committed do we arm the local back-off, + /// so a Keeper failure above does not leave this replica skipping the part for no reason. + if (!non_retryable) + { + const auto next_retry_time = registerLocalBackoff(manifest.transaction_id, part_name, manifest); + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} backing off locally, next retry at {}", part_name, next_retry_time); + } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Successfully recorded failure for part {}", part_name); } bool ExportPartitionTaskScheduler::tryToMovePartToProcessed( diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.h b/src/Storages/MergeTree/ExportPartitionTaskScheduler.h index 29a41fde1cb9..c281e06c06fa 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.h +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.h @@ -1,7 +1,14 @@ #pragma once #include +#include #include +#include +#include +#include +#include +#include +#include namespace DB { @@ -17,7 +24,10 @@ class ExportPartitionTaskScheduler public: ExportPartitionTaskScheduler(StorageReplicatedMergeTree & storage); - void run(); + /// Returns the earliest future back-off deadline (unix seconds) among parts that were skipped + /// this tick purely because they are still backing off, or nullopt if none. The caller can use + /// it to wake the select task sooner than the default tick interval. + std::optional run(); private: StorageReplicatedMergeTree & storage; @@ -41,12 +51,11 @@ class ExportPartitionTaskScheduler ); void handlePartExportFailure( - const std::filesystem::path & processing_parts_path, const std::string & part_name, const std::filesystem::path & export_path, const zkutil::ZooKeeperPtr & zk, const std::optional & exception, - size_t max_retries); + const ExportReplicatedMergeTreePartitionManifest & manifest); bool tryToMovePartToProcessed( const std::filesystem::path & export_path, @@ -61,6 +70,49 @@ class ExportPartitionTaskScheduler const std::filesystem::path & export_path, const zkutil::ZooKeeperPtr & zk ); + + struct LocalBackoff + { + size_t attempts = 0; + time_t next_retry_time = 0; + }; + + /// transaction_id -> part name -> back-off state. Keyed by transaction_id (not composite + /// key) so a reused composite key does not inherit a prior instance's back-off. Guarded by + /// local_backoff_mutex because run() (schedule-pool thread) reads it while part-export + /// completion callbacks (background-executor threads) write it. + using PartNameToBackOffMap = std::unordered_map; + using TransactionID = std::string; + using LocalBackoffMap = std::unordered_map; + + mutable std::mutex local_backoff_mutex; + LocalBackoffMap local_backoff; + + bool shouldBackOff( + const std::string & transaction_id, + const std::string & part_name, + time_t now, + std::optional & earliest_backoff_retry) const; + + /// Record a retryable failure for (transaction_id, part_name): grow the attempt counter and + /// compute the next eligible time. Returns the new absolute deadline. + time_t registerLocalBackoff( + const std::string & transaction_id, + const std::string & part_name, + const ExportReplicatedMergeTreePartitionManifest & manifest); + + /// Drop any back-off state for parts of (transaction_id) once they succeed or the task ends. + void clearLocalBackoff(const std::string & transaction_id, const std::string & part_name); + + /// Remove back-off state for tasks whose transaction_id is no longer PENDING in the published + /// model, bounding the map to the parts of currently-active tasks. + void pruneLocalBackoff(const ExportPartitionTaskEntriesContainer::index::type & model); + +public: + /// Snapshot of the local back-off map for system.replicated_partition_exports: + /// transaction_id -> part -> (attempts, next_retry_time). Briefly locks local_backoff_mutex; + /// never held across ZooKeeper I/O. + std::unordered_map getLocalBackoffSnapshot() const; }; } diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 622fa6af92df..8f31ea18be29 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +41,18 @@ namespace ErrorCodes extern const int NO_SUCH_DATA_PART; extern const int CORRUPTED_DATA; extern const int NETWORK_ERROR; + extern const int LOGICAL_ERROR; + extern const int NOT_IMPLEMENTED; + extern const int SUPPORT_IS_DISABLED; + extern const int TYPE_MISMATCH; + extern const int CANNOT_CONVERT_TYPE; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int ILLEGAL_COLUMN; + extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; + extern const int INCOMPATIBLE_COLUMNS; + extern const int NO_SUCH_COLUMN_IN_TABLE; + extern const int FILE_ALREADY_EXISTS; + extern const int METADATA_MISMATCH; } namespace Setting @@ -57,6 +70,33 @@ namespace fs = std::filesystem; namespace ExportPartitionUtils { + bool isNonRetryableExportError(int code) + { + /// Deterministic failures where retrying cannot possibly succeed (schema/type + /// incompatibilities, unsupported features, programming errors). Everything else + /// (memory limits, network/object-storage/Keeper transient errors, ...) is retryable. + /// `QUERY_WAS_CANCELLED` is handled separately by the caller and never reaches here. + /// + /// ErrorCodes values are runtime `extern const int`, not constant expressions, so they + /// cannot be used as `switch` labels; compare against a static set instead. + static const std::unordered_set non_retryable_codes = { + ErrorCodes::BAD_ARGUMENTS, /// Iceberg partition-transform / type / metadata-parsing rejections (Iceberg/Utils.cpp) + ErrorCodes::TYPE_MISMATCH, + ErrorCodes::CANNOT_CONVERT_TYPE, + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + ErrorCodes::ILLEGAL_COLUMN, + ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH, + ErrorCodes::INCOMPATIBLE_COLUMNS, /// positional-cast schema guard (verifyExportSchemaCastable) + ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, + ErrorCodes::NOT_IMPLEMENTED, + ErrorCodes::SUPPORT_IS_DISABLED, + ErrorCodes::LOGICAL_ERROR, + ErrorCodes::FILE_ALREADY_EXISTS, /// file_already_exists_policy='error': retrying always hits the same file + ErrorCodes::METADATA_MISMATCH, /// schema / partition spec changed mid-export: files were built against the old spec, must restart + }; + return non_retryable_codes.contains(code); + } + Block getPartitionSourceBlockForIcebergCommit( MergeTreeData & storage, const String & partition_id) { @@ -263,7 +303,7 @@ namespace ExportPartitionUtils bool handleCommitFailure( const zkutil::ZooKeeperPtr & zk, const std::string & entry_path, - size_t max_attempts, + int exception_code, const std::string & replica_name, const std::string & exception_message, const LoggerPtr & log) @@ -306,51 +346,19 @@ namespace ExportPartitionUtils Coordination::Requests ops; - /// Record the exception in the same multi as the commit-attempts bump and the - /// (possible) FAILED transition, so the user-visible last_exception znode is - /// updated atomically with the state change that exposes it. + /// Record the exception in the same multi as the (possible) FAILED transition, so the + /// user-visible last_exception znode is updated atomically with the state change that + /// exposes it. appendExceptionOps(ops, zk, fs::path(entry_path), replica_name, /*part_name=*/"", exception_message, log); - /// Bump the global commit_attempts counter (shared across replicas). - /// Non-atomic get+set(-1). Under a race, two replicas may see the same value - /// and write the same +1, under-counting by one. FAILED then fires one retry - /// later than the threshold, which is acceptable. - const std::string commit_attempts_path = fs::path(entry_path) / "commit_attempts"; - - size_t attempts = 0; - std::string attempts_string; - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - if (zk->tryGet(commit_attempts_path, attempts_string)) - { - try - { - attempts = parse(attempts_string); - } - catch (...) - { - LOG_WARNING(log, "ExportPartition: commit_attempts value '{}' at {} is not a valid integer, treating as 0", attempts_string, commit_attempts_path); - attempts = 0; - } - - attempts += 1; - ops.emplace_back(zkutil::makeSetRequest(commit_attempts_path, std::to_string(attempts), -1)); - } - else - { - attempts = 1; - ops.emplace_back(zkutil::makeCreateRequest(commit_attempts_path, "1", zkutil::CreateMode::Persistent)); - } - - /// Transition to FAILED if the commit budget is exhausted. - /// Uses the same setting as per-part retries (manifest.max_retries) per user decision. - /// Version-checked Set: if /status has changed since we read it (e.g. a peer's - /// commit() succeeded and wrote COMPLETED), the whole multi aborts with - /// ZBADVERSION and we safely do nothing — the winning terminal state stands. - const bool exhausted = attempts >= max_attempts; - if (exhausted) + /// A non-retryable error (schema/spec mismatch, ...) can never succeed, + /// so fail the task immediately + const bool non_retryable = isNonRetryableExportError(exception_code); + if (non_retryable) { + /// Version-checked Set: if /status has changed since we read it (e.g. a peer's + /// commit() succeeded and wrote COMPLETED), the whole multi aborts with + /// ZBADVERSION and we safely do nothing — the winning terminal state stands. ops.emplace_back(zkutil::makeSetRequest( status_path, String(magic_enum::enum_name(ExportReplicatedMergeTreePartitionTaskEntry::Status::FAILED)).data(), @@ -363,21 +371,16 @@ namespace ExportPartitionUtils const auto rc = zk->tryMulti(ops, responses); if (rc != Coordination::Error::ZOK) { - /// Any error here (ZBADVERSION on /status race or counter race, ZNODEEXISTS on - /// lazy-create race, ZNONODE if someone removed the task concurrently) is - /// non-fatal: the next attempt re-reads /status and either skips (terminal - /// state won) or retries the bookkeeping. Worst case we delay FAILED by one - /// poll cycle, which matches the best-effort property of the existing counters. LOG_INFO(log, "ExportPartition: Failed to persist commit failure bookkeeping for {}: {}", entry_path, rc); return false; } LOG_INFO(log, - "ExportPartition: Commit failure recorded for {} (attempt {}/{}){}", - entry_path, attempts, max_attempts, - exhausted ? ", task transitioned to FAILED" : ""); + "ExportPartition: Commit failure recorded for {} (code {}){}", + entry_path, exception_code, + non_retryable ? ", task transitioned to FAILED (non-retryable)" : ", will retry until task timeout"); - return exhausted; + return non_retryable; } void appendExceptionOps( @@ -580,7 +583,7 @@ namespace ExportPartitionUtils const auto & source_column = source_columns[i]; const auto & destination_column = destination_columns[i]; if (!canBeSafelyCast(source_column.type, destination_column.type)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Cannot export to {}: column '{}' requires a lossy cast from {} to {}, " "which may change values. Set `export_merge_tree_part_allow_lossy_cast = 1` " "to allow lossy casts during export.", diff --git a/src/Storages/MergeTree/ExportPartitionUtils.h b/src/Storages/MergeTree/ExportPartitionUtils.h index 0434bc59a2cb..e42d02ab4ef5 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.h +++ b/src/Storages/MergeTree/ExportPartitionUtils.h @@ -23,6 +23,8 @@ struct ExportReplicatedMergeTreePartitionManifest; namespace ExportPartitionUtils { + bool isNonRetryableExportError(int code); + std::vector getExportedPaths(const LoggerPtr & log, const zkutil::ZooKeeperPtr & zk, const std::string & export_path); ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest); @@ -51,18 +53,19 @@ namespace ExportPartitionUtils /// Handles a commit-phase failure for a replicated partition export: /// - records the exception via appendExceptionOps in the same multi - /// - increments /commit_attempts (lazy-created) - /// - sets /status to FAILED once attempts >= max_attempts + /// - if `exception_code` is non-retryable (see isNonRetryableExportError), sets + /// /status to FAILED (version-checked against the PENDING read) + /// - otherwise leaves the task PENDING so the commit is retried (by the next + /// last-part success or deferred-commit recovery) until the absolute task timeout /// - /// The counter is a best-effort, non-atomic get+set(-1). Concurrent failing - /// commits may under-count by one (FAILED may fire one retry later than the - /// threshold), which is acceptable. + /// There is no per-task commit-attempt budget: retryable commit failures retry until + /// success or timeout, matching the per-part retry semantics. /// /// Returns true if this call transitioned the task to FAILED. bool handleCommitFailure( const zkutil::ZooKeeperPtr & zk, const std::string & entry_path, - size_t max_attempts, + int exception_code, const std::string & replica_name, const std::string & exception_message, const LoggerPtr & log); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 9819a8783516..d520fa0f910b 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -113,6 +113,8 @@ extern const int S3_ERROR; extern const int TABLE_ALREADY_EXISTS; extern const int SUPPORT_IS_DISABLED; extern const int INCORRECT_DATA; +extern const int METADATA_MISMATCH; +extern const int UNFINISHED; } namespace Setting @@ -1497,7 +1499,7 @@ namespace { /// Find the partition spec object with the given spec-id inside a metadata JSON document. -/// Throws BAD_ARGUMENTS if the spec is not found (indicates metadata/spec-id mismatch). +/// Throws METADATA_MISMATCH if the spec is not found (indicates metadata/spec-id mismatch). Poco::JSON::Object::Ptr lookupPartitionSpec(const Poco::JSON::Object::Ptr & meta, Int64 spec_id) { auto specs = meta->getArray(Iceberg::f_partition_specs); @@ -1507,7 +1509,7 @@ Poco::JSON::Object::Ptr lookupPartitionSpec(const Poco::JSON::Object::Ptr & meta if (spec->getValue(Iceberg::f_spec_id) == spec_id) return spec; } - throw Exception(ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::METADATA_MISMATCH, "Partition spec with id {} not found in table metadata", spec_id); } @@ -1521,7 +1523,7 @@ Poco::JSON::Object::Ptr lookupSchema(const Poco::JSON::Object::Ptr & meta, Int64 return schema; } - throw Exception(ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::METADATA_MISMATCH, "Schema with id {} not found in table metadata", schema_id); } @@ -1708,13 +1710,13 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( /// the caller has to restart the export from scratch. const auto new_schema_id = metadata->getValue(Iceberg::f_current_schema_id); if (new_schema_id != original_schema_id) - throw Exception(ErrorCodes::NOT_IMPLEMENTED, + throw Exception(ErrorCodes::METADATA_MISMATCH, "Table schema changed during export (expected schema {}, got {}). Restart the export operation.", original_schema_id, new_schema_id); const Int64 new_partition_spec_id = metadata->getValue(Iceberg::f_default_spec_id); if (new_partition_spec_id != partition_spec_id) - throw Exception(ErrorCodes::NOT_IMPLEMENTED, + throw Exception(ErrorCodes::METADATA_MISMATCH, "Partition spec changed during export (expected spec {}, got {}). Restart the export operation.", partition_spec_id, new_partition_spec_id); @@ -1922,14 +1924,14 @@ void IcebergMetadata::commitExportPartitionTransaction( /// The exported data files and partition values were produced against the original spec; const auto latest_schema_id = metadata->getValue(Iceberg::f_current_schema_id); if (latest_schema_id != original_schema_id) - throw Exception(ErrorCodes::NOT_IMPLEMENTED, + throw Exception(ErrorCodes::METADATA_MISMATCH, "Table schema changed before export could commit (expected schema {}, got {}). " "Restart the export operation.", original_schema_id, latest_schema_id); const auto latest_spec_id = metadata->getValue(Iceberg::f_default_spec_id); if (latest_spec_id != partition_spec_id) - throw Exception(ErrorCodes::NOT_IMPLEMENTED, + throw Exception(ErrorCodes::METADATA_MISMATCH, "Partition spec changed before export could commit (expected spec {}, got {}). " "Restart the export operation.", partition_spec_id, latest_spec_id); @@ -2004,7 +2006,7 @@ void IcebergMetadata::commitExportPartitionTransaction( ++attempt; } - throw Exception(ErrorCodes::NOT_IMPLEMENTED, + throw Exception(ErrorCodes::UNFINISHED, "Failed to commit export partition transaction after {} attempts due to repeated metadata conflicts.", attempt); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 4701e7c5d9a2..6f7bbfd3f854 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -214,7 +214,8 @@ namespace Setting extern const SettingsBool update_sequential_consistency; extern const SettingsBool allow_experimental_export_merge_tree_part; extern const SettingsBool export_merge_tree_partition_force_export; - extern const SettingsUInt64 export_merge_tree_partition_max_retries; + extern const SettingsUInt64 export_merge_tree_partition_retry_initial_backoff_ms; + extern const SettingsUInt64 export_merge_tree_partition_retry_max_backoff_ms; extern const SettingsUInt64 export_merge_tree_partition_task_timeout_seconds; extern const SettingsBool output_format_parallel_formatting; extern const SettingsBool output_format_parquet_parallel_encoding; @@ -4602,6 +4603,11 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionUpdatingTask() void StorageReplicatedMergeTree::selectPartsToExport() { auto component_guard = Coordination::setCurrentComponent("StorageReplicatedMergeTree::selectPartsToExport"); + + /// Default tick interval; may be shortened below if a part's back-off expires sooner. + static constexpr int64_t default_reschedule_ms = 1000 * 5; + int64_t reschedule_ms = default_reschedule_ms; + try { if (parts_mover.moves_blocker.isCancelled()) @@ -4610,7 +4616,16 @@ void StorageReplicatedMergeTree::selectPartsToExport() } else { - export_merge_tree_partition_task_scheduler->run(); + const auto earliest_backoff_retry = export_merge_tree_partition_task_scheduler->run(); + + /// If a part is only waiting on its back-off deadline and that deadline is sooner than + /// the default tick, wake up earlier so the retry is not delayed by up to a full tick. + if (earliest_backoff_retry) + { + const auto now = time(nullptr); + const int64_t until_ms = (static_cast(*earliest_backoff_retry) - static_cast(now)) * 1000; + reschedule_ms = std::clamp(until_ms, 0, default_reschedule_ms); + } } } catch (...) @@ -4618,7 +4633,7 @@ void StorageReplicatedMergeTree::selectPartsToExport() tryLogCurrentException(log, __PRETTY_FUNCTION__); } - export_merge_tree_partition_select_task->scheduleAfter(1000 * 5); + export_merge_tree_partition_select_task->scheduleAfter(reschedule_ms); } void StorageReplicatedMergeTree::exportMergeTreePartitionStatusHandlingTask() @@ -8495,7 +8510,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.number_of_parts = part_names.size(); manifest.parts = part_names; manifest.create_time = time(nullptr); - manifest.max_retries = query_context->getSettingsRef()[Setting::export_merge_tree_partition_max_retries]; + manifest.retry_initial_backoff_ms = query_context->getSettingsRef()[Setting::export_merge_tree_partition_retry_initial_backoff_ms]; + manifest.retry_max_backoff_ms = query_context->getSettingsRef()[Setting::export_merge_tree_partition_retry_max_backoff_ms]; manifest.task_timeout_seconds = query_context->getSettingsRef()[Setting::export_merge_tree_partition_task_timeout_seconds]; manifest.max_threads = query_context->getSettingsRef()[Setting::max_threads]; manifest.parallel_formatting = query_context->getSettingsRef()[Setting::output_format_parallel_formatting]; @@ -8578,7 +8594,6 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & ExportReplicatedMergeTreePartitionProcessingPartEntry entry; entry.status = ExportReplicatedMergeTreePartitionProcessingPartEntry::Status::PENDING; entry.part_name = part; - entry.retry_count = 0; ops.emplace_back(zkutil::makeCreateRequest( fs::path(partition_exports_path) / "processing" / part, diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index 434cfdc9b7b5..92e7a1ba2dc4 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -28,6 +28,14 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio }, Names{"replica", "message", "part", "time", "count"}); + auto backoff_tuple = std::make_shared( + DataTypes{ + std::make_shared(), + std::make_shared(), + std::make_shared(), + }, + Names{"part", "attempts", "next_retry_time"}); + return ColumnsDescription { {"source_database", std::make_shared(), "Name of the source database."}, @@ -47,6 +55,8 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio "Per-replica last exception entries. Each tuple records the most recent exception observed by that replica plus a best-effort within-replica count. Empty array if no replica has reported an exception for this task."}, {"exception_count", std::make_shared(), "Sum of per-replica exception counts. Each replica owns its own count, so the sum is exact w.r.t. the in-memory snapshot; within-replica updates remain best-effort and may under-count by one under concurrent failures."}, + {"local_backoff_per_part", std::make_shared(backoff_tuple), + "Per-part retry back-off local to this replica: parts currently waiting before their next attempt, with attempt count and the next eligible time. Not shared across replicas; empty if no part is backing off."}, }; } @@ -152,6 +162,12 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu per_replica.push_back(Tuple{ex.replica, ex.message, ex.part, ex.time, ex.count}); res_columns[i++]->insert(per_replica); res_columns[i++]->insert(info.exception_count); + + Array backoff_array; + backoff_array.reserve(info.backoff_per_part.size()); + for (const auto & b : info.backoff_per_part) + backoff_array.push_back(Tuple{b.part, b.attempts, b.next_retry_time}); + res_columns[i++]->insert(backoff_array); } } } diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.h b/src/Storages/System/StorageSystemReplicatedPartitionExports.h index a8666374a7f0..09d7d3eaf9a2 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.h +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.h @@ -29,6 +29,15 @@ struct ReplicatedPartitionExportInfo /// single replica the count is best-effort (concurrent failing writers may under- /// count by one), matching the documented column semantics. size_t exception_count = 0; + + struct PartBackoffEntry + { + String part; + size_t attempts = 0; + time_t next_retry_time = 0; + }; + /// Parts of this task currently backing off (local to this replica). Empty if none. + std::vector backoff_per_part; }; class StorageSystemReplicatedPartitionExports final : public IStorageSystemOneBlock diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index 62a8a2196313..f2df54d50995 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -14,6 +14,7 @@ make_iceberg_s3, make_rmt, unique_suffix, + wait_for_exception_count, wait_for_export_status, wait_for_export_to_start, ) @@ -229,43 +230,30 @@ def test_export_partition_all_to_iceberg(cluster): def test_failure_is_logged_in_system_table(cluster): """ - When S3 is unreachable the export must be marked FAILED in - system.replicated_partition_exports with a non-zero exception_count. + When a part export fails with a non-retryable error the export must be marked + FAILED in system.replicated_partition_exports with a non-zero exception_count. + + Uses the export_part_non_retryable_throw failpoint (throws BAD_ARGUMENTS, a + denylisted code) so the task fails fast without consuming any timeout budget. """ node = cluster.instances["replica1"] - minio_ip = cluster.minio_ip - minio_port = cluster.minio_port uid = unique_suffix() mt_table = f"mt_{uid}" iceberg_table = f"iceberg_{uid}" - setup_tables(cluster, mt_table, iceberg_table, nodes=["replica1"], - s3_retry_attempts=1) - - node.query(f"SYSTEM STOP MOVES {mt_table}") - - node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table} SETTINGS export_merge_tree_partition_max_retries = 1, allow_insert_into_iceberg = 1") - - with PartitionManager() as pm: - pm.add_rule({ - "instance": node, - "destination": node.ip_address, - "protocol": "tcp", - "source_port": minio_port, - "action": "REJECT --reject-with tcp-reset", - }) - pm.add_rule({ - "instance": node, - "destination": minio_ip, - "protocol": "tcp", - "destination_port": minio_port, - "action": "REJECT --reject-with tcp-reset", - }) + setup_tables(cluster, mt_table, iceberg_table, nodes=["replica1"]) - node.query(f"SYSTEM START MOVES {mt_table}") + node.query("SYSTEM ENABLE FAILPOINT export_part_non_retryable_throw") + try: + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) wait_for_export_status(node, mt_table, iceberg_table, "2020", "FAILED", timeout=60) + finally: + node.query("SYSTEM DISABLE FAILPOINT export_part_non_retryable_throw") status = node.query( f""" @@ -287,6 +275,9 @@ def test_failure_is_logged_in_system_table(cluster): ).strip()) assert exception_count > 0, "Expected non-zero exception_count in system.replicated_partition_exports" + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 0, f"Expected 0 rows in Iceberg table after a failed export, got {count}" + def test_inject_short_living_failures(cluster): """ @@ -306,7 +297,7 @@ def test_inject_short_living_failures(cluster): node.query(f"SYSTEM STOP MOVES {mt_table}") - node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table} SETTINGS export_merge_tree_partition_max_retries = 100, allow_insert_into_iceberg = 1") + node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table} SETTINGS allow_insert_into_iceberg = 1") with PartitionManager() as pm: pm.add_rule({ @@ -355,6 +346,245 @@ def test_inject_short_living_failures(cluster): assert exception_count >= 1, "Expected at least one transient exception to be recorded" +def test_export_partition_non_retryable_error_fails_task_fast(cluster): + """ + A non-retryable part-export error (denylisted code, here BAD_ARGUMENTS injected + via the export_part_non_retryable_throw failpoint) must fail the whole export + task immediately, without waiting for the absolute task timeout. + + The task timeout is left at its large default, so reaching FAILED quickly proves + the transition is driven by error classification rather than by a timeout or a + retry budget. + """ + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_{uid}" + iceberg_table = f"iceberg_{uid}" + + setup_tables(cluster, mt_table, iceberg_table, nodes=["replica1"]) + + node.query("SYSTEM ENABLE FAILPOINT export_part_non_retryable_throw") + try: + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) + + # No short timeout is set; FAILED within this window can only come from the + # non-retryable classification, not from the (default, ~1 day) task timeout. + wait_for_export_status(node, mt_table, iceberg_table, "2020", "FAILED", timeout=60) + finally: + node.query("SYSTEM DISABLE FAILPOINT export_part_non_retryable_throw") + + exception_count = int(node.query( + f"SELECT any(exception_count) FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}'" + f" AND destination_table = '{iceberg_table}'" + f" AND partition_id = '2020'" + ).strip()) + assert exception_count > 0, "Expected a non-zero exception_count for the failed export" + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 0, f"Expected 0 rows in Iceberg table after a failed export, got {count}" + + +def test_export_partition_retryable_error_killed_on_timeout(cluster): + """ + A retryable part-export error (here FAULT_INJECTED via export_part_retryable_throw) + must NOT fail the task on a retry budget: there is no retry budget anymore, so the + part keeps retrying until the absolute task timeout fires and the task is KILLED. + """ + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_{uid}" + iceberg_table = f"iceberg_{uid}" + + setup_tables(cluster, mt_table, iceberg_table, nodes=["replica1"]) + + node.query("SYSTEM ENABLE FAILPOINT export_part_retryable_throw") + try: + # Under the old budget model a small retry budget would fail the task after the + # first retry. With the new model there is no budget and only the 5s timeout fails it. + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}" + f" SETTINGS export_merge_tree_partition_task_timeout_seconds = 5," + f" allow_insert_into_iceberg = 1" + ) + + # Give the scheduler time to attempt and fail the part several times. The old + # budget would already have transitioned the task to FAILED by now. + time.sleep(15) + status = node.query( + f"SELECT status FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}'" + f" AND destination_table = '{iceberg_table}'" + f" AND partition_id = '2020'" + ).strip() + assert status != "FAILED", ( + f"Retryable failures must not fail the task on a budget, got status {status!r}" + ) + + # The timeout (5s) is past; KILLED fires on the next manifest-updater poll cycle. + wait_for_export_status( + node, mt_table, iceberg_table, "2020", "KILLED", timeout=90 + ) + finally: + node.query("SYSTEM DISABLE FAILPOINT export_part_retryable_throw") + + exception_count = int(node.query( + f"SELECT any(exception_count) FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}'" + f" AND destination_table = '{iceberg_table}'" + f" AND partition_id = '2020'" + ).strip()) + assert exception_count > 0, "Expected at least one retryable exception to be recorded" + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 0, f"Expected 0 rows in Iceberg table after a killed export, got {count}" + + +def test_export_partition_retryable_error_recovers_after_failpoint_cleared(cluster): + """ + A retryable part-export error must keep the task PENDING (not FAILED) while the + failure persists, applying a per-replica back-off between attempts. Once the + failure clears the export completes successfully — proving the back-off only + spaces retries out and never permanently blocks progress. + """ + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_{uid}" + iceberg_table = f"iceberg_{uid}" + + setup_tables(cluster, mt_table, iceberg_table, nodes=["replica1"]) + + node.query("SYSTEM ENABLE FAILPOINT export_part_retryable_throw") + try: + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}" + f" SETTINGS export_merge_tree_partition_retry_initial_backoff_ms = 1000," + f" export_merge_tree_partition_retry_max_backoff_ms = 2000," + f" allow_insert_into_iceberg = 1" + ) + + # Wait until at least one retryable failure has been recorded; the task must + # still be PENDING (retrying), never FAILED. + wait_for_exception_count(node, mt_table, iceberg_table, "2020", + min_exception_count=1, timeout=60) + status = node.query( + f"SELECT status FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}'" + f" AND destination_table = '{iceberg_table}'" + f" AND partition_id = '2020'" + ).strip() + assert status == "PENDING", ( + f"Retryable failures must keep the task PENDING, got status {status!r}" + ) + finally: + node.query("SYSTEM DISABLE FAILPOINT export_part_retryable_throw") + + # With the failpoint cleared the next retry succeeds and the export completes. + wait_for_export_status(node, mt_table, iceberg_table, "2020", "COMPLETED", timeout=90) + + count = int(node.query(f"SELECT count() FROM {iceberg_table} WHERE year = 2020").strip()) + assert count == 3, f"Expected 3 rows after recovery, got {count}" + + +def test_export_partition_local_backoff_does_not_block_other_replica(cluster): + """ + Back-off is per-replica and in-memory: a part that one replica keeps failing on + (and therefore puts into its local back-off) must NOT be prevented from being + exported by another replica. This is the whole reason the back-off is local + rather than distributed in ZooKeeper. + + replica1 is given a persistent *retryable* failure (export_part_retryable_throw) + and is the only replica scheduling at first (moves are stopped on replica2). Once + replica1 has recorded a failure and a local back-off entry, replica2's scheduler + is enabled. Because the failpoint stays active on replica1 the whole time, the + only way the export can reach COMPLETED is replica2 picking up the very part that + replica1 keeps failing — proving the back-off does not leak across replicas. + """ + replica1 = cluster.instances["replica1"] + replica2 = cluster.instances["replica2"] + + uid = unique_suffix() + mt_table = f"mt_{uid}" + iceberg_table = f"iceberg_{uid}" + + setup_tables(cluster, mt_table, iceberg_table, nodes=["replica1", "replica2"]) + + # Phase 1: only replica1 schedules. Stop the export scheduler on replica2 so the + # part is guaranteed to be attempted (and fail) on replica1 first. + replica2.query(f"SYSTEM STOP MOVES {mt_table}") + + replica1.query("SYSTEM ENABLE FAILPOINT export_part_retryable_throw") + try: + replica1.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}" + f" SETTINGS export_merge_tree_partition_retry_initial_backoff_ms = 1000," + f" export_merge_tree_partition_retry_max_backoff_ms = 2000," + f" allow_insert_into_iceberg = 1" + ) + + # replica1 attempts the part, fails (retryable), and enters local back-off. + # The task must stay PENDING — there is no retry budget to fail it. + wait_for_exception_count(replica1, mt_table, iceberg_table, "2020", + min_exception_count=1, timeout=60) + status = replica1.query( + f"SELECT status FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}'" + f" AND destination_table = '{iceberg_table}'" + f" AND partition_id = '2020'" + ).strip() + assert status == "PENDING", ( + f"Retryable failures must keep the task PENDING, got status {status!r}" + ) + + # The back-off entry must be observable on replica1 (the failing replica). + deadline = time.time() + 60 + backoff_replica1 = "0" + while time.time() < deadline: + backoff_replica1 = replica1.query( + f"SELECT length(local_backoff_per_part) FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}'" + f" AND destination_table = '{iceberg_table}'" + f" AND partition_id = '2020'" + ).strip() + if backoff_replica1 not in ("", "0"): + break + time.sleep(0.5) + assert backoff_replica1 not in ("", "0"), ( + "Expected replica1 to carry a local back-off entry for the failing part, " + f"got {backoff_replica1!r}" + ) + + # ... and it must NOT have leaked to replica2, which never attempted the part. + # This is the core assertion: local back-off state is not shared across replicas. + backoff_replica2 = replica2.query( + f"SELECT length(local_backoff_per_part) FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}'" + f" AND destination_table = '{iceberg_table}'" + f" AND partition_id = '2020'" + ).strip() + assert backoff_replica2 in ("", "0"), ( + f"replica2 must not carry replica1's local back-off, got {backoff_replica2!r}" + ) + + # Phase 2: enable replica2's scheduler. replica1 keeps failing (the failpoint + # is still active), so completion can only come from replica2 exporting the + # part that replica1 is backing off on. + replica2.query(f"SYSTEM START MOVES {mt_table}") + + wait_for_export_status(replica2, mt_table, iceberg_table, "2020", "COMPLETED", timeout=120) + finally: + replica1.query("SYSTEM DISABLE FAILPOINT export_part_retryable_throw") + + count = int(replica2.query(f"SELECT count() FROM {iceberg_table} WHERE year = 2020").strip()) + assert count == 3, f"Expected 3 rows after replica2 completed the export, got {count}" + + def test_export_partition_scheduler_skipped_when_moves_stopped(cluster): """ Verify that selectPartsToExport() skips the scheduler entirely when moves @@ -427,7 +657,7 @@ def test_export_partition_resumes_after_stop_moves(cluster): node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}" - f" SETTINGS export_merge_tree_partition_max_retries = 50, allow_insert_into_iceberg = 1" + f" SETTINGS allow_insert_into_iceberg = 1" ) wait_for_export_to_start(node, mt_table, iceberg_table, "2020") @@ -472,7 +702,7 @@ def test_export_partition_resumes_after_stop_moves_during_export(cluster): node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}" - f" SETTINGS export_merge_tree_partition_max_retries = 50, allow_insert_into_iceberg = 1") + f" SETTINGS allow_insert_into_iceberg = 1") wait_for_export_to_start(node, mt_table, iceberg_table, "2020") @@ -748,10 +978,17 @@ def test_partition_key_compatibility_check(cluster): def test_export_data_files_are_not_cleaned_up_on_commit_failure(cluster): """ - Verify that the data files are not cleaned up on commit failure and the export is retried. - This is to avoid data loss. - - If the data files were cleaned up, a retry would commit a new snapshot that points to dangling references. + Verify that a commit failure does not delete the already-written data files. + `cleanup` only removes the manifest entry / manifest list, never the data files + (a peer replica might still commit the same transaction). This guards against + data loss / dangling references. + + The iceberg_writes_non_retry_cleanup failpoint throws BAD_ARGUMENTS while writing + the manifest entry, after the data files have been written. BAD_ARGUMENTS is a + non-retryable error code, so the task transitions to FAILED; we then confirm the + exported data files are still physically present in object storage by reading + them directly (the Iceberg manifests were removed by cleanup, so we glob the raw + parquet data files instead). """ node = cluster.instances["replica1"] uid = unique_suffix() @@ -760,15 +997,28 @@ def test_export_data_files_are_not_cleaned_up_on_commit_failure(cluster): setup_tables(cluster, mt_table, iceberg_table, nodes=["replica1"]) node.query("SYSTEM ENABLE FAILPOINT iceberg_writes_non_retry_cleanup") - - node.query( - f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}", - settings={"allow_insert_into_iceberg": 1}, + try: + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) + # BAD_ARGUMENTS from the commit phase is non-retryable -> the task fails fast. + wait_for_export_status(node, mt_table, iceberg_table, "2020", "FAILED", timeout=60) + finally: + node.query("SYSTEM DISABLE FAILPOINT iceberg_writes_non_retry_cleanup") + + # The data files were written before the commit failure; cleanup must have left + # them intact. Read them straight from object storage (bypassing the Iceberg + # metadata, which cleanup removed) and confirm all 3 exported rows survive. + rows = int(node.query( + f"SELECT count() FROM s3(" + f"'http://minio1:9001/root/data/{iceberg_table}/**.parquet', " + f"'minio', 'ClickHouse_Minio_P@ssw0rd', 'Parquet')" + ).strip()) + assert rows == 3, ( + f"Expected the 3 exported rows to still exist as data files after a failed " + f"commit (data files must not be cleaned up), got {rows}" ) - wait_for_export_status(node, mt_table, iceberg_table, "2020", "COMPLETED") - - count = int(node.query(f"SELECT count() FROM {iceberg_table} WHERE year = 2020").strip()) - assert count == 3, f"Expected 3 rows after first export, got {count}" def test_post_publish_exception_preserves_snapshot(cluster): @@ -827,10 +1077,9 @@ def test_export_task_timeout_kills_stuck_pending_task(cluster): descriptive last_exception. The export_partition_commit_always_throw failpoint wedges the task in the - commit retry loop (REGULAR failpoint, fires on every commit attempt). A very - large max_retries budget prevents the commit-attempts path from transitioning - to FAILED before the timeout fires, so the timeout branch in tryCleanup is - the actual mechanism under test. + commit retry loop (REGULAR failpoint, fires on every commit attempt) with a + retryable error, so the task never fails on its own and the timeout branch in + tryCleanup is the actual mechanism under test. """ node = cluster.instances["replica1"] uid = unique_suffix() @@ -844,7 +1093,6 @@ def test_export_task_timeout_kills_stuck_pending_task(cluster): node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}" f" SETTINGS export_merge_tree_partition_task_timeout_seconds = 5," - f" export_merge_tree_partition_max_retries = 1000000," f" allow_insert_into_iceberg = 1" ) @@ -1132,7 +1380,7 @@ def test_export_partition_with_castable_narrowing_values_fit(cluster): def test_export_partition_lossy_cast_rejected_without_optin(cluster): """A lossy narrowing (id Int64 -> Int32) is rejected synchronously with - BAD_ARGUMENTS unless export_merge_tree_part_allow_lossy_cast is set.""" + INCOMPATIBLE_COLUMNS unless export_merge_tree_part_allow_lossy_cast is set.""" node = cluster.instances["replica1"] uid = unique_suffix() @@ -1148,7 +1396,7 @@ def test_export_partition_lossy_cast_rejected_without_optin(cluster): f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table} " f"SETTINGS allow_insert_into_iceberg = 1" ) - assert "BAD_ARGUMENTS" in error, f"Expected BAD_ARGUMENTS, got: {error!r}" + assert "INCOMPATIBLE_COLUMNS" in error, f"Expected INCOMPATIBLE_COLUMNS, got: {error!r}" assert "lossy cast" in error, f"Expected 'lossy cast' in error, got: {error!r}" count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) @@ -1158,8 +1406,9 @@ def test_export_partition_lossy_cast_rejected_without_optin(cluster): def test_export_partition_runtime_cast_failure_propagates_async(cluster): """A String value that cannot be parsed as the destination Int32 passes the synchronous lossy-cast gate (with export_merge_tree_part_allow_lossy_cast = 1) but - fails at runtime in the async worker, marking the export FAILED and leaving Iceberg - empty. + fails at runtime in the async worker. The parse error is retryable, so the task is + not failed on a budget; it retries until the absolute task timeout fires (KILLED), + leaving Iceberg empty. (Integer overflow is not used because the internal cast uses CastType::nonAccurate, which wraps rather than throwing.) @@ -1177,11 +1426,13 @@ def test_export_partition_runtime_cast_failure_propagates_async(cluster): node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table} " - f"SETTINGS export_merge_tree_partition_max_retries = 1, allow_insert_into_iceberg = 1, " - f"export_merge_tree_part_allow_lossy_cast = 1" + f"SETTINGS export_merge_tree_partition_task_timeout_seconds = 5, " + f"allow_insert_into_iceberg = 1, export_merge_tree_part_allow_lossy_cast = 1" ) - wait_for_export_status(node, mt_table, iceberg_table, "2020", "FAILED", timeout=60) + # The runtime parse error is retryable, so the task keeps retrying until the 5s + # timeout fires; KILLED is published on the next manifest-updater poll cycle. + wait_for_export_status(node, mt_table, iceberg_table, "2020", "KILLED", timeout=90) exception_count = int(node.query( f"SELECT any(exception_count) FROM system.replicated_partition_exports " @@ -1195,7 +1446,7 @@ def test_export_partition_runtime_cast_failure_propagates_async(cluster): count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) assert count == 0, ( - f"Expected 0 rows in Iceberg table after failed export, got {count}" + f"Expected 0 rows in Iceberg table after killed export, got {count}" ) diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 8ad265a375ad..785e2a358671 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -196,11 +196,9 @@ def test_restart_nodes_during_export(cluster): export_queries = f""" ALTER TABLE {mt_table} - EXPORT PARTITION ID '2020' TO TABLE {s3_table} - SETTINGS export_merge_tree_partition_max_retries = 50; + EXPORT PARTITION ID '2020' TO TABLE {s3_table}; ALTER TABLE {mt_table} - EXPORT PARTITION ID '2021' TO TABLE {s3_table} - SETTINGS export_merge_tree_partition_max_retries = 50; + EXPORT PARTITION ID '2021' TO TABLE {s3_table}; """ node.query(export_queries) @@ -289,11 +287,9 @@ def test_kill_export(cluster): export_queries = f""" ALTER TABLE {mt_table} - EXPORT PARTITION ID '2020' TO TABLE {s3_table} - SETTINGS export_merge_tree_partition_max_retries = 50; + EXPORT PARTITION ID '2020' TO TABLE {s3_table}; ALTER TABLE {mt_table} - EXPORT PARTITION ID '2021' TO TABLE {s3_table} - SETTINGS export_merge_tree_partition_max_retries = 50; + EXPORT PARTITION ID '2021' TO TABLE {s3_table}; """ node.query(export_queries) @@ -356,7 +352,6 @@ def test_kill_export_resilient_to_status_handling_failure(cluster): node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" - f" SETTINGS export_merge_tree_partition_max_retries = 50" ) node.query("SYSTEM ENABLE FAILPOINT export_partition_status_change_throw") @@ -425,9 +420,9 @@ def test_drop_source_table_during_export(cluster): export_queries = f""" ALTER TABLE {mt_table} - EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS s3_retry_attempts = 500, export_merge_tree_partition_max_retries = 50; + EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS s3_retry_attempts = 500; ALTER TABLE {mt_table} - EXPORT PARTITION ID '2021' TO TABLE {s3_table} SETTINGS s3_retry_attempts = 500, export_merge_tree_partition_max_retries = 50; + EXPORT PARTITION ID '2021' TO TABLE {s3_table} SETTINGS s3_retry_attempts = 500; """ node.query(export_queries) @@ -517,14 +512,22 @@ def test_failure_is_logged_in_system_table(cluster): } pm.add_rule(pm_rule_reject_requests) + # Blocked MinIO produces transient (retryable) S3 errors. There is no retry + # budget anymore, so the task keeps retrying and is only torn down once the + # absolute task timeout fires (transitioning to KILLED). Use a small timeout + # so the test does not wait for the default (a day). node.query( - f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_max_retries=1;" + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + f" SETTINGS export_merge_tree_partition_task_timeout_seconds = 5;" ) - # Wait so that the export fails - wait_for_export_status(node, mt_table, s3_table, "2020", "FAILED", timeout=60) + # Wait for the timeout to kill the stuck task. The KILL is a Keeper operation + # (MinIO being blocked does not affect it); the status mirror needs roughly one + # manifest-updater poll cycle (~30s) plus watch propagation on top of the 5s + # timeout, so allow a generous budget. + wait_for_export_status(node, mt_table, s3_table, "2020", "KILLED", timeout=90) - # Network restored; verify the export is marked as FAILED in the system table + # Network restored; verify the export is marked as KILLED in the system table # Also verify we captured at least one exception and no commit file exists status = node.query( f""" @@ -535,7 +538,7 @@ def test_failure_is_logged_in_system_table(cluster): """ ) - assert status.strip() == "FAILED", f"Expected FAILED status, got: {status!r}" + assert status.strip() == "KILLED", f"Expected KILLED status, got: {status!r}" exception_count = node.query( f""" @@ -588,9 +591,10 @@ def test_inject_short_living_failures(cluster): } pm.add_rule(pm_rule_reject_requests) - # set big max_retries so that the export does not fail completely + # Transient (retryable) failures never fail the task on a budget; it keeps + # retrying until the network is restored and the export completes. node.query( - f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_max_retries=100;" + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table};" ) # wait for at least one exception to occur, but not enough to finish the export. @@ -627,6 +631,85 @@ def test_inject_short_living_failures(cluster): assert int(exception_count.strip()) >= 1, "Expected at least one exception" +def test_export_partition_retry_backoff(cluster): + """Verify the per-replica in-memory exponential back-off between failed part exports. + + The back-off is local in-memory state (no ZooKeeper retry_count / next_retry_time + anymore), so it is not directly observable; instead we observe its effect. With a + large back-off, a part that keeps failing (object storage blocked) is parked for the + back-off window after its first failure and must NOT be retried on every ~5s + scheduler tick. We assert that exception_count stays low across a window that spans + several ticks. Once the network is restored and the back-off elapses, the export + completes (there is no retry budget to exhaust).""" + skip_if_remote_database_disk_enabled(cluster) + node = cluster.instances["replica1"] + + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"retry_backoff_mt_table_{postfix}" + s3_table = f"retry_backoff_s3_table_{postfix}" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + # Large back-off so a single failed attempt parks the part well beyond the + # ~5s scheduler tick. Kept moderate so the export can still complete promptly + # once the network is restored. + initial_backoff_ms = 30000 + max_backoff_ms = 30000 + + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + with PartitionManager() as pm: + # Block responses from MinIO (source_port matches MinIO service) + pm.add_rule({ + "instance": node, + "destination": node.ip_address, + "protocol": "tcp", + "source_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + }) + # Also block requests to MinIO to fail fast + pm.add_rule({ + "instance": node, + "destination": minio_ip, + "protocol": "tcp", + "destination_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + }) + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_partition_retry_initial_backoff_ms = {initial_backoff_ms}, " + f"export_merge_tree_partition_retry_max_backoff_ms = {max_backoff_ms}" + ) + + # Wait until the first failure is recorded. + count_after_first = wait_for_exception_count( + node, mt_table, s3_table, "2020", min_exception_count=1, timeout=60 + ) + + # While the part is backing off (~30s) it must not be retried again. Observe + # across a window that spans several scheduler ticks: without back-off the + # ~5s tick would add roughly five more failures, so a small increase proves + # the back-off is pacing retries. + time.sleep(25) + count_during_backoff = int(node.query( + f"SELECT exception_count FROM system.replicated_partition_exports" + f" WHERE source_table = '{mt_table}'" + f" AND destination_table = '{s3_table}'" + f" AND partition_id = '2020'" + ).strip()) + assert count_during_backoff - count_after_first <= 2, ( + f"exception_count jumped during the back-off window: " + f"{count_after_first} -> {count_during_backoff}; back-off was not applied" + ) + + # Network restored; once the back-off elapses the export should complete because + # there is no retry budget to exhaust. + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED", timeout=120) + assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == "3\n", "Export did not succeed" + + def test_export_partition_file_already_exists_policy(cluster): node = cluster.instances["replica1"] @@ -694,10 +777,11 @@ def test_export_partition_file_already_exists_policy(cluster): """ ) == '1\n', "Expected the export to be marked as COMPLETED" - # last but not least, let's try with the error policy - # max retries = 1 so it fails fast + # last but not least, let's try with the error policy. FILE_ALREADY_EXISTS is a + # non-retryable error (retrying always hits the same existing file), so the task + # fails fast without needing a retry budget. node.query( - f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_force_export=1, export_merge_tree_part_file_already_exists_policy='error', export_merge_tree_partition_max_retries=1", + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_force_export=1, export_merge_tree_part_file_already_exists_policy='error'", ) # wait for the export to finish @@ -1376,7 +1460,6 @@ def test_export_partition_resumes_after_stop_moves(cluster): node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" - f" SETTINGS export_merge_tree_partition_max_retries = 50" ) wait_for_export_to_start(node, mt_table, s3_table, "2020") @@ -1435,7 +1518,6 @@ def test_export_partition_resumes_after_stop_moves_during_export(cluster): node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" - f" SETTINGS export_merge_tree_partition_max_retries = 50" ) wait_for_export_to_start(node, mt_table, s3_table, "2020") @@ -1544,8 +1626,8 @@ def test_export_partition_partition_column_castable_type_mismatch(cluster): f"ALTER TABLE {mt_table} EXPORT PARTITION ID '{partition_id}' " f"TO TABLE {s3_table}" ) - assert "BAD_ARGUMENTS" in error, ( - f"Expected BAD_ARGUMENTS for a lossy partition-column cast, " + assert "INCOMPATIBLE_COLUMNS" in error, ( + f"Expected INCOMPATIBLE_COLUMNS for a lossy partition-column cast, " f"got: {error!r}" ) assert "requires a lossy cast" in error and "'year'" in error, ( diff --git a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py index 5466fe543275..53e94d6e6771 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_export_partition_iceberg.py @@ -713,26 +713,24 @@ def test_idempotency_after_commit_crash(export_cluster): assert count == 3, f"Expected 3 rows (no duplicates), got {count}" -def test_commit_attempts_budget_transitions_to_failed(export_cluster): +def test_commit_failure_retries_until_timeout(export_cluster): """ - Verify that the commit-attempts budget transitions a stuck task to FAILED - instead of leaving it in PENDING forever. + Verify that a retryable commit failure does not fail the task on a budget but + keeps retrying until the absolute task timeout fires, transitioning the task to + KILLED (there is no longer a commit-attempts budget). Reproduction: - Parts export successfully. - A REGULAR failpoint (``export_partition_commit_always_throw``) makes every - ``ExportPartitionUtils::commit`` attempt throw before talking to Iceberg. - - ``ExportPartitionUtils::handleCommitFailure`` bumps ``/commit_attempts`` - on each failure and transitions ``/status`` to FAILED once the counter - reaches ``export_merge_tree_partition_max_retries``. + ``ExportPartitionUtils::commit`` attempt throw ``FAULT_INJECTED`` (a retryable, + non-denylisted code) before talking to Iceberg. + - ``ExportPartitionUtils::handleCommitFailure`` records the exception and leaves + the task PENDING; the task is killed only once + ``export_merge_tree_partition_task_timeout_seconds`` elapses. Expected behaviour: - - The first attempt is made synchronously when the last part completes - (scheduler's ``handlePartExportSuccess``). - - Subsequent attempts come from the manifest-updating task's ``tryCleanup`` - path, polling every 30s. - - With max_retries=2, the task reaches FAILED within roughly one poll cycle. - - The ``commit_attempts`` znode reaches at least max_retries. + - There is no retry budget, so the task never reaches FAILED on a counter. + - The task reaches KILLED after the timeout, on a manifest-updating poll cycle. """ node = export_cluster.instances["node1"] spark = export_cluster.spark_session @@ -759,36 +757,35 @@ def test_commit_attempts_budget_transitions_to_failed(export_cluster): # try block exists so we can add a finally that disables the failpoint try: - # max_retries=2 bounds the test: one attempt from handlePartExportSuccess - # plus one from the manifest-updating task's next poll (~30s) is enough - # to exhaust the budget and flip the task to FAILED. + # Under the old budget model a small retry budget would fail the task after a + # couple of commit attempts. With the new model there is no budget and only the + # 5s timeout transitions the task (to KILLED). node.query( f"ALTER TABLE {source} EXPORT PARTITION ID '{pid}' TO TABLE {iceberg}" - f" SETTINGS export_merge_tree_partition_max_retries = 2," + f" SETTINGS export_merge_tree_partition_task_timeout_seconds = 5," f" allow_insert_into_iceberg = 1" ) + # Give the commit path time to fail repeatedly; the old budget would already + # have flipped the task to FAILED by now. + time.sleep(15) + status = node.query( + f"SELECT status FROM system.replicated_partition_exports" + f" WHERE source_table = '{source}'" + f" AND destination_table = '{iceberg}'" + f" AND partition_id = '{pid}'" + ).strip() + assert status != "FAILED", ( + f"Retryable commit failures must not fail the task on a budget, got {status!r}" + ) + # Timeout must cover: at least one manifest-updating poll cycle (30s) # plus slack for task scheduling and keeper RTT. wait_for_export_status( node, source, iceberg, pid, - expected_status="FAILED", + expected_status="KILLED", timeout=90, ) - - # The commit_attempts znode must have reached (at least) max_retries — the - # counter is the direct mechanism that drove the FAILED transition. - # Locate the export's ZK root via the RMT's zookeeper_path and the - # partition_id_destination_db.destination_table export key convention. - export_key = f"{pid}_default.{iceberg}" - commit_attempts = int(node.query( - f"SELECT value FROM system.zookeeper" - f" WHERE path = '/clickhouse/tables/{source}/exports/{export_key}'" - f" AND name = 'commit_attempts'" - ).strip()) - assert commit_attempts >= 2, ( - f"Expected commit_attempts >= 2 (two commit attempts), got {commit_attempts}" - ) finally: node.query("SYSTEM DISABLE FAILPOINT export_partition_commit_always_throw")