From b3c7ed1a8d3840943826aa297e93b17ff36bbe5b Mon Sep 17 00:00:00 2001 From: Joe Prosser Date: Thu, 26 Feb 2026 21:13:06 +0000 Subject: [PATCH] feat(api): resume on every error --- CHANGELOG.md | 3 ++ api/src/lib.rs | 55 +++++++++++------------------ cli/src/commands/create/comments.rs | 24 ++++++------- cli/src/commands/create/emails.rs | 8 +++-- cli/src/commands/package/upload.rs | 10 ++---- cli/src/commands/parse/pst.rs | 5 ++- 6 files changed, 43 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a46d764..7b5a8563 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# Unreleased +- Resume on every error + # v0.38.18 - Split retries on bad serde json response diff --git a/api/src/lib.rs b/api/src/lib.rs index 484ba748..c6e55a22 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -574,13 +574,15 @@ impl Client { source_name: &SourceFullName, comments: Vec, no_charge: bool, - ) -> Result> { + ) -> SplitableRequestResponse { // Retrying here despite the potential for 409's in order to increase reliability when // working with poor connection self.splitable_request( Method::PUT, - self.endpoints.put_comments(source_name)?, + self.endpoints + .put_comments(source_name) + .expect("Could not get put_comments endpoint"), PutCommentsRequest { comments }, Some(NoChargeQuery { no_charge }), Retry::Yes, @@ -698,10 +700,12 @@ impl Client { source_name: &SourceFullName, comments: Vec, no_charge: bool, - ) -> Result> { + ) -> SplitableRequestResponse { self.splitable_request( Method::POST, - self.endpoints.sync_comments(source_name)?, + self.endpoints + .sync_comments(source_name) + .expect("Could not get sync_comments endpoint"), SyncCommentsRequest { comments }, Some(NoChargeQuery { no_charge }), Retry::Yes, @@ -734,10 +738,12 @@ impl Client { bucket_name: &BucketFullName, emails: Vec, no_charge: bool, - ) -> Result> { + ) -> SplitableRequestResponse { self.splitable_request( Method::PUT, - self.endpoints.put_emails(bucket_name)?, + self.endpoints + .put_emails(bucket_name) + .expect("Could not get put_emails endpoint"), PutEmailsRequest { emails }, Some(NoChargeQuery { no_charge }), Retry::Yes, @@ -1557,7 +1563,7 @@ impl Client { body: RequestT, query: Option, retry: Retry, - ) -> Result> + ) -> SplitableRequestResponse where LocationT: IntoUrl + Display + Clone, RequestT: Serialize + SplittableRequest + Clone, @@ -1568,40 +1574,20 @@ impl Client { let result: Result = self.request(&method, &url, &Some(body.clone()), &query, &retry); - fn should_split(error: &Error) -> bool { - if let Error::Api { status_code, .. } = error { - *status_code == reqwest::StatusCode::UNPROCESSABLE_ENTITY - || *status_code == reqwest::StatusCode::BAD_REQUEST - || *status_code == reqwest::StatusCode::CONFLICT - } else if let Error::BadJsonResponse(_) = error { - // This is for the case where some sort of network config (e.g. cloudflare) blocks - // the request and returns invalid content - true - } else if let Error::BadSerdeJsonResponse(_) = error { - // This is for the case where some sort of network config (e.g. cloudflare) blocks - // the request and returns invalid content - true - } else if let Error::ReqwestError { source, .. } = error { - // Should split timeouts - source.is_timeout() - } else { - false - } - } - match result { - Ok(response) => Ok(SplitableRequestResponse { + Ok(response) => SplitableRequestResponse { response, num_failed: 0, - }), - Err(error) if should_split(&error) => { + }, + Err(_) => { let mut num_failed = 0; let response = body .split() .filter_map(|request| { match self.request(&method, &url, &Some(request), &query, &retry) { Ok(response) => Some(response), - Err(_) => { + Err(err) => { + debug!("{err}"); num_failed += 1; None } @@ -1611,12 +1597,11 @@ impl Client { merged.merge(next) }); - Ok(SplitableRequestResponse { + SplitableRequestResponse { num_failed, response, - }) + } } - Err(error) => Err(error), } } diff --git a/cli/src/commands/create/comments.rs b/cli/src/commands/create/comments.rs index e52f3275..ebc088d8 100644 --- a/cli/src/commands/create/comments.rs +++ b/cli/src/commands/create/comments.rs @@ -374,13 +374,11 @@ fn upload_batch_of_comments( } if resume_on_error { - let result = client - .put_comments_split_on_failure( - &source.full_name(), - comments_to_put.to_vec(), - no_charge, - ) - .context("Could not put batch of comments")?; + let result = client.put_comments_split_on_failure( + &source.full_name(), + comments_to_put.to_vec(), + no_charge, + ); failed += result.num_failed; } else { client @@ -402,13 +400,11 @@ fn upload_batch_of_comments( )?; } let result = if resume_on_error { - let result = client - .sync_comments_split_on_failure( - &source.full_name(), - comments_to_sync.to_vec(), - no_charge, - ) - .context("Could not sync batch of comments")?; + let result = client.sync_comments_split_on_failure( + &source.full_name(), + comments_to_sync.to_vec(), + no_charge, + ); failed += result.num_failed; result.response } else { diff --git a/cli/src/commands/create/emails.rs b/cli/src/commands/create/emails.rs index b66b4b2d..a6af2094 100644 --- a/cli/src/commands/create/emails.rs +++ b/cli/src/commands/create/emails.rs @@ -156,9 +156,11 @@ fn upload_emails_from_reader( // Upload emails if resume_on_error { - let result = client - .put_emails_split_on_failure(&bucket.full_name(), batch.to_vec(), no_charge) - .context("Could not upload batch of emails")?; + let result = client.put_emails_split_on_failure( + &bucket.full_name(), + batch.to_vec(), + no_charge, + ); statistics.add_emails(StatisticsUpdate { uploaded: batch.len() - result.num_failed, failed: result.num_failed, diff --git a/cli/src/commands/package/upload.rs b/cli/src/commands/package/upload.rs index a72c0796..93cd1f2e 100644 --- a/cli/src/commands/package/upload.rs +++ b/cli/src/commands/package/upload.rs @@ -429,8 +429,7 @@ fn unpack_cm_bucket( .context("Could not get email batch")?; if *resume_on_error { - let result = - client.put_emails_split_on_failure(&bucket.full_name(), batch, no_charge)?; + let result = client.put_emails_split_on_failure(&bucket.full_name(), batch, no_charge); statistics.add_email_batch_uploads(1); statistics.add_failed_email_uploads(result.num_failed); @@ -487,11 +486,8 @@ fn unpack_cm_source( let new_comments: Vec = batch.into_iter().map(|c| c.comment).collect(); if *resume_on_error { - let result = client.sync_comments_split_on_failure( - &source.full_name(), - new_comments, - no_charge, - )?; + let result = + client.sync_comments_split_on_failure(&source.full_name(), new_comments, no_charge); statistics.add_comment_batch_upload(1); statistics.add_failed_comment_uploads(result.num_failed); diff --git a/cli/src/commands/parse/pst.rs b/cli/src/commands/parse/pst.rs index 02d175b8..59c2502a 100644 --- a/cli/src/commands/parse/pst.rs +++ b/cli/src/commands/parse/pst.rs @@ -212,9 +212,8 @@ pub fn parse(client: &Client, args: &ParsePstArgs) -> Result<()> { if !args.dry_run { let bucket = client.get_bucket(args.bucket.clone())?; if args.resume_on_error { - let result = client - .put_emails_split_on_failure(&bucket.full_name(), emails, args.no_charge) - .context("Could not upload batch of emails")?; + let result = + client.put_emails_split_on_failure(&bucket.full_name(), emails, args.no_charge); statistics.add_uploaded(batch_len - result.num_failed); statistics.add_failed_to_upload(result.num_failed); } else {