Skip to content
16 changes: 16 additions & 0 deletions core/connectors/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,4 +425,20 @@ pub enum Error {
/// failures so that circuit breakers are not tripped by bad data.
#[error("Permanent HTTP error: {0}")]
PermanentHttpError(String),
/// The source schema could not be mapped to the destination schema.
/// Indicates a table definition or configuration problem
#[error("Schema mismatch: {0}")]
SchemaMismatch(String),
/// An I/O failure while writing data (e.g. Parquet serialization, file
/// writer close). Distinct from record-level validation errors.
#[error("Write failure: {0}")]
WriteFailure(String),
/// In-memory transaction preparation failed (e.g. invalid partition spec,
/// schema validation). These failures are deterministic and not retryable.
#[error("Transaction apply error: {0}")]
TransactionApplyError(String),
/// A catalog commit failed over the network (e.g. REST catalog I/O,
/// conflict). Transient failures may be retried.
#[error("Catalog commit error: {0}")]
CatalogCommitError(String),
}
10 changes: 5 additions & 5 deletions core/connectors/sinks/iceberg_sink/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async fn write_data(
table.metadata().uuid(),
err
);
Error::InvalidRecord
Error::SchemaMismatch(err.to_string())
})?,
))
.build(cursor)
Expand All @@ -197,13 +197,13 @@ async fn write_data(
})?;
writer.write(batch_data).await.map_err(|err| {
error!("Error while writing record batch: {}", err);
Error::InvalidRecord
Error::WriteFailure(err.to_string())
})?;
}

let data_files = writer.close().await.map_err(|err| {
error!("Error while writing data records to Parquet file: {}", err);
Error::InvalidRecord
Error::WriteFailure(err.to_string())
})?;

let table_commit = Transaction::new(table);
Expand All @@ -216,7 +216,7 @@ async fn write_data(
table.metadata().uuid(),
err
);
Error::InvalidRecord
Error::TransactionApplyError(err.to_string())
})?;

let _table = tx.commit(catalog).await.map_err(|err| {
Expand All @@ -225,7 +225,7 @@ async fn write_data(
table.metadata().uuid(),
err
);
Error::InvalidRecord
Error::CatalogCommitError(err.to_string())
})?;
Ok(())
}
Expand Down
Loading