Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ void GlueCatalog::createTable(const String & namespace_name, const String & tabl
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Can not create metadata in glue catalog: {}", response.GetError().GetMessage());
}

bool GlueCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*new_snapshot*/) const
CommitOutcome GlueCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*new_snapshot*/) const
{
Aws::Glue::Model::UpdateTableRequest request;
request.SetDatabaseName(namespace_name);
Expand Down Expand Up @@ -725,7 +725,7 @@ bool GlueCatalog::updateMetadata(const String & namespace_name, const String & t
if (!response.IsSuccess())
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Can not update metadata in glue catalog {}", response.GetError().GetMessage());

return true;
return CommitOutcome::Committed;
}

void GlueCatalog::dropTable(const String & namespace_name, const String & table_name) const
Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DataLake/GlueCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class GlueCatalog final : public ICatalog, private DB::WithContext

void createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const override;

bool updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const override;
CommitOutcome updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const override;
void dropTable(const String & namespace_name, const String & table_name) const override;

/// Returns a callback that re-vends fresh AWS credentials from the configured
Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ void ICatalog::createTable(const String & /*namespace_name*/, const String & /*t
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "createTable is not implemented");
}

bool ICatalog::updateMetadata(const String & /*namespace_name*/, const String & /*table_name*/, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr /*new_snapshot*/) const
CommitOutcome ICatalog::updateMetadata(const String & /*namespace_name*/, const String & /*table_name*/, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr /*new_snapshot*/) const
{
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "updateMetadata is not implemented");
}
Expand Down
18 changes: 16 additions & 2 deletions src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ struct CatalogSettings
DB::SettingsChanges allChanged() const;
};

enum class CommitOutcome
{
/// The commit is confirmed live in the catalog. The caller must NOT delete any files.
Committed,
/// The commit was cleanly rejected and never became visible. The files we wrote are
/// orphans and the caller may safely delete them.
RejectedCleanly,
/// Could not determine whether the commit landed (e.g. the post-failure re-read
/// also failed, or the catalog response lacked snapshot state). The caller must
/// preserve all files: a recoverable leak is preferable to unrecoverable corruption.
Unknown,
};

/// Base class for catalog implementation.
/// Used for communication with the catalog.
class ICatalog
Expand Down Expand Up @@ -202,8 +215,9 @@ class ICatalog
/// Creates new table in catalog.
virtual void createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const;

/// Updates metadata in catalog.
virtual bool updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const;
/// Returns a `CommitOutcome` describing whether the commit became live, was cleanly
/// rejected, or is of unknown status. Callers must only delete written files on`RejectedCleanly`.
virtual CommitOutcome updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const;

/// Drop table from catalog.
virtual void dropTable(const String & namespace_name, const String & table_name) const;
Expand Down
135 changes: 132 additions & 3 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
#include <Poco/JSON/Stringifier.h>
#include <Poco/Net/HTTPRequest.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/CurrentThread.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h>
#include <mutex>
#include <chrono>
Expand Down Expand Up @@ -41,6 +43,7 @@
#include <Poco/JSON/Parser.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/HTTPSClientSession.h>
#include <Poco/Net/SSLManager.h>
#include <Poco/StreamCopier.h>
Expand Down Expand Up @@ -1138,7 +1141,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl
}


bool RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const
CommitOutcome RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const
{
const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name;

Expand Down Expand Up @@ -1200,9 +1203,135 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t
}
catch (const DB::HTTPException &)
{
return false;
return classifyCommitOutcomeAfterFailure(namespace_name, table_name, new_snapshot);
}
return true;
catch (const DB::NetException &)
{
return classifyCommitOutcomeAfterFailure(namespace_name, table_name, new_snapshot);
}
catch (const Poco::Net::NetException &)
{
return classifyCommitOutcomeAfterFailure(namespace_name, table_name, new_snapshot);
}
return CommitOutcome::Committed;
}

Poco::JSON::Object::Ptr RestCatalog::getRawTableMetadataObject(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr /*context_*/) const
{
const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / encodeNamespaceForURI(namespace_name) / "tables" / table_name;

String json_str;
{
/// Always a fresh network GET: this read path consults no metadata cache, so it
/// reflects the catalog's current server-side state (required to classify a commit).
auto buf = createReadBuffer(config.prefix / endpoint);
if (buf->eof())
return nullptr;
readJSONObjectPossiblyInvalid(json_str, *buf);
}

Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
if (!object || !object->has("metadata"))
return nullptr;
return object->get("metadata").extract<Poco::JSON::Object::Ptr>();
}

