diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 365ae6f..ce1a346 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -24,7 +24,7 @@ jobs: sudo apt-get install libpcre3 libpcre3-dev libzstd-dev ninja-build - name: Install cppcheck run: | - git clone https://github.com/danmar/cppcheck.git --branch 2.14.1 + git clone https://github.com/danmar/cppcheck.git --branch 2.17.1 cd cppcheck cmake -S. -B build \ -DCMAKE_BUILD_TYPE=Release \ diff --git a/CHANGELOG.md b/CHANGELOG.md index 27868cb..73944a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.34.2 - 2025-05-06 + +#### Bug fixes +- Fixed potential for unaligned records in live and historical streaming requests + ## 0.34.1 - 2025-04-29 ### Enhancements diff --git a/CMakeLists.txt b/CMakeLists.txt index 2056216..db048cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.0) project( databento - VERSION 0.34.1 + VERSION 0.34.2 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/include/databento/dbn_decoder.hpp b/include/databento/dbn_decoder.hpp index 960f793..dd56a89 100644 --- a/include/databento/dbn_decoder.hpp +++ b/include/databento/dbn_decoder.hpp @@ -6,6 +6,7 @@ #include #include "databento/dbn.hpp" +#include "databento/detail/buffer.hpp" #include "databento/enums.hpp" // Upgrade Policy #include "databento/file_stream.hpp" #include "databento/ireadable.hpp" @@ -54,7 +55,6 @@ class DbnDecoder { const std::byte* buffer_end); bool DetectCompression(); std::size_t FillBuffer(); - std::size_t GetReadBufferSize() const; RecordHeader* BufferRecordHeader(); ILogReceiver* log_receiver_; @@ -62,8 +62,7 @@ class DbnDecoder { VersionUpgradePolicy upgrade_policy_; bool ts_out_{}; std::unique_ptr input_; - std::vector read_buffer_; - std::size_t buffer_idx_{}; + detail::Buffer buffer_{}; // Must be 8-byte aligned for records alignas(RecordHeader) std::array compat_buffer_{}; Record current_record_{nullptr}; diff --git a/include/databento/detail/buffer.hpp b/include/databento/detail/buffer.hpp index c82cb21..47fa725 100644 --- a/include/databento/detail/buffer.hpp +++ b/include/databento/detail/buffer.hpp @@ -2,6 +2,7 @@ #include #include +#include #include "databento/ireadable.hpp" #include "databento/iwritable.hpp" @@ -11,7 +12,7 @@ class Buffer : public IReadable, public IWritable { public: Buffer() : Buffer(64 * std::size_t{1 << 10}) {} explicit Buffer(std::size_t init_capacity) - : buf_{std::make_unique(init_capacity)}, + : buf_{AlignedNew(init_capacity), AlignedDelete}, end_{buf_.get() + init_capacity}, read_pos_{buf_.get()}, write_pos_{buf_.get()} {} @@ -22,7 +23,9 @@ class Buffer : public IReadable, public IWritable { void WriteAll(const std::byte* data, std::size_t length) override; std::byte*& WriteBegin() { return write_pos_; } - std::byte* WriteEnd() const { return end_; } + std::byte* WriteEnd() { return end_; } + const std::byte* WriteBegin() const { return write_pos_; } + const std::byte* WriteEnd() const { return end_; } std::size_t WriteCapacity() const { return static_cast(end_ - write_pos_); } @@ -32,7 +35,9 @@ class Buffer : public IReadable, public IWritable { std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; std::byte*& ReadBegin() { return read_pos_; } - std::byte* ReadEnd() const { return write_pos_; } + std::byte* ReadEnd() { return write_pos_; } + const std::byte* ReadBegin() const { return read_pos_; } + const std::byte* ReadEnd() const { return write_pos_; } std::size_t ReadCapacity() const { return static_cast(write_pos_ - read_pos_); } @@ -48,7 +53,19 @@ class Buffer : public IReadable, public IWritable { void Shift(); private: - std::unique_ptr buf_; + static constexpr std::align_val_t kAlignment{8}; + + using UniqueBufPtr = std::unique_ptr; + + std::byte* AlignedNew(std::size_t capacity) { + // Can't use `new` expression due to MSVC bug + // See + // https://developercommunity.visualstudio.com/t/using-c17-new-stdalign-val-tn-syntax-results-in-er/528320 + return static_cast(operator new[](capacity, kAlignment)); + } + static void AlignedDelete(std::byte* p) { operator delete[](p, kAlignment); } + + UniqueBufPtr buf_; std::byte* end_; std::byte* read_pos_{}; std::byte* write_pos_{}; diff --git a/include/databento/detail/dbn_buffer_decoder.hpp b/include/databento/detail/dbn_buffer_decoder.hpp index a8f8a65..4e1cba4 100644 --- a/include/databento/detail/dbn_buffer_decoder.hpp +++ b/include/databento/detail/dbn_buffer_decoder.hpp @@ -6,7 +6,6 @@ #include "databento/detail/buffer.hpp" #include "databento/detail/zstd_stream.hpp" -#include "databento/ireadable.hpp" #include "databento/record.hpp" #include "databento/timeseries.hpp" @@ -18,7 +17,8 @@ class DbnBufferDecoder { const RecordCallback& record_callback) : metadata_callback_{metadata_callback}, record_callback_{record_callback}, - zstd_stream_{InitZstdBuffer()} {} + zstd_stream_{std::make_unique()}, + zstd_buffer_{static_cast(zstd_stream_.Input())} {} KeepGoing Process(const char* data, std::size_t length); @@ -29,12 +29,6 @@ class DbnBufferDecoder { Records, }; - std::unique_ptr InitZstdBuffer() { - auto zstd_buffer = std::make_unique(); - zstd_buffer_ = zstd_buffer.get(); - return zstd_buffer; - } - const MetadataCallback& metadata_callback_; const RecordCallback& record_callback_; ZstdDecodeStream zstd_stream_; @@ -43,7 +37,7 @@ class DbnBufferDecoder { std::size_t bytes_needed_{}; alignas(RecordHeader) std::array compat_buffer_{}; std::uint8_t input_version_{}; - bool ts_out_; + bool ts_out_{}; DecoderState state_{DecoderState::Init}; }; } // namespace databento::detail diff --git a/include/databento/detail/zstd_stream.hpp b/include/databento/detail/zstd_stream.hpp index 56da12b..0fff739 100644 --- a/include/databento/detail/zstd_stream.hpp +++ b/include/databento/detail/zstd_stream.hpp @@ -6,6 +6,7 @@ #include // unique_ptr #include +#include "databento/detail/buffer.hpp" #include "databento/ireadable.hpp" #include "databento/iwritable.hpp" #include "databento/log.hpp" @@ -14,8 +15,7 @@ namespace databento::detail { class ZstdDecodeStream : public IReadable { public: explicit ZstdDecodeStream(std::unique_ptr input); - ZstdDecodeStream(std::unique_ptr input, - std::vector&& in_buffer); + ZstdDecodeStream(std::unique_ptr input, detail::Buffer& in_buffer); // Read exactly `length` bytes into `buffer`. void ReadExact(std::byte* buffer, std::size_t length) override; @@ -23,6 +23,8 @@ class ZstdDecodeStream : public IReadable { // return 0 if the end of the stream is reached. std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; + IReadable* Input() const { return input_.get(); } + private: std::unique_ptr input_; std::unique_ptr z_dstream_; diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index bd4bb75..38dd21c 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.34.1 +pkgver=0.34.2 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') diff --git a/src/dbn_constants.hpp b/src/dbn_constants.hpp index 9908953..059d73d 100644 --- a/src/dbn_constants.hpp +++ b/src/dbn_constants.hpp @@ -13,7 +13,6 @@ constexpr std::size_t kFixedMetadataLen = 100; constexpr std::size_t kDatasetCstrLen = 16; constexpr std::size_t kMetadataReservedLen = 53; constexpr std::size_t kMetadataReservedLenV1 = 47; -constexpr std::size_t kBufferCapacity = 8UL * 1024; constexpr std::uint16_t kNullSchema = std::numeric_limits::max(); constexpr std::uint8_t kNullSType = std::numeric_limits::max(); constexpr std::uint64_t kNullRecordCount = diff --git a/src/dbn_decoder.cpp b/src/dbn_decoder.cpp index b4570d5..b5ef248 100644 --- a/src/dbn_decoder.cpp +++ b/src/dbn_decoder.cpp @@ -9,6 +9,7 @@ #include "databento/compat.hpp" #include "databento/constants.hpp" #include "databento/datetime.hpp" +#include "databento/detail/buffer.hpp" #include "databento/detail/zstd_stream.hpp" #include "databento/enums.hpp" #include "databento/exceptions.hpp" @@ -21,7 +22,7 @@ using databento::DbnDecoder; namespace { template T Consume(const std::byte*& buf) { - const auto res = *reinterpret_cast(&*buf); + const auto res = *reinterpret_cast(buf); buf += sizeof(T); return res; } @@ -34,7 +35,7 @@ std::uint8_t Consume(const std::byte*& buf) { } const char* Consume(const std::byte*& buf, const std::ptrdiff_t num_bytes) { - const auto* pos = &*buf; + const auto* pos = buf; buf += num_bytes; return reinterpret_cast(pos); } @@ -76,16 +77,12 @@ DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, : log_receiver_{log_receiver}, upgrade_policy_{upgrade_policy}, input_{std::move(input)} { - read_buffer_.reserve(kBufferCapacity); if (DetectCompression()) { - input_ = std::make_unique( - std::move(input_), std::move(read_buffer_)); - // Reinitialize buffer and get it into the same state as uncompressed input - read_buffer_ = std::vector(); - read_buffer_.reserve(kBufferCapacity); - read_buffer_.resize(kMagicSize); - input_->ReadExact(read_buffer_.data(), kMagicSize); - const auto* buf_ptr = read_buffer_.data(); + input_ = + std::make_unique(std::move(input_), buffer_); + input_->ReadExact(buffer_.WriteBegin(), kMagicSize); + buffer_.WriteBegin() += kMagicSize; + const auto* buf_ptr = buffer_.ReadBegin(); if (std::strncmp(Consume(buf_ptr, 3), kDbnPrefix, 3) != 0) { throw DbnResponseError{"Found Zstd input, but not DBN prefix"}; } @@ -181,16 +178,22 @@ databento::Metadata DbnDecoder::DecodeMetadataFields( databento::Metadata DbnDecoder::DecodeMetadata() { // already read first 4 bytes detecting compression - read_buffer_.resize(kMetadataPreludeSize); - input_->ReadExact(&read_buffer_[4], 4); + const auto read_size = kMetadataPreludeSize - kMagicSize; + input_->ReadExact(buffer_.WriteBegin(), read_size); + buffer_.WriteBegin() += read_size; const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize( - read_buffer_.data(), kMetadataPreludeSize); + buffer_.ReadBegin(), kMetadataPreludeSize); + buffer_.ReadBegin() += kMetadataPreludeSize; version_ = version; - read_buffer_.resize(size); - input_->ReadExact(read_buffer_.data(), read_buffer_.size()); - buffer_idx_ = read_buffer_.size(); + buffer_.Reserve(size); + input_->ReadExact(buffer_.WriteBegin(), size); + buffer_.WriteBegin() += size; auto metadata = DbnDecoder::DecodeMetadataFields( - version_, read_buffer_.data(), read_buffer_.data() + read_buffer_.size()); + version_, buffer_.ReadBegin(), buffer_.ReadEnd()); + buffer_.ReadBegin() += size; + // Metadata may leave buffer misaligned. Shift records to ensure 8-byte + // alignment + buffer_.Shift(); ts_out_ = metadata.ts_out; metadata.Upgrade(upgrade_policy_); return metadata; @@ -239,60 +242,53 @@ databento::Record DbnDecoder::DecodeRecordCompat( // assumes DecodeMetadata has been called const databento::Record* DbnDecoder::DecodeRecord() { // need some unread unread_bytes - if (GetReadBufferSize() == 0) { + if (buffer_.ReadCapacity() == 0) { if (FillBuffer() == 0) { return nullptr; } } // check length - while (GetReadBufferSize() < BufferRecordHeader()->Size()) { + while (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) { if (FillBuffer() == 0) { - if (GetReadBufferSize() > 0) { + if (buffer_.ReadCapacity() > 0) { log_receiver_->Receive( LogLevel::Warning, "Unexpected partial record remaining in stream: " + - std::to_string(GetReadBufferSize()) + " bytes"); + std::to_string(buffer_.ReadCapacity()) + " bytes"); } return nullptr; } } current_record_ = Record{BufferRecordHeader()}; - buffer_idx_ += current_record_.Size(); + buffer_.ReadBegin() += current_record_.Size(); current_record_ = DbnDecoder::DecodeRecordCompat( version_, upgrade_policy_, ts_out_, &compat_buffer_, current_record_); return ¤t_record_; } size_t DbnDecoder::FillBuffer() { - // Shift data forward - std::copy(read_buffer_.cbegin() + static_cast(buffer_idx_), - read_buffer_.cend(), read_buffer_.begin()); - const auto unread_size = read_buffer_.size() - buffer_idx_; - buffer_idx_ = 0; - read_buffer_.resize(kBufferCapacity); - const auto fill_size = input_->ReadSome(&read_buffer_[unread_size], - kBufferCapacity - unread_size); - read_buffer_.resize(unread_size + fill_size); + if (buffer_.WriteCapacity() < kMaxRecordLen) { + buffer_.Shift(); + } + const auto fill_size = + input_->ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()); + buffer_.WriteBegin() += fill_size; return fill_size; } -std::size_t DbnDecoder::GetReadBufferSize() const { - return read_buffer_.size() - buffer_idx_; -} - databento::RecordHeader* DbnDecoder::BufferRecordHeader() { - return reinterpret_cast(&read_buffer_[buffer_idx_]); + return reinterpret_cast(buffer_.ReadBegin()); } bool DbnDecoder::DetectCompression() { - read_buffer_.resize(kMagicSize); - input_->ReadExact(read_buffer_.data(), kMagicSize); - const auto* read_buffer_it = read_buffer_.data(); - if (std::strncmp(Consume(read_buffer_it, 3), kDbnPrefix, 3) == 0) { + input_->ReadExact(buffer_.WriteBegin(), kMagicSize); + buffer_.WriteBegin() += kMagicSize; + const auto* buffer_it = buffer_.ReadBegin(); + if (std::strncmp(Consume(buffer_it, 3), kDbnPrefix, 3) == 0) { return false; } - read_buffer_it = read_buffer_.data(); - auto x = Consume(read_buffer_it); + buffer_it = buffer_.ReadBegin(); + auto x = Consume(buffer_it); if (x == kZstdMagicNumber) { return true; } @@ -302,12 +298,10 @@ bool DbnDecoder::DetectCompression() { if ((x & kZstdSkippableFrame) == kZstdSkippableFrame) { throw DbnResponseError{ "Legacy DBZ encoding is not supported. Please use the dbn CLI tool " - "to " - "convert it to DBN."}; + "to convert it to DBN."}; } throw DbnResponseError{ - "Couldn't detect input type. It doesn't appear to be Zstd or " - "DBN."}; + "Couldn't detect input type. It doesn't appear to be Zstd or DBN."}; } std::string DbnDecoder::DecodeSymbol(std::size_t symbol_cstr_len, diff --git a/src/detail/buffer.cpp b/src/detail/buffer.cpp index a14c04c..7822fd4 100644 --- a/src/detail/buffer.cpp +++ b/src/detail/buffer.cpp @@ -54,7 +54,7 @@ void Buffer::Reserve(std::size_t capacity) { if (capacity <= Capacity()) { return; } - auto new_buf = std::make_unique(capacity); + UniqueBufPtr new_buf{AlignedNew(capacity), AlignedDelete}; const auto unread_bytes = ReadCapacity(); std::copy(ReadBegin(), ReadEnd(), new_buf.get()); buf_ = std::move(new_buf); diff --git a/src/detail/dbn_buffer_decoder.cpp b/src/detail/dbn_buffer_decoder.cpp index 7b31d42..2bb95e2 100644 --- a/src/detail/dbn_buffer_decoder.cpp +++ b/src/detail/dbn_buffer_decoder.cpp @@ -35,6 +35,9 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data, auto metadata = DbnDecoder::DecodeMetadataFields( input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd()); dbn_buffer_.ReadBegin() += bytes_needed_; + // Metadata may leave buffer misaligned. Shift records to ensure 8-byte + // alignment + dbn_buffer_.Shift(); ts_out_ = metadata.ts_out; metadata.Upgrade(VersionUpgradePolicy::UpgradeToV2); if (metadata_callback_) { diff --git a/src/detail/zstd_stream.cpp b/src/detail/zstd_stream.cpp index 234ad52..c521379 100644 --- a/src/detail/zstd_stream.cpp +++ b/src/detail/zstd_stream.cpp @@ -4,21 +4,28 @@ #include #include // move +#include "databento/detail/buffer.hpp" #include "databento/exceptions.hpp" #include "databento/log.hpp" using databento::detail::ZstdDecodeStream; ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr input) - : ZstdDecodeStream{std::move(input), {}} {} + : input_{std::move(input)}, + z_dstream_{::ZSTD_createDStream(), ::ZSTD_freeDStream}, + read_suggestion_{::ZSTD_initDStream(z_dstream_.get())}, + in_buffer_{}, + z_in_buffer_{in_buffer_.data(), 0, 0} {} ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr input, - std::vector&& in_buffer) + detail::Buffer& in_buffer) : input_{std::move(input)}, z_dstream_{::ZSTD_createDStream(), ::ZSTD_freeDStream}, read_suggestion_{::ZSTD_initDStream(z_dstream_.get())}, - in_buffer_{std::move(in_buffer)}, - z_in_buffer_{in_buffer_.data(), in_buffer_.size(), 0} {} + in_buffer_{in_buffer.ReadBegin(), in_buffer.ReadEnd()}, + z_in_buffer_{in_buffer_.data(), in_buffer_.size(), 0} { + in_buffer.ReadBegin() += in_buffer.ReadCapacity(); +} void ZstdDecodeStream::ReadExact(std::byte* buffer, std::size_t length) { std::size_t size{}; diff --git a/src/historical.cpp b/src/historical.cpp index 75ee79f..07d20a6 100644 --- a/src/historical.cpp +++ b/src/historical.cpp @@ -14,7 +14,6 @@ #include "databento/constants.hpp" #include "databento/datetime.hpp" -#include "databento/dbn_decoder.hpp" #include "databento/dbn_file_store.hpp" #include "databento/detail/dbn_buffer_decoder.hpp" #include "databento/detail/json_helpers.hpp" @@ -24,7 +23,6 @@ #include "databento/log.hpp" #include "databento/metadata.hpp" #include "databento/timeseries.hpp" -#include "dbn_constants.hpp" using databento::Historical; @@ -676,7 +674,6 @@ double Historical::MetadataGetCost( const std::string& dataset, const DateTimeRange& datetime_range, const std::vector& symbols, Schema schema, FeedMode mode, SType stype_in, std::uint64_t limit) { - static const std::string kPath = ::BuildMetadataPath(".get_cost"); httplib::Params params{ {"dataset", dataset}, {"start", ToString(datetime_range.start)}, @@ -693,7 +690,6 @@ double Historical::MetadataGetCost( const DateTimeRange& datetime_range, const std::vector& symbols, Schema schema, FeedMode mode, SType stype_in, std::uint64_t limit) { - static const std::string kPath = ::BuildMetadataPath(".get_cost"); httplib::Params params{ {"dataset", dataset}, {"start", datetime_range.start}, diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 167b77f..d36172c 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -148,6 +148,9 @@ databento::Metadata LiveBlocking::Start() { auto metadata = DbnDecoder::DecodeMetadataFields(version, buffer_.ReadBegin(), buffer_.ReadEnd()); buffer_.ReadBegin() += size; + // Metadata may leave buffer misaligned. Shift records to ensure 8-byte + // alignment + buffer_.Shift(); version_ = metadata.version; metadata.Upgrade(upgrade_policy_); return metadata;