Skip to content

Commit cc2a272

Browse files
committed
feat(rest): Add comprehensive error handling system for REST catalog
1 parent 2ed0a6f commit cc2a272

File tree

7 files changed

+909
-139
lines changed

7 files changed

+909
-139
lines changed

crates/catalog/rest/src/catalog.rs

Lines changed: 18 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@ use reqwest::{Client, Method, StatusCode, Url};
3737
use tokio::sync::OnceCell;
3838
use typed_builder::TypedBuilder;
3939

40-
use crate::client::{
41-
HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error,
40+
use crate::client::{HttpClient, deserialize_catalog_response, handle_error_response};
41+
use crate::error_handlers::{
42+
DefaultErrorHandler, DropNamespaceErrorHandler, NamespaceErrorHandler, TableCommitHandler,
43+
TableErrorHandler,
4244
};
4345
use crate::types::{
4446
CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest,
@@ -378,7 +380,7 @@ impl RestCatalog {
378380

379381
match http_response.status() {
380382
StatusCode::OK => deserialize_catalog_response(http_response).await,
381-
_ => Err(deserialize_unexpected_catalog_error(http_response).await),
383+
_ => Err(handle_error_response(http_response, &DefaultErrorHandler).await),
382384
}
383385
}
384386

@@ -473,13 +475,7 @@ impl Catalog for RestCatalog {
473475
None => break,
474476
}
475477
}
476-
StatusCode::NOT_FOUND => {
477-
return Err(Error::new(
478-
ErrorKind::Unexpected,
479-
"The parent parameter of the namespace provided does not exist",
480-
));
481-
}
482-
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
478+
_ => return Err(handle_error_response(http_response, &NamespaceErrorHandler).await),
483479
}
484480
}
485481

@@ -510,11 +506,7 @@ impl Catalog for RestCatalog {
510506
deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
511507
Ok(Namespace::from(response))
512508
}
513-
StatusCode::CONFLICT => Err(Error::new(
514-
ErrorKind::Unexpected,
515-
"Tried to create a namespace that already exists",
516-
)),
517-
_ => Err(deserialize_unexpected_catalog_error(http_response).await),
509+
_ => Err(handle_error_response(http_response, &NamespaceErrorHandler).await),
518510
}
519511
}
520512

@@ -534,11 +526,7 @@ impl Catalog for RestCatalog {
534526
deserialize_catalog_response::<NamespaceResponse>(http_response).await?;
535527
Ok(Namespace::from(response))
536528
}
537-
StatusCode::NOT_FOUND => Err(Error::new(
538-
ErrorKind::Unexpected,
539-
"Tried to get a namespace that does not exist",
540-
)),
541-
_ => Err(deserialize_unexpected_catalog_error(http_response).await),
529+
_ => Err(handle_error_response(http_response, &NamespaceErrorHandler).await),
542530
}
543531
}
544532

@@ -555,7 +543,7 @@ impl Catalog for RestCatalog {
555543
match http_response.status() {
556544
StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
557545
StatusCode::NOT_FOUND => Ok(false),
558-
_ => Err(deserialize_unexpected_catalog_error(http_response).await),
546+
_ => Err(handle_error_response(http_response, &NamespaceErrorHandler).await),
559547
}
560548
}
561549

@@ -582,11 +570,7 @@ impl Catalog for RestCatalog {
582570

583571
match http_response.status() {
584572
StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
585-
StatusCode::NOT_FOUND => Err(Error::new(
586-
ErrorKind::Unexpected,
587-
"Tried to drop a namespace that does not exist",
588-
)),
589-
_ => Err(deserialize_unexpected_catalog_error(http_response).await),
573+
_ => Err(handle_error_response(http_response, &DropNamespaceErrorHandler).await),
590574
}
591575
}
592576

@@ -617,13 +601,7 @@ impl Catalog for RestCatalog {
617601
None => break,
618602
}
619603
}
620-
StatusCode::NOT_FOUND => {
621-
return Err(Error::new(
622-
ErrorKind::Unexpected,
623-
"Tried to list tables of a namespace that does not exist",
624-
));
625-
}
626-
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
604+
_ => return Err(handle_error_response(http_response, &TableErrorHandler).await),
627605
}
628606
}
629607

@@ -665,19 +643,7 @@ impl Catalog for RestCatalog {
665643
StatusCode::OK => {
666644
deserialize_catalog_response::<LoadTableResult>(http_response).await?
667645
}
668-
StatusCode::NOT_FOUND => {
669-
return Err(Error::new(
670-
ErrorKind::Unexpected,
671-
"Tried to create a table under a namespace that does not exist",
672-
));
673-
}
674-
StatusCode::CONFLICT => {
675-
return Err(Error::new(
676-
ErrorKind::Unexpected,
677-
"The table already exists",
678-
));
679-
}
680-
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
646+
_ => return Err(handle_error_response(http_response, &TableErrorHandler).await),
681647
};
682648