CommitOutcome RestCatalog::classifyCommitOutcomeAfterFailure(
const std::string & namespace_name,
const std::string & table_name,
Poco::JSON::Object::Ptr new_snapshot) const
{
/// Snapshot-expiration commits pass no snapshot (nullptr); there is no id to confirm,
/// so we cannot prove the commit landed or was rejected -> preserve files.
if (!new_snapshot || !new_snapshot->has(DB::Iceberg::f_metadata_snapshot_id))
return CommitOutcome::Unknown;

const Int64 our_snapshot_id = new_snapshot->getValue<Int64>(DB::Iceberg::f_metadata_snapshot_id);

Poco::JSON::Object::Ptr metadata_object;
try
{
metadata_object = getRawTableMetadataObject(namespace_name, table_name, getContext());
}
catch (...)
{
LOG_WARNING(
log,
"Commit response lost and post-failure re-read of table {}.{} also failed; "
"classifying snapshot {} as Unknown to preserve files: {}",
namespace_name, table_name, our_snapshot_id, DB::getCurrentExceptionMessage(false));
return CommitOutcome::Unknown;
}

if (!metadata_object)
{
LOG_WARNING(
log,
"Post-failure re-read of table {}.{} returned no metadata; "
"classifying snapshot {} as Unknown",
namespace_name, table_name, our_snapshot_id);
return CommitOutcome::Unknown;
}

/// Our snapshot is the current ref target: the commit landed and only its response was lost.
bool had_current_snapshot_id = metadata_object->has(DB::Iceberg::f_current_snapshot_id)
&& !metadata_object->get(DB::Iceberg::f_current_snapshot_id).isEmpty();
if (had_current_snapshot_id)
{
const Int64 current_snapshot_id = metadata_object->getValue<Int64>(DB::Iceberg::f_current_snapshot_id);
if (current_snapshot_id == our_snapshot_id)
{
LOG_DEBUG(
log, "Commit confirmed: snapshot {} is current in catalog for {}.{}",
our_snapshot_id, namespace_name, table_name);
return CommitOutcome::Committed;
}
}

/// Our snapshot is in the history but no longer current: we landed, then a concurrent
/// writer superseded us. We test membership rather than equality with current, because
/// otherwise a later concurrent commit would make our real commit look rejected.
bool had_snapshots_array = false;
if (auto snapshots = metadata_object->getArray(DB::Iceberg::f_snapshots))
{
had_snapshots_array = true;
for (UInt32 i = 0; i < snapshots->size(); ++i)
{
auto snapshot = snapshots->getObject(i);
if (snapshot
&& snapshot->has(DB::Iceberg::f_metadata_snapshot_id)
&& snapshot->getValue<Int64>(DB::Iceberg::f_metadata_snapshot_id) == our_snapshot_id)
{
LOG_DEBUG(
log,
"Commit confirmed: snapshot {} present in catalog history for {}.{} "
"(superseded by a later snapshot)",
our_snapshot_id, namespace_name, table_name);
return CommitOutcome::Committed;
}
}
}

if (had_current_snapshot_id || had_snapshots_array)
{
LOG_DEBUG(
log, "Commit cleanly rejected: snapshot {} absent from catalog state for {}.{}",
our_snapshot_id, namespace_name, table_name);
return CommitOutcome::RejectedCleanly;
}

/// The response carried no snapshot state at all, so we cannot prove anything: preserve files.
LOG_WARNING(
log,
"Post-failure re-read of table {}.{} carried no snapshot state; "
"classifying snapshot {} as Unknown",
namespace_name, table_name, our_snapshot_id);
return CommitOutcome::Unknown;
}

void RestCatalog::dropTable(const String & namespace_name, const String & table_name) const
Expand Down
16 changes: 15 additions & 1 deletion src/Databases/DataLake/RestCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class RestCatalog : public ICatalog, public DB::WithContext

void createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const override;

bool updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const override;
CommitOutcome updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const override;

bool isTransactional() const override { return true; }

Expand Down Expand Up @@ -179,6 +179,20 @@ class RestCatalog : public ICatalog, public DB::WithContext
DB::ContextPtr context_,
TableMetadata & result) const;

Poco::JSON::Object::Ptr getRawTableMetadataObject(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_) const;

/// After a failed commit POST, re-reads the catalog and classifies whether our snapshot
/// nonetheless became live (lost-response case) or was cleanly rejected. See the four
/// classification arms in the implementation. Never returns `RejectedCleanly` unless the
/// re-read succeeded and our snapshot id is provably absent from populated snapshot state.
CommitOutcome classifyCommitOutcomeAfterFailure(
const std::string & namespace_name,
const std::string & table_name,
Poco::JSON::Object::Ptr new_snapshot) const;

