Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Unreleased
- Resume on every error

# v0.38.18
- Split retries on bad serde json response

Expand Down
55 changes: 20 additions & 35 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,15 @@ impl Client {
source_name: &SourceFullName,
comments: Vec<NewComment>,
no_charge: bool,
) -> Result<SplitableRequestResponse<PutCommentsResponse>> {
) -> SplitableRequestResponse<PutCommentsResponse> {
// Retrying here despite the potential for 409's in order to increase reliability when
// working with poor connection

self.splitable_request(
Method::PUT,
self.endpoints.put_comments(source_name)?,
self.endpoints
.put_comments(source_name)
.expect("Could not get put_comments endpoint"),
PutCommentsRequest { comments },
Some(NoChargeQuery { no_charge }),
Retry::Yes,
Expand Down Expand Up @@ -698,10 +700,12 @@ impl Client {
source_name: &SourceFullName,
comments: Vec<NewComment>,
no_charge: bool,
) -> Result<SplitableRequestResponse<SyncCommentsResponse>> {
) -> SplitableRequestResponse<SyncCommentsResponse> {
self.splitable_request(
Method::POST,
self.endpoints.sync_comments(source_name)?,
self.endpoints
.sync_comments(source_name)
.expect("Could not get sync_comments endpoint"),
SyncCommentsRequest { comments },
Some(NoChargeQuery { no_charge }),
Retry::Yes,
Expand Down Expand Up @@ -734,10 +738,12 @@ impl Client {
bucket_name: &BucketFullName,
emails: Vec<NewEmail>,
no_charge: bool,
) -> Result<SplitableRequestResponse<PutEmailsResponse>> {
) -> SplitableRequestResponse<PutEmailsResponse> {
self.splitable_request(
Method::PUT,
self.endpoints.put_emails(bucket_name)?,
self.endpoints
.put_emails(bucket_name)
.expect("Could not get put_emails endpoint"),
PutEmailsRequest { emails },
Some(NoChargeQuery { no_charge }),
Retry::Yes,
Expand Down Expand Up @@ -1557,7 +1563,7 @@ impl Client {
body: RequestT,
query: Option<QueryT>,
retry: Retry,
) -> Result<SplitableRequestResponse<SuccessT>>
) -> SplitableRequestResponse<SuccessT>
where
LocationT: IntoUrl + Display + Clone,
RequestT: Serialize + SplittableRequest + Clone,
Expand All @@ -1568,40 +1574,20 @@ impl Client {
let result: Result<SuccessT> =
self.request(&method, &url, &Some(body.clone()), &query, &retry);

fn should_split(error: &Error) -> bool {
if let Error::Api { status_code, .. } = error {
*status_code == reqwest::StatusCode::UNPROCESSABLE_ENTITY
|| *status_code == reqwest::StatusCode::BAD_REQUEST
|| *status_code == reqwest::StatusCode::CONFLICT
} else if let Error::BadJsonResponse(_) = error {
// This is for the case where some sort of network config (e.g. cloudflare) blocks
// the request and returns invalid content
true
} else if let Error::BadSerdeJsonResponse(_) = error {
// This is for the case where some sort of network config (e.g. cloudflare) blocks
// the request and returns invalid content
true
} else if let Error::ReqwestError { source, .. } = error {
// Should split timeouts
source.is_timeout()
} else {
false
}
}

match result {
Ok(response) => Ok(SplitableRequestResponse {
Ok(response) => SplitableRequestResponse {
response,
num_failed: 0,
}),
Err(error) if should_split(&error) => {
},
Err(_) => {
let mut num_failed = 0;
let response = body
.split()
.filter_map(|request| {
match self.request(&method, &url, &Some(request), &query, &retry) {
Ok(response) => Some(response),
Err(_) => {
Err(err) => {
debug!("{err}");
num_failed += 1;
None
}
Expand All @@ -1611,12 +1597,11 @@ impl Client {
merged.merge(next)
});

Ok(SplitableRequestResponse {
SplitableRequestResponse {
num_failed,
response,
})
}
}
Err(error) => Err(error),
}
}

Expand Down
24 changes: 10 additions & 14 deletions cli/src/commands/create/comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,11 @@ fn upload_batch_of_comments(
}

if resume_on_error {
let result = client
.put_comments_split_on_failure(
&source.full_name(),
comments_to_put.to_vec(),
no_charge,
)
.context("Could not put batch of comments")?;
let result = client.put_comments_split_on_failure(
&source.full_name(),
comments_to_put.to_vec(),
no_charge,
);
failed += result.num_failed;
} else {
client
Expand All @@ -402,13 +400,11 @@ fn upload_batch_of_comments(
)?;
}
let result = if resume_on_error {
let result = client
.sync_comments_split_on_failure(
&source.full_name(),
comments_to_sync.to_vec(),
no_charge,
)
.context("Could not sync batch of comments")?;
let result = client.sync_comments_split_on_failure(
&source.full_name(),
comments_to_sync.to_vec(),
no_charge,
);
failed += result.num_failed;
result.response
} else {
Expand Down
8 changes: 5 additions & 3 deletions cli/src/commands/create/emails.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ fn upload_emails_from_reader(
// Upload emails

if resume_on_error {
let result = client
.put_emails_split_on_failure(&bucket.full_name(), batch.to_vec(), no_charge)
.context("Could not upload batch of emails")?;
let result = client.put_emails_split_on_failure(
&bucket.full_name(),
batch.to_vec(),
no_charge,
);
statistics.add_emails(StatisticsUpdate {
uploaded: batch.len() - result.num_failed,
failed: result.num_failed,
Expand Down
10 changes: 3 additions & 7 deletions cli/src/commands/package/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,7 @@ fn unpack_cm_bucket(
.context("Could not get email batch")?;

if *resume_on_error {
let result =
client.put_emails_split_on_failure(&bucket.full_name(), batch, no_charge)?;
let result = client.put_emails_split_on_failure(&bucket.full_name(), batch, no_charge);

statistics.add_email_batch_uploads(1);
statistics.add_failed_email_uploads(result.num_failed);
Expand Down Expand Up @@ -487,11 +486,8 @@ fn unpack_cm_source(
let new_comments: Vec<NewComment> = batch.into_iter().map(|c| c.comment).collect();

if *resume_on_error {
let result = client.sync_comments_split_on_failure(
&source.full_name(),
new_comments,
no_charge,
)?;
let result =
client.sync_comments_split_on_failure(&source.full_name(), new_comments, no_charge);

statistics.add_comment_batch_upload(1);
statistics.add_failed_comment_uploads(result.num_failed);
Expand Down
5 changes: 2 additions & 3 deletions cli/src/commands/parse/pst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ pub fn parse(client: &Client, args: &ParsePstArgs) -> Result<()> {
if !args.dry_run {
let bucket = client.get_bucket(args.bucket.clone())?;
if args.resume_on_error {
let result = client
.put_emails_split_on_failure(&bucket.full_name(), emails, args.no_charge)
.context("Could not upload batch of emails")?;
let result =
client.put_emails_split_on_failure(&bucket.full_name(), emails, args.no_charge);
statistics.add_uploaded(batch_len - result.num_failed);
statistics.add_failed_to_upload(result.num_failed);
} else {
Expand Down
Loading