Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## 0.37.1 - 2025-06-03

### Bug fixes
- Fixed issue where not all data was processed in `Historical::TimeseriesGetRange()`
- Fixed issue with pointer arithmetic in `Buffer::Write()`
- Fixed issue where more data than necessary was copied in `Buffer::Shift()`

## 0.37.0 - 2025-06-03

### Breaking changes
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.0)

project(
databento
VERSION 0.37.0
VERSION 0.37.1
LANGUAGES CXX
DESCRIPTION "Official Databento client library"
)
Expand Down
20 changes: 20 additions & 0 deletions include/databento/detail/dbn_buffer_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <ostream>

#include "databento/detail/buffer.hpp"
#include "databento/detail/zstd_stream.hpp"
Expand All @@ -22,13 +23,32 @@ class DbnBufferDecoder {

KeepGoing Process(const char* data, std::size_t length);

std::size_t UnreadBytes() const { return dbn_buffer_.ReadCapacity(); }
friend std::ostream& operator<<(std::ostream& stream,
const DbnBufferDecoder& buffer);

private:
enum class DecoderState : std::uint8_t {
Init,
Metadata,
Records,
};

friend std::ostream& operator<<(std::ostream& stream, DecoderState state) {
switch (state) {
case DbnBufferDecoder::DecoderState::Init:
stream << "init";
break;
case DbnBufferDecoder::DecoderState::Metadata:
stream << "metadata";
break;
case DbnBufferDecoder::DecoderState::Records:
stream << "records";
break;
}
return stream;
}

const MetadataCallback& metadata_callback_;
const RecordCallback& record_callback_;
ZstdDecodeStream zstd_stream_;
Expand Down
2 changes: 1 addition & 1 deletion pkg/PKGBUILD
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Maintainer: Databento <support@databento.com>
_pkgname=databento-cpp
pkgname=databento-cpp-git
pkgver=0.37.0
pkgver=0.37.1
pkgrel=1
pkgdesc="Official C++ client for Databento"
arch=('any')
Expand Down
4 changes: 2 additions & 2 deletions src/detail/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ size_t Buffer::Write(const std::byte* data, std::size_t length) {
}
const auto write_size = std::min(WriteCapacity(), length);
std::copy(data, data + write_size, WriteBegin());
Fill(length);
Fill(write_size);
return write_size;
}

