Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ size_t RapidJsonByteStream::PutEnd(char*) { return 0; }
bool RapidJsonByteStream::ReadEmpty() const {
return count_ == read_buffer_.size();
}
bool RapidJsonByteStream::WriteEmpty() const { return write_buffer_.empty(); }
bool RapidJsonByteStream::WriteEmpty() const {
std::unique_lock<std::mutex> lock(mutex_);
return write_buffer_.empty();
}

void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
std::unique_lock<std::mutex> lock(mutex_);
Expand All @@ -64,7 +67,7 @@ void RapidJsonByteStream::AppendContent(const char* content, size_t length) {

void RapidJsonByteStream::SwapBuffers() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&]() { return !WriteEmpty(); });
cv_.wait(lock, [&]() { return !write_buffer_.empty(); });
std::swap(read_buffer_, write_buffer_);
write_buffer_.clear();
count_ = 0;
Expand Down Expand Up @@ -114,7 +117,7 @@ boost::optional<client::ApiError> AsyncJsonStream::GetError() const {

bool AsyncJsonStream::IsClosed() const {
std::unique_lock<std::mutex> lock(mutex_);
return closed_;
return closed_ && (error_ || current_stream_->WriteEmpty());
}

} // namespace repository
Expand Down
10 changes: 8 additions & 2 deletions olp-cpp-sdk-dataservice-read/tests/PartitionsRepositoryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2076,10 +2076,13 @@ TEST_F(PartitionsRepositoryTest, StreamPartitions) {

repository.StreamPartitions(async_stream, kVersion, additional_fields,
billing_tag, context);
EXPECT_TRUE(async_stream->IsClosed());
// Not closed since the stream is not empty
EXPECT_FALSE(async_stream->IsClosed());
EXPECT_FALSE(async_stream->GetError());
EXPECT_STREQ(ref_stream_data.c_str(),
get_stream_content(*async_stream).c_str());
// Now it's closed as we read all its content
EXPECT_TRUE(async_stream->IsClosed());

{
SCOPED_TRACE("Data with offset is in the stream");
Expand All @@ -2091,10 +2094,13 @@ TEST_F(PartitionsRepositoryTest, StreamPartitions) {
repository.StreamPartitions(second_stream, kVersion, additional_fields,
billing_tag, context);

EXPECT_TRUE(second_stream->IsClosed());
// Not closed since the stream is not empty
EXPECT_FALSE(second_stream->IsClosed());
EXPECT_FALSE(second_stream->GetError());
EXPECT_STREQ((initial_value + ref_stream_data).c_str(),
get_stream_content(*second_stream).c_str());
// Now it's closed as we read all its content
EXPECT_TRUE(second_stream->IsClosed());
}
}
}
Expand Down
Loading