@@ -319,9 +319,11 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
319319 auto factory = WriteResultFactory (
320320 [stub = stub_, cq = cq_, retry = std::move (retry),
321321 // NOLINTNEXTLINE(bugprone-lambda-function-name)
322- backoff = std::move (backoff), current, function_name = __func__](
322+ backoff = std::move (backoff), current, function_name = __func__,
323+ // Use shared_ptr to propagate RoutingHeaderOptions across retries.
324+ current_routing_options = std::make_shared<RoutingHeaderOptions>()](
323325 google::storage::v2::BidiWriteObjectRequest req) {
324- auto call = [stub, request = std::move (req)](
326+ auto call = [stub, request = std::move (req), current_routing_options ](
325327 CompletionQueue& cq,
326328 std::shared_ptr<grpc::ClientContext> context,
327329 google::cloud::internal::ImmutableOptions options,
@@ -336,9 +338,11 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
336338
337339 // Apply the routing header
338340 if (request.has_write_object_spec ())
339- ApplyRoutingHeaders (*context, request.write_object_spec ());
341+ ApplyRoutingHeaders (*context, request.write_object_spec (),
342+ *current_routing_options);
340343 else
341- ApplyRoutingHeaders (*context, request.append_object_spec ());
344+ ApplyRoutingHeaders (*context, request.append_object_spec (),
345+ *current_routing_options);
342346
343347 auto rpc = stub->AsyncBidiWriteObject (cq, std::move (context),
344348 std::move (options));
@@ -347,18 +351,23 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
347351 std::move (rpc));
348352 request.set_state_lookup (true );
349353 auto open = std::make_shared<WriteObject>(std::move (rpc), request);
350- return open->Call ().then ([open, &request](auto f) mutable {
351- open.reset ();
352- auto response = f.get ();
353- if (!response) {
354- google::rpc::Status grpc_status =
355- ExtractGrpcStatus (response.status ());
356- EnsureFirstMessageAppendObjectSpec (request, grpc_status);
357- ApplyWriteRedirectErrors (*request.mutable_append_object_spec (),
358- grpc_status);
359- }
360- return response;
361- });
354+ return open->Call ().then (
355+ [open, &request, current_routing_options](auto f) mutable {
356+ open.reset ();
357+ auto response = f.get ();
358+ if (!response) {
359+ google::rpc::Status grpc_status =
360+ ExtractGrpcStatus (response.status ());
361+ // Handle redirect and get info for updating routing options.
362+ BidiWriteRedirectInfo redirect_info =
363+ HandleBidiWriteRedirect (request, grpc_status);
364+
365+ // Update RoutingHeaderOptions for the next attempt.
366+ current_routing_options->routing_token =
367+ redirect_info.routing_token ;
368+ }
369+ return response;
370+ });
362371 };
363372
364373 return google::cloud::internal::AsyncRetryLoop (
@@ -371,7 +380,7 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
371380 return pending.then (
372381 [current, request = std::move (p.request ), persisted_size,
373382 hash = std::move (hash_function), fa = std::move (factory)](auto f) mutable
374- -> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
383+ -> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
375384 auto rpc = f.get ();
376385 if (!rpc) return std::move (rpc).status ();
377386 std::unique_ptr<AsyncWriterConnectionImpl> impl;
@@ -427,7 +436,7 @@ AsyncConnectionImpl::StartBufferedUpload(UploadParams p) {
427436 return StartUnbufferedUpload (std::move (p))
428437 .then ([current = std::move (current),
429438 async_write_object = std::move (async_write_object)](auto f) mutable
430- -> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
439+ -> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
431440 auto w = f.get ();
432441 if (!w) return std::move (w).status ();
433442 auto factory = [upload_id = (*w)->UploadId (),
@@ -461,14 +470,15 @@ AsyncConnectionImpl::ResumeBufferedUpload(ResumeUploadParams p) {
461470 };
462471
463472 auto f = make_unbuffered ();
464- return f.then ([current = std::move (current),
465- make_unbuffered = std::move (make_unbuffered)](auto f) mutable
466- -> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
467- auto w = f.get ();
468- if (!w) return std::move (w).status ();
469- return MakeWriterConnectionBuffered (std::move (make_unbuffered),
470- *std::move (w), *current);
471- });
473+ return f.then (
474+ [current = std::move (current),
475+ make_unbuffered = std::move (make_unbuffered)](auto f) mutable
476+ -> StatusOr<std::unique_ptr<storage::AsyncWriterConnection>> {
477+ auto w = f.get ();
478+ if (!w) return std::move (w).status ();
479+ return MakeWriterConnectionBuffered (std::move (make_unbuffered),
480+ *std::move (w), *current);
481+ });
472482}
473483
474484future<StatusOr<google::storage::v2::Object>>
0 commit comments