diff --git a/CHANGELOG.md b/CHANGELOG.md index 23985d6..27868cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 0.34.1 - 2025-04-29 + +### Enhancements +- Added `InstitutionalPrioritization` variant to `MatchingAlgorithm` + +### Bug fixes +- Improved memory usage of historical streaming requests (`TimeseriesGetRange`) + ## 0.34.0 - 2025-04-22 ### Enhancements diff --git a/CMakeLists.txt b/CMakeLists.txt index 3c5c32d..2056216 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.0) project( databento - VERSION 0.34.0 + VERSION 0.34.1 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/cmake/SourcesAndHeaders.cmake b/cmake/SourcesAndHeaders.cmake index d74e7e5..110b003 100644 --- a/cmake/SourcesAndHeaders.cmake +++ b/cmake/SourcesAndHeaders.cmake @@ -7,11 +7,12 @@ set(headers include/databento/dbn_decoder.hpp include/databento/dbn_encoder.hpp include/databento/dbn_file_store.hpp + include/databento/detail/buffer.hpp + include/databento/detail/dbn_buffer_decoder.hpp include/databento/detail/http_client.hpp include/databento/detail/json_helpers.hpp include/databento/detail/scoped_fd.hpp include/databento/detail/scoped_thread.hpp - include/databento/detail/shared_channel.hpp include/databento/detail/tcp_client.hpp include/databento/detail/zstd_stream.hpp include/databento/enums.hpp @@ -47,10 +48,11 @@ set(sources src/dbn_decoder.cpp src/dbn_encoder.cpp src/dbn_file_store.cpp + src/detail/buffer.cpp + src/detail/dbn_buffer_decoder.cpp src/detail/http_client.cpp src/detail/json_helpers.cpp src/detail/scoped_fd.cpp - src/detail/shared_channel.cpp src/detail/tcp_client.cpp src/detail/zstd_stream.cpp src/enums.cpp diff --git a/include/databento/dbn_decoder.hpp b/include/databento/dbn_decoder.hpp index c714be2..960f793 100644 --- a/include/databento/dbn_decoder.hpp +++ b/include/databento/dbn_decoder.hpp @@ -6,7 +6,6 @@ #include #include "databento/dbn.hpp" -#include "databento/detail/shared_channel.hpp" #include "databento/enums.hpp" // Upgrade Policy #include "databento/file_stream.hpp" #include "databento/ireadable.hpp" @@ -18,7 +17,6 @@ namespace databento { // handled. Defaults to upgrading DBNv1 data to version 2 (the current version). class DbnDecoder { public: - DbnDecoder(ILogReceiver* log_receiver, detail::SharedChannel channel); DbnDecoder(ILogReceiver* log_receiver, InFileStream file_stream); DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr input); DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr input, @@ -27,7 +25,8 @@ class DbnDecoder { static std::pair DecodeMetadataVersionAndSize( const std::byte* buffer, std::size_t size); static Metadata DecodeMetadataFields(std::uint8_t version, - const std::vector& buffer); + const std::byte* buffer, + const std::byte* buffer_end); // Decodes a record possibly applying upgrading the data according to the // given version and upgrade policy. If an upgrade is applied, // compat_buffer is modified. @@ -42,21 +41,17 @@ class DbnDecoder { const Record* DecodeRecord(); private: - static std::string DecodeSymbol( - std::size_t symbol_cstr_len, - std::vector::const_iterator& buffer_it); + static std::string DecodeSymbol(std::size_t symbol_cstr_len, + const std::byte*& buffer); static std::vector DecodeRepeatedSymbol( - std::size_t symbol_cstr_len, - std::vector::const_iterator& buffer_it, - std::vector::const_iterator buffer_end_it); + std::size_t symbol_cstr_len, const std::byte*& buffer, + const std::byte* buffer_end); static std::vector DecodeSymbolMappings( - std::size_t symbol_cstr_len, - std::vector::const_iterator& buffer_it, - std::vector::const_iterator buffer_end_it); - static SymbolMapping DecodeSymbolMapping( - std::size_t symbol_cstr_len, - std::vector::const_iterator& buffer_it, - std::vector::const_iterator buffer_end_it); + std::size_t symbol_cstr_len, const std::byte*& buffer, + const std::byte* buffer_end); + static SymbolMapping DecodeSymbolMapping(std::size_t symbol_cstr_len, + const std::byte*& buffer, + const std::byte* buffer_end); bool DetectCompression(); std::size_t FillBuffer(); std::size_t GetReadBufferSize() const; diff --git a/include/databento/detail/buffer.hpp b/include/databento/detail/buffer.hpp new file mode 100644 index 0000000..c82cb21 --- /dev/null +++ b/include/databento/detail/buffer.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include +#include + +#include "databento/ireadable.hpp" +#include "databento/iwritable.hpp" + +namespace databento::detail { +class Buffer : public IReadable, public IWritable { + public: + Buffer() : Buffer(64 * std::size_t{1 << 10}) {} + explicit Buffer(std::size_t init_capacity) + : buf_{std::make_unique(init_capacity)}, + end_{buf_.get() + init_capacity}, + read_pos_{buf_.get()}, + write_pos_{buf_.get()} {} + + size_t Write(const char* data, std::size_t length); + size_t Write(const std::byte* data, std::size_t length); + void WriteAll(const char* data, std::size_t length); + void WriteAll(const std::byte* data, std::size_t length) override; + + std::byte*& WriteBegin() { return write_pos_; } + std::byte* WriteEnd() const { return end_; } + std::size_t WriteCapacity() const { + return static_cast(end_ - write_pos_); + } + + /// Will throw if `length > ReadCapacity()`. + void ReadExact(std::byte* buffer, std::size_t length) override; + std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; + + std::byte*& ReadBegin() { return read_pos_; } + std::byte* ReadEnd() const { return write_pos_; } + std::size_t ReadCapacity() const { + return static_cast(write_pos_ - read_pos_); + } + + std::size_t Capacity() const { + return static_cast(end_ - buf_.get()); + } + void Clear() { + read_pos_ = buf_.get(); + write_pos_ = buf_.get(); + } + void Reserve(std::size_t capacity); + void Shift(); + + private: + std::unique_ptr buf_; + std::byte* end_; + std::byte* read_pos_{}; + std::byte* write_pos_{}; +}; +} // namespace databento::detail diff --git a/include/databento/detail/dbn_buffer_decoder.hpp b/include/databento/detail/dbn_buffer_decoder.hpp new file mode 100644 index 0000000..a8f8a65 --- /dev/null +++ b/include/databento/detail/dbn_buffer_decoder.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include + +#include "databento/detail/buffer.hpp" +#include "databento/detail/zstd_stream.hpp" +#include "databento/ireadable.hpp" +#include "databento/record.hpp" +#include "databento/timeseries.hpp" + +namespace databento::detail { +class DbnBufferDecoder { + public: + // The instance cannot outlive the lifetime of these references. + DbnBufferDecoder(const MetadataCallback& metadata_callback, + const RecordCallback& record_callback) + : metadata_callback_{metadata_callback}, + record_callback_{record_callback}, + zstd_stream_{InitZstdBuffer()} {} + + KeepGoing Process(const char* data, std::size_t length); + + private: + enum class DecoderState : std::uint8_t { + Init, + Metadata, + Records, + }; + + std::unique_ptr InitZstdBuffer() { + auto zstd_buffer = std::make_unique(); + zstd_buffer_ = zstd_buffer.get(); + return zstd_buffer; + } + + const MetadataCallback& metadata_callback_; + const RecordCallback& record_callback_; + ZstdDecodeStream zstd_stream_; + Buffer* zstd_buffer_; + Buffer dbn_buffer_{}; + std::size_t bytes_needed_{}; + alignas(RecordHeader) std::array compat_buffer_{}; + std::uint8_t input_version_{}; + bool ts_out_; + DecoderState state_{DecoderState::Init}; +}; +} // namespace databento::detail diff --git a/include/databento/detail/shared_channel.hpp b/include/databento/detail/shared_channel.hpp deleted file mode 100644 index ae02819..0000000 --- a/include/databento/detail/shared_channel.hpp +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include // byte, size_t -#include // shared_ptr - -#include "databento/ireadable.hpp" - -namespace databento::detail { -// Copyable, thread-safe, unidirectional channel. -class SharedChannel : public IReadable { - public: - SharedChannel(); - - // Write `data` of `length` bytes to the channel. - void Write(const std::byte* data, std::size_t length); - // Signal the end of input. - void Finish(); - // Read exactly `length` bytes. - void ReadExact(std::byte* buffer, std::size_t length) override; - // Read at most `length` bytes. Returns the number of bytes read. Will only - // return 0 if the end of the stream is reached. - std::size_t ReadSome(std::byte* buffer, std::size_t length) override; - - private: - class Channel; - - std::shared_ptr channel_; -}; -} // namespace databento::detail diff --git a/include/databento/enums.hpp b/include/databento/enums.hpp index 8206d12..b319bf6 100644 --- a/include/databento/enums.hpp +++ b/include/databento/enums.hpp @@ -208,6 +208,7 @@ enum MatchAlgorithm : char { ThresholdProRataLmm = 'Q', EurodollarFutures = 'Y', TimeProRata = 'P', + InstitutionalPrioritization = 'V', }; } // namespace match_algorithm using match_algorithm::MatchAlgorithm; diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index 5227563..c55eeb1 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -9,8 +9,9 @@ #include // pair #include -#include "databento/datetime.hpp" // UnixNanos -#include "databento/dbn.hpp" // Metadata +#include "databento/datetime.hpp" // UnixNanos +#include "databento/dbn.hpp" // Metadata +#include "databento/detail/buffer.hpp" #include "databento/detail/tcp_client.hpp" // TcpClient #include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy #include "databento/live_subscription.hpp" @@ -118,10 +119,7 @@ class LiveBlocking { detail::TcpClient client_; std::uint32_t sub_counter_{}; std::vector subscriptions_; - // Must be 8-byte aligned for records - alignas(RecordHeader) std::array read_buffer_{}; - std::size_t buffer_size_{}; - std::size_t buffer_idx_{}; + detail::Buffer buffer_{}; // Must be 8-byte aligned for records alignas(RecordHeader) std::array compat_buffer_{}; std::uint64_t session_id_; diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index d3ab559..bd4bb75 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.34.0 +pkgver=0.34.1 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') diff --git a/src/dbn_decoder.cpp b/src/dbn_decoder.cpp index dbd18ed..b4570d5 100644 --- a/src/dbn_decoder.cpp +++ b/src/dbn_decoder.cpp @@ -20,29 +20,28 @@ using databento::DbnDecoder; namespace { template -T Consume(std::vector::const_iterator& byte_it) { - const auto res = *reinterpret_cast(&*byte_it); - byte_it += sizeof(T); +T Consume(const std::byte*& buf) { + const auto res = *reinterpret_cast(&*buf); + buf += sizeof(T); return res; } template <> -std::uint8_t Consume(std::vector::const_iterator& byte_it) { - const auto res = *byte_it; - byte_it += 1; +std::uint8_t Consume(const std::byte*& buf) { + const auto res = *buf; + buf += 1; return static_cast(res); } -const char* Consume(std::vector::const_iterator& byte_it, - const std::ptrdiff_t num_bytes) { - const auto* pos = &*byte_it; - byte_it += num_bytes; +const char* Consume(const std::byte*& buf, const std::ptrdiff_t num_bytes) { + const auto* pos = &*buf; + buf += num_bytes; return reinterpret_cast(pos); } -std::string Consume(std::vector::const_iterator& byte_it, - const std::ptrdiff_t num_bytes, const char* context) { - const auto cstr = Consume(byte_it, num_bytes); +std::string Consume(const std::byte*& buf, const std::ptrdiff_t num_bytes, + const char* context) { + const auto cstr = Consume(buf, num_bytes); // strnlen isn't portable const auto str_len = std::find(cstr, cstr + num_bytes, '\0') - cstr; if (str_len == num_bytes) { @@ -62,11 +61,6 @@ date::year_month_day DecodeIso8601Date(std::uint32_t yyyymmdd_int) { } } // namespace -DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, - detail::SharedChannel channel) - : DbnDecoder(log_receiver, - std::make_unique(std::move(channel))) {} - DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, InFileStream file_stream) : DbnDecoder(log_receiver, std::make_unique(std::move(file_stream))) {} @@ -91,8 +85,8 @@ DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, read_buffer_.reserve(kBufferCapacity); read_buffer_.resize(kMagicSize); input_->ReadExact(read_buffer_.data(), kMagicSize); - auto read_buffer_it = read_buffer_.cbegin(); - if (std::strncmp(Consume(read_buffer_it, 3), kDbnPrefix, 3) != 0) { + const auto* buf_ptr = read_buffer_.data(); + if (std::strncmp(Consume(buf_ptr, 3), kDbnPrefix, 3) != 0) { throw DbnResponseError{"Found Zstd input, but not DBN prefix"}; } } @@ -116,7 +110,8 @@ std::pair DbnDecoder::DecodeMetadataVersionAndSize( } databento::Metadata DbnDecoder::DecodeMetadataFields( - std::uint8_t version, const std::vector& buffer) { + std::uint8_t version, const std::byte* buffer, + const std::byte* buffer_end) { Metadata res; res.version = version; if (res.version > kDbnVersion) { @@ -125,9 +120,8 @@ databento::Metadata DbnDecoder::DecodeMetadataFields( std::to_string(kDbnVersion) + ", input version is " + std::to_string(res.version)}; } - auto read_buffer_it = buffer.cbegin(); - res.dataset = Consume(read_buffer_it, kDatasetCstrLen, "dataset"); - const auto raw_schema = Consume(read_buffer_it); + res.dataset = Consume(buffer, kDatasetCstrLen, "dataset"); + const auto raw_schema = Consume(buffer); if (raw_schema == kNullSchema) { res.has_mixed_schema = true; // must initialize @@ -136,16 +130,15 @@ databento::Metadata DbnDecoder::DecodeMetadataFields( res.has_mixed_schema = false; res.schema = static_cast(raw_schema); } - res.start = UnixNanos{ - std::chrono::nanoseconds{Consume(read_buffer_it)}}; - res.end = UnixNanos{ - std::chrono::nanoseconds{Consume(read_buffer_it)}}; - res.limit = Consume(read_buffer_it); + res.start = + UnixNanos{std::chrono::nanoseconds{Consume(buffer)}}; + res.end = UnixNanos{std::chrono::nanoseconds{Consume(buffer)}}; + res.limit = Consume(buffer); if (version == 1) { // skip deprecated record_count - read_buffer_it += 8; + buffer += 8; } - const auto raw_stype_in = Consume(read_buffer_it); + const auto raw_stype_in = Consume(buffer); if (raw_stype_in == kNullSType) { res.has_mixed_stype_in = true; // must initialize @@ -154,34 +147,34 @@ databento::Metadata DbnDecoder::DecodeMetadataFields( res.has_mixed_stype_in = false; res.stype_in = static_cast(raw_stype_in); } - res.stype_out = static_cast(Consume(read_buffer_it)); - res.ts_out = static_cast(Consume(read_buffer_it)); + res.stype_out = static_cast(Consume(buffer)); + res.ts_out = static_cast(Consume(buffer)); if (version > 1) { res.symbol_cstr_len = - static_cast(Consume(read_buffer_it)); + static_cast(Consume(buffer)); } else { res.symbol_cstr_len = kSymbolCstrLenV1; } // skip reserved if (version == 1) { - read_buffer_it += kMetadataReservedLenV1; + buffer += kMetadataReservedLenV1; } else { - read_buffer_it += kMetadataReservedLen; + buffer += kMetadataReservedLen; } - const auto schema_definition_length = Consume(read_buffer_it); + const auto schema_definition_length = Consume(buffer); if (schema_definition_length != 0) { throw DbnResponseError{ "This version of dbn can't parse schema definitions"}; } - res.symbols = DbnDecoder::DecodeRepeatedSymbol(res.symbol_cstr_len, - read_buffer_it, buffer.cend()); - res.partial = DbnDecoder::DecodeRepeatedSymbol(res.symbol_cstr_len, - read_buffer_it, buffer.cend()); - res.not_found = DbnDecoder::DecodeRepeatedSymbol( - res.symbol_cstr_len, read_buffer_it, buffer.cend()); - res.mappings = DbnDecoder::DecodeSymbolMappings( - res.symbol_cstr_len, read_buffer_it, buffer.cend()); + res.symbols = + DbnDecoder::DecodeRepeatedSymbol(res.symbol_cstr_len, buffer, buffer_end); + res.partial = + DbnDecoder::DecodeRepeatedSymbol(res.symbol_cstr_len, buffer, buffer_end); + res.not_found = + DbnDecoder::DecodeRepeatedSymbol(res.symbol_cstr_len, buffer, buffer_end); + res.mappings = + DbnDecoder::DecodeSymbolMappings(res.symbol_cstr_len, buffer, buffer_end); return res; } @@ -196,7 +189,8 @@ databento::Metadata DbnDecoder::DecodeMetadata() { read_buffer_.resize(size); input_->ReadExact(read_buffer_.data(), read_buffer_.size()); buffer_idx_ = read_buffer_.size(); - auto metadata = DbnDecoder::DecodeMetadataFields(version_, read_buffer_); + auto metadata = DbnDecoder::DecodeMetadataFields( + version_, read_buffer_.data(), read_buffer_.data() + read_buffer_.size()); ts_out_ = metadata.ts_out; metadata.Upgrade(upgrade_policy_); return metadata; @@ -293,11 +287,11 @@ databento::RecordHeader* DbnDecoder::BufferRecordHeader() { bool DbnDecoder::DetectCompression() { read_buffer_.resize(kMagicSize); input_->ReadExact(read_buffer_.data(), kMagicSize); - auto read_buffer_it = read_buffer_.cbegin(); + const auto* read_buffer_it = read_buffer_.data(); if (std::strncmp(Consume(read_buffer_it, 3), kDbnPrefix, 3) == 0) { return false; } - read_buffer_it = read_buffer_.cbegin(); + read_buffer_it = read_buffer_.data(); auto x = Consume(read_buffer_it); if (x == kZstdMagicNumber) { return true; @@ -316,84 +310,79 @@ bool DbnDecoder::DetectCompression() { "DBN."}; } -std::string DbnDecoder::DecodeSymbol( - std::size_t symbol_cstr_len, - std::vector::const_iterator& read_buffer_it) { +std::string DbnDecoder::DecodeSymbol(std::size_t symbol_cstr_len, + std::byte const*& read_buffer_it) { return Consume(read_buffer_it, static_cast(symbol_cstr_len), "symbol"); } std::vector DbnDecoder::DecodeRepeatedSymbol( - std::size_t symbol_cstr_len, - std::vector::const_iterator& read_buffer_it, - std::vector::const_iterator read_buffer_end_it) { - if (read_buffer_it + sizeof(std::uint32_t) > read_buffer_end_it) { + std::size_t symbol_cstr_len, const std::byte*& read_buf, + const std::byte* read_buf_end) { + if (read_buf + sizeof(std::uint32_t) > read_buf_end) { throw DbnResponseError{ "Unexpected end of metadata buffer while parsing symbol"}; } - const auto count = std::size_t{Consume(read_buffer_it)}; - if (read_buffer_it + static_cast(count * symbol_cstr_len) > - read_buffer_end_it) { + const auto count = std::size_t{Consume(read_buf)}; + if (read_buf + static_cast(count * symbol_cstr_len) > + read_buf_end) { throw DbnResponseError{ "Unexpected end of metadata buffer while parsing symbol"}; } std::vector res; res.reserve(count); for (std::size_t i = 0; i < count; ++i) { - res.emplace_back(DecodeSymbol(symbol_cstr_len, read_buffer_it)); + res.emplace_back(DecodeSymbol(symbol_cstr_len, read_buf)); } return res; } std::vector DbnDecoder::DecodeSymbolMappings( - std::size_t symbol_cstr_len, - std::vector::const_iterator& read_buffer_it, - std::vector::const_iterator read_buffer_end_it) { - if (read_buffer_it + sizeof(std::uint32_t) > read_buffer_end_it) { + std::size_t symbol_cstr_len, const std::byte*& read_buf, + const std::byte* read_buf_end) { + if (read_buf + sizeof(std::uint32_t) > read_buf_end) { throw DbnResponseError{ "Unexpected end of metadata buffer while parsing mappings"}; } - const auto count = std::size_t{Consume(read_buffer_it)}; + const auto count = std::size_t{Consume(read_buf)}; std::vector res; res.reserve(count); for (std::size_t i = 0; i < count; ++i) { - res.emplace_back(DbnDecoder::DecodeSymbolMapping( - symbol_cstr_len, read_buffer_it, read_buffer_end_it)); + res.emplace_back(DbnDecoder::DecodeSymbolMapping(symbol_cstr_len, read_buf, + read_buf_end)); } return res; } databento::SymbolMapping DbnDecoder::DecodeSymbolMapping( - std::size_t symbol_cstr_len, - std::vector::const_iterator& read_buffer_it, - std::vector::const_iterator read_buffer_end_it) { + std::size_t symbol_cstr_len, const std::byte*& read_buf, + const std::byte* read_buf_end) { const auto min_symbol_mapping_encoded_len = static_cast(symbol_cstr_len + sizeof(std::uint32_t)); const auto mapping_encoded_len = sizeof(std::uint32_t) * 2 + symbol_cstr_len; - if (read_buffer_it + min_symbol_mapping_encoded_len > read_buffer_end_it) { + if (read_buf + min_symbol_mapping_encoded_len > read_buf_end) { throw DbnResponseError{ "Unexpected end of metadata buffer while parsing symbol " "mapping"}; } SymbolMapping res; - res.raw_symbol = DecodeSymbol(symbol_cstr_len, read_buffer_it); - const auto interval_count = - std::size_t{Consume(read_buffer_it)}; + res.raw_symbol = DecodeSymbol(symbol_cstr_len, read_buf); + const auto interval_count = std::size_t{Consume(read_buf)}; const auto read_size = static_cast(interval_count * mapping_encoded_len); - if (read_buffer_it + read_size > read_buffer_end_it) { + if (read_buf + read_size > read_buf_end) { throw DbnResponseError{ "Symbol mapping interval_count doesn't match size of buffer"}; } res.intervals.reserve(interval_count); for (std::size_t i = 0; i < interval_count; ++i) { MappingInterval interval; - auto raw_start_date = Consume(read_buffer_it); + auto raw_start_date = Consume(read_buf); interval.start_date = DecodeIso8601Date(raw_start_date); - auto raw_end_date = Consume(read_buffer_it); + auto raw_end_date = Consume(read_buf); interval.end_date = DecodeIso8601Date(raw_end_date); - interval.symbol = DecodeSymbol(symbol_cstr_len, read_buffer_it); + interval.symbol = DecodeSymbol(symbol_cstr_len, read_buf); res.intervals.emplace_back(std::move(interval)); } return res; diff --git a/src/detail/buffer.cpp b/src/detail/buffer.cpp new file mode 100644 index 0000000..a14c04c --- /dev/null +++ b/src/detail/buffer.cpp @@ -0,0 +1,73 @@ +#include "databento/detail/buffer.hpp" + +#include +#include + +#include "databento/exceptions.hpp" + +using databento::detail::Buffer; + +size_t Buffer::Write(const char* data, std::size_t length) { + return Write(reinterpret_cast(data), length); +} +size_t Buffer::Write(const std::byte* data, std::size_t length) { + if (length > WriteCapacity()) { + Shift(); + } + const auto write_size = std::min(WriteCapacity(), length); + std::copy(data, data + write_size, WriteBegin()); + WriteBegin() += write_size; + return write_size; +} + +void Buffer::WriteAll(const char* data, std::size_t length) { + WriteAll(reinterpret_cast(data), length); +} +void Buffer::WriteAll(const std::byte* data, std::size_t length) { + if (length > Capacity() - ReadCapacity()) { + Reserve(ReadCapacity() + length); + } else if (length >= WriteCapacity()) { + Shift(); + } + std::copy(data, data + length, WriteBegin()); + write_pos_ += length; +} + +void Buffer::ReadExact(std::byte* buffer, std::size_t length) { + if (length < ReadCapacity()) { + std::ostringstream err_msg; + err_msg << "Reached end of buffer without " << length << " bytes, only " + << ReadCapacity() << " bytes available"; + throw databento::Exception{err_msg.str()}; + } + ReadSome(buffer, length); +} + +std::size_t Buffer::ReadSome(std::byte* buffer, std::size_t max_length) { + const auto read_size = std::min(ReadCapacity(), max_length); + std::copy(ReadBegin(), ReadBegin() + read_size, buffer); + ReadBegin() += read_size; + return read_size; +} + +void Buffer::Reserve(std::size_t capacity) { + if (capacity <= Capacity()) { + return; + } + auto new_buf = std::make_unique(capacity); + const auto unread_bytes = ReadCapacity(); + std::copy(ReadBegin(), ReadEnd(), new_buf.get()); + buf_ = std::move(new_buf); + end_ = buf_.get() + capacity; + read_pos_ = buf_.get(); + write_pos_ = read_pos_ + unread_bytes; +} + +void Buffer::Shift() { + const auto unread_bytes = ReadCapacity(); + if (unread_bytes) { + std::copy(read_pos_, end_, buf_.get()); + } + read_pos_ = buf_.get(); + write_pos_ = read_pos_ + unread_bytes; +} diff --git a/src/detail/dbn_buffer_decoder.cpp b/src/detail/dbn_buffer_decoder.cpp new file mode 100644 index 0000000..7b31d42 --- /dev/null +++ b/src/detail/dbn_buffer_decoder.cpp @@ -0,0 +1,65 @@ +#include "databento/detail/dbn_buffer_decoder.hpp" + +#include "databento/dbn_decoder.hpp" +#include "databento/timeseries.hpp" +#include "dbn_constants.hpp" + +using databento::detail::DbnBufferDecoder; + +databento::KeepGoing DbnBufferDecoder::Process(const char* data, + std::size_t length) { + zstd_buffer_->WriteAll(data, length); + const auto read_size = zstd_stream_.ReadSome(dbn_buffer_.WriteBegin(), + dbn_buffer_.WriteCapacity()); + dbn_buffer_.WriteBegin() += read_size; + if (read_size == 0) { + return KeepGoing::Continue; + } + switch (state_) { + case DecoderState::Init: { + if (dbn_buffer_.ReadCapacity() < kMetadataPreludeSize) { + break; + } + std::tie(input_version_, bytes_needed_) = + DbnDecoder::DecodeMetadataVersionAndSize(dbn_buffer_.ReadBegin(), + dbn_buffer_.ReadCapacity()); + dbn_buffer_.ReadBegin() += kMetadataPreludeSize; + dbn_buffer_.Reserve(bytes_needed_); + state_ = DecoderState::Metadata; + [[fallthrough]]; + } + case DecoderState::Metadata: { + if (dbn_buffer_.ReadCapacity() < bytes_needed_) { + break; + } + auto metadata = DbnDecoder::DecodeMetadataFields( + input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd()); + dbn_buffer_.ReadBegin() += bytes_needed_; + ts_out_ = metadata.ts_out; + metadata.Upgrade(VersionUpgradePolicy::UpgradeToV2); + if (metadata_callback_) { + metadata_callback_(std::move(metadata)); + } + state_ = DecoderState::Records; + [[fallthrough]]; + } + case DecoderState::Records: { + while (dbn_buffer_.ReadCapacity() > 0) { + auto record = + Record{reinterpret_cast(dbn_buffer_.ReadBegin())}; + bytes_needed_ = record.Size(); + if (dbn_buffer_.ReadCapacity() < bytes_needed_) { + break; + } + record = DbnDecoder::DecodeRecordCompat( + input_version_, VersionUpgradePolicy::UpgradeToV2, ts_out_, + &compat_buffer_, record); + if (record_callback_(record) == KeepGoing::Stop) { + return KeepGoing::Stop; + } + dbn_buffer_.ReadBegin() += bytes_needed_; + } + } + } + return KeepGoing::Continue; +} diff --git a/src/detail/shared_channel.cpp b/src/detail/shared_channel.cpp deleted file mode 100644 index a3d027b..0000000 --- a/src/detail/shared_channel.cpp +++ /dev/null @@ -1,108 +0,0 @@ -#include "databento/detail/shared_channel.hpp" - -#include -#include -#include -#include // stringstream - -#include "databento/exceptions.hpp" // DbnResponseError - -namespace databento::detail { -class SharedChannel::Channel { - public: - Channel() = default; - Channel(const Channel&) = delete; - Channel& operator=(const Channel&) = delete; - Channel(Channel&&) = delete; - Channel& operator=(Channel&&) = delete; - ~Channel(); - - void Write(const std::byte* data, std::size_t length); - void Finish(); - // Read exactly `length` bytes - void ReadExact(std::byte* buffer, std::size_t length); - // Read at most `length` bytes. Returns the number of bytes read. Will only - // return 0 if the end of the stream is reached. - std::size_t ReadSome(std::byte* buffer, std::size_t length); - - private: - std::size_t Size(); - - // protects all other data members of this class - std::mutex mutex_; - // could use eofbit, but seekg clears this - bool is_finished_{false}; - std::condition_variable cv_; - std::stringstream stream_; -}; -} // namespace databento::detail - -using databento::detail::SharedChannel; - -SharedChannel::SharedChannel() : channel_{std::make_shared()} {} - -void SharedChannel::Write(const std::byte* data, std::size_t length) { - channel_->Write(data, length); -} - -void SharedChannel::Finish() { channel_->Finish(); } - -void SharedChannel::ReadExact(std::byte* buffer, std::size_t length) { - channel_->ReadExact(buffer, length); -} - -// Read at most `length` bytes. Returns the number of bytes read. Will only -// return 0 if the end of the stream is reached. -std::size_t SharedChannel::ReadSome(std::byte* buffer, std::size_t max_length) { - return channel_->ReadSome(buffer, max_length); -} - -SharedChannel::Channel::~Channel() { Finish(); } - -void SharedChannel::Channel::Write(const std::byte* data, std::size_t length) { - const std::lock_guard lock{mutex_}; - stream_.write(reinterpret_cast(data), - static_cast(length)); - cv_.notify_one(); -} - -void SharedChannel::Channel::Finish() { - const std::lock_guard lock{mutex_}; - is_finished_ = true; - cv_.notify_one(); -} - -void SharedChannel::Channel::ReadExact(std::byte* buffer, std::size_t length) { - std::unique_lock lock{mutex_}; - cv_.wait(lock, [this, length] { return Size() >= length || is_finished_; }); - if (Size() < length) { - std::ostringstream err_msg; - err_msg << "Reached end of the stream with only " << Size() - << " bytes remaining"; - throw DbnResponseError{err_msg.str()}; - } - stream_.read(reinterpret_cast(buffer), - static_cast(length)); -} - -std::size_t SharedChannel::Channel::ReadSome(std::byte* buffer, - std::size_t length) { - std::unique_lock lock{mutex_}; - cv_.wait(lock, [this] { return Size() > 0 || is_finished_; }); - if (Size() == 0) { - return 0; - } - stream_.read(reinterpret_cast(buffer), - static_cast(std::min(Size(), length))); - return static_cast(stream_.gcount()); -} - -// lock_ must be held to call this method -std::size_t SharedChannel::Channel::Size() { - const auto pos = stream_.tellg(); - stream_.seekg(0, std::ios::end); - const auto remaining = stream_.tellg() - pos; - // reset state - stream_.seekg(pos, std::ios::beg); - return static_cast(remaining); -} diff --git a/src/enums.cpp b/src/enums.cpp index 1ac00aa..26dad86 100644 --- a/src/enums.cpp +++ b/src/enums.cpp @@ -433,6 +433,9 @@ const char* ToString(MatchAlgorithm match_algorithm) { case match_algorithm::TimeProRata: { return "TimeProRata"; } + case match_algorithm::InstitutionalPrioritization: { + return "InstitutionalPrioritization"; + } default: { return "Unknown"; } diff --git a/src/historical.cpp b/src/historical.cpp index 0e289ed..75ee79f 100644 --- a/src/historical.cpp +++ b/src/historical.cpp @@ -4,10 +4,8 @@ #include #include // find_if -#include // atomic #include // size_t #include // get_env -#include // exception, exception_ptr #include #include // back_inserter #include @@ -18,15 +16,15 @@ #include "databento/datetime.hpp" #include "databento/dbn_decoder.hpp" #include "databento/dbn_file_store.hpp" +#include "databento/detail/dbn_buffer_decoder.hpp" #include "databento/detail/json_helpers.hpp" -#include "databento/detail/scoped_thread.hpp" -#include "databento/detail/shared_channel.hpp" #include "databento/enums.hpp" #include "databento/exceptions.hpp" // Exception, JsonResponseError #include "databento/file_stream.hpp" #include "databento/log.hpp" #include "databento/metadata.hpp" #include "databento/timeseries.hpp" +#include "dbn_constants.hpp" using databento::Historical; @@ -851,54 +849,23 @@ void Historical::TimeseriesGetRange( this->TimeseriesGetRange(params, metadata_callback, record_callback); } + +enum class DecoderState : std::uint8_t { + Init, + Metadata, + Records, +}; void Historical::TimeseriesGetRange(const HttplibParams& params, const MetadataCallback& metadata_callback, const RecordCallback& record_callback) { - std::atomic should_continue{true}; - detail::SharedChannel channel; - std::exception_ptr exception_ptr{}; - detail::ScopedThread stream{ - [this, &channel, &exception_ptr, ¶ms, &should_continue] { - try { - this->client_.GetRawStream( - kTimeseriesGetRangePath, params, - [channel, &should_continue](const char* data, - std::size_t length) mutable { - channel.Write(reinterpret_cast(data), length); - return should_continue.load(); - }); - channel.Finish(); - } catch (const std::exception&) { - channel.Finish(); - // rethrowing here will cause the process to be terminated - exception_ptr = std::current_exception(); - } - }}; - try { - DbnDecoder dbn_decoder{log_receiver_, channel}; - Metadata metadata = dbn_decoder.DecodeMetadata(); - if (metadata_callback) { - metadata_callback(std::move(metadata)); - } - const Record* record; - while ((record = dbn_decoder.DecodeRecord()) != nullptr) { - if (record_callback(*record) == KeepGoing::Stop) { - should_continue = false; - break; - } - } - } catch (const std::exception&) { - should_continue = false; - // wait for thread to finish before checking for exceptions - stream.Join(); - // check if there's an exception from stream thread. Thread safe because - // `stream` thread has been joined - if (exception_ptr) { - std::rethrow_exception(exception_ptr); - } - // otherwise rethrow original exception - throw; - } + detail::DbnBufferDecoder decoder{metadata_callback, record_callback}; + + this->client_.GetRawStream( + kTimeseriesGetRangePath, params, + [&decoder](const char* data, std::size_t length) mutable { + return decoder.Process(data, length) == KeepGoing::Continue; + }); + // FIXME: check if remaining partial records } static const std::string kTimeseriesGetRangeToFileEndpoint = diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 19a83e2..167b77f 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -137,12 +137,17 @@ void LiveBlocking::Subscribe(std::string_view sub_msg, databento::Metadata LiveBlocking::Start() { client_.WriteAll("start_session\n"); - client_.ReadExact(read_buffer_.data(), kMetadataPreludeSize); + client_.ReadExact(buffer_.WriteBegin(), kMetadataPreludeSize); + buffer_.WriteBegin() += kMetadataPreludeSize; const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize( - read_buffer_.data(), kMetadataPreludeSize); - std::vector meta_buffer(size); - client_.ReadExact(meta_buffer.data(), size); - auto metadata = DbnDecoder::DecodeMetadataFields(version, meta_buffer); + buffer_.ReadBegin(), kMetadataPreludeSize); + buffer_.ReadBegin() += kMetadataPreludeSize; + buffer_.Reserve(size); + client_.ReadExact(buffer_.WriteBegin(), size); + buffer_.WriteBegin() += size; + auto metadata = DbnDecoder::DecodeMetadataFields(version, buffer_.ReadBegin(), + buffer_.ReadEnd()); + buffer_.ReadBegin() += size; version_ = metadata.version; metadata.Upgrade(upgrade_policy_); return metadata; @@ -153,7 +158,7 @@ const databento::Record& LiveBlocking::NextRecord() { return *NextRecord({}); } const databento::Record* LiveBlocking::NextRecord( std::chrono::milliseconds timeout) { // need some unread_bytes - const auto unread_bytes = buffer_size_ - buffer_idx_; + const auto unread_bytes = buffer_.ReadCapacity(); if (unread_bytes == 0) { const auto read_res = FillBuffer(timeout); if (read_res.status == detail::TcpClient::Status::Timeout) { @@ -164,7 +169,7 @@ const databento::Record* LiveBlocking::NextRecord( } } // check length - while (buffer_size_ - buffer_idx_ < BufferRecordHeader()->Size()) { + while (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) { const auto read_res = FillBuffer(timeout); if (read_res.status == detail::TcpClient::Status::Timeout) { return nullptr; @@ -174,7 +179,7 @@ const databento::Record* LiveBlocking::NextRecord( } } current_record_ = Record{BufferRecordHeader()}; - buffer_idx_ += current_record_.Size(); + buffer_.ReadBegin() += current_record_.Size(); current_record_ = DbnDecoder::DecodeRecordCompat(version_, upgrade_policy_, send_ts_out_, &compat_buffer_, current_record_); @@ -208,14 +213,15 @@ void LiveBlocking::Resubscribe() { } std::string LiveBlocking::DecodeChallenge() { - buffer_size_ = - client_.ReadSome(read_buffer_.data(), read_buffer_.size()).read_size; - if (buffer_size_ == 0) { + const auto read_size = + client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size; + if (read_size == 0) { throw LiveApiError{"Gateway closed socket during authentication"}; } + buffer_.WriteBegin() += read_size; // first line is version - std::string response{reinterpret_cast(read_buffer_.data()), - buffer_size_}; + std::string response{reinterpret_cast(buffer_.ReadBegin()), + buffer_.ReadCapacity()}; { std::ostringstream log_ss; log_ss << "[LiveBlocking::DecodeChallenge] Challenge: " << response; @@ -232,15 +238,14 @@ std::string LiveBlocking::DecodeChallenge() { : response.find('\n', find_start); while (next_nl_pos == std::string::npos) { // read more - buffer_size_ += client_ - .ReadSome(&read_buffer_[buffer_size_], - read_buffer_.size() - buffer_size_) - .read_size; - if (buffer_size_ == 0) { + buffer_.WriteBegin() += + client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()) + .read_size; + if (buffer_.ReadCapacity() == 0) { throw LiveApiError{"Gateway closed socket during authentication"}; } - response = {reinterpret_cast(read_buffer_.data()), - buffer_size_}; + response = {reinterpret_cast(buffer_.ReadBegin()), + buffer_.ReadCapacity()}; next_nl_pos = response.find('\n', find_start); } const auto challenge_line = @@ -314,27 +319,24 @@ std::string LiveBlocking::EncodeAuthReq(std::string_view auth) { std::uint64_t LiveBlocking::DecodeAuthResp() { // handle split packet read - std::array::const_iterator nl_it; - buffer_size_ = 0; + const std::byte* newline_ptr; + buffer_.Clear(); do { - buffer_idx_ = buffer_size_; - const auto read_size = client_ - .ReadSome(&read_buffer_[buffer_idx_], - read_buffer_.size() - buffer_idx_) - .read_size; + const auto read_size = + client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()) + .read_size; if (read_size == 0) { throw LiveApiError{ "Unexpected end of message received from server after replying to " "CRAM"}; } - buffer_size_ += read_size; - nl_it = std::find(read_buffer_.begin() + buffer_idx_, - read_buffer_.begin() + buffer_size_, - static_cast('\n')); - } while (nl_it == read_buffer_.end()); + buffer_.WriteBegin() += read_size; + newline_ptr = std::find(buffer_.ReadBegin(), buffer_.ReadEnd(), + static_cast('\n')); + } while (newline_ptr == buffer_.ReadEnd()); const std::string response{ - reinterpret_cast(read_buffer_.data()), - static_cast(nl_it - read_buffer_.cbegin())}; + reinterpret_cast(buffer_.ReadBegin()), + static_cast(newline_ptr - buffer_.ReadBegin())}; { std::ostringstream log_ss; log_ss << "[LiveBlocking::DecodeAuthResp] Authentication response: " @@ -342,7 +344,7 @@ std::uint64_t LiveBlocking::DecodeAuthResp() { log_receiver_->Receive(LogLevel::Debug, log_ss.str()); } // set in case Read call also read records. One beyond newline - buffer_idx_ = response.length() + 1; + buffer_.ReadBegin() += response.length() + 1; std::size_t pos{}; bool found_success{}; @@ -401,17 +403,13 @@ void LiveBlocking::IncrementSubCounter() { databento::detail::TcpClient::Result LiveBlocking::FillBuffer( std::chrono::milliseconds timeout) { - // Shift data forward - std::copy(read_buffer_.cbegin() + static_cast(buffer_idx_), - read_buffer_.cend(), read_buffer_.begin()); - buffer_size_ -= buffer_idx_; - buffer_idx_ = 0; - const auto read_res = client_.ReadSome( - &read_buffer_[buffer_size_], read_buffer_.size() - buffer_size_, timeout); - buffer_size_ += read_res.read_size; + buffer_.Shift(); + const auto read_res = + client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout); + buffer_.WriteBegin() += read_res.read_size; return read_res; } databento::RecordHeader* LiveBlocking::BufferRecordHeader() { - return reinterpret_cast(&read_buffer_[buffer_idx_]); + return reinterpret_cast(buffer_.ReadBegin()); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d1868fa..c5f97ab 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -27,6 +27,7 @@ set( set( test_sources src/batch_tests.cpp + src/buffer_tests.cpp src/datetime_tests.cpp src/dbn_decoder_tests.cpp src/dbn_encoder_tests.cpp @@ -46,7 +47,6 @@ set( src/mock_tcp_server.cpp src/record_tests.cpp src/scoped_thread_tests.cpp - src/shared_channel_tests.cpp src/stream_op_helper_tests.cpp src/symbol_map_tests.cpp src/symbology_tests.cpp diff --git a/tests/src/buffer_tests.cpp b/tests/src/buffer_tests.cpp new file mode 100644 index 0000000..f006756 --- /dev/null +++ b/tests/src/buffer_tests.cpp @@ -0,0 +1,61 @@ +#include + +#include +#include + +#include "databento/detail/buffer.hpp" + +using namespace std::string_view_literals; + +namespace databento::detail::tests { +TEST(BufferTests, TestWriteAllPastCapacity) { + Buffer target{10}; + target.WriteBegin() += 4; + target.ReadBegin() += 2; + ASSERT_EQ(target.WriteCapacity(), 6); + ASSERT_EQ(target.ReadCapacity(), 2); + ASSERT_EQ(target.Capacity(), 10); + + target.WriteAll("TestWriteAllPastCapacity", 24); + ASSERT_EQ(target.WriteCapacity(), 0); + ASSERT_EQ(target.ReadCapacity(), 26); + ASSERT_EQ(target.Capacity(), 26); +} + +TEST(BufferTests, TestWriteAllShift) { + Buffer target{20}; + target.WriteAll("TestWriteAllShift", 17); + target.ReadBegin() += 4; + ASSERT_EQ(target.WriteCapacity(), 3); + ASSERT_EQ(target.ReadCapacity(), 13); + ASSERT_EQ(target.Capacity(), 20); + + target.WriteAll("Test", 4); + ASSERT_EQ(target.WriteCapacity(), 3); + ASSERT_EQ(target.ReadCapacity(), 17); + ASSERT_EQ(target.Capacity(), 20); +} + +TEST(BufferTests, TestWriteRead) { + Buffer target{10}; + target.WriteBegin() += 5; + target.ReadBegin() += 5; + const auto write_len = target.Write("BufferTests", 11); + ASSERT_EQ(write_len, 10); + std::array read_buf{}; + target.ReadExact(read_buf.data(), read_buf.size()); + std::string_view res{reinterpret_cast(read_buf.data()), + read_buf.size()}; + ASSERT_EQ(res, "BufferTest"sv); +} + +TEST(BufferTests, TestReserve) { + Buffer target{120}; + ASSERT_EQ(target.WriteCapacity(), 120); + ASSERT_EQ(target.ReadCapacity(), 0); + ASSERT_EQ(target.Capacity(), 120); + target.WriteAll("TestReserve", 11); + target.ReadBegin() += 4; +} + +} // namespace databento::detail::tests diff --git a/tests/src/dbn_decoder_tests.cpp b/tests/src/dbn_decoder_tests.cpp index 80973c0..e9bd1bb 100644 --- a/tests/src/dbn_decoder_tests.cpp +++ b/tests/src/dbn_decoder_tests.cpp @@ -19,7 +19,6 @@ #include "databento/dbn_decoder.hpp" #include "databento/dbn_encoder.hpp" #include "databento/detail/scoped_thread.hpp" -#include "databento/detail/shared_channel.hpp" #include "databento/detail/zstd_stream.hpp" #include "databento/enums.hpp" #include "databento/exceptions.hpp" @@ -36,9 +35,7 @@ namespace databento::tests { class DbnDecoderTests : public testing::Test { public: - detail::SharedChannel channel_; std::unique_ptr file_target_; - std::unique_ptr channel_target_; detail::ScopedThread write_thread_; std::unique_ptr logger_{std::make_unique()}; @@ -61,12 +58,7 @@ class DbnDecoderTests : public testing::Test { std::vector buffer(size); input_file.read(buffer.data(), static_cast(size)); ASSERT_EQ(input_file.gcount(), size); - channel_.Write(reinterpret_cast(buffer.data()), size); - channel_.Finish(); }}; - channel_target_ = std::make_unique( - logger_.get(), std::make_unique(channel_), - upgrade_policy); // File setup file_target_ = std::make_unique( logger_.get(), std::make_unique(file_path), @@ -85,6 +77,7 @@ class DbnDecoderTests : public testing::Test { } }; +// FIXME: update for only a single record template void AssertDefEq(const Record* ch_record, const Record* f_record) { ASSERT_TRUE(ch_record->Holds()); @@ -115,22 +108,21 @@ TEST_F(DbnDecoderTests, TestDecodeDbz) { TEST_F(DbnDecoderTests, TestDecodeDefinitionUpgrade) { ReadFromFile("definition", ".dbn", 1, VersionUpgradePolicy::UpgradeToV2); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, 2); - EXPECT_EQ(ch_metadata.dataset, dataset::kXnasItch); - EXPECT_EQ(ch_metadata.schema, Schema::Definition); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1633305600000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1641254400000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"MSFT"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - EXPECT_EQ(ch_metadata.mappings.size(), 1); - const auto& mapping = ch_metadata.mappings.at(0); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata, metadata); + EXPECT_EQ(metadata.version, 2); + EXPECT_EQ(metadata.dataset, dataset::kXnasItch); + EXPECT_EQ(metadata.schema, Schema::Definition); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1633305600000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1641254400000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"MSFT"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + EXPECT_EQ(metadata.mappings.size(), 1); + const auto& mapping = metadata.mappings.at(0); EXPECT_EQ(mapping.raw_symbol, "MSFT"); ASSERT_EQ(mapping.intervals.size(), 62); const auto& interval = mapping.intervals.at(0); @@ -138,17 +130,13 @@ TEST_F(DbnDecoderTests, TestDecodeDefinitionUpgrade) { EXPECT_EQ(interval.start_date, date::year{2021} / 10 / 4); EXPECT_EQ(interval.end_date, date::year{2021} / 10 / 5); - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - AssertDefEq(ch_record1, f_record1); - AssertDefEq(ch_record2, f_record2); + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + // AssertDefEq(ch_record1, f_record1); + // AssertDefEq(ch_record2, f_record2); } TEST_F(DbnDecoderTests, TestUpgradeSymbolMappingWithTsOut) { @@ -239,68 +227,57 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbo) { const auto version = GetParam().second; ReadFromFile("mbo", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Mbo); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_mbo1 = ch_record1->Get(); - const auto& f_mbo1 = f_record1->Get(); - EXPECT_EQ(ch_mbo1, f_mbo1); - EXPECT_EQ(ch_mbo1.hd.publisher_id, 1); - EXPECT_EQ(ch_mbo1.hd.instrument_id, 5482); - EXPECT_EQ(ch_mbo1.hd.ts_event.time_since_epoch().count(), - 1609160400000429831); - EXPECT_EQ(ch_mbo1.order_id, 647784973705); - EXPECT_EQ(ch_mbo1.price, 3722750000000); - EXPECT_EQ(ch_mbo1.size, 1); - EXPECT_EQ(ch_mbo1.flags.Raw(), 128); - EXPECT_EQ(ch_mbo1.channel_id, 0); - EXPECT_EQ(ch_mbo1.action, Action::Cancel); - EXPECT_EQ(ch_mbo1.side, Side::Ask); - EXPECT_EQ(ch_mbo1.ts_recv.time_since_epoch().count(), 1609160400000704060); - EXPECT_EQ(ch_mbo1.ts_in_delta.count(), 22993); - EXPECT_EQ(ch_mbo1.sequence, 1170352); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_mbo2 = ch_record2->Get(); - const auto& f_mbo2 = f_record2->Get(); - EXPECT_EQ(ch_mbo2, f_mbo2); - EXPECT_EQ(ch_mbo2.hd.publisher_id, 1); - EXPECT_EQ(ch_mbo2.hd.instrument_id, 5482); - EXPECT_EQ(ch_mbo2.hd.ts_event.time_since_epoch().count(), - 1609160400000431665); - EXPECT_EQ(ch_mbo2.order_id, 647784973631); - EXPECT_EQ(ch_mbo2.price, 3723000000000); - EXPECT_EQ(ch_mbo2.size, 1); - EXPECT_EQ(ch_mbo2.flags.Raw(), 128); - EXPECT_EQ(ch_mbo2.channel_id, 0); - EXPECT_EQ(ch_mbo2.action, Action::Cancel); - EXPECT_EQ(ch_mbo2.side, Side::Ask); - EXPECT_EQ(ch_mbo2.ts_recv.time_since_epoch().count(), 1609160400000711344); - EXPECT_EQ(ch_mbo2.ts_in_delta.count(), 19621); - EXPECT_EQ(ch_mbo2.sequence, 1170353); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata, metadata); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Mbo); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& mbo1 = record1->Get(); + EXPECT_EQ(mbo1, mbo1); + EXPECT_EQ(mbo1.hd.publisher_id, 1); + EXPECT_EQ(mbo1.hd.instrument_id, 5482); + EXPECT_EQ(mbo1.hd.ts_event.time_since_epoch().count(), 1609160400000429831); + EXPECT_EQ(mbo1.order_id, 647784973705); + EXPECT_EQ(mbo1.price, 3722750000000); + EXPECT_EQ(mbo1.size, 1); + EXPECT_EQ(mbo1.flags.Raw(), 128); + EXPECT_EQ(mbo1.channel_id, 0); + EXPECT_EQ(mbo1.action, Action::Cancel); + EXPECT_EQ(mbo1.side, Side::Ask); + EXPECT_EQ(mbo1.ts_recv.time_since_epoch().count(), 1609160400000704060); + EXPECT_EQ(mbo1.ts_in_delta.count(), 22993); + EXPECT_EQ(mbo1.sequence, 1170352); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& mbo2 = record2->Get(); + EXPECT_EQ(mbo2, mbo2); + EXPECT_EQ(mbo2.hd.publisher_id, 1); + EXPECT_EQ(mbo2.hd.instrument_id, 5482); + EXPECT_EQ(mbo2.hd.ts_event.time_since_epoch().count(), 1609160400000431665); + EXPECT_EQ(mbo2.order_id, 647784973631); + EXPECT_EQ(mbo2.price, 3723000000000); + EXPECT_EQ(mbo2.size, 1); + EXPECT_EQ(mbo2.flags.Raw(), 128); + EXPECT_EQ(mbo2.channel_id, 0); + EXPECT_EQ(mbo2.action, Action::Cancel); + EXPECT_EQ(mbo2.side, Side::Ask); + EXPECT_EQ(mbo2.ts_recv.time_since_epoch().count(), 1609160400000711344); + EXPECT_EQ(mbo2.ts_in_delta.count(), 19621); + EXPECT_EQ(mbo2.sequence, 1170353); } TEST_P(DbnDecoderSchemaTests, TestDecodeMbp1) { @@ -308,79 +285,67 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbp1) { const auto version = GetParam().second; ReadFromFile("mbp-1", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Mbp1); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_mbp1 = ch_record1->Get(); - const auto& f_mbp1 = f_record1->Get(); - EXPECT_EQ(ch_mbp1, f_mbp1); - EXPECT_EQ(ch_mbp1.hd.publisher_id, 1); - EXPECT_EQ(ch_mbp1.hd.instrument_id, 5482); - EXPECT_EQ(ch_mbp1.hd.ts_event.time_since_epoch().count(), - 1609160400006001487); - EXPECT_EQ(ch_mbp1.price, 3720500000000); - EXPECT_EQ(ch_mbp1.size, 1); - EXPECT_EQ(ch_mbp1.action, Action::Add); - EXPECT_EQ(ch_mbp1.side, Side::Ask); - EXPECT_EQ(ch_mbp1.flags.Raw(), 128); - EXPECT_EQ(ch_mbp1.depth, 0); - EXPECT_EQ(ch_mbp1.ts_recv.time_since_epoch().count(), 1609160400006136329); - EXPECT_EQ(ch_mbp1.ts_in_delta.count(), 17214); - EXPECT_EQ(ch_mbp1.sequence, 1170362); - EXPECT_EQ(ch_mbp1.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_mbp1.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_mbp1.levels[0].bid_sz, 24); - EXPECT_EQ(ch_mbp1.levels[0].ask_sz, 11); - EXPECT_EQ(ch_mbp1.levels[0].bid_ct, 15); - EXPECT_EQ(ch_mbp1.levels[0].ask_ct, 9); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_mbp2 = ch_record2->Get(); - const auto& f_mbp2 = f_record2->Get(); - EXPECT_EQ(ch_mbp2, f_mbp2); - EXPECT_EQ(ch_mbp2.hd.publisher_id, 1); - EXPECT_EQ(ch_mbp2.hd.instrument_id, 5482); - EXPECT_EQ(ch_mbp2.hd.ts_event.time_since_epoch().count(), - 1609160400006146661); - EXPECT_EQ(ch_mbp2.price, 3720500000000); - EXPECT_EQ(ch_mbp2.size, 1); - EXPECT_EQ(ch_mbp2.action, Action::Add); - EXPECT_EQ(ch_mbp2.side, Side::Ask); - EXPECT_EQ(ch_mbp2.flags.Raw(), 128); - EXPECT_EQ(ch_mbp2.depth, 0); - EXPECT_EQ(ch_mbp2.ts_recv.time_since_epoch().count(), 1609160400006246513); - EXPECT_EQ(ch_mbp2.ts_in_delta.count(), 18858); - EXPECT_EQ(ch_mbp2.sequence, 1170364); - EXPECT_EQ(ch_mbp2.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_mbp2.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_mbp2.levels[0].bid_sz, 24); - EXPECT_EQ(ch_mbp2.levels[0].ask_sz, 12); - EXPECT_EQ(ch_mbp2.levels[0].bid_ct, 15); - EXPECT_EQ(ch_mbp2.levels[0].ask_ct, 10); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata, metadata); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Mbp1); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& mbp1 = record1->Get(); + EXPECT_EQ(mbp1.hd.publisher_id, 1); + EXPECT_EQ(mbp1.hd.instrument_id, 5482); + EXPECT_EQ(mbp1.hd.ts_event.time_since_epoch().count(), 1609160400006001487); + EXPECT_EQ(mbp1.price, 3720500000000); + EXPECT_EQ(mbp1.size, 1); + EXPECT_EQ(mbp1.action, Action::Add); + EXPECT_EQ(mbp1.side, Side::Ask); + EXPECT_EQ(mbp1.flags.Raw(), 128); + EXPECT_EQ(mbp1.depth, 0); + EXPECT_EQ(mbp1.ts_recv.time_since_epoch().count(), 1609160400006136329); + EXPECT_EQ(mbp1.ts_in_delta.count(), 17214); + EXPECT_EQ(mbp1.sequence, 1170362); + EXPECT_EQ(mbp1.levels[0].bid_px, 3720250000000); + EXPECT_EQ(mbp1.levels[0].ask_px, 3720500000000); + EXPECT_EQ(mbp1.levels[0].bid_sz, 24); + EXPECT_EQ(mbp1.levels[0].ask_sz, 11); + EXPECT_EQ(mbp1.levels[0].bid_ct, 15); + EXPECT_EQ(mbp1.levels[0].ask_ct, 9); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& mbp2 = record2->Get(); + EXPECT_EQ(mbp2, mbp2); + EXPECT_EQ(mbp2.hd.publisher_id, 1); + EXPECT_EQ(mbp2.hd.instrument_id, 5482); + EXPECT_EQ(mbp2.hd.ts_event.time_since_epoch().count(), 1609160400006146661); + EXPECT_EQ(mbp2.price, 3720500000000); + EXPECT_EQ(mbp2.size, 1); + EXPECT_EQ(mbp2.action, Action::Add); + EXPECT_EQ(mbp2.side, Side::Ask); + EXPECT_EQ(mbp2.flags.Raw(), 128); + EXPECT_EQ(mbp2.depth, 0); + EXPECT_EQ(mbp2.ts_recv.time_since_epoch().count(), 1609160400006246513); + EXPECT_EQ(mbp2.ts_in_delta.count(), 18858); + EXPECT_EQ(mbp2.sequence, 1170364); + EXPECT_EQ(mbp2.levels[0].bid_px, 3720250000000); + EXPECT_EQ(mbp2.levels[0].ask_px, 3720500000000); + EXPECT_EQ(mbp2.levels[0].bid_sz, 24); + EXPECT_EQ(mbp2.levels[0].ask_sz, 12); + EXPECT_EQ(mbp2.levels[0].bid_ct, 15); + EXPECT_EQ(mbp2.levels[0].ask_ct, 10); } TEST_P(DbnDecoderSchemaTests, TestDecodeMbp10) { @@ -388,103 +353,90 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbp10) { const auto version = GetParam().second; ReadFromFile("mbp-10", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Mbp10); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_mbp1 = ch_record1->Get(); - const auto& f_mbp1 = f_record1->Get(); - EXPECT_EQ(ch_mbp1, f_mbp1); - EXPECT_EQ(ch_mbp1.hd.publisher_id, 1); - EXPECT_EQ(ch_mbp1.hd.instrument_id, 5482); - EXPECT_EQ(ch_mbp1.hd.ts_event.time_since_epoch().count(), - 1609160400000429831); - EXPECT_EQ(ch_mbp1.price, 3722750000000); - EXPECT_EQ(ch_mbp1.size, 1); - EXPECT_EQ(ch_mbp1.action, Action::Cancel); - EXPECT_EQ(ch_mbp1.side, Side::Ask); - EXPECT_EQ(ch_mbp1.flags.Raw(), 128); - EXPECT_EQ(ch_mbp1.depth, 9); - EXPECT_EQ(ch_mbp1.ts_recv.time_since_epoch().count(), 1609160400000704060); - EXPECT_EQ(ch_mbp1.ts_in_delta.count(), 22993); - EXPECT_EQ(ch_mbp1.sequence, 1170352); - EXPECT_EQ(ch_mbp1.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_mbp1.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_mbp1.levels[0].bid_sz, 24); - EXPECT_EQ(ch_mbp1.levels[0].ask_sz, 10); - EXPECT_EQ(ch_mbp1.levels[0].bid_ct, 15); - EXPECT_EQ(ch_mbp1.levels[0].ask_ct, 8); - EXPECT_EQ(ch_mbp1.levels[1].bid_px, 3720000000000); - EXPECT_EQ(ch_mbp1.levels[1].ask_px, 3720750000000); - EXPECT_EQ(ch_mbp1.levels[1].bid_sz, 31); - EXPECT_EQ(ch_mbp1.levels[1].ask_sz, 34); - EXPECT_EQ(ch_mbp1.levels[1].bid_ct, 18); - EXPECT_EQ(ch_mbp1.levels[1].ask_ct, 24); - EXPECT_EQ(ch_mbp1.levels[2].bid_px, 3719750000000); - EXPECT_EQ(ch_mbp1.levels[2].ask_px, 3721000000000); - EXPECT_EQ(ch_mbp1.levels[2].bid_sz, 32); - EXPECT_EQ(ch_mbp1.levels[2].ask_sz, 39); - EXPECT_EQ(ch_mbp1.levels[2].bid_ct, 23); - EXPECT_EQ(ch_mbp1.levels[2].ask_ct, 25); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_mbp2 = ch_record2->Get(); - const auto& f_mbp2 = f_record2->Get(); - EXPECT_EQ(ch_mbp2, f_mbp2); - EXPECT_EQ(ch_mbp2.hd.publisher_id, 1); - EXPECT_EQ(ch_mbp2.hd.instrument_id, 5482); - EXPECT_EQ(ch_mbp2.hd.ts_event.time_since_epoch().count(), - 1609160400000435673); - EXPECT_EQ(ch_mbp2.price, 3720000000000); - EXPECT_EQ(ch_mbp2.size, 1); - EXPECT_EQ(ch_mbp2.action, Action::Cancel); - EXPECT_EQ(ch_mbp2.side, Side::Bid); - EXPECT_EQ(ch_mbp2.flags.Raw(), 128); - EXPECT_EQ(ch_mbp2.depth, 1); - EXPECT_EQ(ch_mbp2.ts_recv.time_since_epoch().count(), 1609160400000750544); - EXPECT_EQ(ch_mbp2.ts_in_delta.count(), 20625); - EXPECT_EQ(ch_mbp2.sequence, 1170356); - EXPECT_EQ(ch_mbp2.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_mbp2.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_mbp2.levels[0].bid_sz, 24); - EXPECT_EQ(ch_mbp2.levels[0].ask_sz, 10); - EXPECT_EQ(ch_mbp2.levels[0].bid_ct, 15); - EXPECT_EQ(ch_mbp2.levels[0].ask_ct, 8); - EXPECT_EQ(ch_mbp2.levels[1].bid_px, 3720000000000); - EXPECT_EQ(ch_mbp2.levels[1].ask_px, 3720750000000); - EXPECT_EQ(ch_mbp2.levels[1].bid_sz, 30); - EXPECT_EQ(ch_mbp2.levels[1].ask_sz, 34); - EXPECT_EQ(ch_mbp2.levels[1].bid_ct, 17); - EXPECT_EQ(ch_mbp2.levels[1].ask_ct, 24); - EXPECT_EQ(ch_mbp2.levels[2].bid_px, 3719750000000); - EXPECT_EQ(ch_mbp2.levels[2].ask_px, 3721000000000); - EXPECT_EQ(ch_mbp2.levels[2].bid_sz, 32); - EXPECT_EQ(ch_mbp2.levels[2].ask_sz, 39); - EXPECT_EQ(ch_mbp2.levels[2].bid_ct, 23); - EXPECT_EQ(ch_mbp2.levels[2].ask_ct, 25); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Mbp10); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& mbp1 = record1->Get(); + EXPECT_EQ(mbp1.hd.publisher_id, 1); + EXPECT_EQ(mbp1.hd.instrument_id, 5482); + EXPECT_EQ(mbp1.hd.ts_event.time_since_epoch().count(), 1609160400000429831); + EXPECT_EQ(mbp1.price, 3722750000000); + EXPECT_EQ(mbp1.size, 1); + EXPECT_EQ(mbp1.action, Action::Cancel); + EXPECT_EQ(mbp1.side, Side::Ask); + EXPECT_EQ(mbp1.flags.Raw(), 128); + EXPECT_EQ(mbp1.depth, 9); + EXPECT_EQ(mbp1.ts_recv.time_since_epoch().count(), 1609160400000704060); + EXPECT_EQ(mbp1.ts_in_delta.count(), 22993); + EXPECT_EQ(mbp1.sequence, 1170352); + EXPECT_EQ(mbp1.levels[0].bid_px, 3720250000000); + EXPECT_EQ(mbp1.levels[0].ask_px, 3720500000000); + EXPECT_EQ(mbp1.levels[0].bid_sz, 24); + EXPECT_EQ(mbp1.levels[0].ask_sz, 10); + EXPECT_EQ(mbp1.levels[0].bid_ct, 15); + EXPECT_EQ(mbp1.levels[0].ask_ct, 8); + EXPECT_EQ(mbp1.levels[1].bid_px, 3720000000000); + EXPECT_EQ(mbp1.levels[1].ask_px, 3720750000000); + EXPECT_EQ(mbp1.levels[1].bid_sz, 31); + EXPECT_EQ(mbp1.levels[1].ask_sz, 34); + EXPECT_EQ(mbp1.levels[1].bid_ct, 18); + EXPECT_EQ(mbp1.levels[1].ask_ct, 24); + EXPECT_EQ(mbp1.levels[2].bid_px, 3719750000000); + EXPECT_EQ(mbp1.levels[2].ask_px, 3721000000000); + EXPECT_EQ(mbp1.levels[2].bid_sz, 32); + EXPECT_EQ(mbp1.levels[2].ask_sz, 39); + EXPECT_EQ(mbp1.levels[2].bid_ct, 23); + EXPECT_EQ(mbp1.levels[2].ask_ct, 25); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& mbp2 = record2->Get(); + EXPECT_EQ(mbp2, mbp2); + EXPECT_EQ(mbp2.hd.publisher_id, 1); + EXPECT_EQ(mbp2.hd.instrument_id, 5482); + EXPECT_EQ(mbp2.hd.ts_event.time_since_epoch().count(), 1609160400000435673); + EXPECT_EQ(mbp2.price, 3720000000000); + EXPECT_EQ(mbp2.size, 1); + EXPECT_EQ(mbp2.action, Action::Cancel); + EXPECT_EQ(mbp2.side, Side::Bid); + EXPECT_EQ(mbp2.flags.Raw(), 128); + EXPECT_EQ(mbp2.depth, 1); + EXPECT_EQ(mbp2.ts_recv.time_since_epoch().count(), 1609160400000750544); + EXPECT_EQ(mbp2.ts_in_delta.count(), 20625); + EXPECT_EQ(mbp2.sequence, 1170356); + EXPECT_EQ(mbp2.levels[0].bid_px, 3720250000000); + EXPECT_EQ(mbp2.levels[0].ask_px, 3720500000000); + EXPECT_EQ(mbp2.levels[0].bid_sz, 24); + EXPECT_EQ(mbp2.levels[0].ask_sz, 10); + EXPECT_EQ(mbp2.levels[0].bid_ct, 15); + EXPECT_EQ(mbp2.levels[0].ask_ct, 8); + EXPECT_EQ(mbp2.levels[1].bid_px, 3720000000000); + EXPECT_EQ(mbp2.levels[1].ask_px, 3720750000000); + EXPECT_EQ(mbp2.levels[1].bid_sz, 30); + EXPECT_EQ(mbp2.levels[1].ask_sz, 34); + EXPECT_EQ(mbp2.levels[1].bid_ct, 17); + EXPECT_EQ(mbp2.levels[1].ask_ct, 24); + EXPECT_EQ(mbp2.levels[2].bid_px, 3719750000000); + EXPECT_EQ(mbp2.levels[2].ask_px, 3721000000000); + EXPECT_EQ(mbp2.levels[2].bid_sz, 32); + EXPECT_EQ(mbp2.levels[2].ask_sz, 39); + EXPECT_EQ(mbp2.levels[2].bid_ct, 23); + EXPECT_EQ(mbp2.levels[2].ask_ct, 25); } TEST_P(DbnDecoderSchemaTests, TestDecodeCmbp1) { @@ -492,75 +444,61 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeCmbp1) { const auto version = GetParam().second; ReadFromFile("cmbp-1", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Cmbp1); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_cbbo1 = ch_record1->Get(); - const auto& f_mbp1 = f_record1->Get(); - EXPECT_EQ(ch_cbbo1, f_mbp1); - EXPECT_EQ(ch_cbbo1.hd.publisher_id, 1); - EXPECT_EQ(ch_cbbo1.hd.instrument_id, 5482); - EXPECT_EQ(ch_cbbo1.hd.ts_event.time_since_epoch().count(), - 1609160400006001487); - EXPECT_EQ(ch_cbbo1.price, 3720500000000); - EXPECT_EQ(ch_cbbo1.size, 1); - EXPECT_EQ(ch_cbbo1.action, Action::Add); - EXPECT_EQ(ch_cbbo1.side, Side::Ask); - EXPECT_EQ(ch_cbbo1.flags.Raw(), 128); - EXPECT_EQ(ch_cbbo1.ts_recv.time_since_epoch().count(), 1609160400006136329); - EXPECT_EQ(ch_cbbo1.ts_in_delta.count(), 17214); - EXPECT_EQ(ch_cbbo1.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_cbbo1.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_cbbo1.levels[0].bid_sz, 24); - EXPECT_EQ(ch_cbbo1.levels[0].ask_sz, 11); - EXPECT_EQ(ch_cbbo1.levels[0].bid_pb, 1); - EXPECT_EQ(ch_cbbo1.levels[0].ask_pb, 1); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_cbbo2 = ch_record2->Get(); - const auto& f_mbp2 = f_record2->Get(); - EXPECT_EQ(ch_cbbo2, f_mbp2); - EXPECT_EQ(ch_cbbo2.hd.publisher_id, 1); - EXPECT_EQ(ch_cbbo2.hd.instrument_id, 5482); - EXPECT_EQ(ch_cbbo2.hd.ts_event.time_since_epoch().count(), - 1609160400006146661); - EXPECT_EQ(ch_cbbo2.price, 3720500000000); - EXPECT_EQ(ch_cbbo2.size, 1); - EXPECT_EQ(ch_cbbo2.action, Action::Add); - EXPECT_EQ(ch_cbbo2.side, Side::Ask); - EXPECT_EQ(ch_cbbo2.flags.Raw(), 128); - EXPECT_EQ(ch_cbbo2.ts_recv.time_since_epoch().count(), 1609160400006246513); - EXPECT_EQ(ch_cbbo2.ts_in_delta.count(), 18858); - EXPECT_EQ(ch_cbbo2.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_cbbo2.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_cbbo2.levels[0].bid_sz, 24); - EXPECT_EQ(ch_cbbo2.levels[0].ask_sz, 12); - EXPECT_EQ(ch_cbbo2.levels[0].bid_pb, 1); - EXPECT_EQ(ch_cbbo2.levels[0].ask_pb, 1); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Cmbp1); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& cmbp1 = record1->Get(); + EXPECT_EQ(cmbp1.hd.publisher_id, 1); + EXPECT_EQ(cmbp1.hd.instrument_id, 5482); + EXPECT_EQ(cmbp1.hd.ts_event.time_since_epoch().count(), 1609160400006001487); + EXPECT_EQ(cmbp1.price, 3720500000000); + EXPECT_EQ(cmbp1.size, 1); + EXPECT_EQ(cmbp1.action, Action::Add); + EXPECT_EQ(cmbp1.side, Side::Ask); + EXPECT_EQ(cmbp1.flags.Raw(), 128); + EXPECT_EQ(cmbp1.ts_recv.time_since_epoch().count(), 1609160400006136329); + EXPECT_EQ(cmbp1.ts_in_delta.count(), 17214); + EXPECT_EQ(cmbp1.levels[0].bid_px, 3720250000000); + EXPECT_EQ(cmbp1.levels[0].ask_px, 3720500000000); + EXPECT_EQ(cmbp1.levels[0].bid_sz, 24); + EXPECT_EQ(cmbp1.levels[0].ask_sz, 11); + EXPECT_EQ(cmbp1.levels[0].bid_pb, 1); + EXPECT_EQ(cmbp1.levels[0].ask_pb, 1); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& cmbp2 = record2->Get(); + EXPECT_EQ(cmbp2.hd.publisher_id, 1); + EXPECT_EQ(cmbp2.hd.instrument_id, 5482); + EXPECT_EQ(cmbp2.hd.ts_event.time_since_epoch().count(), 1609160400006146661); + EXPECT_EQ(cmbp2.price, 3720500000000); + EXPECT_EQ(cmbp2.size, 1); + EXPECT_EQ(cmbp2.action, Action::Add); + EXPECT_EQ(cmbp2.side, Side::Ask); + EXPECT_EQ(cmbp2.flags.Raw(), 128); + EXPECT_EQ(cmbp2.ts_recv.time_since_epoch().count(), 1609160400006246513); + EXPECT_EQ(cmbp2.ts_in_delta.count(), 18858); + EXPECT_EQ(cmbp2.levels[0].bid_px, 3720250000000); + EXPECT_EQ(cmbp2.levels[0].ask_px, 3720500000000); + EXPECT_EQ(cmbp2.levels[0].bid_sz, 24); + EXPECT_EQ(cmbp2.levels[0].ask_sz, 12); + EXPECT_EQ(cmbp2.levels[0].bid_pb, 1); + EXPECT_EQ(cmbp2.levels[0].ask_pb, 1); } TEST_P(DbnDecoderSchemaTests, TestDecodeCbbo) { @@ -568,71 +506,57 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeCbbo) { const auto version = GetParam().second; ReadFromFile("cbbo", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Cbbo1S); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_cbbo1 = ch_record1->Get(); - const auto& f_mbp1 = f_record1->Get(); - EXPECT_EQ(ch_cbbo1, f_mbp1); - EXPECT_EQ(ch_cbbo1.hd.publisher_id, 1); - EXPECT_EQ(ch_cbbo1.hd.instrument_id, 5482); - EXPECT_EQ(ch_cbbo1.hd.ts_event.time_since_epoch().count(), - 1609160400006001487); - EXPECT_EQ(ch_cbbo1.price, 3720500000000); - EXPECT_EQ(ch_cbbo1.size, 1); - EXPECT_EQ(ch_cbbo1.side, Side::Ask); - EXPECT_EQ(ch_cbbo1.flags.Raw(), 128); - EXPECT_EQ(ch_cbbo1.ts_recv.time_since_epoch().count(), 1609160400006136329); - EXPECT_EQ(ch_cbbo1.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_cbbo1.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_cbbo1.levels[0].bid_sz, 24); - EXPECT_EQ(ch_cbbo1.levels[0].ask_sz, 11); - EXPECT_EQ(ch_cbbo1.levels[0].bid_pb, 1); - EXPECT_EQ(ch_cbbo1.levels[0].ask_pb, 1); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_cbbo2 = ch_record2->Get(); - const auto& f_mbp2 = f_record2->Get(); - EXPECT_EQ(ch_cbbo2, f_mbp2); - EXPECT_EQ(ch_cbbo2.hd.publisher_id, 1); - EXPECT_EQ(ch_cbbo2.hd.instrument_id, 5482); - EXPECT_EQ(ch_cbbo2.hd.ts_event.time_since_epoch().count(), - 1609160400006146661); - EXPECT_EQ(ch_cbbo2.price, 3720500000000); - EXPECT_EQ(ch_cbbo2.size, 1); - EXPECT_EQ(ch_cbbo2.side, Side::Ask); - EXPECT_EQ(ch_cbbo2.flags.Raw(), 128); - EXPECT_EQ(ch_cbbo2.ts_recv.time_since_epoch().count(), 1609160400006246513); - EXPECT_EQ(ch_cbbo2.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_cbbo2.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_cbbo2.levels[0].bid_sz, 24); - EXPECT_EQ(ch_cbbo2.levels[0].ask_sz, 12); - EXPECT_EQ(ch_cbbo2.levels[0].bid_pb, 1); - EXPECT_EQ(ch_cbbo2.levels[0].ask_pb, 1); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Cbbo1S); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& cbbo1 = record1->Get(); + EXPECT_EQ(cbbo1.hd.publisher_id, 1); + EXPECT_EQ(cbbo1.hd.instrument_id, 5482); + EXPECT_EQ(cbbo1.hd.ts_event.time_since_epoch().count(), 1609160400006001487); + EXPECT_EQ(cbbo1.price, 3720500000000); + EXPECT_EQ(cbbo1.size, 1); + EXPECT_EQ(cbbo1.side, Side::Ask); + EXPECT_EQ(cbbo1.flags.Raw(), 128); + EXPECT_EQ(cbbo1.ts_recv.time_since_epoch().count(), 1609160400006136329); + EXPECT_EQ(cbbo1.levels[0].bid_px, 3720250000000); + EXPECT_EQ(cbbo1.levels[0].ask_px, 3720500000000); + EXPECT_EQ(cbbo1.levels[0].bid_sz, 24); + EXPECT_EQ(cbbo1.levels[0].ask_sz, 11); + EXPECT_EQ(cbbo1.levels[0].bid_pb, 1); + EXPECT_EQ(cbbo1.levels[0].ask_pb, 1); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& cbbo2 = record2->Get(); + EXPECT_EQ(cbbo2.hd.publisher_id, 1); + EXPECT_EQ(cbbo2.hd.instrument_id, 5482); + EXPECT_EQ(cbbo2.hd.ts_event.time_since_epoch().count(), 1609160400006146661); + EXPECT_EQ(cbbo2.price, 3720500000000); + EXPECT_EQ(cbbo2.size, 1); + EXPECT_EQ(cbbo2.side, Side::Ask); + EXPECT_EQ(cbbo2.flags.Raw(), 128); + EXPECT_EQ(cbbo2.ts_recv.time_since_epoch().count(), 1609160400006246513); + EXPECT_EQ(cbbo2.levels[0].bid_px, 3720250000000); + EXPECT_EQ(cbbo2.levels[0].ask_px, 3720500000000); + EXPECT_EQ(cbbo2.levels[0].bid_sz, 24); + EXPECT_EQ(cbbo2.levels[0].ask_sz, 12); + EXPECT_EQ(cbbo2.levels[0].bid_pb, 1); + EXPECT_EQ(cbbo2.levels[0].ask_pb, 1); } TEST_P(DbnDecoderSchemaTests, TestDecodeTbbo) { @@ -640,79 +564,65 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeTbbo) { const auto version = GetParam().second; ReadFromFile("tbbo", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Tbbo); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_tbbo1 = ch_record1->Get(); - const auto& f_tbbo1 = f_record1->Get(); - EXPECT_EQ(ch_tbbo1, f_tbbo1); - EXPECT_EQ(ch_tbbo1.hd.publisher_id, 1); - EXPECT_EQ(ch_tbbo1.hd.instrument_id, 5482); - EXPECT_EQ(ch_tbbo1.hd.ts_event.time_since_epoch().count(), - 1609160400098821953); - EXPECT_EQ(ch_tbbo1.price, 3720250000000); - EXPECT_EQ(ch_tbbo1.size, 5); - EXPECT_EQ(ch_tbbo1.action, Action::Trade); - EXPECT_EQ(ch_tbbo1.side, Side::Ask); - EXPECT_EQ(ch_tbbo1.flags.Raw(), 129); - EXPECT_EQ(ch_tbbo1.depth, 0); - EXPECT_EQ(ch_tbbo1.ts_recv.time_since_epoch().count(), 1609160400099150057); - EXPECT_EQ(ch_tbbo1.ts_in_delta.count(), 19251); - EXPECT_EQ(ch_tbbo1.sequence, 1170380); - EXPECT_EQ(ch_tbbo1.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_tbbo1.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_tbbo1.levels[0].bid_sz, 26); - EXPECT_EQ(ch_tbbo1.levels[0].ask_sz, 7); - EXPECT_EQ(ch_tbbo1.levels[0].bid_ct, 16); - EXPECT_EQ(ch_tbbo1.levels[0].ask_ct, 6); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_tbbo2 = ch_record2->Get(); - const auto& f_tbbo2 = f_record2->Get(); - EXPECT_EQ(ch_tbbo2, f_tbbo2); - EXPECT_EQ(ch_tbbo2.hd.publisher_id, 1); - EXPECT_EQ(ch_tbbo2.hd.instrument_id, 5482); - EXPECT_EQ(ch_tbbo2.hd.ts_event.time_since_epoch().count(), - 1609160400107665963); - EXPECT_EQ(ch_tbbo2.price, 3720250000000); - EXPECT_EQ(ch_tbbo2.size, 21); - EXPECT_EQ(ch_tbbo2.action, Action::Trade); - EXPECT_EQ(ch_tbbo2.side, Side::Ask); - EXPECT_EQ(ch_tbbo2.flags.Raw(), 129); - EXPECT_EQ(ch_tbbo2.depth, 0); - EXPECT_EQ(ch_tbbo2.ts_recv.time_since_epoch().count(), 1609160400108142648); - EXPECT_EQ(ch_tbbo2.ts_in_delta.count(), 20728); - EXPECT_EQ(ch_tbbo2.sequence, 1170414); - EXPECT_EQ(ch_tbbo2.levels[0].bid_px, 3720250000000); - EXPECT_EQ(ch_tbbo2.levels[0].ask_px, 3720500000000); - EXPECT_EQ(ch_tbbo2.levels[0].bid_sz, 21); - EXPECT_EQ(ch_tbbo2.levels[0].ask_sz, 22); - EXPECT_EQ(ch_tbbo2.levels[0].bid_ct, 13); - EXPECT_EQ(ch_tbbo2.levels[0].ask_ct, 15); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Tbbo); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& tbbo1 = record1->Get(); + EXPECT_EQ(tbbo1.hd.publisher_id, 1); + EXPECT_EQ(tbbo1.hd.instrument_id, 5482); + EXPECT_EQ(tbbo1.hd.ts_event.time_since_epoch().count(), 1609160400098821953); + EXPECT_EQ(tbbo1.price, 3720250000000); + EXPECT_EQ(tbbo1.size, 5); + EXPECT_EQ(tbbo1.action, Action::Trade); + EXPECT_EQ(tbbo1.side, Side::Ask); + EXPECT_EQ(tbbo1.flags.Raw(), 129); + EXPECT_EQ(tbbo1.depth, 0); + EXPECT_EQ(tbbo1.ts_recv.time_since_epoch().count(), 1609160400099150057); + EXPECT_EQ(tbbo1.ts_in_delta.count(), 19251); + EXPECT_EQ(tbbo1.sequence, 1170380); + EXPECT_EQ(tbbo1.levels[0].bid_px, 3720250000000); + EXPECT_EQ(tbbo1.levels[0].ask_px, 3720500000000); + EXPECT_EQ(tbbo1.levels[0].bid_sz, 26); + EXPECT_EQ(tbbo1.levels[0].ask_sz, 7); + EXPECT_EQ(tbbo1.levels[0].bid_ct, 16); + EXPECT_EQ(tbbo1.levels[0].ask_ct, 6); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& tbbo2 = record2->Get(); + EXPECT_EQ(tbbo2.hd.publisher_id, 1); + EXPECT_EQ(tbbo2.hd.instrument_id, 5482); + EXPECT_EQ(tbbo2.hd.ts_event.time_since_epoch().count(), 1609160400107665963); + EXPECT_EQ(tbbo2.price, 3720250000000); + EXPECT_EQ(tbbo2.size, 21); + EXPECT_EQ(tbbo2.action, Action::Trade); + EXPECT_EQ(tbbo2.side, Side::Ask); + EXPECT_EQ(tbbo2.flags.Raw(), 129); + EXPECT_EQ(tbbo2.depth, 0); + EXPECT_EQ(tbbo2.ts_recv.time_since_epoch().count(), 1609160400108142648); + EXPECT_EQ(tbbo2.ts_in_delta.count(), 20728); + EXPECT_EQ(tbbo2.sequence, 1170414); + EXPECT_EQ(tbbo2.levels[0].bid_px, 3720250000000); + EXPECT_EQ(tbbo2.levels[0].ask_px, 3720500000000); + EXPECT_EQ(tbbo2.levels[0].bid_sz, 21); + EXPECT_EQ(tbbo2.levels[0].ask_sz, 22); + EXPECT_EQ(tbbo2.levels[0].bid_ct, 13); + EXPECT_EQ(tbbo2.levels[0].ask_ct, 15); } TEST_P(DbnDecoderSchemaTests, TestDecodeTrades) { @@ -720,67 +630,53 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeTrades) { const auto version = GetParam().second; ReadFromFile("trades", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Trades); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_trade1 = ch_record1->Get(); - const auto& f_trade1 = f_record1->Get(); - EXPECT_EQ(ch_trade1, f_trade1); - EXPECT_EQ(ch_trade1.hd.publisher_id, 1); - EXPECT_EQ(ch_trade1.hd.instrument_id, 5482); - EXPECT_EQ(ch_trade1.hd.ts_event.time_since_epoch().count(), - 1609160400098821953); - EXPECT_EQ(ch_trade1.price, 3720250000000); - EXPECT_EQ(ch_trade1.size, 5); - EXPECT_EQ(ch_trade1.action, Action::Trade); - EXPECT_EQ(ch_trade1.side, Side::Ask); - EXPECT_EQ(ch_trade1.flags.Raw(), 129); - EXPECT_EQ(ch_trade1.depth, 0); - EXPECT_EQ(ch_trade1.ts_recv.time_since_epoch().count(), 1609160400099150057); - EXPECT_EQ(ch_trade1.ts_in_delta.count(), 19251); - EXPECT_EQ(ch_trade1.sequence, 1170380); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_trade2 = ch_record2->Get(); - const auto& f_trade2 = f_record2->Get(); - EXPECT_EQ(ch_trade2, f_trade2); - EXPECT_EQ(ch_trade2.hd.publisher_id, 1); - EXPECT_EQ(ch_trade2.hd.instrument_id, 5482); - EXPECT_EQ(ch_trade2.hd.ts_event.time_since_epoch().count(), - 1609160400107665963); - EXPECT_EQ(ch_trade2.price, 3720250000000); - EXPECT_EQ(ch_trade2.size, 21); - EXPECT_EQ(ch_trade2.action, Action::Trade); - EXPECT_EQ(ch_trade2.side, Side::Ask); - EXPECT_EQ(ch_trade2.flags.Raw(), 129); - EXPECT_EQ(ch_trade2.depth, 0); - EXPECT_EQ(ch_trade2.ts_recv.time_since_epoch().count(), 1609160400108142648); - EXPECT_EQ(ch_trade2.ts_in_delta.count(), 20728); - EXPECT_EQ(ch_trade2.sequence, 1170414); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Trades); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& trade1 = record1->Get(); + EXPECT_EQ(trade1.hd.publisher_id, 1); + EXPECT_EQ(trade1.hd.instrument_id, 5482); + EXPECT_EQ(trade1.hd.ts_event.time_since_epoch().count(), 1609160400098821953); + EXPECT_EQ(trade1.price, 3720250000000); + EXPECT_EQ(trade1.size, 5); + EXPECT_EQ(trade1.action, Action::Trade); + EXPECT_EQ(trade1.side, Side::Ask); + EXPECT_EQ(trade1.flags.Raw(), 129); + EXPECT_EQ(trade1.depth, 0); + EXPECT_EQ(trade1.ts_recv.time_since_epoch().count(), 1609160400099150057); + EXPECT_EQ(trade1.ts_in_delta.count(), 19251); + EXPECT_EQ(trade1.sequence, 1170380); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& trade2 = record2->Get(); + EXPECT_EQ(trade2.hd.publisher_id, 1); + EXPECT_EQ(trade2.hd.instrument_id, 5482); + EXPECT_EQ(trade2.hd.ts_event.time_since_epoch().count(), 1609160400107665963); + EXPECT_EQ(trade2.price, 3720250000000); + EXPECT_EQ(trade2.size, 21); + EXPECT_EQ(trade2.action, Action::Trade); + EXPECT_EQ(trade2.side, Side::Ask); + EXPECT_EQ(trade2.flags.Raw(), 129); + EXPECT_EQ(trade2.depth, 0); + EXPECT_EQ(trade2.ts_recv.time_since_epoch().count(), 1609160400108142648); + EXPECT_EQ(trade2.ts_in_delta.count(), 20728); + EXPECT_EQ(trade2.sequence, 1170414); } TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1D) { @@ -788,21 +684,19 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1D) { const auto version = GetParam().second; ReadFromFile("ohlcv-1d", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Ohlcv1D); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Ohlcv1D); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); } TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1H) { @@ -810,59 +704,45 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1H) { const auto version = GetParam().second; ReadFromFile("ohlcv-1h", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Ohlcv1H); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_ohlcv1 = ch_record1->Get(); - const auto& f_ohlcv1 = f_record1->Get(); - EXPECT_EQ(ch_ohlcv1, f_ohlcv1); - EXPECT_EQ(ch_ohlcv1.hd.publisher_id, 1); - EXPECT_EQ(ch_ohlcv1.hd.instrument_id, 5482); - EXPECT_EQ(ch_ohlcv1.hd.ts_event.time_since_epoch().count(), - 1609160400000000000); - EXPECT_EQ(ch_ohlcv1.open, 372025000000000); - EXPECT_EQ(ch_ohlcv1.high, 372350000000000); - EXPECT_EQ(ch_ohlcv1.low, 372025000000000); - EXPECT_EQ(ch_ohlcv1.close, 372225000000000); - EXPECT_EQ(ch_ohlcv1.volume, 9385); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_ohlcv2 = ch_record2->Get(); - const auto& f_ohlcv2 = f_record2->Get(); - EXPECT_EQ(ch_ohlcv2, f_ohlcv2); - EXPECT_EQ(ch_ohlcv2.hd.publisher_id, 1); - EXPECT_EQ(ch_ohlcv2.hd.instrument_id, 5482); - EXPECT_EQ(ch_ohlcv2.hd.ts_event.time_since_epoch().count(), - 1609164000000000000); - EXPECT_EQ(ch_ohlcv2.open, 372225000000000); - EXPECT_EQ(ch_ohlcv2.high, 372450000000000); - EXPECT_EQ(ch_ohlcv2.low, 371600000000000); - EXPECT_EQ(ch_ohlcv2.close, 371950000000000); - EXPECT_EQ(ch_ohlcv2.volume, 112698); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Ohlcv1H); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& ohlcv1 = record1->Get(); + EXPECT_EQ(ohlcv1.hd.publisher_id, 1); + EXPECT_EQ(ohlcv1.hd.instrument_id, 5482); + EXPECT_EQ(ohlcv1.hd.ts_event.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(ohlcv1.open, 372025000000000); + EXPECT_EQ(ohlcv1.high, 372350000000000); + EXPECT_EQ(ohlcv1.low, 372025000000000); + EXPECT_EQ(ohlcv1.close, 372225000000000); + EXPECT_EQ(ohlcv1.volume, 9385); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& ohlcv2 = record2->Get(); + EXPECT_EQ(ohlcv2.hd.publisher_id, 1); + EXPECT_EQ(ohlcv2.hd.instrument_id, 5482); + EXPECT_EQ(ohlcv2.hd.ts_event.time_since_epoch().count(), 1609164000000000000); + EXPECT_EQ(ohlcv2.open, 372225000000000); + EXPECT_EQ(ohlcv2.high, 372450000000000); + EXPECT_EQ(ohlcv2.low, 371600000000000); + EXPECT_EQ(ohlcv2.close, 371950000000000); + EXPECT_EQ(ohlcv2.volume, 112698); } TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1M) { @@ -870,59 +750,45 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1M) { const auto version = GetParam().second; ReadFromFile("ohlcv-1m", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Ohlcv1M); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_ohlcv1 = ch_record1->Get(); - const auto& f_ohlcv1 = f_record1->Get(); - EXPECT_EQ(ch_ohlcv1, f_ohlcv1); - EXPECT_EQ(ch_ohlcv1.hd.publisher_id, 1); - EXPECT_EQ(ch_ohlcv1.hd.instrument_id, 5482); - EXPECT_EQ(ch_ohlcv1.hd.ts_event.time_since_epoch().count(), - 1609160400000000000); - EXPECT_EQ(ch_ohlcv1.open, 372025000000000); - EXPECT_EQ(ch_ohlcv1.high, 372150000000000); - EXPECT_EQ(ch_ohlcv1.low, 372025000000000); - EXPECT_EQ(ch_ohlcv1.close, 372100000000000); - EXPECT_EQ(ch_ohlcv1.volume, 353); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_ohlcv2 = ch_record2->Get(); - const auto& f_ohlcv2 = f_record2->Get(); - EXPECT_EQ(ch_ohlcv2, f_ohlcv2); - EXPECT_EQ(ch_ohlcv2.hd.publisher_id, 1); - EXPECT_EQ(ch_ohlcv2.hd.instrument_id, 5482); - EXPECT_EQ(ch_ohlcv2.hd.ts_event.time_since_epoch().count(), - 1609160460000000000); - EXPECT_EQ(ch_ohlcv2.open, 372100000000000); - EXPECT_EQ(ch_ohlcv2.high, 372150000000000); - EXPECT_EQ(ch_ohlcv2.low, 372100000000000); - EXPECT_EQ(ch_ohlcv2.close, 372150000000000); - EXPECT_EQ(ch_ohlcv2.volume, 152); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Ohlcv1M); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& ohlcv1 = record1->Get(); + EXPECT_EQ(ohlcv1.hd.publisher_id, 1); + EXPECT_EQ(ohlcv1.hd.instrument_id, 5482); + EXPECT_EQ(ohlcv1.hd.ts_event.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(ohlcv1.open, 372025000000000); + EXPECT_EQ(ohlcv1.high, 372150000000000); + EXPECT_EQ(ohlcv1.low, 372025000000000); + EXPECT_EQ(ohlcv1.close, 372100000000000); + EXPECT_EQ(ohlcv1.volume, 353); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& ohlcv2 = record2->Get(); + EXPECT_EQ(ohlcv2.hd.publisher_id, 1); + EXPECT_EQ(ohlcv2.hd.instrument_id, 5482); + EXPECT_EQ(ohlcv2.hd.ts_event.time_since_epoch().count(), 1609160460000000000); + EXPECT_EQ(ohlcv2.open, 372100000000000); + EXPECT_EQ(ohlcv2.high, 372150000000000); + EXPECT_EQ(ohlcv2.low, 372100000000000); + EXPECT_EQ(ohlcv2.close, 372150000000000); + EXPECT_EQ(ohlcv2.volume, 152); } TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1S) { @@ -930,59 +796,45 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1S) { const auto version = GetParam().second; ReadFromFile("ohlcv-1s", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Ohlcv1S); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1609160400000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1609200000000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"ESH1"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - AssertMappings(ch_metadata.mappings); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_ohlcv1 = ch_record1->Get(); - const auto& f_ohlcv1 = f_record1->Get(); - EXPECT_EQ(ch_ohlcv1, f_ohlcv1); - EXPECT_EQ(ch_ohlcv1.hd.publisher_id, 1); - EXPECT_EQ(ch_ohlcv1.hd.instrument_id, 5482); - EXPECT_EQ(ch_ohlcv1.hd.ts_event.time_since_epoch().count(), - 1609160400000000000); - EXPECT_EQ(ch_ohlcv1.open, 372025000000000); - EXPECT_EQ(ch_ohlcv1.high, 372050000000000); - EXPECT_EQ(ch_ohlcv1.low, 372025000000000); - EXPECT_EQ(ch_ohlcv1.close, 372050000000000); - EXPECT_EQ(ch_ohlcv1.volume, 57); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_ohlcv2 = ch_record2->Get(); - const auto& f_ohlcv2 = f_record2->Get(); - EXPECT_EQ(ch_ohlcv2, f_ohlcv2); - EXPECT_EQ(ch_ohlcv2.hd.publisher_id, 1); - EXPECT_EQ(ch_ohlcv2.hd.instrument_id, 5482); - EXPECT_EQ(ch_ohlcv2.hd.ts_event.time_since_epoch().count(), - 1609160401000000000); - EXPECT_EQ(ch_ohlcv2.open, 372050000000000); - EXPECT_EQ(ch_ohlcv2.high, 372050000000000); - EXPECT_EQ(ch_ohlcv2.low, 372050000000000); - EXPECT_EQ(ch_ohlcv2.close, 372050000000000); - EXPECT_EQ(ch_ohlcv2.volume, 13); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Ohlcv1S); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1609200000000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"ESH1"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + AssertMappings(metadata.mappings); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& ohlcv1 = record1->Get(); + EXPECT_EQ(ohlcv1.hd.publisher_id, 1); + EXPECT_EQ(ohlcv1.hd.instrument_id, 5482); + EXPECT_EQ(ohlcv1.hd.ts_event.time_since_epoch().count(), 1609160400000000000); + EXPECT_EQ(ohlcv1.open, 372025000000000); + EXPECT_EQ(ohlcv1.high, 372050000000000); + EXPECT_EQ(ohlcv1.low, 372025000000000); + EXPECT_EQ(ohlcv1.close, 372050000000000); + EXPECT_EQ(ohlcv1.volume, 57); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& ohlcv2 = record2->Get(); + EXPECT_EQ(ohlcv2.hd.publisher_id, 1); + EXPECT_EQ(ohlcv2.hd.instrument_id, 5482); + EXPECT_EQ(ohlcv2.hd.ts_event.time_since_epoch().count(), 1609160401000000000); + EXPECT_EQ(ohlcv2.open, 372050000000000); + EXPECT_EQ(ohlcv2.high, 372050000000000); + EXPECT_EQ(ohlcv2.low, 372050000000000); + EXPECT_EQ(ohlcv2.close, 372050000000000); + EXPECT_EQ(ohlcv2.volume, 13); } TEST_P(DbnDecoderSchemaTests, TestDecodeDefinition) { @@ -990,22 +842,20 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeDefinition) { const auto version = GetParam().second; ReadFromFile("definition", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kXnasItch); - EXPECT_EQ(ch_metadata.schema, Schema::Definition); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1633305600000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1641254400000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"MSFT"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - EXPECT_EQ(ch_metadata.mappings.size(), 1); - const auto& mapping = ch_metadata.mappings.at(0); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kXnasItch); + EXPECT_EQ(metadata.schema, Schema::Definition); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1633305600000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1641254400000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"MSFT"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + EXPECT_EQ(metadata.mappings.size(), 1); + const auto& mapping = metadata.mappings.at(0); EXPECT_EQ(mapping.raw_symbol, "MSFT"); ASSERT_EQ(mapping.intervals.size(), 62); const auto& interval = mapping.intervals.at(0); @@ -1013,21 +863,19 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeDefinition) { EXPECT_EQ(interval.start_date, date::year{2021} / 10 / 4); EXPECT_EQ(interval.end_date, date::year{2021} / 10 / 5); - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_NE(record1, nullptr); - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_NE(record2, nullptr); if (version == 1) { - AssertDefEq(ch_record1, f_record1); - AssertDefEq(ch_record2, f_record2); + AssertDefEq(record1, record1); + AssertDefEq(record2, record2); } else { - AssertDefEq(ch_record1, f_record1); - AssertDefEq(ch_record2, f_record2); + AssertDefEq(record1, record1); + AssertDefEq(record2, record2); } } @@ -1036,43 +884,32 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeImbalance) { const auto version = GetParam().second; ReadFromFile("imbalance", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kXnasItch); - EXPECT_EQ(ch_metadata.schema, Schema::Imbalance); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 1633305600000000000); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 1641254400000000000); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::RawSymbol); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_EQ(ch_metadata.symbols, std::vector{"SPOT"}); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - EXPECT_EQ(ch_metadata.mappings.size(), 1); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_imbalance1 = ch_record1->Get(); - const auto& f_imbalance1 = f_record1->Get(); - EXPECT_EQ(ch_imbalance1, f_imbalance1); - EXPECT_EQ(ch_imbalance1.ref_price, 229430000000); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_imbalance2 = ch_record2->Get(); - const auto& f_imbalance2 = f_record2->Get(); - EXPECT_EQ(ch_imbalance2, f_imbalance2); - EXPECT_EQ(ch_imbalance2.ref_price, 229990000000); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kXnasItch); + EXPECT_EQ(metadata.schema, Schema::Imbalance); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 1633305600000000000); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 1641254400000000000); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::RawSymbol); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_EQ(metadata.symbols, std::vector{"SPOT"}); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + EXPECT_EQ(metadata.mappings.size(), 1); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& imbalance1 = record1->Get(); + EXPECT_EQ(imbalance1, imbalance1); + EXPECT_EQ(imbalance1.ref_price, 229430000000); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& imbalance2 = record2->Get(); + EXPECT_EQ(imbalance2.ref_price, 229990000000); } TEST_P(DbnDecoderSchemaTests, TestDecodeStatistics) { @@ -1080,45 +917,33 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeStatistics) { const auto version = GetParam().second; ReadFromFile("statistics", extension, version); - const Metadata ch_metadata = channel_target_->DecodeMetadata(); - const Metadata f_metadata = file_target_->DecodeMetadata(); - EXPECT_EQ(ch_metadata, f_metadata); - EXPECT_EQ(ch_metadata.version, version); - EXPECT_EQ(ch_metadata.dataset, dataset::kGlbxMdp3); - EXPECT_EQ(ch_metadata.schema, Schema::Statistics); - EXPECT_EQ(ch_metadata.start.time_since_epoch().count(), 2814749767106560); - EXPECT_EQ(ch_metadata.end.time_since_epoch().count(), 18446744073709551615UL); - EXPECT_EQ(ch_metadata.limit, 2); - EXPECT_EQ(ch_metadata.stype_in, SType::InstrumentId); - EXPECT_EQ(ch_metadata.stype_out, SType::InstrumentId); - EXPECT_TRUE(ch_metadata.symbols.empty()); - EXPECT_TRUE(ch_metadata.partial.empty()); - EXPECT_TRUE(ch_metadata.not_found.empty()); - EXPECT_TRUE(ch_metadata.mappings.empty()); - - const auto ch_record1 = channel_target_->DecodeRecord(); - const auto f_record1 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record1, nullptr); - ASSERT_NE(f_record1, nullptr); - ASSERT_TRUE(ch_record1->Holds()); - ASSERT_TRUE(f_record1->Holds()); - const auto& ch_stat1 = ch_record1->Get(); - const auto& f_stat1 = f_record1->Get(); - EXPECT_EQ(ch_stat1, f_stat1); - EXPECT_EQ(ch_stat1.stat_type, StatType::LowestOffer); - EXPECT_EQ(ch_stat1.price, 100 * kFixedPriceScale); - - const auto ch_record2 = channel_target_->DecodeRecord(); - const auto f_record2 = file_target_->DecodeRecord(); - ASSERT_NE(ch_record2, nullptr); - ASSERT_NE(f_record2, nullptr); - ASSERT_TRUE(ch_record2->Holds()); - ASSERT_TRUE(f_record2->Holds()); - const auto& ch_stat2 = ch_record2->Get(); - const auto& f_stat2 = f_record2->Get(); - EXPECT_EQ(ch_stat2, f_stat2); - EXPECT_EQ(ch_stat2.stat_type, StatType::TradingSessionHighPrice); - EXPECT_EQ(ch_stat2.price, 100 * kFixedPriceScale); + const Metadata metadata = file_target_->DecodeMetadata(); + EXPECT_EQ(metadata.version, version); + EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); + EXPECT_EQ(metadata.schema, Schema::Statistics); + EXPECT_EQ(metadata.start.time_since_epoch().count(), 2814749767106560); + EXPECT_EQ(metadata.end.time_since_epoch().count(), 18446744073709551615UL); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.stype_in, SType::InstrumentId); + EXPECT_EQ(metadata.stype_out, SType::InstrumentId); + EXPECT_TRUE(metadata.symbols.empty()); + EXPECT_TRUE(metadata.partial.empty()); + EXPECT_TRUE(metadata.not_found.empty()); + EXPECT_TRUE(metadata.mappings.empty()); + + const auto record1 = file_target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + ASSERT_TRUE(record1->Holds()); + const auto& stat1 = record1->Get(); + EXPECT_EQ(stat1.stat_type, StatType::LowestOffer); + EXPECT_EQ(stat1.price, 100 * kFixedPriceScale); + + const auto record2 = file_target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + ASSERT_TRUE(record2->Holds()); + const auto& stat2 = record2->Get(); + EXPECT_EQ(stat2.stat_type, StatType::TradingSessionHighPrice); + EXPECT_EQ(stat2.price, 100 * kFixedPriceScale); } class DbnIdentityTests : public testing::TestWithParam< diff --git a/tests/src/historical_tests.cpp b/tests/src/historical_tests.cpp index 721e970..637be74 100644 --- a/tests/src/historical_tests.cpp +++ b/tests/src/historical_tests.cpp @@ -637,6 +637,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_Basic) { mbo_records.emplace_back(record.Get()); return KeepGoing::Continue; }); + ASSERT_NE(metadata_ptr, nullptr) << "metadata callback wasn't called"; EXPECT_EQ(metadata_ptr->limit, 2); EXPECT_EQ(metadata_ptr->schema, Schema::Mbo); EXPECT_EQ(mbo_records.size(), 2); diff --git a/tests/src/shared_channel_tests.cpp b/tests/src/shared_channel_tests.cpp deleted file mode 100644 index e8d7390..0000000 --- a/tests/src/shared_channel_tests.cpp +++ /dev/null @@ -1,84 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include - -#include "databento/detail/scoped_thread.hpp" -#include "databento/detail/shared_channel.hpp" -#include "databento/exceptions.hpp" - -namespace databento::detail::test { -class SharedChannelTests : public testing::Test { - protected: - void Write(const std::vector& inputs) { - for (const auto& input : inputs) { - target_.Write(reinterpret_cast(input.data()), - input.size()); - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - } - target_.Finish(); - } - - SharedChannel target_; - ScopedThread write_thread_; -}; - -TEST_F(SharedChannelTests, TestReadExact) { - write_thread_ = ScopedThread{ - [this] { this->Write({"parse", "stream", "tests", "end"}); }}; - std::array buffer{}; - target_.ReadExact(buffer.data(), 3); - EXPECT_STREQ(reinterpret_cast(buffer.data()), "par"); - target_.ReadExact(buffer.data(), 8); - EXPECT_STREQ(reinterpret_cast(buffer.data()), "sestream"); - target_.ReadExact(buffer.data(), 8); - EXPECT_STREQ(reinterpret_cast(buffer.data()), "testsend"); - ASSERT_THROW(target_.ReadExact(buffer.data(), 1), DbnResponseError); -} - -TEST_F(SharedChannelTests, TestReadExactAfterFinished) { - // write on same thread, so all reading happens after writing - this->Write({"parse", "exact"}); - std::array buffer{}; - target_.ReadExact(buffer.data(), 7); - EXPECT_STREQ(reinterpret_cast(buffer.data()), "parseex"); - // reset buffer - buffer = {}; - target_.ReadExact(buffer.data(), 3); - EXPECT_STREQ(reinterpret_cast(buffer.data()), "act"); -} - -TEST_F(SharedChannelTests, TestInterleavedReadsAndWrites) { - std::array buffer{}; - target_.Write(reinterpret_cast("hello"), 5); - ASSERT_EQ(target_.ReadSome(buffer.data(), buffer.size()), 5); - EXPECT_STREQ(reinterpret_cast(buffer.data()), "hello"); - buffer = {}; - target_.Write(reinterpret_cast("longer message"), 14); - target_.Finish(); - target_.ReadSome(buffer.data(), 6); - target_.ReadSome(&buffer[6], 1); - target_.ReadSome(&buffer[7], 7); - EXPECT_STREQ(reinterpret_cast(buffer.data()), "longer message"); -} - -TEST_F(SharedChannelTests, TestReadSome) { - write_thread_ = ScopedThread{ - [this] { this->Write({"parse", "stream", "tests", "some", "last"}); }}; - std::array buffer{}; - std::string res; - // -1 to keep last null byte - while (res.size() < 23) { - auto read_size = target_.ReadSome(buffer.data(), buffer.size()); - res.append(reinterpret_cast(buffer.data()), read_size); - std::this_thread::sleep_for(std::chrono::milliseconds{10}); - buffer = {}; - } - - ASSERT_EQ(res, "parsestreamtestssomelast"); -} -} // namespace databento::detail::test