diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs index 2289aa9503..aee7d8c78b 100644 --- a/core/connectors/sdk/src/lib.rs +++ b/core/connectors/sdk/src/lib.rs @@ -425,4 +425,20 @@ pub enum Error { /// failures so that circuit breakers are not tripped by bad data. #[error("Permanent HTTP error: {0}")] PermanentHttpError(String), + /// The source schema could not be mapped to the destination schema. + /// Indicates a table definition or configuration problem + #[error("Schema mismatch: {0}")] + SchemaMismatch(String), + /// An I/O failure while writing data (e.g. Parquet serialization, file + /// writer close). Distinct from record-level validation errors. + #[error("Write failure: {0}")] + WriteFailure(String), + /// In-memory transaction preparation failed (e.g. invalid partition spec, + /// schema validation). These failures are deterministic and not retryable. + #[error("Transaction apply error: {0}")] + TransactionApplyError(String), + /// A catalog commit failed over the network (e.g. REST catalog I/O, + /// conflict). Transient failures may be retried. + #[error("Catalog commit error: {0}")] + CatalogCommitError(String), } diff --git a/core/connectors/sinks/iceberg_sink/src/router/mod.rs b/core/connectors/sinks/iceberg_sink/src/router/mod.rs index 8e60beedfc..ed5ca2bb3d 100644 --- a/core/connectors/sinks/iceberg_sink/src/router/mod.rs +++ b/core/connectors/sinks/iceberg_sink/src/router/mod.rs @@ -178,7 +178,7 @@ async fn write_data( table.metadata().uuid(), err ); - Error::InvalidRecord + Error::SchemaMismatch(err.to_string()) })?, )) .build(cursor) @@ -197,13 +197,13 @@ async fn write_data( })?; writer.write(batch_data).await.map_err(|err| { error!("Error while writing record batch: {}", err); - Error::InvalidRecord + Error::WriteFailure(err.to_string()) })?; } let data_files = writer.close().await.map_err(|err| { error!("Error while writing data records to Parquet file: {}", err); - Error::InvalidRecord + Error::WriteFailure(err.to_string()) })?; let table_commit = Transaction::new(table); @@ -216,7 +216,7 @@ async fn write_data( table.metadata().uuid(), err ); - Error::InvalidRecord + Error::TransactionApplyError(err.to_string()) })?; let _table = tx.commit(catalog).await.map_err(|err| { @@ -225,7 +225,7 @@ async fn write_data( table.metadata().uuid(), err ); - Error::InvalidRecord + Error::CatalogCommitError(err.to_string()) })?; Ok(()) }