Expand Down Expand Up @@ -67,7 +67,7 @@ void Buffer::Reserve(std::size_t capacity) {
void Buffer::Shift() {
const auto unread_bytes = ReadCapacity();
if (unread_bytes) {
std::copy(read_pos_, end_, buf_.get());
std::copy(read_pos_, write_pos_, buf_.get());
}
read_pos_ = buf_.get();
write_pos_ = read_pos_ + unread_bytes;
Expand Down
109 changes: 63 additions & 46 deletions src/detail/dbn_buffer_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "databento/dbn_decoder.hpp"
#include "databento/timeseries.hpp"
#include "dbn_constants.hpp"
#include "stream_op_helper.hpp"

using databento::detail::DbnBufferDecoder;

Expand All @@ -11,59 +12,75 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data,
constexpr auto kUpgradePolicy = VersionUpgradePolicy::UpgradeToV3;

zstd_buffer_->WriteAll(data, length);
const auto read_size = zstd_stream_.ReadSome(dbn_buffer_.WriteBegin(),
dbn_buffer_.WriteCapacity());
dbn_buffer_.Fill(read_size);
if (read_size == 0) {
return KeepGoing::Continue;
}
switch (state_) {
case DecoderState::Init: {
if (dbn_buffer_.ReadCapacity() < kMetadataPreludeSize) {
break;
}
std::tie(input_version_, bytes_needed_) =
DbnDecoder::DecodeMetadataVersionAndSize(dbn_buffer_.ReadBegin(),
dbn_buffer_.ReadCapacity());
dbn_buffer_.Consume(kMetadataPreludeSize);
dbn_buffer_.Reserve(bytes_needed_);
state_ = DecoderState::Metadata;
[[fallthrough]];
while (true) {
const auto read_size = zstd_stream_.ReadSome(dbn_buffer_.WriteBegin(),
dbn_buffer_.WriteCapacity());
dbn_buffer_.Fill(read_size);
if (read_size == 0) {
return KeepGoing::Continue;
}
case DecoderState::Metadata: {
if (dbn_buffer_.ReadCapacity() < bytes_needed_) {
break;
}
auto metadata = DbnDecoder::DecodeMetadataFields(
input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd());
dbn_buffer_.Consume(bytes_needed_);
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
// alignment
dbn_buffer_.Shift();
ts_out_ = metadata.ts_out;
metadata.Upgrade(kUpgradePolicy);
if (metadata_callback_) {
metadata_callback_(std::move(metadata));
switch (state_) {
case DecoderState::Init: {
if (dbn_buffer_.ReadCapacity() < kMetadataPreludeSize) {
break;
}
std::tie(input_version_, bytes_needed_) =
DbnDecoder::DecodeMetadataVersionAndSize(
dbn_buffer_.ReadBegin(), dbn_buffer_.ReadCapacity());
dbn_buffer_.Consume(kMetadataPreludeSize);
dbn_buffer_.Reserve(bytes_needed_);
state_ = DecoderState::Metadata;
[[fallthrough]];
}
state_ = DecoderState::Records;
[[fallthrough]];
}
case DecoderState::Records: {
while (dbn_buffer_.ReadCapacity() > 0) {
auto record =
Record{reinterpret_cast<RecordHeader*>(dbn_buffer_.ReadBegin())};
bytes_needed_ = record.Size();
case DecoderState::Metadata: {
if (dbn_buffer_.ReadCapacity() < bytes_needed_) {
break;
}
record = DbnDecoder::DecodeRecordCompat(
input_version_, kUpgradePolicy, ts_out_, &compat_buffer_, record);
if (record_callback_(record) == KeepGoing::Stop) {
return KeepGoing::Stop;
}
auto metadata = DbnDecoder::DecodeMetadataFields(
input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd());
dbn_buffer_.Consume(bytes_needed_);
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
// alignment
dbn_buffer_.Shift();
ts_out_ = metadata.ts_out;
metadata.Upgrade(kUpgradePolicy);
if (metadata_callback_) {
metadata_callback_(std::move(metadata));
}
state_ = DecoderState::Records;
[[fallthrough]];
}
case DecoderState::Records: {
while (dbn_buffer_.ReadCapacity() > 0) {
auto record =
Record{reinterpret_cast<RecordHeader*>(dbn_buffer_.ReadBegin())};
bytes_needed_ = record.Size();
if (dbn_buffer_.ReadCapacity() < bytes_needed_) {
break;
}
record = DbnDecoder::DecodeRecordCompat(
input_version_, kUpgradePolicy, ts_out_, &compat_buffer_, record);
if (record_callback_(record) == KeepGoing::Stop) {
return KeepGoing::Stop;
}
dbn_buffer_.Consume(bytes_needed_);
}
}
}
}
return KeepGoing::Continue;
}

namespace databento::detail {
std::ostream& operator<<(std::ostream& stream, const DbnBufferDecoder& buffer) {
return StreamOpBuilder{stream}
.SetSpacer("\n ")
.SetTypeName("DbnBufferDecoder")
.Build()
.AddField("dbn_buffer_", buffer.dbn_buffer_)
.AddField("bytes_needed_", buffer.bytes_needed_)
.AddField("input_version_", buffer.input_version_)
.AddField("ts_out_", buffer.ts_out_)
.AddField("state_", buffer.state_)
.Finish();
}
} // namespace databento::detail
18 changes: 15 additions & 3 deletions src/historical.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cstdlib> // get_env
#include <filesystem>
#include <iterator> // back_inserter
#include <sstream>
#include <string>
#include <system_error>
#include <utility> // move
Expand Down Expand Up @@ -846,12 +847,23 @@ void Historical::TimeseriesGetRange(const HttplibParams& params,
const RecordCallback& record_callback) {
detail::DbnBufferDecoder decoder{metadata_callback, record_callback};

bool early_exit = false;
this->client_.GetRawStream(
kTimeseriesGetRangePath, params,
[&decoder](const char* data, std::size_t length) mutable {
return decoder.Process(data, length) == KeepGoing::Continue;
[&decoder, &early_exit](const char* data, std::size_t length) mutable {
if (decoder.Process(data, length) == KeepGoing::Continue) {
return true;
}
early_exit = true;
return false;
});
// FIXME: check if remaining partial records
if (!early_exit && decoder.UnreadBytes() > 0) {
std::ostringstream ss;
ss << "[Historical::TimeseriesGetRange] Partial or incomplete record "
"remaining of "
<< decoder.UnreadBytes() << " bytes";
log_receiver_->Receive(LogLevel::Warning, ss.str());
}
}

static const std::string kTimeseriesGetRangeToFileEndpoint =
Expand Down
Loading