From 7917b114f7cc56a8a0b57b3bf3bd38b5bfbbf44b Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Wed, 8 Apr 2026 10:27:37 +0000 Subject: [PATCH 1/3] fix(storage): Handle request transformation correctly after BidiWriteObject redirects This commit addresses an issue where BidiWriteObject operations, particularly in cross-region redirect scenarios, would fail with a NOT_FOUND error. The root cause was that bucket information was not propagated in the request headers after the client received a redirect error from the service. --- .../storage/internal/async/connection_impl.cc | 41 ++- .../connection_impl_appendable_upload_test.cc | 329 ++++++++++++++++-- .../internal/async/handle_redirect_error.cc | 74 ++-- .../internal/async/handle_redirect_error.h | 13 +- .../async/handle_redirect_error_test.cc | 132 +++++-- .../internal/async/writer_connection_impl.cc | 4 +- .../async/writer_connection_impl_test.cc | 45 +++ .../internal/grpc/configure_client_context.cc | 26 +- .../internal/grpc/configure_client_context.h | 11 +- .../grpc/configure_client_context_test.cc | 28 ++ 10 files changed, 605 insertions(+), 98 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index e67993317b6d0..c32b9d8ec60ce 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -334,9 +334,11 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { auto factory = WriteResultFactory( [stub = stub_, cq = cq_, retry = std::move(retry), // NOLINTNEXTLINE(bugprone-lambda-function-name) - backoff = std::move(backoff), current, function_name = __func__]( + backoff = std::move(backoff), current, function_name = __func__, + // Use shared_ptr to propagate RoutingHeaderOptions across retries. + current_routing_options = std::make_shared()]( google::storage::v2::BidiWriteObjectRequest req) { - auto call = [stub, request = std::move(req)]( + auto call = [stub, request = std::move(req), current_routing_options]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, @@ -351,9 +353,11 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { // Apply the routing header if (request.has_write_object_spec()) - ApplyRoutingHeaders(*context, request.write_object_spec()); + ApplyRoutingHeaders(*context, request.write_object_spec(), + *current_routing_options); else - ApplyRoutingHeaders(*context, request.append_object_spec()); + ApplyRoutingHeaders(*context, request.append_object_spec(), + *current_routing_options); auto rpc = stub->AsyncBidiWriteObject(cq, std::move(context), std::move(options)); @@ -362,18 +366,23 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { std::move(rpc)); request.set_state_lookup(true); auto open = std::make_shared(std::move(rpc), request); - return open->Call().then([open, &request](auto f) mutable { - open.reset(); - auto response = f.get(); - if (!response) { - google::rpc::Status grpc_status = - ExtractGrpcStatus(response.status()); - EnsureFirstMessageAppendObjectSpec(request, grpc_status); - ApplyWriteRedirectErrors(*request.mutable_append_object_spec(), - grpc_status); - } - return response; - }); + return open->Call().then( + [open, &request, current_routing_options](auto f) mutable { + open.reset(); + auto response = f.get(); + if (!response) { + google::rpc::Status grpc_status = + ExtractGrpcStatus(response.status()); + // Handle redirect and get info for updating routing options. + BidiWriteRedirectInfo redirect_info = + HandleBidiWriteRedirect(request, grpc_status); + + // Update RoutingHeaderOptions for the next attempt. + current_routing_options->routing_token = + redirect_info.routing_token; + } + return response; + }); }; return google::cloud::internal::AsyncRetryLoop( diff --git a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc index 85ce51a10f31d..bb6833b1526a7 100644 --- a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc @@ -21,6 +21,8 @@ #include "google/cloud/common_options.h" #include "google/cloud/grpc_options.h" #include "google/cloud/internal/background_threads_impl.h" +#include "google/cloud/internal/status_payload_keys.h" +#include "google/cloud/status.h" #include "google/cloud/testing_util/async_sequencer.h" #include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/mock_completion_queue_impl.h" @@ -70,35 +72,14 @@ std::shared_ptr MakeTestConnection( TestOptions(std::move(options))); } -// Creates a mock bidirectional stream that simulates a successful append flow. -std::unique_ptr MakeSuccessfulAppendStream( +// Creates a mock bidirectional stream with common expectations for append +// flows. +std::unique_ptr MakeCommonAppendStream( AsyncSequencer& sequencer, std::int64_t persisted_size) { auto stream = std::make_unique(); EXPECT_CALL(*stream, Start).WillOnce([&] { return sequencer.PushBack("Start"); }); - // The first write is a "state lookup" write. It should not contain a payload. - // The server responds with the current persisted size of the object. - EXPECT_CALL(*stream, Write) - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.state_lookup()); - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(StateLookup)"); - }) - // Subsequent writes carry data. - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&, - grpc::WriteOptions wopt) { - EXPECT_FALSE(wopt.is_last_message()); - return sequencer.PushBack("Write(data)"); - }) - // The finalize write marks the end of the stream. - .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, - grpc::WriteOptions wopt) { - EXPECT_TRUE(request.finish_write()); - EXPECT_TRUE(wopt.is_last_message()); - return sequencer.PushBack("Write(Finalize)"); - }); // The first `Read()` call after the state lookup confirms the persisted size. EXPECT_CALL(*stream, Read) @@ -129,8 +110,111 @@ std::unique_ptr MakeSuccessfulAppendStream( EXPECT_CALL(*stream, Finish).WillOnce([&] { return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); }); + return stream; +} - return std::unique_ptr(std::move(stream)); +std::unique_ptr MakeRedirectAppendStream( + AsyncSequencer& sequencer, std::int64_t persisted_size, + absl::string_view expected_handle, std::int64_t expected_generation, + absl::string_view expected_routing_token) { + auto stream = MakeCommonAppendStream(sequencer, persisted_size); + // The first write is a "state lookup" write. It should not contain a payload. + // The server responds with the current persisted size of the object. + EXPECT_CALL(*stream, Write) + .WillOnce([&sequencer, expected_handle, expected_generation, + expected_routing_token]( + google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.state_lookup()); + EXPECT_FALSE(wopt.is_last_message()); + EXPECT_TRUE(request.has_append_object_spec()); + EXPECT_EQ(request.append_object_spec().write_handle().handle(), + expected_handle); + EXPECT_EQ(request.append_object_spec().generation(), + expected_generation); + EXPECT_EQ(request.append_object_spec().routing_token(), + expected_routing_token); + return sequencer.PushBack("Write(StateLookup)"); + }) + // Subsequent writes carry data. + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&, + grpc::WriteOptions wopt) { + EXPECT_FALSE(wopt.is_last_message()); + return sequencer.PushBack("Write(data)"); + }) + // The finalize write marks the end of the stream. + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.finish_write()); + EXPECT_TRUE(wopt.is_last_message()); + return sequencer.PushBack("Write(Finalize)"); + }); + return stream; +} + +std::unique_ptr MakeRedirectAppendStreamNoHandle( + AsyncSequencer& sequencer, std::int64_t persisted_size, + absl::string_view expected_bucket, absl::string_view expected_object_name) { + auto stream = MakeCommonAppendStream(sequencer, persisted_size); + // The first write is a "state lookup" write. It should not contain a payload. + // The server responds with the current persisted size of the object. + EXPECT_CALL(*stream, Write) + .WillOnce([&sequencer, expected_bucket, expected_object_name]( + google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.state_lookup()); + EXPECT_FALSE(wopt.is_last_message()); + EXPECT_FALSE(request.has_append_object_spec()); + EXPECT_TRUE(request.has_write_object_spec()); + EXPECT_EQ(request.write_object_spec().resource().name(), + expected_object_name); + EXPECT_EQ(request.write_object_spec().resource().bucket(), + expected_bucket); + return sequencer.PushBack("Write(StateLookup)"); + }) + // Subsequent writes carry data. + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&, + grpc::WriteOptions wopt) { + EXPECT_FALSE(wopt.is_last_message()); + return sequencer.PushBack("Write(data)"); + }) + // The finalize write marks the end of the stream. + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.finish_write()); + EXPECT_TRUE(wopt.is_last_message()); + return sequencer.PushBack("Write(Finalize)"); + }); + return stream; +} + +// Creates a mock bidirectional stream that simulates a successful append flow. +std::unique_ptr MakeSuccessfulAppendStream( + AsyncSequencer& sequencer, std::int64_t persisted_size) { + auto stream = MakeCommonAppendStream(sequencer, persisted_size); + // The first write is a "state lookup" write. It should not contain a payload. + // The server responds with the current persisted size of the object. + EXPECT_CALL(*stream, Write) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.state_lookup()); + EXPECT_FALSE(wopt.is_last_message()); + return sequencer.PushBack("Write(StateLookup)"); + }) + // Subsequent writes carry data. + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&, + grpc::WriteOptions wopt) { + EXPECT_FALSE(wopt.is_last_message()); + return sequencer.PushBack("Write(data)"); + }) + // The finalize write marks the end of the stream. + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.finish_write()); + EXPECT_TRUE(wopt.is_last_message()); + return sequencer.PushBack("Write(Finalize)"); + }); + return stream; } // Creates a mock stream that returns an error. @@ -348,6 +432,201 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadPermanentError) { EXPECT_THAT(r, StatusIs(PermanentError().code())); } +TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirect) { + auto constexpr kRequestText = R"pb( + write_object_spec { + resource { + bucket: "projects/_/buckets/test-bucket" + name: "test-object" + content_type: "text/plain" + } + } + )pb"; + AsyncSequencer sequencer; + auto mock = std::make_shared(); + + google::rpc::Status rpc_status; + rpc_status.set_code(static_cast(StatusCode::kAborted)); + rpc_status.set_message("redirect"); + google::storage::v2::BidiWriteObjectRedirectedError redirect; + redirect.mutable_write_handle()->set_handle("redirect-handle"); + redirect.set_routing_token("redirect-token"); + redirect.set_generation(4321); + rpc_status.add_details()->PackFrom(redirect); + std::string rpc_status_payload; + ASSERT_TRUE(rpc_status.SerializeToString(&rpc_status_payload)); + Status status(StatusCode::kAborted, "redirect"); + internal::SetPayload(status, internal::StatusPayloadGrpcProto(), + rpc_status_payload); + + // Simulate one redirect failure, followed by a success. + EXPECT_CALL(*mock, AsyncBidiWriteObject) + .WillOnce([&] { return MakeErrorBidiWriteStream(sequencer, status); }) + .WillOnce([&] { + return MakeRedirectAppendStream(sequencer, 1024, "redirect-handle", + 4321, "redirect-token"); + }); + + internal::AutomaticallyCreatedBackgroundThreads pool(1); + auto connection = MakeTestConnection(pool.cq(), mock); + + auto request = google::storage::v2::BidiWriteObjectRequest{}; + ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); + auto pending = connection->StartAppendableObjectUpload( + {std::move(request), connection->options()}); + + // First attempt fails with redirect. + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(false); // The stream fails to start. + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); + + // Retry attempt succeeds. + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(StateLookup)"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read(PersistedSize)"); + next.first.set_value(true); + + auto r = pending.get(); + ASSERT_STATUS_OK(r); + auto writer = *std::move(r); + EXPECT_EQ(absl::get(writer->PersistedState()), 1024); + + // Write some data. + auto w1 = writer->Write(storage::WritePayload("some data")); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(data)"); + next.first.set_value(true); + EXPECT_STATUS_OK(w1.get()); + + // Finalize the upload. + auto w2 = writer->Finalize({}); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(Finalize)"); + next.first.set_value(true); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read(FinalObject)"); + next.first.set_value(true); + + auto response = w2.get(); + ASSERT_STATUS_OK(response); + EXPECT_EQ(response->bucket(), "projects/_/buckets/test-bucket"); + EXPECT_EQ(response->name(), "test-object"); + EXPECT_EQ(response->size(), 1024 + 1024); + + writer.reset(); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); +} + +TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) { + auto constexpr kRequestText = R"pb( + write_object_spec { + resource { + bucket: "projects/_/buckets/test-bucket" + name: "test-object" + content_type: "text/plain" + } + } + )pb"; + AsyncSequencer sequencer; + auto mock = std::make_shared(); + + google::rpc::Status rpc_status; + rpc_status.set_code(static_cast(StatusCode::kAborted)); + rpc_status.set_message("redirect"); + google::storage::v2::BidiWriteObjectRedirectedError redirect; + redirect.set_routing_token("redirect-token"); + redirect.set_generation(4321); + rpc_status.add_details()->PackFrom(redirect); + std::string rpc_status_payload; + ASSERT_TRUE(rpc_status.SerializeToString(&rpc_status_payload)); + Status status(StatusCode::kAborted, "redirect"); + internal::SetPayload(status, internal::StatusPayloadGrpcProto(), + rpc_status_payload); + + // Simulate one redirect failure, followed by a success. + EXPECT_CALL(*mock, AsyncBidiWriteObject) + .WillOnce([&] { return MakeErrorBidiWriteStream(sequencer, status); }) + .WillOnce([&] { + return MakeRedirectAppendStreamNoHandle( + sequencer, 1024, "projects/_/buckets/test-bucket", "test-object"); + }); + + internal::AutomaticallyCreatedBackgroundThreads pool(1); + auto connection = MakeTestConnection(pool.cq(), mock); + + auto request = google::storage::v2::BidiWriteObjectRequest{}; + ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); + auto pending = connection->StartAppendableObjectUpload( + {std::move(request), connection->options()}); + + // First attempt fails with redirect. + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(false); // The stream fails to start. + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); + + // Retry attempt succeeds. + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(StateLookup)"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read(PersistedSize)"); + next.first.set_value(true); + + auto r = pending.get(); + ASSERT_STATUS_OK(r); + auto writer = *std::move(r); + EXPECT_EQ(absl::get(writer->PersistedState()), 1024); + + // Write some data. + auto w1 = writer->Write(storage::WritePayload("some data")); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(data)"); + next.first.set_value(true); + EXPECT_STATUS_OK(w1.get()); + + // Finalize the upload. + auto w2 = writer->Finalize({}); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(Finalize)"); + next.first.set_value(true); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read(FinalObject)"); + next.first.set_value(true); + + auto response = w2.get(); + ASSERT_STATUS_OK(response); + EXPECT_EQ(response->bucket(), "projects/_/buckets/test-bucket"); + EXPECT_EQ(response->name(), "test-object"); + EXPECT_EQ(response->size(), 1024 + 1024); + + writer.reset(); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/handle_redirect_error.cc b/google/cloud/storage/internal/async/handle_redirect_error.cc index 1444a8bda6f19..d8fe1ad0684c4 100644 --- a/google/cloud/storage/internal/async/handle_redirect_error.cc +++ b/google/cloud/storage/internal/async/handle_redirect_error.cc @@ -20,27 +20,6 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -void EnsureFirstMessageAppendObjectSpec( - google::storage::v2::BidiWriteObjectRequest& request, - google::rpc::Status const& rpc_status) { - for (auto const& rpc_status_detail : rpc_status.details()) { - google::storage::v2::BidiWriteObjectRedirectedError error = - google::storage::v2::BidiWriteObjectRedirectedError{}; - if (!rpc_status_detail.UnpackTo(&error)) continue; - if (!error.has_write_handle()) continue; - if (request.has_write_object_spec()) { - auto spec = request.write_object_spec(); - auto& append_object_spec = *request.mutable_append_object_spec(); - append_object_spec.set_bucket(spec.resource().bucket()); - append_object_spec.set_object(spec.resource().name()); - append_object_spec.set_if_metageneration_match( - spec.if_metageneration_match()); - append_object_spec.set_if_metageneration_not_match( - spec.if_metageneration_not_match()); - } - } -} - google::rpc::Status ExtractGrpcStatus(Status const& status) { google::rpc::Status proto_status = google::rpc::Status{}; auto payload = google::cloud::internal::GetPayload( @@ -72,6 +51,59 @@ void ApplyWriteRedirectErrors(google::storage::v2::AppendObjectSpec& spec, } } +BidiWriteRedirectInfo HandleBidiWriteRedirect( + google::storage::v2::BidiWriteObjectRequest& request, + google::rpc::Status const& rpc_status) { + BidiWriteRedirectInfo info; + std::optional + redirect_error; + for (auto const& rpc_status_detail : rpc_status.details()) { + google::storage::v2::BidiWriteObjectRedirectedError error_proto; + if (rpc_status_detail.UnpackTo(&error_proto)) { + redirect_error = std::move(error_proto); + break; // Found the redirect error, no need to look further. + } + } + if (!redirect_error) { + return info; + } + + // We always extract the routing token if it's provided, as it's needed for + // the x-goog-request-params header in the next retry attempt. + if (!redirect_error->routing_token().empty()) { + info.routing_token = redirect_error->routing_token(); + } + if (!redirect_error->has_write_handle()) { + return info; + } + + // If we get back a write handle, we should use it. We can only use it + // on an append object spec. If we have a write object spec, we copy the + // relevant fields from write object spec to append object spec. + // If we have an append object spec, we copy the relevant fields from the + // error to the spec. + if (request.has_write_object_spec()) { + auto write_object_spec = request.write_object_spec(); + auto& append_object_spec = *request.mutable_append_object_spec(); + append_object_spec.set_bucket(write_object_spec.resource().bucket()); + append_object_spec.set_object(write_object_spec.resource().name()); + append_object_spec.set_if_metageneration_match( + write_object_spec.if_metageneration_match()); + append_object_spec.set_if_metageneration_not_match( + write_object_spec.if_metageneration_not_match()); + } + if (request.has_append_object_spec()) { + auto& append_object_spec = *request.mutable_append_object_spec(); + *append_object_spec.mutable_write_handle() = + std::move(*redirect_error->mutable_write_handle()); + *append_object_spec.mutable_routing_token() = + std::move(*redirect_error->mutable_routing_token()); + if (redirect_error->has_generation()) + append_object_spec.set_generation(redirect_error->generation()); + } + return info; +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud diff --git a/google/cloud/storage/internal/async/handle_redirect_error.h b/google/cloud/storage/internal/async/handle_redirect_error.h index 5cdf3b1b3d181..29d2a6025c3ec 100644 --- a/google/cloud/storage/internal/async/handle_redirect_error.h +++ b/google/cloud/storage/internal/async/handle_redirect_error.h @@ -25,9 +25,10 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN -void EnsureFirstMessageAppendObjectSpec( - google::storage::v2::BidiWriteObjectRequest& request, - google::rpc::Status const& rpc_status); +struct BidiWriteRedirectInfo { + // The routing token extracted from the redirect error, if available. + std::string routing_token; +}; google::rpc::Status ExtractGrpcStatus(Status const& status); @@ -37,6 +38,12 @@ void ApplyRedirectErrors(google::storage::v2::BidiReadObjectSpec& spec, void ApplyWriteRedirectErrors(google::storage::v2::AppendObjectSpec& spec, google::rpc::Status const& rpc_status); +// Handles BidiWriteObjectRedirectedError and modifies the request accordingly. +// Returns information needed for the next retry attempt. +BidiWriteRedirectInfo HandleBidiWriteRedirect( + google::storage::v2::BidiWriteObjectRequest& request, + google::rpc::Status const& rpc_status); + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud diff --git a/google/cloud/storage/internal/async/handle_redirect_error_test.cc b/google/cloud/storage/internal/async/handle_redirect_error_test.cc index d681cdf1e790c..b3ecca5bb5073 100644 --- a/google/cloud/storage/internal/async/handle_redirect_error_test.cc +++ b/google/cloud/storage/internal/async/handle_redirect_error_test.cc @@ -70,58 +70,150 @@ TEST(ApplyRedirectErrors, NoRedirect) { EXPECT_TRUE(spec.routing_token().empty()); } -TEST(EnsureFirstMessageAppendObjectSpec, Success) { +TEST(ApplyWriteRedirectErrors, NoRedirect) { + google::storage::v2::AppendObjectSpec spec; + spec.set_bucket("projects/_/buckets/test-bucket"); + spec.set_object("test-object"); + google::rpc::Status rpc_status; + rpc_status.set_code(static_cast(StatusCode::kNotFound)); + rpc_status.set_message("test-message"); + + ApplyWriteRedirectErrors(spec, rpc_status); + EXPECT_EQ(spec.bucket(), "projects/_/buckets/test-bucket"); + EXPECT_EQ(spec.object(), "test-object"); + EXPECT_FALSE(spec.has_write_handle()); + EXPECT_TRUE(spec.routing_token().empty()); + EXPECT_EQ(spec.generation(), 0); +} + +TEST(ApplyWriteRedirectErrors, Success) { + google::storage::v2::AppendObjectSpec spec; + spec.set_bucket("projects/_/buckets/test-bucket"); + spec.set_object("test-object"); + google::rpc::Status rpc_status; + google::storage::v2::BidiWriteObjectRedirectedError redirect; + redirect.mutable_write_handle()->set_handle("test-handle"); + redirect.set_routing_token("test-token"); + redirect.set_generation(1234); + rpc_status.add_details()->PackFrom(redirect); + + ApplyWriteRedirectErrors(spec, rpc_status); + EXPECT_EQ(spec.bucket(), "projects/_/buckets/test-bucket"); + EXPECT_EQ(spec.object(), "test-object"); + EXPECT_EQ(spec.write_handle().handle(), "test-handle"); + EXPECT_EQ(spec.routing_token(), "test-token"); + EXPECT_EQ(spec.generation(), 1234); +} + +TEST(HandleBidiWriteRedirect, NoRedirect) { + google::storage::v2::BidiWriteObjectRequest request; + ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString( + R"pb( + write_object_spec { + resource { bucket: "projects/_/buckets/b", name: "o" } + } + )pb", + &request)); + + google::rpc::Status rpc_status; + rpc_status.set_code(static_cast(StatusCode::kNotFound)); + rpc_status.set_message("test-message"); + + auto info = HandleBidiWriteRedirect(request, rpc_status); + EXPECT_TRUE(info.routing_token.empty()); + EXPECT_TRUE(request.has_write_object_spec()); + EXPECT_FALSE(request.has_append_object_spec()); +} + +TEST(HandleBidiWriteRedirect, NoWriteHandle) { google::storage::v2::BidiWriteObjectRequest request; ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString( R"pb( write_object_spec { resource { bucket: "projects/_/buckets/b", name: "o" } - if_metageneration_match: 1 - if_metageneration_not_match: 1 } )pb", &request)); google::rpc::Status rpc_status; google::storage::v2::BidiWriteObjectRedirectedError redirect; - redirect.mutable_write_handle(); + redirect.set_routing_token("test-token"); rpc_status.add_details()->PackFrom(redirect); - EnsureFirstMessageAppendObjectSpec(request, rpc_status); + auto info = HandleBidiWriteRedirect(request, rpc_status); + EXPECT_EQ(info.routing_token, "test-token"); + EXPECT_TRUE(request.has_write_object_spec()); + EXPECT_FALSE(request.has_append_object_spec()); +} +TEST(HandleBidiWriteRedirect, WithWriteHandleForWriteObjectSpec) { + google::storage::v2::BidiWriteObjectRequest request; + ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString( + R"pb( + write_object_spec { + resource { bucket: "projects/_/buckets/b", name: "o" } + if_metageneration_match: 1 + if_metageneration_not_match: 2 + } + )pb", + &request)); + + google::rpc::Status rpc_status; + google::storage::v2::BidiWriteObjectRedirectedError redirect; + redirect.mutable_write_handle()->set_handle("test-handle"); + redirect.set_routing_token("test-token"); + redirect.set_generation(1234); + rpc_status.add_details()->PackFrom(redirect); + + auto info = HandleBidiWriteRedirect(request, rpc_status); + EXPECT_EQ(info.routing_token, "test-token"); EXPECT_FALSE(request.has_write_object_spec()); EXPECT_TRUE(request.has_append_object_spec()); - auto const& append_spec = request.append_object_spec(); - EXPECT_EQ(append_spec.bucket(), "projects/_/buckets/b"); - EXPECT_EQ(append_spec.object(), "o"); - - EXPECT_FALSE(append_spec.has_write_handle()); - EXPECT_TRUE(append_spec.routing_token().empty()); - EXPECT_EQ(append_spec.if_metageneration_match(), 1); - EXPECT_EQ(append_spec.if_metageneration_not_match(), 1); - EXPECT_EQ(append_spec.generation(), 0); + auto const& spec = request.append_object_spec(); + EXPECT_EQ(spec.bucket(), "projects/_/buckets/b"); + EXPECT_EQ(spec.object(), "o"); + EXPECT_EQ(spec.if_metageneration_match(), 1); + EXPECT_EQ(spec.if_metageneration_not_match(), 2); + EXPECT_EQ(spec.write_handle().handle(), "test-handle"); + EXPECT_EQ(spec.routing_token(), "test-token"); + EXPECT_EQ(spec.generation(), 1234); } -TEST(EnsureFirstMessageAppendObjectSpec, WriteHandleIsNotSet) { +TEST(HandleBidiWriteRedirect, WithWriteHandleForAppendObjectSpec) { google::storage::v2::BidiWriteObjectRequest request; ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString( R"pb( - write_object_spec { - resource { bucket: "projects/_/buckets/b", name: "o" } + append_object_spec { + bucket: "projects/_/buckets/b" + object: "o" + generation: 123 + if_metageneration_match: 1 + if_metageneration_not_match: 2 } )pb", &request)); google::rpc::Status rpc_status; google::storage::v2::BidiWriteObjectRedirectedError redirect; + redirect.mutable_write_handle()->set_handle("test-handle"); + redirect.set_routing_token("test-token"); redirect.set_generation(1234); rpc_status.add_details()->PackFrom(redirect); - EnsureFirstMessageAppendObjectSpec(request, rpc_status); + auto info = HandleBidiWriteRedirect(request, rpc_status); + EXPECT_EQ(info.routing_token, "test-token"); + EXPECT_FALSE(request.has_write_object_spec()); + EXPECT_TRUE(request.has_append_object_spec()); - EXPECT_TRUE(request.has_write_object_spec()); - EXPECT_FALSE(request.has_append_object_spec()); + auto const& spec = request.append_object_spec(); + EXPECT_EQ(spec.bucket(), "projects/_/buckets/b"); + EXPECT_EQ(spec.object(), "o"); + EXPECT_EQ(spec.if_metageneration_match(), 1); + EXPECT_EQ(spec.if_metageneration_not_match(), 2); + EXPECT_EQ(spec.write_handle().handle(), "test-handle"); + EXPECT_EQ(spec.routing_token(), "test-token"); + EXPECT_EQ(spec.generation(), 1234); } } // namespace diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index cbc4087169fbb..8e579cfe64cd7 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -237,9 +237,7 @@ future> AsyncWriterConnectionImpl::OnQuery( .then([this](auto g) { auto result = g.get(); google::rpc::Status grpc_status = ExtractGrpcStatus(result); - EnsureFirstMessageAppendObjectSpec(request_, grpc_status); - ApplyWriteRedirectErrors(*request_.mutable_append_object_spec(), - grpc_status); + HandleBidiWriteRedirect(request_, grpc_status); return StatusOr(std::move(result)); }); } diff --git a/google/cloud/storage/internal/async/writer_connection_impl_test.cc b/google/cloud/storage/internal/async/writer_connection_impl_test.cc index 037954d4a153a..caaa3d4ae8d1c 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl_test.cc @@ -21,6 +21,8 @@ #include "google/cloud/storage/options.h" #include "google/cloud/storage/testing/canonical_errors.h" #include "google/cloud/storage/testing/mock_hash_function.h" +#include "google/cloud/internal/status_payload_keys.h" +#include "google/cloud/status.h" #include "google/cloud/testing_util/async_sequencer.h" #include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/status_matchers.h" @@ -649,6 +651,49 @@ TEST(AsyncWriterConnectionTest, UnexpectedQueryFailsWithoutError) { EXPECT_THAT(query.get(), StatusIs(StatusCode::kInternal)); } +TEST(AsyncWriterConnectionTest, QueryFailsWithRedirect) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Cancel).Times(1); + EXPECT_CALL(*mock, Read).WillOnce([&]() { + return sequencer.PushBack("Read").then( + [](auto) { return absl::optional(); }); + }); + + google::rpc::Status rpc_status; + rpc_status.set_code(static_cast(StatusCode::kAborted)); + rpc_status.set_message("redirect"); + google::storage::v2::BidiWriteObjectRedirectedError redirect; + redirect.mutable_write_handle()->set_handle("redirect-handle"); + redirect.set_routing_token("redirect-token"); + redirect.set_generation(4321); + rpc_status.add_details()->PackFrom(redirect); + std::string rpc_status_payload; + ASSERT_TRUE(rpc_status.SerializeToString(&rpc_status_payload)); + Status status(StatusCode::kAborted, "redirect"); + internal::SetPayload(status, internal::StatusPayloadGrpcProto(), + rpc_status_payload); + + EXPECT_CALL(*mock, Finish).WillOnce([&, status] { + return sequencer.PushBack("Finish").then([s = status](auto f) -> Status { + if (f.get()) return Status{}; + return s; + }); + }); + auto hash = std::make_shared(); + + auto tested = std::make_unique( + TestOptions(), MakeRequest(), std::move(mock), hash, 1024); + auto query = tested->Query(); + auto next = sequencer.PopFrontWithName(); + ASSERT_THAT(next.second, "Read"); + next.first.set_value(false); // Detect error from Read() + next = sequencer.PopFrontWithName(); + ASSERT_THAT(next.second, "Finish"); + next.first.set_value(false); // Return error from Finish() + EXPECT_THAT(query.get(), StatusIs(StatusCode::kAborted)); +} + TEST(AsyncWriterConnectionTest, FinalizeAppendableNoChecksum) { AsyncSequencer sequencer; auto mock = std::make_unique(); diff --git a/google/cloud/storage/internal/grpc/configure_client_context.cc b/google/cloud/storage/internal/grpc/configure_client_context.cc index ff716764e8ece..fd6163d30fc30 100644 --- a/google/cloud/storage/internal/grpc/configure_client_context.cc +++ b/google/cloud/storage/internal/grpc/configure_client_context.cc @@ -44,17 +44,27 @@ void ApplyRoutingHeaders( } void ApplyRoutingHeaders(grpc::ClientContext& context, - google::storage::v2::WriteObjectSpec const& spec) { - context.AddMetadata( - "x-goog-request-params", - "bucket=" + google::cloud::internal::UrlEncode(spec.resource().bucket())); + google::storage::v2::WriteObjectSpec const& spec, + RoutingHeaderOptions const& options) { + std::string params = + "bucket=" + google::cloud::internal::UrlEncode(spec.resource().bucket()); + if (!options.routing_token.empty()) { + params += "&routing_token=" + + google::cloud::internal::UrlEncode(options.routing_token); + } + context.AddMetadata("x-goog-request-params", params); } void ApplyRoutingHeaders(grpc::ClientContext& context, - google::storage::v2::AppendObjectSpec const& spec) { - context.AddMetadata( - "x-goog-request-params", - "bucket=" + google::cloud::internal::UrlEncode(spec.bucket())); + google::storage::v2::AppendObjectSpec const& spec, + RoutingHeaderOptions const& options) { + std::string params = + "bucket=" + google::cloud::internal::UrlEncode(spec.bucket()); + if (!options.routing_token.empty()) { + params += "&routing_token=" + + google::cloud::internal::UrlEncode(options.routing_token); + } + context.AddMetadata("x-goog-request-params", params); } void ApplyRoutingHeaders(grpc::ClientContext& context, diff --git a/google/cloud/storage/internal/grpc/configure_client_context.h b/google/cloud/storage/internal/grpc/configure_client_context.h index 7c898fbe9bbd6..bbc4a4c80d226 100644 --- a/google/cloud/storage/internal/grpc/configure_client_context.h +++ b/google/cloud/storage/internal/grpc/configure_client_context.h @@ -28,6 +28,11 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +struct RoutingHeaderOptions { + // The routing token to be included in x-goog-request-params. + std::string routing_token; +}; + /// Configures @p ctx using @p context. void AddIdempotencyToken(grpc::ClientContext& ctx, rest_internal::RestContext const& context); @@ -77,11 +82,13 @@ void ApplyRoutingHeaders( /// @copydoc ApplyRoutingHeaders(grpc::ClientContext&,) void ApplyRoutingHeaders(grpc::ClientContext& context, - google::storage::v2::WriteObjectSpec const& spec); + google::storage::v2::WriteObjectSpec const& spec, + RoutingHeaderOptions const& options = {}); /// @copydoc ApplyRoutingHeaders(grpc::ClientContext&,) void ApplyRoutingHeaders(grpc::ClientContext& context, - google::storage::v2::AppendObjectSpec const& spec); + google::storage::v2::AppendObjectSpec const& spec, + RoutingHeaderOptions const& options = {}); /** * The generated `StorageMetadata` stub can not handle dynamic routing headers diff --git a/google/cloud/storage/internal/grpc/configure_client_context_test.cc b/google/cloud/storage/internal/grpc/configure_client_context_test.cc index 550f956372e69..7e18790de74d3 100644 --- a/google/cloud/storage/internal/grpc/configure_client_context_test.cc +++ b/google/cloud/storage/internal/grpc/configure_client_context_test.cc @@ -156,6 +156,20 @@ TEST_F(GrpcConfigureClientContext, ApplyRoutingHeadersInsertObject) { "bucket=projects%2F_%2Fbuckets%2Ftest-bucket"))); } +TEST_F(GrpcConfigureClientContext, + ApplyRoutingHeadersInsertObjectWithRoutingToken) { + auto spec = google::storage::v2::WriteObjectSpec{}; + spec.mutable_resource()->set_bucket("projects/_/buckets/test-bucket"); + + grpc::ClientContext context; + ApplyRoutingHeaders(context, spec, {"test-token"}); + auto metadata = GetMetadata(context); + EXPECT_THAT(metadata, + Contains(Pair("x-goog-request-params", + "bucket=projects%2F_%2Fbuckets%2Ftest-bucket&" + "routing_token=test-token"))); +} + TEST_F(GrpcConfigureClientContext, ApplyRoutingHeadersUploadChunkMatchSlash) { storage::internal::UploadChunkRequest req( "projects/_/buckets/test-bucket/blah/blah", 0, {}, @@ -222,6 +236,20 @@ TEST_F(GrpcConfigureClientContext, ApplyRoutingHeadersAppendObject) { "bucket=projects%2F_%2Fbuckets%2Ftest-bucket"))); } +TEST_F(GrpcConfigureClientContext, + ApplyRoutingHeadersAppendObjectWithRoutingToken) { + auto spec = google::storage::v2::AppendObjectSpec{}; + spec.set_bucket("projects/_/buckets/test-bucket"); + + grpc::ClientContext context; + ApplyRoutingHeaders(context, spec, {"test-token"}); + auto metadata = GetMetadata(context); + EXPECT_THAT(metadata, + Contains(Pair("x-goog-request-params", + "bucket=projects%2F_%2Fbuckets%2Ftest-bucket&" + "routing_token=test-token"))); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal From 0d5a3d0dff20eead0662694b2e2b7ff5f804d358 Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Thu, 9 Apr 2026 04:13:28 +0000 Subject: [PATCH 2/3] fix(storage): Address feedback on BidiWriteObject error handling --- .../internal/grpc/configure_client_context.cc | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/google/cloud/storage/internal/grpc/configure_client_context.cc b/google/cloud/storage/internal/grpc/configure_client_context.cc index fd6163d30fc30..2d77f99dced76 100644 --- a/google/cloud/storage/internal/grpc/configure_client_context.cc +++ b/google/cloud/storage/internal/grpc/configure_client_context.cc @@ -20,6 +20,19 @@ namespace google { namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { +void ApplyRoutingHeadersImpl(grpc::ClientContext& context, + std::string const& bucket_name, + RoutingHeaderOptions const& options) { + std::string params = + "bucket=" + google::cloud::internal::UrlEncode(bucket_name); + if (!options.routing_token.empty()) { + params += "&routing_token=" + + google::cloud::internal::UrlEncode(options.routing_token); + } + context.AddMetadata("x-goog-request-params", std::move(params)); +} +} // namespace auto constexpr kIdempotencyTokenHeader = "x-goog-gcs-idempotency-token"; @@ -46,25 +59,13 @@ void ApplyRoutingHeaders( void ApplyRoutingHeaders(grpc::ClientContext& context, google::storage::v2::WriteObjectSpec const& spec, RoutingHeaderOptions const& options) { - std::string params = - "bucket=" + google::cloud::internal::UrlEncode(spec.resource().bucket()); - if (!options.routing_token.empty()) { - params += "&routing_token=" + - google::cloud::internal::UrlEncode(options.routing_token); - } - context.AddMetadata("x-goog-request-params", params); + ApplyRoutingHeadersImpl(context, spec.resource().bucket(), options); } void ApplyRoutingHeaders(grpc::ClientContext& context, google::storage::v2::AppendObjectSpec const& spec, RoutingHeaderOptions const& options) { - std::string params = - "bucket=" + google::cloud::internal::UrlEncode(spec.bucket()); - if (!options.routing_token.empty()) { - params += "&routing_token=" + - google::cloud::internal::UrlEncode(options.routing_token); - } - context.AddMetadata("x-goog-request-params", params); + ApplyRoutingHeadersImpl(context, spec.bucket(), options); } void ApplyRoutingHeaders(grpc::ClientContext& context, From d504b389e45e229d1dbcbf7c9ece753d09388968 Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Thu, 9 Apr 2026 08:51:21 +0000 Subject: [PATCH 3/3] fix(storage): Address feedback on BidiWriteObject error handling --- .../cloud/storage/internal/async/connection_impl.cc | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index c32b9d8ec60ce..409a45fbf1f35 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -377,9 +377,16 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { BidiWriteRedirectInfo redirect_info = HandleBidiWriteRedirect(request, grpc_status); - // Update RoutingHeaderOptions for the next attempt. - current_routing_options->routing_token = - redirect_info.routing_token; + // Only update the routing token if the new info has a + // non-empty token. + // Otherwise, retain the existing token for subsequent + // retries. + if (!redirect_info.routing_token.empty() && + current_routing_options->routing_token != + redirect_info.routing_token) { + current_routing_options->routing_token = + redirect_info.routing_token; + } } return response; });