From 003945f1dacf844cdd12af3a6f178af652fe710e Mon Sep 17 00:00:00 2001 From: kai lin Date: Wed, 17 Jun 2026 16:55:12 -0400 Subject: [PATCH] Honor requestTimeoutMs for CRT event stream writes When requestTimeoutMs > 0, compute a deadline at stream creation and use wait_until in SendBuffer(). If the deadline passes, the write fails immediately instead of blocking indefinitely when network drops. When requestTimeoutMs = 0, behavior is unchanged (infinite wait). --- .../TranscribeStreamingServiceClient.cpp | 12 +++++++---- .../utils/stream/HttpWriteDataStreamBuf.h | 5 ++++- .../utils/stream/HttpWriteDataStreamBuf.cpp | 21 +++++++++++++++++-- .../JsonServiceEventStreamOperationsSource.vm | 2 +- ...yJsonServiceEventStreamOperationsSource.vm | 2 +- 5 files changed, 33 insertions(+), 9 deletions(-) diff --git a/generated/src/aws-cpp-sdk-transcribestreaming/source/TranscribeStreamingServiceClient.cpp b/generated/src/aws-cpp-sdk-transcribestreaming/source/TranscribeStreamingServiceClient.cpp index 8917477057c..7ade3c14b22 100644 --- a/generated/src/aws-cpp-sdk-transcribestreaming/source/TranscribeStreamingServiceClient.cpp +++ b/generated/src/aws-cpp-sdk-transcribestreaming/source/TranscribeStreamingServiceClient.cpp @@ -261,7 +261,8 @@ void TranscribeStreamingServiceClient::StartCallAnalyticsStreamTranscriptionAsyn #if AWS_SDK_USE_CRT_HTTP // Push-based WriteData path (CRT HTTP client only) - auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient()); + auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient(), 8 * 1024, + m_clientConfiguration.requestTimeoutMs); auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER); auto eventEncoderStream = Aws::MakeShared(ALLOCATION_TAG, writeDataStreamBuf); @@ -361,7 +362,8 @@ void TranscribeStreamingServiceClient::StartMedicalScribeStreamAsync( #if AWS_SDK_USE_CRT_HTTP // Push-based WriteData path (CRT HTTP client only) - auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient()); + auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient(), 8 * 1024, + m_clientConfiguration.requestTimeoutMs); auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER); auto eventEncoderStream = Aws::MakeShared(ALLOCATION_TAG, writeDataStreamBuf); @@ -477,7 +479,8 @@ void TranscribeStreamingServiceClient::StartMedicalStreamTranscriptionAsync( #if AWS_SDK_USE_CRT_HTTP // Push-based WriteData path (CRT HTTP client only) - auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient()); + auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient(), 8 * 1024, + m_clientConfiguration.requestTimeoutMs); auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER); auto eventEncoderStream = Aws::MakeShared(ALLOCATION_TAG, writeDataStreamBuf); @@ -569,7 +572,8 @@ void TranscribeStreamingServiceClient::StartStreamTranscriptionAsync( #if AWS_SDK_USE_CRT_HTTP // Push-based WriteData path (CRT HTTP client only) - auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient()); + auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient(), 8 * 1024, + m_clientConfiguration.requestTimeoutMs); auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER); auto eventEncoderStream = Aws::MakeShared(ALLOCATION_TAG, writeDataStreamBuf); diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/stream/HttpWriteDataStreamBuf.h b/src/aws-cpp-sdk-core/include/aws/core/utils/stream/HttpWriteDataStreamBuf.h index d76c463db6d..f3b47f59cea 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/stream/HttpWriteDataStreamBuf.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/stream/HttpWriteDataStreamBuf.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -35,7 +36,7 @@ namespace Stream { */ class AWS_CORE_API HttpWriteDataStreamBuf : public std::streambuf { public: - explicit HttpWriteDataStreamBuf(const std::shared_ptr& client, size_t bufferLength = 8 * 1024); + explicit HttpWriteDataStreamBuf(const std::shared_ptr& client, size_t bufferLength = 8 * 1024, size_t requestTimeoutMs = 0); HttpWriteDataStreamBuf(const HttpWriteDataStreamBuf& other) = delete; HttpWriteDataStreamBuf(HttpWriteDataStreamBuf&& other) noexcept = delete; HttpWriteDataStreamBuf& operator=(const HttpWriteDataStreamBuf& other) = delete; @@ -102,6 +103,8 @@ class AWS_CORE_API HttpWriteDataStreamBuf : public std::streambuf { std::condition_variable m_writeComplete; bool m_writeInProgress{false}; bool m_writeError{false}; + bool m_hasDeadline{false}; + std::chrono::steady_clock::time_point m_deadline; // State management enum class STATE { diff --git a/src/aws-cpp-sdk-core/source/utils/stream/HttpWriteDataStreamBuf.cpp b/src/aws-cpp-sdk-core/source/utils/stream/HttpWriteDataStreamBuf.cpp index c04849cc2ee..0c808da0cd1 100644 --- a/src/aws-cpp-sdk-core/source/utils/stream/HttpWriteDataStreamBuf.cpp +++ b/src/aws-cpp-sdk-core/source/utils/stream/HttpWriteDataStreamBuf.cpp @@ -5,6 +5,7 @@ #include #include +#include #include namespace { @@ -12,8 +13,13 @@ const char* WRITE_DATA_BUF_LOG_NAME = "HttpWriteDataStreamBuf"; } Aws::Utils::Stream::HttpWriteDataStreamBuf::HttpWriteDataStreamBuf(const std::shared_ptr& client, - size_t bufferLength) + size_t bufferLength, + size_t requestTimeoutMs) : m_client{client}, m_buffer{bufferLength} { + if (requestTimeoutMs > 0) { + m_hasDeadline = true; + m_deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(requestTimeoutMs); + } ResetPutArea(); } @@ -163,7 +169,18 @@ bool Aws::Utils::Stream::HttpWriteDataStreamBuf::SendBuffer(bool endStream) { endStream); std::unique_lock lock{m_writeMutex}; - m_writeComplete.wait(lock, [this]() -> bool { return !m_writeInProgress; }); + if (m_hasDeadline) { + bool completed = m_writeComplete.wait_until(lock, m_deadline, + [this]() -> bool { return !m_writeInProgress; }); + if (!completed) { + m_writeError = true; + m_writeInProgress = false; + ResetPutArea(); + return false; + } + } else { + m_writeComplete.wait(lock, [this]() -> bool { return !m_writeInProgress; }); + } ResetPutArea(); diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceEventStreamOperationsSource.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceEventStreamOperationsSource.vm index a71775efd9e..e9272d24db2 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceEventStreamOperationsSource.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/JsonServiceEventStreamOperationsSource.vm @@ -27,7 +27,7 @@ void ${className}::${operation.name}Async(Model::${operation.request.shape.name} #set($streamModelNameWithFirstLetterCapitalized = $CppViewHelper.capitalizeFirstChar($streamModelName)) \#if AWS_SDK_USE_CRT_HTTP // Push-based WriteData path (CRT HTTP client only) - auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient()); + auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, GetHttpClient(), 8 * 1024, m_clientConfiguration.requestTimeoutMs); auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER); auto eventEncoderStream = Aws::MakeShared(ALLOCATION_TAG, writeDataStreamBuf); diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/smithy/SmithyJsonServiceEventStreamOperationsSource.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/smithy/SmithyJsonServiceEventStreamOperationsSource.vm index 7ece21874a2..a4732f67f9f 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/smithy/SmithyJsonServiceEventStreamOperationsSource.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/smithy/SmithyJsonServiceEventStreamOperationsSource.vm @@ -29,7 +29,7 @@ void ${className}::${operation.name}Async(Model::${operation.request.shape.name} #set($streamModelNameWithFirstLetterCapitalized = $CppViewHelper.capitalizeFirstChar($streamModelName)) \#if AWS_SDK_USE_CRT_HTTP // Push-based WriteData path (CRT HTTP client only) - auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, m_httpClient); + auto writeDataStreamBuf = Aws::MakeShared(ALLOCATION_TAG, m_httpClient, 8 * 1024, m_clientConfig->requestTimeoutMs); auto eventEncoderStream = Aws::MakeShared(ALLOCATION_TAG, writeDataStreamBuf); request.Set${streamModelNameWithFirstLetterCapitalized}(eventEncoderStream);