683649
let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
@@ -726,13 +692,7 @@ impl Catalog for RestCatalog {
726692
StatusCode::OK | StatusCode::NOT_MODIFIED => {
727693
deserialize_catalog_response::<LoadTableResult>(http_response).await?
728694
}
729-
StatusCode::NOT_FOUND => {
730-
return Err(Error::new(
731-
ErrorKind::Unexpected,
732-
"Tried to load a table that does not exist",
733-
));
734-
}
735-
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
695+
_ => return Err(handle_error_response(http_response, &TableErrorHandler).await),
736696
};
737697

738698
let config = response
@@ -770,11 +730,7 @@ impl Catalog for RestCatalog {
770730

771731
match http_response.status() {
772732
StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
773-
StatusCode::NOT_FOUND => Err(Error::new(
774-
ErrorKind::Unexpected,
775-
"Tried to drop a table that does not exist",
776-
)),
777-
_ => Err(deserialize_unexpected_catalog_error(http_response).await),
733+
_ => Err(handle_error_response(http_response, &TableErrorHandler).await),
778734
}
779735
}
780736

@@ -792,7 +748,7 @@ impl Catalog for RestCatalog {
792748
match http_response.status() {
793749
StatusCode::NO_CONTENT | StatusCode::OK => Ok(true),
794750
StatusCode::NOT_FOUND => Ok(false),
795-
_ => Err(deserialize_unexpected_catalog_error(http_response).await),
751+
_ => Err(handle_error_response(http_response, &TableErrorHandler).await),
796752
}
797753
}
798754

@@ -813,15 +769,7 @@ impl Catalog for RestCatalog {
813769

814770
match http_response.status() {
815771
StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
816-
StatusCode::NOT_FOUND => Err(Error::new(
817-
ErrorKind::Unexpected,
818-
"Tried to rename a table that does not exist (is the namespace correct?)",
819-
)),
820-
StatusCode::CONFLICT => Err(Error::new(
821-
ErrorKind::Unexpected,
822-
"Tried to rename a table to a name that already exists",
823-
)),
824-
_ => Err(deserialize_unexpected_catalog_error(http_response).await),
772+
_ => Err(handle_error_response(http_response, &TableErrorHandler).await),
825773
}
826774
}
827775

@@ -853,19 +801,7 @@ impl Catalog for RestCatalog {
853801
StatusCode::OK => {
854802
deserialize_catalog_response::<LoadTableResult>(http_response).await?
855803
}
856-
StatusCode::NOT_FOUND => {
857-
return Err(Error::new(
858-
ErrorKind::NamespaceNotFound,
859-
"The namespace specified does not exist.",
860-
));
861-
}
862-
StatusCode::CONFLICT => {
863-
return Err(Error::new(
864-
ErrorKind::TableAlreadyExists,
865-
"The given table already exists.",
866-
));
867-
}
868-
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
804+
_ => return Err(handle_error_response(http_response, &TableErrorHandler).await),
869805
};
870806

