Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ void TranscribeStreamingServiceClient::StartCallAnalyticsStreamTranscriptionAsyn

#if AWS_SDK_USE_CRT_HTTP
// Push-based WriteData path (CRT HTTP client only)
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient());
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient(), 8 * 1024,
m_clientConfiguration.requestTimeoutMs);
auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER);

auto eventEncoderStream = Aws::MakeShared<Model::AudioStream>(ALLOCATION_TAG, writeDataStreamBuf);
Expand Down Expand Up @@ -361,7 +362,8 @@ void TranscribeStreamingServiceClient::StartMedicalScribeStreamAsync(

#if AWS_SDK_USE_CRT_HTTP
// Push-based WriteData path (CRT HTTP client only)
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient());
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient(), 8 * 1024,
m_clientConfiguration.requestTimeoutMs);
auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER);

auto eventEncoderStream = Aws::MakeShared<Model::MedicalScribeInputStream>(ALLOCATION_TAG, writeDataStreamBuf);
Expand Down Expand Up @@ -477,7 +479,8 @@ void TranscribeStreamingServiceClient::StartMedicalStreamTranscriptionAsync(

#if AWS_SDK_USE_CRT_HTTP
// Push-based WriteData path (CRT HTTP client only)
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient());
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient(), 8 * 1024,
m_clientConfiguration.requestTimeoutMs);
auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER);

auto eventEncoderStream = Aws::MakeShared<Model::AudioStream>(ALLOCATION_TAG, writeDataStreamBuf);
Expand Down Expand Up @@ -569,7 +572,8 @@ void TranscribeStreamingServiceClient::StartStreamTranscriptionAsync(

#if AWS_SDK_USE_CRT_HTTP
// Push-based WriteData path (CRT HTTP client only)
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient());
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient(), 8 * 1024,
m_clientConfiguration.requestTimeoutMs);
auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER);

auto eventEncoderStream = Aws::MakeShared<Model::AudioStream>(ALLOCATION_TAG, writeDataStreamBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <aws/core/utils/Array.h>
#include <aws/crt/Types.h>

#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
Expand All @@ -35,7 +36,7 @@ namespace Stream {
*/
class AWS_CORE_API HttpWriteDataStreamBuf : public std::streambuf {
public:
explicit HttpWriteDataStreamBuf(const std::shared_ptr<Aws::Http::HttpClient>& client, size_t bufferLength = 8 * 1024);
explicit HttpWriteDataStreamBuf(const std::shared_ptr<Aws::Http::HttpClient>& client, size_t bufferLength = 8 * 1024, size_t requestTimeoutMs = 0);
HttpWriteDataStreamBuf(const HttpWriteDataStreamBuf& other) = delete;
HttpWriteDataStreamBuf(HttpWriteDataStreamBuf&& other) noexcept = delete;
HttpWriteDataStreamBuf& operator=(const HttpWriteDataStreamBuf& other) = delete;
Expand Down Expand Up @@ -102,6 +103,8 @@ class AWS_CORE_API HttpWriteDataStreamBuf : public std::streambuf {
std::condition_variable m_writeComplete;
bool m_writeInProgress{false};
bool m_writeError{false};
bool m_hasDeadline{false};
std::chrono::steady_clock::time_point m_deadline;

// State management
enum class STATE {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@
#include <aws/core/http/HttpClient.h>
#include <aws/core/utils/stream/HttpWriteDataStreamBuf.h>

#include <chrono>
#include <utility>

namespace {
const char* WRITE_DATA_BUF_LOG_NAME = "HttpWriteDataStreamBuf";
}

Aws::Utils::Stream::HttpWriteDataStreamBuf::HttpWriteDataStreamBuf(const std::shared_ptr<Aws::Http::HttpClient>& client,
size_t bufferLength)
size_t bufferLength,
size_t requestTimeoutMs)
: m_client{client}, m_buffer{bufferLength} {
if (requestTimeoutMs > 0) {
m_hasDeadline = true;
m_deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(requestTimeoutMs);
}
ResetPutArea();
}

Expand Down Expand Up @@ -163,7 +169,18 @@ bool Aws::Utils::Stream::HttpWriteDataStreamBuf::SendBuffer(bool endStream) {
endStream);

std::unique_lock<std::mutex> lock{m_writeMutex};
m_writeComplete.wait(lock, [this]() -> bool { return !m_writeInProgress; });
if (m_hasDeadline) {
bool completed = m_writeComplete.wait_until(lock, m_deadline,
[this]() -> bool { return !m_writeInProgress; });
if (!completed) {
m_writeError = true;
m_writeInProgress = false;
ResetPutArea();
return false;
}
} else {
m_writeComplete.wait(lock, [this]() -> bool { return !m_writeInProgress; });
}

ResetPutArea();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void ${className}::${operation.name}Async(Model::${operation.request.shape.name}
#set($streamModelNameWithFirstLetterCapitalized = $CppViewHelper.capitalizeFirstChar($streamModelName))
\#if AWS_SDK_USE_CRT_HTTP
// Push-based WriteData path (CRT HTTP client only)
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient());
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, GetHttpClient(), 8 * 1024, m_clientConfiguration.requestTimeoutMs);
auto signer = GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER);

auto eventEncoderStream = Aws::MakeShared<Model::${streamModelType}>(ALLOCATION_TAG, writeDataStreamBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void ${className}::${operation.name}Async(Model::${operation.request.shape.name}
#set($streamModelNameWithFirstLetterCapitalized = $CppViewHelper.capitalizeFirstChar($streamModelName))
\#if AWS_SDK_USE_CRT_HTTP
// Push-based WriteData path (CRT HTTP client only)
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, m_httpClient);
auto writeDataStreamBuf = Aws::MakeShared<Aws::Utils::Stream::HttpWriteDataStreamBuf>(ALLOCATION_TAG, m_httpClient, 8 * 1024, m_clientConfig->requestTimeoutMs);
auto eventEncoderStream = Aws::MakeShared<Model::${streamModelType}>(ALLOCATION_TAG, writeDataStreamBuf);
request.Set${streamModelNameWithFirstLetterCapitalized}(eventEncoderStream);

Expand Down
Loading