Config loadConfig();
virtual DB::HTTPHeaderEntries getAuthHeaders(
bool update_token,
Expand Down
19 changes: 16 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,9 +719,12 @@ void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICa
if (catalog)
{
const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName());
if (!catalog->updateMetadata(namespace_name, table_name, path_resolver.resolveForCatalog(metadata_info.path), new_snapshot))
/// Truncate performs no destructive file cleanup here, so anything other than a
/// confirmed commit is surfaced as a failure (preserving files).
const auto outcome = catalog->updateMetadata(namespace_name, table_name, path_resolver.resolveForCatalog(metadata_info.path), new_snapshot);
if (outcome != DataLake::CommitOutcome::Committed)
throw Exception(ErrorCodes::INCORRECT_DATA,
"Failed to commit Iceberg truncate update to catalog.");
"Failed to commit Iceberg truncate update to catalog (commit was not confirmed).");
}
}

Expand Down Expand Up @@ -1811,11 +1814,21 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl(
auto catalog_filename = resolver.resolveForCatalog(metadata_info.path);

const auto & [namespace_name, table_name] = DataLake::parseTableName(table_id.getTableName());
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
const auto outcome = catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot);
if (outcome == DataLake::CommitOutcome::RejectedCleanly)
{
cleanup(true);
return false;
}
if (outcome == DataLake::CommitOutcome::Unknown)
{
LOG_ERROR(
log,
"Iceberg commit for {}.{} is of unknown status after a lost response; "
"preserving written files to avoid corrupting a possibly-committed snapshot.",
namespace_name, table_name);
return false;
}

/// Catalog has accepted the commit - the new snapshot is now live and references
/// storage_manifest_entry_name / storage_manifest_list_name. From here on, any
Expand Down
31 changes: 30 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,11 @@ bool IcebergStorageSink::initializeMetadata()
}
};

/// Becomes true once the catalog has confirmed the commit (the snapshot is live and
/// references the manifest entry / manifest list we wrote). From that point any failure
/// must NOT delete those files, otherwise the live snapshot is corrupted.
bool published = false;

try
{
for (const auto & [partition_key, writer] : writer_per_partition_key)
Expand Down Expand Up @@ -1392,11 +1397,24 @@ bool IcebergStorageSink::initializeMetadata()
auto catalog_filename = resolver.resolveForCatalog(metadata_info.path);

const auto & [namespace_name, table_name] = DataLake::parseTableName(table_id.getTableName());
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
const auto outcome = catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot);
if (outcome == DataLake::CommitOutcome::RejectedCleanly)
{
cleanup(true);
return false;
}
if (outcome == DataLake::CommitOutcome::Unknown)
{
LOG_ERROR(
log,
"Iceberg commit for {}.{} is of unknown status after a lost response; "
"preserving written files to avoid corrupting a possibly-committed snapshot. "
"Orphaned files may need manual cleanup if the commit did not actually land.",
namespace_name, table_name);
return false;
}
/// Committed: the snapshot is now live.
published = true;
}
}

Expand All @@ -1412,6 +1430,17 @@ bool IcebergStorageSink::initializeMetadata()
}
catch (...)
{
if (published)
{
/// The commit is already live in the catalog. A failure in trailing post-publish
/// work (e.g. metadata-cache invalidation) must NOT delete the manifest files the
/// live snapshot references.
tryLogCurrentException(
log,
"Post-publish work failed after Iceberg snapshot was committed; "
"skipping cleanup to preserve the published snapshot");
return true;
}
cleanup(false);
throw;
}
Expand Down
15 changes: 13 additions & 2 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,11 +545,21 @@ static bool writeMetadataFiles(
{
auto catalog_filename = path_resolver.resolveForCatalog(metadata_info.path);
const auto & [namespace_name, table_name] = DataLake::parseTableName(table_id.getTableName());
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
const auto outcome = catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot);
if (outcome == DataLake::CommitOutcome::RejectedCleanly)
{
cleanup();
return false;
}
if (outcome == DataLake::CommitOutcome::Unknown)
{
LOG_ERROR(
getLogger("IcebergMutations"),
"Iceberg mutation commit for {}.{} is of unknown status after a lost response; "
"preserving written files to avoid corrupting a possibly-committed snapshot.",
namespace_name, table_name);
return false;
}
}
}
}
Expand Down Expand Up @@ -1417,7 +1427,8 @@ ExpireSnapshotsResult expireSnapshots(
{
auto catalog_filename = persistent_table_components.path_resolver.resolveForCatalog(metadata_info.path);
const auto & [namespace_name, parsed_table_name] = DataLake::parseTableName(table_name);
if (!catalog->updateMetadata(namespace_name, parsed_table_name, catalog_filename, nullptr))
const auto outcome = catalog->updateMetadata(namespace_name, parsed_table_name, catalog_filename, nullptr);
if (outcome != DataLake::CommitOutcome::Committed)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
Expand Down
Loading
Loading