871807
let metadata_location = response.metadata_location.as_ref().ok_or(Error::new(
@@ -903,38 +839,7 @@ impl Catalog for RestCatalog {
903839

904840
let response: CommitTableResponse = match http_response.status() {
905841
StatusCode::OK => deserialize_catalog_response(http_response).await?,
906-
StatusCode::NOT_FOUND => {
907-
return Err(Error::new(
908-
ErrorKind::TableNotFound,
909-
"Tried to update a table that does not exist",
910-
));
911-
}
912-
StatusCode::CONFLICT => {
913-
return Err(Error::new(
914-
ErrorKind::CatalogCommitConflicts,
915-
"CatalogCommitConflicts, one or more requirements failed. The client may retry.",
916-
)
917-
.with_retryable(true));
918-
}
919-
StatusCode::INTERNAL_SERVER_ERROR => {
920-
return Err(Error::new(
921-
ErrorKind::Unexpected,
922-
"An unknown server-side problem occurred; the commit state is unknown.",
923-
));
924-
}
925-
StatusCode::BAD_GATEWAY => {
926-
return Err(Error::new(
927-
ErrorKind::Unexpected,
928-
"A gateway or proxy received an invalid response from the upstream server; the commit state is unknown.",
929-
));
930-
}
931-
StatusCode::GATEWAY_TIMEOUT => {
932-
return Err(Error::new(
933-
ErrorKind::Unexpected,
934-
"A server-side gateway timeout occurred; the commit state is unknown.",
935-
));
936-
}
937-
_ => return Err(deserialize_unexpected_catalog_error(http_response).await),
842+
_ => return Err(handle_error_response(http_response, &TableCommitHandler).await),
938843
};
939844

940845
let file_io = self

crates/catalog/rest/src/client.rs

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ use serde::de::DeserializeOwned;
2626
use tokio::sync::Mutex;
2727

2828
use crate::RestCatalogConfig;
29-
use crate::types::{ErrorResponse, TokenResponse};
29+
use crate::error_handlers::{ErrorHandler, OAuthErrorHandler};
30+
use crate::types::{ErrorModel, ErrorResponse, OAuthError, TokenResponse};
3031

3132
pub(crate) struct HttpClient {
3233
client: Client,
@@ -156,20 +157,39 @@ impl HttpClient {
156157
.with_source(e)
157158
})?)
158159
} else {
159-
let code = auth_resp.status();
160+
let status = auth_resp.status();
160161
let text = auth_resp
161162
.bytes()
162163
.await
163164
.map_err(|err| err.with_url(auth_url.clone()))?;
164-
let e: ErrorResponse = serde_json::from_slice(&text).map_err(|e| {
165-
Error::new(ErrorKind::Unexpected, "Received unexpected response")
166-
.with_context("code", code.to_string())
167-
.with_context("operation", "auth")
168-
.with_context("url", auth_url.to_string())
169-
.with_context("json", String::from_utf8_lossy(&text))
170-
.with_source(e)
171-
})?;
172-
Err(Error::from(e))
165+
let error_response = if text.is_empty() {
166+
// use default error when response body is empty
167+
ErrorResponse::build_default_response(status)
168+
} else {
169+
match serde_json::from_slice::<OAuthError>(&text) {
170+
Ok(oauth_error) => {
171+
// Convert OAuth error format to ErrorResponse format
172+
// OAuth "error" field becomes ErrorResponse "type"
173+
// OAuth "error_description" becomes ErrorResponse "message"
174+
ErrorResponse {
175+
error: ErrorModel {
176+
message: oauth_error
177+
.error_description
178+
.clone()
179+
.unwrap_or_else(|| oauth_error.error.clone()),
180+
r#type: oauth_error.error, // OAuth error type
181+
code: status.as_u16(),
182+
stack: None,
183+
},
184+
}
185+
}
186+
Err(_parse_err) => {
187+
// use default error when parsing failed
188+
ErrorResponse::build_default_response(status)
189+
}
190+
}
191+
};
192+
Err(OAuthErrorHandler.handle(status, &error_response))
173193
}?;
174194
Ok(auth_res.access_token)
175195
}
@@ -278,22 +298,39 @@ pub(crate) async fn deserialize_catalog_response<R: DeserializeOwned>(
278298
})
279299
}
280300

281-
/// Deserializes a unexpected catalog response into an error.
282-
pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> Error {
283-
let err = Error::new(
284-
ErrorKind::Unexpected,
285-
"Received response with unexpected status code",
286-
)
287-
.with_context("status", response.status().to_string())
288-
.with_context("headers", format!("{:?}", response.headers()));
301+
/// Handle an error response using the provided error handler.
302+
///
303+
/// Returns an `iceberg::Error` with appropriate error kind and context.
304+
pub(crate) async fn handle_error_response(response: Response, handler: &dyn ErrorHandler) -> Error {
305+
let status = response.status();
289306

290307
let bytes = match response.bytes().await {
291308
Ok(bytes) => bytes,
292-
Err(err) => return err.into(),
309+
Err(err) => {
310+
return Error::new(
311+
ErrorKind::Unexpected,
312+
format!(
313+
"Failed to read error response body for HTTP {}",
314+
status.as_u16()
315+
),
316+
)
317+
.with_context("status", status.to_string())
318+
.with_source(err);
319+
}
293320
};
294321

295-
if bytes.is_empty() {
296-
return err;
297-
}
298-
err.with_context("json", String::from_utf8_lossy(&bytes))
322+
let error_response = if bytes.is_empty() {
323+
// use default error when response body is empty
324+
ErrorResponse::build_default_response(status)
325+
} else {
326+
match serde_json::from_slice::<ErrorResponse>(&bytes) {
327+
Ok(response) => response,
328+
Err(_parse_err) => {
329+
// use default error when parsing failed
330+
ErrorResponse::build_default_response(status)
331+
}
332+
}
333+
};
334+
335+
handler.handle(status, &error_response)
299336
}

0 commit comments

Comments
 (0)