diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index ddbf6a4e01..f3a2e7e43f 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -37,8 +37,10 @@ use reqwest::{Client, Method, StatusCode, Url}; use tokio::sync::OnceCell; use typed_builder::TypedBuilder; -use crate::client::{ - HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error, +use crate::client::{HttpClient, deserialize_catalog_response, handle_error_response}; +use crate::error_handlers::{ + DefaultErrorHandler, DropNamespaceErrorHandler, NamespaceErrorHandler, TableCommitHandler, + TableErrorHandler, }; use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest, @@ -378,7 +380,7 @@ impl RestCatalog { match http_response.status() { StatusCode::OK => deserialize_catalog_response(http_response).await, - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(handle_error_response(http_response, &DefaultErrorHandler).await), } } @@ -473,13 +475,7 @@ impl Catalog for RestCatalog { None => break, } } - StatusCode::NOT_FOUND => { - return Err(Error::new( - ErrorKind::Unexpected, - "The parent parameter of the namespace provided does not exist", - )); - } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => return Err(handle_error_response(http_response, &NamespaceErrorHandler).await), } } @@ -510,11 +506,7 @@ impl Catalog for RestCatalog { deserialize_catalog_response::(http_response).await?; Ok(Namespace::from(response)) } - StatusCode::CONFLICT => Err(Error::new( - ErrorKind::Unexpected, - "Tried to create a namespace that already exists", - )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(handle_error_response(http_response, &NamespaceErrorHandler).await), } } @@ -534,11 +526,7 @@ impl Catalog for RestCatalog { deserialize_catalog_response::(http_response).await?; Ok(Namespace::from(response)) } - StatusCode::NOT_FOUND => Err(Error::new( - ErrorKind::Unexpected, - "Tried to get a namespace that does not exist", - )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(handle_error_response(http_response, &NamespaceErrorHandler).await), } } @@ -555,7 +543,7 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::NO_CONTENT | StatusCode::OK => Ok(true), StatusCode::NOT_FOUND => Ok(false), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(handle_error_response(http_response, &NamespaceErrorHandler).await), } } @@ -582,11 +570,7 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::NO_CONTENT | StatusCode::OK => Ok(()), - StatusCode::NOT_FOUND => Err(Error::new( - ErrorKind::Unexpected, - "Tried to drop a namespace that does not exist", - )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(handle_error_response(http_response, &DropNamespaceErrorHandler).await), } } @@ -617,13 +601,7 @@ impl Catalog for RestCatalog { None => break, } } - StatusCode::NOT_FOUND => { - return Err(Error::new( - ErrorKind::Unexpected, - "Tried to list tables of a namespace that does not exist", - )); - } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => return Err(handle_error_response(http_response, &TableErrorHandler).await), } } @@ -665,19 +643,7 @@ impl Catalog for RestCatalog { StatusCode::OK => { deserialize_catalog_response::(http_response).await? } - StatusCode::NOT_FOUND => { - return Err(Error::new( - ErrorKind::Unexpected, - "Tried to create a table under a namespace that does not exist", - )); - } - StatusCode::CONFLICT => { - return Err(Error::new( - ErrorKind::Unexpected, - "The table already exists", - )); - } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => return Err(handle_error_response(http_response, &TableErrorHandler).await), }; let metadata_location = response.metadata_location.as_ref().ok_or(Error::new( @@ -726,13 +692,7 @@ impl Catalog for RestCatalog { StatusCode::OK | StatusCode::NOT_MODIFIED => { deserialize_catalog_response::(http_response).await? } - StatusCode::NOT_FOUND => { - return Err(Error::new( - ErrorKind::Unexpected, - "Tried to load a table that does not exist", - )); - } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => return Err(handle_error_response(http_response, &TableErrorHandler).await), }; let config = response @@ -770,11 +730,7 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::NO_CONTENT | StatusCode::OK => Ok(()), - StatusCode::NOT_FOUND => Err(Error::new( - ErrorKind::Unexpected, - "Tried to drop a table that does not exist", - )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(handle_error_response(http_response, &TableErrorHandler).await), } } @@ -792,7 +748,7 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::NO_CONTENT | StatusCode::OK => Ok(true), StatusCode::NOT_FOUND => Ok(false), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(handle_error_response(http_response, &TableErrorHandler).await), } } @@ -813,15 +769,7 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::NO_CONTENT | StatusCode::OK => Ok(()), - StatusCode::NOT_FOUND => Err(Error::new( - ErrorKind::Unexpected, - "Tried to rename a table that does not exist (is the namespace correct?)", - )), - StatusCode::CONFLICT => Err(Error::new( - ErrorKind::Unexpected, - "Tried to rename a table to a name that already exists", - )), - _ => Err(deserialize_unexpected_catalog_error(http_response).await), + _ => Err(handle_error_response(http_response, &TableErrorHandler).await), } } @@ -853,19 +801,7 @@ impl Catalog for RestCatalog { StatusCode::OK => { deserialize_catalog_response::(http_response).await? } - StatusCode::NOT_FOUND => { - return Err(Error::new( - ErrorKind::NamespaceNotFound, - "The namespace specified does not exist.", - )); - } - StatusCode::CONFLICT => { - return Err(Error::new( - ErrorKind::TableAlreadyExists, - "The given table already exists.", - )); - } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => return Err(handle_error_response(http_response, &TableErrorHandler).await), }; let metadata_location = response.metadata_location.as_ref().ok_or(Error::new( @@ -903,38 +839,7 @@ impl Catalog for RestCatalog { let response: CommitTableResponse = match http_response.status() { StatusCode::OK => deserialize_catalog_response(http_response).await?, - StatusCode::NOT_FOUND => { - return Err(Error::new( - ErrorKind::TableNotFound, - "Tried to update a table that does not exist", - )); - } - StatusCode::CONFLICT => { - return Err(Error::new( - ErrorKind::CatalogCommitConflicts, - "CatalogCommitConflicts, one or more requirements failed. The client may retry.", - ) - .with_retryable(true)); - } - StatusCode::INTERNAL_SERVER_ERROR => { - return Err(Error::new( - ErrorKind::Unexpected, - "An unknown server-side problem occurred; the commit state is unknown.", - )); - } - StatusCode::BAD_GATEWAY => { - return Err(Error::new( - ErrorKind::Unexpected, - "A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.", - )); - } - StatusCode::GATEWAY_TIMEOUT => { - return Err(Error::new( - ErrorKind::Unexpected, - "A server-side gateway timeout occurred; the commit state is unknown.", - )); - } - _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + _ => return Err(handle_error_response(http_response, &TableCommitHandler).await), }; let file_io = self diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 361c036bb6..14fcd05aec 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -26,7 +26,8 @@ use serde::de::DeserializeOwned; use tokio::sync::Mutex; use crate::RestCatalogConfig; -use crate::types::{ErrorResponse, TokenResponse}; +use crate::error_handlers::{ErrorHandler, OAuthErrorHandler}; +use crate::types::{ErrorModel, ErrorResponse, OAuthError, TokenResponse}; pub(crate) struct HttpClient { client: Client, @@ -156,20 +157,39 @@ impl HttpClient { .with_source(e) })?) } else { - let code = auth_resp.status(); + let status = auth_resp.status(); let text = auth_resp .bytes() .await .map_err(|err| err.with_url(auth_url.clone()))?; - let e: ErrorResponse = serde_json::from_slice(&text).map_err(|e| { - Error::new(ErrorKind::Unexpected, "Received unexpected response") - .with_context("code", code.to_string()) - .with_context("operation", "auth") - .with_context("url", auth_url.to_string()) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) - })?; - Err(Error::from(e)) + let error_response = if text.is_empty() { + // use default error when response body is empty + ErrorResponse::build_default_response(status) + } else { + match serde_json::from_slice::(&text) { + Ok(oauth_error) => { + // Convert OAuth error format to ErrorResponse format + // OAuth "error" field becomes ErrorResponse "type" + // OAuth "error_description" becomes ErrorResponse "message" + ErrorResponse { + error: ErrorModel { + message: oauth_error + .error_description + .clone() + .unwrap_or_else(|| oauth_error.error.clone()), + r#type: oauth_error.error, // OAuth error type + code: status.as_u16(), + stack: None, + }, + } + } + Err(_parse_err) => { + // use default error when parsing failed + ErrorResponse::build_default_response(status) + } + } + }; + Err(OAuthErrorHandler.handle(status, &error_response)) }?; Ok(auth_res.access_token) } @@ -278,22 +298,39 @@ pub(crate) async fn deserialize_catalog_response( }) } -/// Deserializes a unexpected catalog response into an error. -pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> Error { - let err = Error::new( - ErrorKind::Unexpected, - "Received response with unexpected status code", - ) - .with_context("status", response.status().to_string()) - .with_context("headers", format!("{:?}", response.headers())); +/// Handle an error response using the provided error handler. +/// +/// Returns an `iceberg::Error` with appropriate error kind and context. +pub(crate) async fn handle_error_response(response: Response, handler: &dyn ErrorHandler) -> Error { + let status = response.status(); let bytes = match response.bytes().await { Ok(bytes) => bytes, - Err(err) => return err.into(), + Err(err) => { + return Error::new( + ErrorKind::Unexpected, + format!( + "Failed to read error response body for HTTP {}", + status.as_u16() + ), + ) + .with_context("status", status.to_string()) + .with_source(err); + } }; - if bytes.is_empty() { - return err; - } - err.with_context("json", String::from_utf8_lossy(&bytes)) + let error_response = if bytes.is_empty() { + // use default error when response body is empty + ErrorResponse::build_default_response(status) + } else { + match serde_json::from_slice::(&bytes) { + Ok(response) => response, + Err(_parse_err) => { + // use default error when parsing failed + ErrorResponse::build_default_response(status) + } + } + }; + + handler.handle(status, &error_response) } diff --git a/crates/catalog/rest/src/error_handlers.rs b/crates/catalog/rest/src/error_handlers.rs new file mode 100644 index 0000000000..a7a24260b7 --- /dev/null +++ b/crates/catalog/rest/src/error_handlers.rs @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Error handlers for REST catalog operations. +//! +//! This module provides error handlers that convert HTTP error responses into +//! semantic Iceberg errors. Each handler interprets HTTP status codes and error +//! response types in the context of specific catalog operations (namespace, table, +//! view, commit, OAuth). +//! +//! Error handlers follow a strategy pattern with delegation to provide operation-specific +//! error semantics while reusing common error handling logic. + +use iceberg::{Error, ErrorKind}; +use reqwest::StatusCode; + +use crate::types::ErrorResponse; + +/// Trait for handling REST API error responses and converting to semantic errors. +pub trait ErrorHandler: Send + Sync { + /// Process an error response and convert to iceberg::Error. + fn handle(&self, code: StatusCode, response: &ErrorResponse) -> Error; +} + +/// Default error handler for common HTTP error responses. +#[derive(Debug, Default, Clone, Copy)] +pub struct DefaultErrorHandler; + +impl ErrorHandler for DefaultErrorHandler { + fn handle(&self, code: StatusCode, response: &ErrorResponse) -> Error { + let error_type = &response.error.r#type; + let message = &response.error.message; + + match code { + StatusCode::BAD_REQUEST => { + // Check error.type for special cases + if error_type.contains("IllegalArgumentException") { + Error::new(ErrorKind::DataInvalid, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()) + } else { + Error::new(ErrorKind::BadRequest, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()) + } + } + StatusCode::UNAUTHORIZED => Error::new(ErrorKind::NotAuthorized, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + StatusCode::FORBIDDEN => Error::new(ErrorKind::Forbidden, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + StatusCode::INTERNAL_SERVER_ERROR => Error::new(ErrorKind::Unexpected, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + StatusCode::NOT_IMPLEMENTED => Error::new(ErrorKind::FeatureUnsupported, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + StatusCode::SERVICE_UNAVAILABLE => Error::new(ErrorKind::ServiceUnavailable, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + _ => { + // Generic REST error for unhandled status codes + Error::new(ErrorKind::Unexpected, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()) + } + } + } +} + +/// Error handler for namespace operations (list, create, get, update). +#[derive(Debug, Default, Clone, Copy)] +pub struct NamespaceErrorHandler; + +impl ErrorHandler for NamespaceErrorHandler { + fn handle(&self, code: StatusCode, response: &ErrorResponse) -> Error { + let error_type = &response.error.r#type; + let message = &response.error.message; + + match code { + StatusCode::BAD_REQUEST => { + // Check for NamespaceNotEmptyException + if error_type.contains("NamespaceNotEmptyException") { + Error::new(ErrorKind::NamespaceNotEmpty, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()) + } else { + // Delegate to default handler + DefaultErrorHandler.handle(code, response) + } + } + StatusCode::NOT_FOUND => Error::new(ErrorKind::NamespaceNotFound, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + StatusCode::CONFLICT => Error::new(ErrorKind::NamespaceAlreadyExists, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + StatusCode::UNPROCESSABLE_ENTITY => Error::new(ErrorKind::Unexpected, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + _ => DefaultErrorHandler.handle(code, response), + } + } +} + +/// Error handler for drop namespace operations. +#[derive(Debug, Default, Clone, Copy)] +pub struct DropNamespaceErrorHandler; + +impl ErrorHandler for DropNamespaceErrorHandler { + fn handle(&self, code: StatusCode, response: &ErrorResponse) -> Error { + match code { + StatusCode::CONFLICT => { + // For drop operations, 409 specifically means namespace not empty + Error::new(ErrorKind::NamespaceNotEmpty, &response.error.message) + .with_context("type", response.error.r#type.clone()) + .with_context("code", code.as_u16().to_string()) + } + _ => NamespaceErrorHandler.handle(code, response), + } + } +} + +/// Error handler for table operations (list, create, load, rename). +#[derive(Debug, Default, Clone, Copy)] +pub struct TableErrorHandler; + +impl ErrorHandler for TableErrorHandler { + fn handle(&self, code: StatusCode, response: &ErrorResponse) -> Error { + let error_type = &response.error.r#type; + let message = &response.error.message; + + match code { + StatusCode::NOT_FOUND => { + // Disambiguate based on error type + if error_type.contains("NoSuchNamespaceException") { + Error::new(ErrorKind::NamespaceNotFound, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()) + } else { + Error::new(ErrorKind::TableNotFound, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()) + } + } + StatusCode::CONFLICT => Error::new(ErrorKind::TableAlreadyExists, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + _ => DefaultErrorHandler.handle(code, response), + } + } +} + +/// Error handler for table commit operations (update_table, commit_table). +#[derive(Debug, Default, Clone, Copy)] +pub struct TableCommitHandler; + +impl ErrorHandler for TableCommitHandler { + fn handle(&self, code: StatusCode, response: &ErrorResponse) -> Error { + let error_type = &response.error.r#type; + let message = &response.error.message; + + match code { + StatusCode::NOT_FOUND => Error::new(ErrorKind::TableNotFound, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + StatusCode::CONFLICT => { + // Optimistic locking failure - can retry + Error::new(ErrorKind::CatalogCommitConflicts, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()) + .with_retryable(true) + } + // Special handling for commit operations - 5xx means unknown state + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => Error::new( + ErrorKind::CommitStateUnknown, + format!( + "Service failed during commit ({}): {}. \ + Commit state is unknown. Manual verification may be needed.", + code.as_u16(), + message + ), + ) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + // Note: NOT marked as retryable - requires manual verification + _ => DefaultErrorHandler.handle(code, response), + } + } +} + +/// Error handler for OAuth token exchange operations. +#[derive(Debug, Default, Clone, Copy)] +pub struct OAuthErrorHandler; + +impl ErrorHandler for OAuthErrorHandler { + fn handle(&self, code: StatusCode, response: &ErrorResponse) -> Error { + let error_type = &response.error.r#type; + let message = &response.error.message; + + // OAuth errors use the error.type field for error classification + match error_type.as_str() { + "invalid_client" => Error::new(ErrorKind::NotAuthorized, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + "invalid_request" + | "invalid_grant" + | "unauthorized_client" + | "unsupported_grant_type" + | "invalid_scope" => Error::new(ErrorKind::BadRequest, message) + .with_context("type", error_type.clone()) + .with_context("code", code.as_u16().to_string()), + _ => DefaultErrorHandler.handle(code, response), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::ErrorModel; + + fn create_error_response(code: u16, error_type: &str, message: &str) -> ErrorResponse { + ErrorResponse { + error: ErrorModel { + message: message.to_string(), + r#type: error_type.to_string(), + code, + stack: None, + }, + } + } + + #[test] + fn test_default_handler_bad_request() { + let handler = DefaultErrorHandler; + let response = create_error_response(400, "BadRequestException", "Invalid request"); + let error = handler.handle(StatusCode::BAD_REQUEST, &response); + + assert_eq!(error.kind(), ErrorKind::BadRequest); + assert_eq!(error.message(), "Invalid request"); + } + + #[test] + fn test_default_handler_illegal_argument() { + let handler = DefaultErrorHandler; + let response = create_error_response(400, "IllegalArgumentException", "Illegal argument"); + let error = handler.handle(StatusCode::BAD_REQUEST, &response); + + assert_eq!(error.kind(), ErrorKind::DataInvalid); + assert_eq!(error.message(), "Illegal argument"); + } + + #[test] + fn test_default_handler_unauthorized() { + let handler = DefaultErrorHandler; + let response = create_error_response(401, "NotAuthorizedException", "Not authorized"); + let error = handler.handle(StatusCode::UNAUTHORIZED, &response); + + assert_eq!(error.kind(), ErrorKind::NotAuthorized); + } + + #[test] + fn test_default_handler_forbidden() { + let handler = DefaultErrorHandler; + let response = create_error_response(403, "ForbiddenException", "Forbidden"); + let error = handler.handle(StatusCode::FORBIDDEN, &response); + + assert_eq!(error.kind(), ErrorKind::Forbidden); + } + + #[test] + fn test_default_handler_service_unavailable() { + let handler = DefaultErrorHandler; + let response = create_error_response(503, "ServiceUnavailableException", "Unavailable"); + let error = handler.handle(StatusCode::SERVICE_UNAVAILABLE, &response); + + assert_eq!(error.kind(), ErrorKind::ServiceUnavailable); + } + + #[test] + fn test_namespace_handler_not_found() { + let handler = NamespaceErrorHandler; + let response = + create_error_response(404, "NoSuchNamespaceException", "Namespace not found"); + let error = handler.handle(StatusCode::NOT_FOUND, &response); + + assert_eq!(error.kind(), ErrorKind::NamespaceNotFound); + } + + #[test] + fn test_namespace_handler_conflict() { + let handler = NamespaceErrorHandler; + let response = create_error_response(409, "AlreadyExistsException", "Namespace exists"); + let error = handler.handle(StatusCode::CONFLICT, &response); + + assert_eq!(error.kind(), ErrorKind::NamespaceAlreadyExists); + } + + #[test] + fn test_drop_namespace_handler_conflict() { + let handler = DropNamespaceErrorHandler; + let response = + create_error_response(409, "NamespaceNotEmptyException", "Namespace not empty"); + let error = handler.handle(StatusCode::CONFLICT, &response); + + assert_eq!(error.kind(), ErrorKind::NamespaceNotEmpty); + } + + #[test] + fn test_table_handler_not_found_table() { + let handler = TableErrorHandler; + let response = create_error_response(404, "NoSuchTableException", "Table not found"); + let error = handler.handle(StatusCode::NOT_FOUND, &response); + + assert_eq!(error.kind(), ErrorKind::TableNotFound); + } + + #[test] + fn test_table_handler_not_found_namespace() { + let handler = TableErrorHandler; + let response = + create_error_response(404, "NoSuchNamespaceException", "Namespace not found"); + let error = handler.handle(StatusCode::NOT_FOUND, &response); + + assert_eq!(error.kind(), ErrorKind::NamespaceNotFound); + } + + #[test] + fn test_table_handler_conflict() { + let handler = TableErrorHandler; + let response = create_error_response(409, "AlreadyExistsException", "Table exists"); + let error = handler.handle(StatusCode::CONFLICT, &response); + + assert_eq!(error.kind(), ErrorKind::TableAlreadyExists); + } + + #[test] + fn test_table_commit_handler_conflict() { + let handler = TableCommitHandler; + let response = create_error_response(409, "CommitFailedException", "Commit conflict"); + let error = handler.handle(StatusCode::CONFLICT, &response); + + assert_eq!(error.kind(), ErrorKind::CatalogCommitConflicts); + assert!(error.retryable()); + } + + #[test] + fn test_table_commit_handler_500() { + let handler = TableCommitHandler; + let response = create_error_response(500, "InternalServerError", "Server error"); + let error = handler.handle(StatusCode::INTERNAL_SERVER_ERROR, &response); + + assert_eq!(error.kind(), ErrorKind::CommitStateUnknown); + assert!(!error.retryable()); + } + + #[test] + fn test_table_commit_handler_503() { + let handler = TableCommitHandler; + let response = + create_error_response(503, "ServiceUnavailableException", "Service unavailable"); + let error = handler.handle(StatusCode::SERVICE_UNAVAILABLE, &response); + + assert_eq!(error.kind(), ErrorKind::CommitStateUnknown); + } + + #[test] + fn test_oauth_handler_invalid_client() { + let handler = OAuthErrorHandler; + let response = create_error_response(401, "invalid_client", "Invalid client"); + let error = handler.handle(StatusCode::UNAUTHORIZED, &response); + + assert_eq!(error.kind(), ErrorKind::NotAuthorized); + } + + #[test] + fn test_oauth_handler_invalid_request() { + let handler = OAuthErrorHandler; + let response = create_error_response(400, "invalid_request", "Invalid request"); + let error = handler.handle(StatusCode::BAD_REQUEST, &response); + + assert_eq!(error.kind(), ErrorKind::BadRequest); + } +} diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs index 6bee950970..c645a0c920 100644 --- a/crates/catalog/rest/src/lib.rs +++ b/crates/catalog/rest/src/lib.rs @@ -53,6 +53,7 @@ mod catalog; mod client; +mod error_handlers; mod types; pub use catalog::*; diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index ab44c40ee3..488f45701a 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; +use http::StatusCode; use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec}; use iceberg::{ Error, ErrorKind, Namespace, NamespaceIdent, TableIdent, TableRequirement, TableUpdate, @@ -34,7 +35,27 @@ pub(super) struct CatalogConfig { #[derive(Debug, Serialize, Deserialize)] /// Wrapper for all non-2xx error responses from the REST API pub struct ErrorResponse { - error: ErrorModel, + /// Error model + pub error: ErrorModel, +} + +impl ErrorResponse { + /// Build a fallback default error response. + pub fn build_default_response(status: StatusCode) -> Self { + let message = status + .canonical_reason() + .unwrap_or("Unknown Error") + .to_string(); + + Self { + error: ErrorModel { + message, + r#type: "RESTException".to_string(), + code: status.as_u16(), + stack: None, + }, + } + } } impl From for Error { diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 59fea0b51f..cce38a0e0c 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -24,7 +24,9 @@ use std::sync::RwLock; use ctor::{ctor, dtor}; use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::{ApplyTransactionAction, Transaction}; -use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; +use iceberg::{ + Catalog, CatalogBuilder, ErrorKind, Namespace, NamespaceIdent, TableCreation, TableIdent, +}; use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalog, RestCatalogBuilder}; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; @@ -449,3 +451,305 @@ async fn test_register_table() { table_registered.identifier().to_string() ); } + +#[tokio::test] +async fn test_create_namespace_already_exists() { + let catalog = get_catalog().await; + let ns_ident = NamespaceIdent::from_strs(["test_duplicate_ns"]).unwrap(); + + // Create once, should succeed + catalog + .create_namespace(&ns_ident, HashMap::new()) + .await + .unwrap(); + + // Create again, should fail with NamespaceAlreadyExists + let result = catalog.create_namespace(&ns_ident, HashMap::new()).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::NamespaceAlreadyExists); + assert!(err.message().contains("already exists") || err.message().contains("Conflict")); +} + +#[tokio::test] +async fn test_drop_namespace_not_empty() { + let catalog = get_catalog().await; + + // Create namespace with a table + let ns = NamespaceIdent::from_strs(["test_drop_nonempty_ns"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + // Create a table in the namespace + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema) + .build(); + + catalog.create_table(&ns, table_creation).await.unwrap(); + + // Attempt to drop namespace, should fail with NamespaceNotEmpty + let result = catalog.drop_namespace(&ns).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::NamespaceNotEmpty); +} + +#[tokio::test] +async fn test_drop_nonexistent_namespace() { + let catalog = get_catalog().await; + let ns = NamespaceIdent::from_strs(["test_drop_nonexistent_ns"]).unwrap(); + + let result = catalog.drop_namespace(&ns).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::NamespaceNotFound); +} + +#[tokio::test] +async fn test_list_namespaces_under_nonexistent_parent() { + let catalog = get_catalog().await; + + // Try to list namespaces under a parent that doesn't exist + let parent = NamespaceIdent::from_strs(["test_nonexistent_parent", "sub"]).unwrap(); + + let result = catalog.list_namespaces(Some(&parent)).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::NamespaceNotFound); +} + +#[tokio::test] +async fn test_load_table_not_found() { + let catalog = get_catalog().await; + + // Create namespace but no table + let ns = NamespaceIdent::from_strs(["test_load_table_nf"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let table_id = TableIdent::new(ns, "nonexistent_table".to_string()); + + // Try to load non-existent table + let result = catalog.load_table(&table_id).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::TableNotFound); +} + +#[tokio::test] +async fn test_create_table_namespace_not_found() { + let catalog = get_catalog().await; + + // Try to create table in non-existent namespace + let ns = NamespaceIdent::from_strs(["test_create_table_ns_nf"]).unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema) + .build(); + + let result = catalog.create_table(&ns, table_creation).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::NamespaceNotFound); +} + +#[tokio::test] +async fn test_create_table_already_exists() { + let catalog = get_catalog().await; + + // Create namespace + let ns = NamespaceIdent::from_strs(["test_create_dup_table"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + // Create table once, should succeed + catalog.create_table(&ns, table_creation).await.unwrap(); + + // Create same table again, should fail + let table_creation2 = TableCreation::builder() + .name("t1".to_string()) + .schema(schema) + .build(); + + let result = catalog.create_table(&ns, table_creation2).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::TableAlreadyExists); +} + +#[tokio::test] +async fn test_drop_table_not_found() { + let catalog = get_catalog().await; + + // Create namespace but no table + let ns = NamespaceIdent::from_strs(["test_drop_table_nf"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let table_id = TableIdent::new(ns, "nonexistent_table".to_string()); + let result = catalog.drop_table(&table_id).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::TableNotFound); +} + +#[tokio::test] +async fn test_table_exists_nonexistent() { + let catalog = get_catalog().await; + + // Create namespace but no table + let ns = NamespaceIdent::from_strs(["test_exists_table_nf"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let table_id = TableIdent::new(ns, "nonexistent_table".to_string()); + + // Check if non-existent table exists, should return false + let exists = catalog.table_exists(&table_id).await.unwrap(); + assert!(!exists); +} + +#[tokio::test] +async fn test_rename_table_not_found() { + let catalog = get_catalog().await; + + // Create namespace + let ns = NamespaceIdent::from_strs(["test_rename_table_nf"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let from = TableIdent::new(ns.clone(), "nonexistent".to_string()); + let to = TableIdent::new(ns, "new_name".to_string()); + + // Try to rename non-existent table + let result = catalog.rename_table(&from, &to).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::TableNotFound); +} + +#[tokio::test] +async fn test_rename_table_to_existing_name() { + let catalog = get_catalog().await; + + // Create namespace + let ns = NamespaceIdent::from_strs(["test_rename_table_dup"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + // Create two tables + let table1_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table2_creation = TableCreation::builder() + .name("t2".to_string()) + .schema(schema) + .build(); + + catalog.create_table(&ns, table1_creation).await.unwrap(); + catalog.create_table(&ns, table2_creation).await.unwrap(); + + let from = TableIdent::new(ns.clone(), "t1".to_string()); + let to = TableIdent::new(ns, "t2".to_string()); + + // Try to rename t1 to t2 (which already exists) + let result = catalog.rename_table(&from, &to).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::TableAlreadyExists); +} + +#[tokio::test] +async fn test_list_tables_nonexistent_namespace() { + let catalog = get_catalog().await; + + let ns = NamespaceIdent::from_strs(["test_list_tables_ns_nf"]).unwrap(); + + // Try to list tables in non-existent namespace + let result = catalog.list_tables(&ns).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::NamespaceNotFound); +} + +#[tokio::test] +async fn test_commit_to_nonexistent_table() { + let catalog = get_catalog().await; + + // Create namespace + let ns = NamespaceIdent::from_strs(["test_commit_table_nf"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + // Create a table, then drop it + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema) + .build(); + + let table = catalog.create_table(&ns, table_creation).await.unwrap(); + let table_id = table.identifier().clone(); + + // Drop the table + catalog.drop_table(&table_id).await.unwrap(); + + // Try to commit to the dropped table + let tx = Transaction::new(&table); + let result = tx + .update_table_properties() + .set("prop1".to_string(), "v1".to_string()) + .apply(tx) + .unwrap() + .commit(&catalog) + .await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::TableNotFound); +} diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 6ab3a78c8b..088c6dea7c 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -63,6 +63,46 @@ pub enum ErrorKind { /// Catalog commit failed due to outdated metadata CatalogCommitConflicts, + + /// Catalog commit state is unknown after a server error. + /// + /// This error is returned when a commit operation receives a 5xx response. + CommitStateUnknown, + + /// Namespace is not empty and cannot be dropped. + /// + /// This error is returned when attempting to drop a namespace that still + /// contains tables or other resources. + NamespaceNotEmpty, + + /// Iceberg view does not exist. + ViewNotFound, + + /// Iceberg view already exists at creation. + ViewAlreadyExists, + + /// Service is temporarily unavailable. + /// + /// This error is returned when the server returns a 503 Service Unavailable status. + ServiceUnavailable, + + /// Bad request. + /// + /// This error is returned when the server returns a 400 Bad Request status, + /// indicating a malformed or invalid request. + BadRequest, + + /// Forbidden. + /// + /// This error is returned when the server returns a 403 Forbidden status, + /// indicating the client does not have permission to perform the operation. + Forbidden, + + /// Not authorized. + /// + /// This error is returned when the server returns a 401 Unauthorized status, + /// indicating the client is not authenticated. + NotAuthorized, } impl ErrorKind { @@ -84,6 +124,14 @@ impl From for &'static str { ErrorKind::NamespaceNotFound => "NamespaceNotFound", ErrorKind::PreconditionFailed => "PreconditionFailed", ErrorKind::CatalogCommitConflicts => "CatalogCommitConflicts", + ErrorKind::CommitStateUnknown => "CommitStateUnknown", + ErrorKind::NamespaceNotEmpty => "NamespaceNotEmpty", + ErrorKind::ViewNotFound => "ViewNotFound", + ErrorKind::ViewAlreadyExists => "ViewAlreadyExists", + ErrorKind::ServiceUnavailable => "ServiceUnavailable", + ErrorKind::BadRequest => "BadRequest", + ErrorKind::Forbidden => "Forbidden", + ErrorKind::NotAuthorized => "NotAuthorized", } } }