From 49e9ecb128a52f4ab10504c4dcf16055f3585b98 Mon Sep 17 00:00:00 2001 From: Andrey Kashcheev Date: Wed, 8 Jan 2025 18:39:08 +0100 Subject: [PATCH] Check write buffer on stream closed check Helps to avoid "empty JSON" errors when stream is being reset Relates-To: DATASDK-57 Signed-off-by: Andrey Kashcheev --- .../src/repositories/AsyncJsonStream.cpp | 9 ++++++--- .../tests/PartitionsRepositoryTest.cpp | 10 ++++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp b/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp index 2d9993317..959a01013 100644 --- a/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp +++ b/olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp @@ -50,7 +50,10 @@ size_t RapidJsonByteStream::PutEnd(char*) { return 0; } bool RapidJsonByteStream::ReadEmpty() const { return count_ == read_buffer_.size(); } -bool RapidJsonByteStream::WriteEmpty() const { return write_buffer_.empty(); } +bool RapidJsonByteStream::WriteEmpty() const { + std::unique_lock lock(mutex_); + return write_buffer_.empty(); +} void RapidJsonByteStream::AppendContent(const char* content, size_t length) { std::unique_lock lock(mutex_); @@ -64,7 +67,7 @@ void RapidJsonByteStream::AppendContent(const char* content, size_t length) { void RapidJsonByteStream::SwapBuffers() { std::unique_lock lock(mutex_); - cv_.wait(lock, [&]() { return !WriteEmpty(); }); + cv_.wait(lock, [&]() { return !write_buffer_.empty(); }); std::swap(read_buffer_, write_buffer_); write_buffer_.clear(); count_ = 0; @@ -114,7 +117,7 @@ boost::optional AsyncJsonStream::GetError() const { bool AsyncJsonStream::IsClosed() const { std::unique_lock lock(mutex_); - return closed_; + return closed_ && (error_ || current_stream_->WriteEmpty()); } } // namespace repository diff --git a/olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp b/olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp index c1d170c70..48a29f23b 100644 --- a/olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp +++ b/olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp @@ -2076,10 +2076,13 @@ TEST_F(PartitionsRepositoryTest, StreamPartitions) { repository.StreamPartitions(async_stream, kVersion, additional_fields, billing_tag, context); - EXPECT_TRUE(async_stream->IsClosed()); + // Not closed since the stream is not empty + EXPECT_FALSE(async_stream->IsClosed()); EXPECT_FALSE(async_stream->GetError()); EXPECT_STREQ(ref_stream_data.c_str(), get_stream_content(*async_stream).c_str()); + // Now it's closed as we read all its content + EXPECT_TRUE(async_stream->IsClosed()); { SCOPED_TRACE("Data with offset is in the stream"); @@ -2091,10 +2094,13 @@ TEST_F(PartitionsRepositoryTest, StreamPartitions) { repository.StreamPartitions(second_stream, kVersion, additional_fields, billing_tag, context); - EXPECT_TRUE(second_stream->IsClosed()); + // Not closed since the stream is not empty + EXPECT_FALSE(second_stream->IsClosed()); EXPECT_FALSE(second_stream->GetError()); EXPECT_STREQ((initial_value + ref_stream_data).c_str(), get_stream_content(*second_stream).c_str()); + // Now it's closed as we read all its content + EXPECT_TRUE(second_stream->IsClosed()); } } }