From d1c16215147f799f4acb0845b5f86e9542ffba28 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 3 Jun 2025 15:08:06 -0500 Subject: [PATCH] FIX: Fix C++ client timeseries stream --- CHANGELOG.md | 7 ++ CMakeLists.txt | 2 +- .../databento/detail/dbn_buffer_decoder.hpp | 20 ++++ pkg/PKGBUILD | 2 +- src/detail/buffer.cpp | 4 +- src/detail/dbn_buffer_decoder.cpp | 109 ++++++++++-------- src/historical.cpp | 18 ++- 7 files changed, 109 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69a9f1a..8cd8506 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 0.37.1 - 2025-06-03 + +### Bug fixes +- Fixed issue where not all data was processed in `Historical::TimeseriesGetRange()` +- Fixed issue with pointer arithmetic in `Buffer::Write()` +- Fixed issue where more data than necessary was copied in `Buffer::Shift()` + ## 0.37.0 - 2025-06-03 ### Breaking changes diff --git a/CMakeLists.txt b/CMakeLists.txt index 77f3484..97a5fe0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.0) project( databento - VERSION 0.37.0 + VERSION 0.37.1 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/include/databento/detail/dbn_buffer_decoder.hpp b/include/databento/detail/dbn_buffer_decoder.hpp index 4e1cba4..204ef72 100644 --- a/include/databento/detail/dbn_buffer_decoder.hpp +++ b/include/databento/detail/dbn_buffer_decoder.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "databento/detail/buffer.hpp" #include "databento/detail/zstd_stream.hpp" @@ -22,6 +23,10 @@ class DbnBufferDecoder { KeepGoing Process(const char* data, std::size_t length); + std::size_t UnreadBytes() const { return dbn_buffer_.ReadCapacity(); } + friend std::ostream& operator<<(std::ostream& stream, + const DbnBufferDecoder& buffer); + private: enum class DecoderState : std::uint8_t { Init, @@ -29,6 +34,21 @@ class DbnBufferDecoder { Records, }; + friend std::ostream& operator<<(std::ostream& stream, DecoderState state) { + switch (state) { + case DbnBufferDecoder::DecoderState::Init: + stream << "init"; + break; + case DbnBufferDecoder::DecoderState::Metadata: + stream << "metadata"; + break; + case DbnBufferDecoder::DecoderState::Records: + stream << "records"; + break; + } + return stream; + } + const MetadataCallback& metadata_callback_; const RecordCallback& record_callback_; ZstdDecodeStream zstd_stream_; diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index a9c3205..26ac2a9 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.37.0 +pkgver=0.37.1 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') diff --git a/src/detail/buffer.cpp b/src/detail/buffer.cpp index 6c5565d..44b2889 100644 --- a/src/detail/buffer.cpp +++ b/src/detail/buffer.cpp @@ -17,7 +17,7 @@ size_t Buffer::Write(const std::byte* data, std::size_t length) { } const auto write_size = std::min(WriteCapacity(), length); std::copy(data, data + write_size, WriteBegin()); - Fill(length); + Fill(write_size); return write_size; } @@ -67,7 +67,7 @@ void Buffer::Reserve(std::size_t capacity) { void Buffer::Shift() { const auto unread_bytes = ReadCapacity(); if (unread_bytes) { - std::copy(read_pos_, end_, buf_.get()); + std::copy(read_pos_, write_pos_, buf_.get()); } read_pos_ = buf_.get(); write_pos_ = read_pos_ + unread_bytes; diff --git a/src/detail/dbn_buffer_decoder.cpp b/src/detail/dbn_buffer_decoder.cpp index e499c67..88b669c 100644 --- a/src/detail/dbn_buffer_decoder.cpp +++ b/src/detail/dbn_buffer_decoder.cpp @@ -3,6 +3,7 @@ #include "databento/dbn_decoder.hpp" #include "databento/timeseries.hpp" #include "dbn_constants.hpp" +#include "stream_op_helper.hpp" using databento::detail::DbnBufferDecoder; @@ -11,59 +12,75 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data, constexpr auto kUpgradePolicy = VersionUpgradePolicy::UpgradeToV3; zstd_buffer_->WriteAll(data, length); - const auto read_size = zstd_stream_.ReadSome(dbn_buffer_.WriteBegin(), - dbn_buffer_.WriteCapacity()); - dbn_buffer_.Fill(read_size); - if (read_size == 0) { - return KeepGoing::Continue; - } - switch (state_) { - case DecoderState::Init: { - if (dbn_buffer_.ReadCapacity() < kMetadataPreludeSize) { - break; - } - std::tie(input_version_, bytes_needed_) = - DbnDecoder::DecodeMetadataVersionAndSize(dbn_buffer_.ReadBegin(), - dbn_buffer_.ReadCapacity()); - dbn_buffer_.Consume(kMetadataPreludeSize); - dbn_buffer_.Reserve(bytes_needed_); - state_ = DecoderState::Metadata; - [[fallthrough]]; + while (true) { + const auto read_size = zstd_stream_.ReadSome(dbn_buffer_.WriteBegin(), + dbn_buffer_.WriteCapacity()); + dbn_buffer_.Fill(read_size); + if (read_size == 0) { + return KeepGoing::Continue; } - case DecoderState::Metadata: { - if (dbn_buffer_.ReadCapacity() < bytes_needed_) { - break; - } - auto metadata = DbnDecoder::DecodeMetadataFields( - input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd()); - dbn_buffer_.Consume(bytes_needed_); - // Metadata may leave buffer misaligned. Shift records to ensure 8-byte - // alignment - dbn_buffer_.Shift(); - ts_out_ = metadata.ts_out; - metadata.Upgrade(kUpgradePolicy); - if (metadata_callback_) { - metadata_callback_(std::move(metadata)); + switch (state_) { + case DecoderState::Init: { + if (dbn_buffer_.ReadCapacity() < kMetadataPreludeSize) { + break; + } + std::tie(input_version_, bytes_needed_) = + DbnDecoder::DecodeMetadataVersionAndSize( + dbn_buffer_.ReadBegin(), dbn_buffer_.ReadCapacity()); + dbn_buffer_.Consume(kMetadataPreludeSize); + dbn_buffer_.Reserve(bytes_needed_); + state_ = DecoderState::Metadata; + [[fallthrough]]; } - state_ = DecoderState::Records; - [[fallthrough]]; - } - case DecoderState::Records: { - while (dbn_buffer_.ReadCapacity() > 0) { - auto record = - Record{reinterpret_cast(dbn_buffer_.ReadBegin())}; - bytes_needed_ = record.Size(); + case DecoderState::Metadata: { if (dbn_buffer_.ReadCapacity() < bytes_needed_) { break; } - record = DbnDecoder::DecodeRecordCompat( - input_version_, kUpgradePolicy, ts_out_, &compat_buffer_, record); - if (record_callback_(record) == KeepGoing::Stop) { - return KeepGoing::Stop; - } + auto metadata = DbnDecoder::DecodeMetadataFields( + input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd()); dbn_buffer_.Consume(bytes_needed_); + // Metadata may leave buffer misaligned. Shift records to ensure 8-byte + // alignment + dbn_buffer_.Shift(); + ts_out_ = metadata.ts_out; + metadata.Upgrade(kUpgradePolicy); + if (metadata_callback_) { + metadata_callback_(std::move(metadata)); + } + state_ = DecoderState::Records; + [[fallthrough]]; + } + case DecoderState::Records: { + while (dbn_buffer_.ReadCapacity() > 0) { + auto record = + Record{reinterpret_cast(dbn_buffer_.ReadBegin())}; + bytes_needed_ = record.Size(); + if (dbn_buffer_.ReadCapacity() < bytes_needed_) { + break; + } + record = DbnDecoder::DecodeRecordCompat( + input_version_, kUpgradePolicy, ts_out_, &compat_buffer_, record); + if (record_callback_(record) == KeepGoing::Stop) { + return KeepGoing::Stop; + } + dbn_buffer_.Consume(bytes_needed_); + } } } } - return KeepGoing::Continue; } + +namespace databento::detail { +std::ostream& operator<<(std::ostream& stream, const DbnBufferDecoder& buffer) { + return StreamOpBuilder{stream} + .SetSpacer("\n ") + .SetTypeName("DbnBufferDecoder") + .Build() + .AddField("dbn_buffer_", buffer.dbn_buffer_) + .AddField("bytes_needed_", buffer.bytes_needed_) + .AddField("input_version_", buffer.input_version_) + .AddField("ts_out_", buffer.ts_out_) + .AddField("state_", buffer.state_) + .Finish(); +} +} // namespace databento::detail diff --git a/src/historical.cpp b/src/historical.cpp index f742498..d627ec1 100644 --- a/src/historical.cpp +++ b/src/historical.cpp @@ -8,6 +8,7 @@ #include // get_env #include #include // back_inserter +#include #include #include #include // move @@ -846,12 +847,23 @@ void Historical::TimeseriesGetRange(const HttplibParams& params, const RecordCallback& record_callback) { detail::DbnBufferDecoder decoder{metadata_callback, record_callback}; + bool early_exit = false; this->client_.GetRawStream( kTimeseriesGetRangePath, params, - [&decoder](const char* data, std::size_t length) mutable { - return decoder.Process(data, length) == KeepGoing::Continue; + [&decoder, &early_exit](const char* data, std::size_t length) mutable { + if (decoder.Process(data, length) == KeepGoing::Continue) { + return true; + } + early_exit = true; + return false; }); - // FIXME: check if remaining partial records + if (!early_exit && decoder.UnreadBytes() > 0) { + std::ostringstream ss; + ss << "[Historical::TimeseriesGetRange] Partial or incomplete record " + "remaining of " + << decoder.UnreadBytes() << " bytes"; + log_receiver_->Receive(LogLevel::Warning, ss.str()); + } } static const std::string kTimeseriesGetRangeToFileEndpoint =