From eaf4c7e2e836afad1eac13c4580234924c097556 Mon Sep 17 00:00:00 2001 From: Atharva Lade Date: Tue, 28 Apr 2026 19:28:50 -0500 Subject: [PATCH 1/3] fix(connectors): replace overloaded InvalidRecord with distinct error variants --- core/connectors/sdk/src/lib.rs | 12 ++++++++++++ core/connectors/sinks/iceberg_sink/src/router/mod.rs | 10 +++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs index a864407be6..84bb4566cb 100644 --- a/core/connectors/sdk/src/lib.rs +++ b/core/connectors/sdk/src/lib.rs @@ -416,4 +416,16 @@ pub enum Error { /// failures so that circuit breakers are not tripped by bad data. #[error("Permanent HTTP error: {0}")] PermanentHttpError(String), + /// The source schema could not be mapped to the destination schema. + /// Indicates a table definition or configuration problem — not a data issue. + #[error("Schema mismatch: {0}")] + SchemaMismatch(String), + /// An I/O failure while writing data (e.g. Parquet serialization, file + /// writer close). Distinct from record-level validation errors. + #[error("Write failure: {0}")] + WriteFailure(String), + /// A catalog or transaction-level failure (e.g. applying or committing an + /// Iceberg transaction). Callers may retry on transient catalog outages. + #[error("Catalog error: {0}")] + CatalogError(String), } diff --git a/core/connectors/sinks/iceberg_sink/src/router/mod.rs b/core/connectors/sinks/iceberg_sink/src/router/mod.rs index 8e60beedfc..a8f677237f 100644 --- a/core/connectors/sinks/iceberg_sink/src/router/mod.rs +++ b/core/connectors/sinks/iceberg_sink/src/router/mod.rs @@ -178,7 +178,7 @@ async fn write_data( table.metadata().uuid(), err ); - Error::InvalidRecord + Error::SchemaMismatch(err.to_string()) })?, )) .build(cursor) @@ -197,13 +197,13 @@ async fn write_data( })?; writer.write(batch_data).await.map_err(|err| { error!("Error while writing record batch: {}", err); - Error::InvalidRecord + Error::WriteFailure(err.to_string()) })?; } let data_files = writer.close().await.map_err(|err| { error!("Error while writing data records to Parquet file: {}", err); - Error::InvalidRecord + Error::WriteFailure(err.to_string()) })?; let table_commit = Transaction::new(table); @@ -216,7 +216,7 @@ async fn write_data( table.metadata().uuid(), err ); - Error::InvalidRecord + Error::CatalogError(err.to_string()) })?; let _table = tx.commit(catalog).await.map_err(|err| { @@ -225,7 +225,7 @@ async fn write_data( table.metadata().uuid(), err ); - Error::InvalidRecord + Error::CatalogError(err.to_string()) })?; Ok(()) } From b28e323453ee21544d00d49149bf6500a2ac2179 Mon Sep 17 00:00:00 2001 From: Atharva Lade <92752921+atharvalade@users.noreply.github.com> Date: Sun, 3 May 2026 18:14:39 -0500 Subject: [PATCH 2/3] Refine SchemaMismatch error documentation Removed redundant phrasing from SchemaMismatch error documentation. --- core/connectors/sdk/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs index 84bb4566cb..696eaab0d7 100644 --- a/core/connectors/sdk/src/lib.rs +++ b/core/connectors/sdk/src/lib.rs @@ -417,7 +417,7 @@ pub enum Error { #[error("Permanent HTTP error: {0}")] PermanentHttpError(String), /// The source schema could not be mapped to the destination schema. - /// Indicates a table definition or configuration problem — not a data issue. + /// Indicates a table definition or configuration problem #[error("Schema mismatch: {0}")] SchemaMismatch(String), /// An I/O failure while writing data (e.g. Parquet serialization, file From a11fbc4ca379e0faf78da386af6a157a5cbf6eb4 Mon Sep 17 00:00:00 2001 From: Atharva Lade Date: Sun, 17 May 2026 01:51:54 -0500 Subject: [PATCH 3/3] fix(connectors): split CatalogError into TransactionApplyError and CatalogCommitError --- core/connectors/sdk/src/lib.rs | 12 ++++++++---- core/connectors/sinks/iceberg_sink/src/router/mod.rs | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs index 8aa68a090f..aee7d8c78b 100644 --- a/core/connectors/sdk/src/lib.rs +++ b/core/connectors/sdk/src/lib.rs @@ -433,8 +433,12 @@ pub enum Error { /// writer close). Distinct from record-level validation errors. #[error("Write failure: {0}")] WriteFailure(String), - /// A catalog or transaction-level failure (e.g. applying or committing an - /// Iceberg transaction). Callers may retry on transient catalog outages. - #[error("Catalog error: {0}")] - CatalogError(String), + /// In-memory transaction preparation failed (e.g. invalid partition spec, + /// schema validation). These failures are deterministic and not retryable. + #[error("Transaction apply error: {0}")] + TransactionApplyError(String), + /// A catalog commit failed over the network (e.g. REST catalog I/O, + /// conflict). Transient failures may be retried. + #[error("Catalog commit error: {0}")] + CatalogCommitError(String), } diff --git a/core/connectors/sinks/iceberg_sink/src/router/mod.rs b/core/connectors/sinks/iceberg_sink/src/router/mod.rs index a8f677237f..ed5ca2bb3d 100644 --- a/core/connectors/sinks/iceberg_sink/src/router/mod.rs +++ b/core/connectors/sinks/iceberg_sink/src/router/mod.rs @@ -216,7 +216,7 @@ async fn write_data( table.metadata().uuid(), err ); - Error::CatalogError(err.to_string()) + Error::TransactionApplyError(err.to_string()) })?; let _table = tx.commit(catalog).await.map_err(|err| { @@ -225,7 +225,7 @@ async fn write_data( table.metadata().uuid(), err ); - Error::CatalogError(err.to_string()) + Error::CatalogCommitError(err.to_string()) })?; Ok(()) }