diff --git a/.gitignore b/.gitignore index d45b01f..13c6701 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ build CMakeSettings.json vcpkg_installed .vs +.cache !*.zst diff --git a/CHANGELOG.md b/CHANGELOG.md index c99f459..dc1f2cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,41 @@ # Changelog +## 0.36.0 - 2025-05-27 + +This version marks the release of DBN version 3 (DBNv3), which is the new default. +API methods and `DbnDecoder` support decoding all versions of DBN, but now default to +upgrading data to version 3. + +### Enhancements +- Added `UpgradeToV3` variant to `VersionUpgradePolicy` enum +- Version 1 and 2 structs can be converted to version 3 structs with templated `Upgrade` + method +- Metadata will now always be encoded with a length divisible by 8 bytes for better + alignment +- Added `is_last` field to live subscription requests which will be used to improve the + handling of split subscription requests + +### Breaking changes +- Release of DBN version 3: + - Definition schema: + - Updated `InstrumentDefMsg` with new `leg_` fields to support multi-leg strategy + definitions. + - Expanded `raw_instrument_id` to 64 bits to support more venues. Like other 64-bit + integer fields, its value will now be quoted in JSON + - Removed `trading_reference_date`, `trading_reference_price`, and + `settl_price_type` fields which will be normalized in the statistics schema + - Removed `md_security_trading_status` better served by the status schema + - Updated `asset` to 11 bytes and `kAssetCstrLen` to match + - Statistics schema: + - Updated `StatMsg` has an expanded 64-bit `quantity` field. `kUndefStatQuantity` + has been updated to match + - The previous `StatMsg` has been moved to `v2::StatMsg` or `StatMsgV2` + +### Bug fixes +- Fixed "Zstd error decompressing: Operation made no progress over multiple calls, due + to output buffer being full" error with `TimeseriesGetRange` +- Fixed missing implementation of `HistoricalBuilder::SetLogReceiver` + ## 0.35.1 - 2025-05-20 ### Bug fixes diff --git a/CMakeLists.txt b/CMakeLists.txt index ccae230..fe1cbc4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.0) project( databento - VERSION 0.35.1 + VERSION 0.36.0 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/cmake/SourcesAndHeaders.cmake b/cmake/SourcesAndHeaders.cmake index 110b003..093490b 100644 --- a/cmake/SourcesAndHeaders.cmake +++ b/cmake/SourcesAndHeaders.cmake @@ -71,5 +71,5 @@ set(sources src/symbol_map.cpp src/symbology.cpp src/v1.cpp - src/v3.cpp + src/v2.cpp ) diff --git a/examples/historical/timeseries_get_range.cpp b/examples/historical/timeseries_get_range.cpp index 0bb65de..ea3c2f4 100644 --- a/examples/historical/timeseries_get_range.cpp +++ b/examples/historical/timeseries_get_range.cpp @@ -1,34 +1,15 @@ -#include -#include -#include #include // setw -#ifdef _WIN32 -// _mkgmtime is equivalent to timegm -#define timegm _mkgmtime -#endif #include "databento/constants.hpp" -#include "databento/datetime.hpp" #include "databento/enums.hpp" #include "databento/historical.hpp" -// Converts a date to Unix Epoch nanoseconds -databento::UnixNanos DateToUnixNanos(int year, int month, int day) { - std::tm time{}; - time.tm_year = year - 1900; - // January is 0 - time.tm_mon = month - 1; - time.tm_mday = day; - return databento::UnixNanos{std::chrono::seconds{::timegm(&time)}}; -} - int main() { auto client = databento::HistoricalBuilder{}.SetKeyFromEnv().Build(); - const databento::UnixNanos start = DateToUnixNanos(2022, 10, 3); - const databento::UnixNanos end = DateToUnixNanos(2022, 10, 4); const auto limit = 1000; client.TimeseriesGetRange( - databento::dataset::kGlbxMdp3, {start, end}, {"ESZ2"}, + databento::dataset::kGlbxMdp3, + databento::DateTimeRange{"2022-10-03"}, {"ESZ2"}, databento::Schema::Trades, databento::SType::RawSymbol, databento::SType::InstrumentId, limit, [](databento::Metadata&& metadata) { std::cout << metadata << '\n'; }, diff --git a/include/databento/constants.hpp b/include/databento/constants.hpp index ec7e215..2043503 100644 --- a/include/databento/constants.hpp +++ b/include/databento/constants.hpp @@ -16,16 +16,16 @@ static constexpr auto kUndefOrderSize = std::numeric_limits::max(); // The sentinel value for an unset statistic quantity. static constexpr auto kUndefStatQuantity = - std::numeric_limits::max(); + std::numeric_limits::max(); // The sentinel value for an unset or null timestamp. static constexpr auto kUndefTimestamp = std::numeric_limits::max(); // The current version of the DBN encoding. -static constexpr auto kDbnVersion = 2; +static constexpr auto kDbnVersion = 3; // The length of fixed-length symbol strings. static constexpr auto kSymbolCstrLen = 71; // The length of fixed-length asset string. -static constexpr auto kAssetCstrLen = 7; +static constexpr auto kAssetCstrLen = 11; // The multiplier for converting the `length` field in `RecordHeader` to bytes. static constexpr std::size_t kRecordHeaderLengthMultiplier = 4; diff --git a/include/databento/dbn_decoder.hpp b/include/databento/dbn_decoder.hpp index dd56a89..97f4898 100644 --- a/include/databento/dbn_decoder.hpp +++ b/include/databento/dbn_decoder.hpp @@ -15,7 +15,8 @@ namespace databento { // DBN decoder. Set upgrade_policy to control how DBN version 1 data should be -// handled. Defaults to upgrading DBNv1 data to version 2 (the current version). +// handled. Defaults to upgrading DBN versions 1 and 2 to version 3 (the current +// version). class DbnDecoder { public: DbnDecoder(ILogReceiver* log_receiver, InFileStream file_stream); diff --git a/include/databento/dbn_encoder.hpp b/include/databento/dbn_encoder.hpp index 9bc251f..62f8a01 100644 --- a/include/databento/dbn_encoder.hpp +++ b/include/databento/dbn_encoder.hpp @@ -32,7 +32,8 @@ class DbnEncoder { void EncodeRecord(const Record& record); private: - static std::uint32_t CalcLength(const Metadata& metadata); + static std::pair CalcLength( + const Metadata& metadata); IWritable* output_; }; diff --git a/include/databento/detail/buffer.hpp b/include/databento/detail/buffer.hpp index 47fa725..539d2c1 100644 --- a/include/databento/detail/buffer.hpp +++ b/include/databento/detail/buffer.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "databento/ireadable.hpp" #include "databento/iwritable.hpp" @@ -22,22 +23,32 @@ class Buffer : public IReadable, public IWritable { void WriteAll(const char* data, std::size_t length); void WriteAll(const std::byte* data, std::size_t length) override; - std::byte*& WriteBegin() { return write_pos_; } + std::byte* WriteBegin() { return write_pos_; } std::byte* WriteEnd() { return end_; } const std::byte* WriteBegin() const { return write_pos_; } const std::byte* WriteEnd() const { return end_; } + // Indicate how many bytes were written + void Fill(std::size_t length) { write_pos_ += length; } std::size_t WriteCapacity() const { return static_cast(end_ - write_pos_); } - /// Will throw if `length > ReadCapacity()`. + // Will throw if `length > ReadCapacity()`. void ReadExact(std::byte* buffer, std::size_t length) override; std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; - std::byte*& ReadBegin() { return read_pos_; } + std::byte* ReadBegin() { return read_pos_; } std::byte* ReadEnd() { return write_pos_; } const std::byte* ReadBegin() const { return read_pos_; } const std::byte* ReadEnd() const { return write_pos_; } + // Indicate how mnay bytes were read + void Consume(std::size_t length) { + read_pos_ += length; + if (static_cast(read_pos_ - buf_.get()) > (Capacity() / 2)) { + Shift(); + } + } + void ConsumeNoShift(std::size_t length) { read_pos_ += length; } std::size_t ReadCapacity() const { return static_cast(write_pos_ - read_pos_); } @@ -52,6 +63,8 @@ class Buffer : public IReadable, public IWritable { void Reserve(std::size_t capacity); void Shift(); + friend std::ostream& operator<<(std::ostream& stream, const Buffer& buffer); + private: static constexpr std::align_val_t kAlignment{8}; diff --git a/include/databento/enums.hpp b/include/databento/enums.hpp index dffd475..4068865 100644 --- a/include/databento/enums.hpp +++ b/include/databento/enums.hpp @@ -303,6 +303,7 @@ enum class StatUpdateAction : std::uint8_t { enum class VersionUpgradePolicy : std::uint8_t { AsIs = 0, UpgradeToV2 = 1, + UpgradeToV3 = 2, }; namespace status_action { diff --git a/include/databento/live.hpp b/include/databento/live.hpp index ca498c4..149e871 100644 --- a/include/databento/live.hpp +++ b/include/databento/live.hpp @@ -52,7 +52,7 @@ class LiveBuilder { std::string key_; std::string dataset_; bool send_ts_out_{false}; - VersionUpgradePolicy upgrade_policy_{VersionUpgradePolicy::UpgradeToV2}; + VersionUpgradePolicy upgrade_policy_{VersionUpgradePolicy::UpgradeToV3}; std::chrono::seconds heartbeat_interval_{}; }; } // namespace databento diff --git a/include/databento/record.hpp b/include/databento/record.hpp index 2c050fe..e7fd04b 100644 --- a/include/databento/record.hpp +++ b/include/databento/record.hpp @@ -17,12 +17,6 @@ #include "databento/publishers.hpp" // Publisher namespace databento { -// Forward declare -namespace v3 { -struct InstrumentDefMsg; -struct StatMsg; -} // namespace v3 - // Common data for all Databento Records. struct RecordHeader { static constexpr std::size_t kLengthMultiplier = @@ -379,7 +373,6 @@ static_assert(alignof(StatusMsg) == 8, "Must have 8-byte alignment"); struct InstrumentDefMsg { static bool HasRType(RType rtype) { return rtype == RType::InstrumentDef; } - v3::InstrumentDefMsg ToV3() const; UnixNanos IndexTs() const { return ts_recv; } const char* Currency() const { return currency.data(); } const char* SettlCurrency() const { return settl_currency.data(); } @@ -395,6 +388,7 @@ struct InstrumentDefMsg { const char* StrikePriceCurrency() const { return strike_price_currency.data(); } + const char* LegRawSymbol() const { return leg_raw_symbol.data(); } RecordHeader hd; UnixNanos ts_recv; @@ -405,14 +399,15 @@ struct InstrumentDefMsg { std::int64_t high_limit_price; std::int64_t low_limit_price; std::int64_t max_price_variation; - std::int64_t trading_reference_price; std::int64_t unit_of_measure_qty; std::int64_t min_price_increment_amount; std::int64_t price_ratio; std::int64_t strike_price; + std::uint64_t raw_instrument_id; + std::int64_t leg_price; + std::int64_t leg_delta; std::int32_t inst_attrib_value; std::uint32_t underlying_id; - std::uint32_t raw_instrument_id; std::int32_t market_depth_implied; std::int32_t market_depth; std::uint32_t market_segment_id; @@ -424,11 +419,18 @@ struct InstrumentDefMsg { std::int32_t contract_multiplier; std::int32_t decay_quantity; std::int32_t original_contract_size; - std::uint16_t trading_reference_date; + std::uint32_t leg_instrument_id; + std::int32_t leg_ratio_price_numerator; + std::int32_t leg_ratio_price_denominator; + std::int32_t leg_ratio_qty_numerator; + std::int32_t leg_ratio_qty_denominator; + std::uint32_t leg_underlying_id; std::int16_t appl_id; std::uint16_t maturity_year; std::uint16_t decay_start_date; std::uint16_t channel_id; + std::uint16_t leg_count; + std::uint16_t leg_index; std::array currency; std::array settl_currency; std::array secsubtype; @@ -441,12 +443,11 @@ struct InstrumentDefMsg { std::array unit_of_measure; std::array underlying; std::array strike_price_currency; + std::array leg_raw_symbol; InstrumentClass instrument_class; MatchAlgorithm match_algorithm; - std::uint8_t md_security_trading_status; std::uint8_t main_fraction; std::uint8_t price_display_format; - std::uint8_t settl_price_type; std::uint8_t sub_fraction; std::uint8_t underlying_product; SecurityUpdateAction security_update_action; @@ -457,9 +458,12 @@ struct InstrumentDefMsg { std::int8_t contract_multiplier_unit; std::int8_t flow_schedule_type; std::uint8_t tick_rule; - std::array reserved; + InstrumentClass leg_instrument_class; + Side leg_side; + // padding for alignment + std::array reserved; }; -static_assert(sizeof(InstrumentDefMsg) == 400, +static_assert(sizeof(InstrumentDefMsg) == 520, "InstrumentDefMsg size must match Rust"); static_assert(alignof(InstrumentDefMsg) == 8, "Must have 8-byte alignment"); @@ -502,23 +506,22 @@ static_assert(alignof(ImbalanceMsg) == 8, "Must have 8-byte alignment"); struct StatMsg { static bool HasRType(RType rtype) { return rtype == RType::Statistics; } - v3::StatMsg ToV3() const; UnixNanos IndexTs() const { return ts_recv; } RecordHeader hd; UnixNanos ts_recv; UnixNanos ts_ref; std::int64_t price; - std::int32_t quantity; + std::int64_t quantity; std::uint32_t sequence; TimeDeltaNanos ts_in_delta; StatType stat_type; std::uint16_t channel_id; StatUpdateAction update_action; std::uint8_t stat_flags; - std::array reserved; + std::array reserved; }; -static_assert(sizeof(StatMsg) == 64, "StatMsg size must match Rust"); +static_assert(sizeof(StatMsg) == 80, "StatMsg size must match Rust"); static_assert(alignof(StatMsg) == 8, "Must have 8-byte alignment"); // An error message from the Live Subscription Gateway (LSG). This will never @@ -798,4 +801,6 @@ std::ostream& operator<<(std::ostream& stream, // The length in bytes of the largest record type. static constexpr std::size_t kMaxRecordLen = 520 + 8; +static_assert(kMaxRecordLen == sizeof(InstrumentDefMsg) + sizeof(UnixNanos), + "v3 definition with ts_out should be the largest record"); } // namespace databento diff --git a/include/databento/v1.hpp b/include/databento/v1.hpp index cf2b98a..6c44b61 100644 --- a/include/databento/v1.hpp +++ b/include/databento/v1.hpp @@ -1,23 +1,21 @@ #pragma once -#include "databento/constants.hpp" // kAssetCstrLen, kSymbolCstrLen, kUndefStatQuantity #include "databento/datetime.hpp" // UnixNanos #include "databento/enums.hpp" #include "databento/record.hpp" -#include "databento/v2.hpp" namespace databento { // Forward declare -namespace v3 { +namespace v2 { struct InstrumentDefMsg; -} +} // namespace v2 namespace v1 { static constexpr std::uint8_t kDbnVersion = 1; static constexpr std::size_t kSymbolCstrLen = 22; -static constexpr std::size_t kAssetCstrLen = databento::kAssetCstrLen; -static constexpr std::int32_t kUndefStatQuantity = - databento::kUndefStatQuantity; +static constexpr std::size_t kAssetCstrLen = 7; +static constexpr auto kUndefStatQuantity = + std::numeric_limits::max(); using MboMsg = databento::MboMsg; using TradeMsg = databento::TradeMsg; @@ -35,13 +33,14 @@ using Cbbo1MMsg = databento::Cbbo1MMsg; using OhlcvMsg = databento::OhlcvMsg; using StatusMsg = databento::StatusMsg; using ImbalanceMsg = databento::ImbalanceMsg; -using StatMsg = databento::StatMsg; struct InstrumentDefMsg { static bool HasRType(RType rtype) { return rtype == RType::InstrumentDef; } v2::InstrumentDefMsg ToV2() const; - v3::InstrumentDefMsg ToV3() const; + databento::InstrumentDefMsg ToV3() const; + template + T Upgrade() const; const char* Currency() const { return currency.data(); } const char* SettlCurrency() const { return settl_currency.data(); } const char* SecSubType() const { return secsubtype.data(); } @@ -125,21 +124,58 @@ struct InstrumentDefMsg { // padding for alignment std::array dummy; }; +template <> +v2::InstrumentDefMsg InstrumentDefMsg::Upgrade() const; +template <> +databento::InstrumentDefMsg InstrumentDefMsg::Upgrade() const; static_assert(sizeof(InstrumentDefMsg) == 360, "Size must match Rust"); static_assert(alignof(InstrumentDefMsg) == 8, "Must have 8-byte alignment"); +/// A statistics message. A catchall for various data disseminated by +/// publishers. The `stat_type` indicates the statistic contained in the +/// message. +struct StatMsg { + static bool HasRType(RType rtype) { return rtype == RType::Statistics; } + + databento::StatMsg ToV3() const; + template + T Upgrade() const; + UnixNanos IndexTs() const { return ts_recv; } + + RecordHeader hd; + UnixNanos ts_recv; + UnixNanos ts_ref; + std::int64_t price; + std::int32_t quantity; + std::uint32_t sequence; + TimeDeltaNanos ts_in_delta; + StatType stat_type; + std::uint16_t channel_id; + StatUpdateAction update_action; + std::uint8_t stat_flags; + std::array reserved; +}; +template <> +databento::StatMsg StatMsg::Upgrade() const; +static_assert(sizeof(StatMsg) == 64, "StatMsg size must match Rust"); +static_assert(alignof(StatMsg) == 8, "Must have 8-byte alignment"); + // An error message from the Live Subscription Gateway (LSG). This will never // be present in historical data. struct ErrorMsg { static bool HasRType(RType rtype) { return rtype == RType::Error; } - v2::ErrorMsg ToV2() const; + databento::ErrorMsg ToV2() const; + template + T Upgrade() const; UnixNanos IndexTs() const { return hd.ts_event; } const char* Err() const { return err.data(); } RecordHeader hd; std::array err; }; +template <> +ErrorMsg ErrorMsg::Upgrade() const; static_assert(sizeof(ErrorMsg) == 80, "ErrorMsg size must match Rust"); static_assert(alignof(ErrorMsg) == 8, "Must have 8-byte alignment"); @@ -147,7 +183,9 @@ static_assert(alignof(ErrorMsg) == 8, "Must have 8-byte alignment"); struct SymbolMappingMsg { static bool HasRType(RType rtype) { return rtype == RType::SymbolMapping; } - v2::SymbolMappingMsg ToV2() const; + databento::SymbolMappingMsg ToV2() const; + template + T Upgrade() const; const char* STypeInSymbol() const { return stype_in_symbol.data(); } const char* STypeOutSymbol() const { return stype_out_symbol.data(); } @@ -159,13 +197,17 @@ struct SymbolMappingMsg { UnixNanos start_ts; UnixNanos end_ts; }; +template <> +SymbolMappingMsg SymbolMappingMsg::Upgrade() const; static_assert(sizeof(SymbolMappingMsg) == 80, "Size must match Rust"); static_assert(alignof(SymbolMappingMsg) == 8, "Must have 8-byte alignment"); struct SystemMsg { static bool HasRType(RType rtype) { return rtype == RType::System; } - v2::SystemMsg ToV2() const; + databento::SystemMsg ToV2() const; + template + T Upgrade() const; UnixNanos IndexTs() const { return hd.ts_event; } const char* Msg() const { return msg.data(); } bool IsHeartbeat() const { @@ -175,6 +217,8 @@ struct SystemMsg { RecordHeader hd; std::array msg; }; +template <> +SystemMsg SystemMsg::Upgrade() const; static_assert(sizeof(SystemMsg) == 80, "SystemMsg size must match Rust"); static_assert(alignof(SystemMsg) == 8, "Must have 8-byte alignment"); @@ -183,6 +227,17 @@ inline bool operator!=(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs) { return !(lhs == rhs); } +inline bool operator==(const StatMsg& lhs, const StatMsg& rhs) { + return std::tie(lhs.hd, lhs.ts_recv, lhs.ts_ref, lhs.price, lhs.quantity, + lhs.sequence, lhs.ts_in_delta, lhs.stat_type, lhs.channel_id, + lhs.update_action, lhs.stat_flags) == + std::tie(rhs.hd, rhs.ts_recv, rhs.ts_ref, rhs.price, rhs.quantity, + rhs.sequence, rhs.ts_in_delta, rhs.stat_type, rhs.channel_id, + rhs.update_action, rhs.stat_flags); +} +inline bool operator!=(const StatMsg& lhs, const StatMsg& rhs) { + return !(lhs == rhs); +} inline bool operator==(const ErrorMsg& lhs, const ErrorMsg& rhs) { return std::tie(lhs.hd, lhs.err) == std::tie(rhs.hd, rhs.err); } @@ -209,6 +264,8 @@ inline bool operator!=(const SystemMsg& lhs, const SystemMsg& rhs) { std::string ToString(const InstrumentDefMsg& instr_def_msg); std::ostream& operator<<(std::ostream& stream, const InstrumentDefMsg& instr_def_msg); +std::string ToString(const StatMsg& stat_msg); +std::ostream& operator<<(std::ostream& stream, const StatMsg& stat_msg); std::string ToString(const ErrorMsg& err_msg); std::ostream& operator<<(std::ostream& stream, const ErrorMsg& err_msg); std::string ToString(const SymbolMappingMsg& symbol_mapping_msg); diff --git a/include/databento/v2.hpp b/include/databento/v2.hpp index b8adabc..408e5a2 100644 --- a/include/databento/v2.hpp +++ b/include/databento/v2.hpp @@ -2,13 +2,13 @@ #include "databento/constants.hpp" // kAssetCstrLen, kSymbolCstrLen, kUndefStatQuantity #include "databento/record.hpp" +#include "databento/v1.hpp" namespace databento::v2 { static constexpr std::uint8_t kDbnVersion = 2; static constexpr std::size_t kSymbolCstrLen = databento::kSymbolCstrLen; -static constexpr std::size_t kAssetCstrLen = databento::kAssetCstrLen; -static constexpr std::int32_t kUndefStatQuantity = - databento::kUndefStatQuantity; +static constexpr std::size_t kAssetCstrLen = 7; +static constexpr std::int32_t kUndefStatQuantity = v1::kUndefStatQuantity; using MboMsg = databento::MboMsg; using TradeMsg = databento::TradeMsg; @@ -25,10 +25,112 @@ using Cbbo1SMsg = databento::Cbbo1SMsg; using Cbbo1MMsg = databento::Cbbo1MMsg; using OhlcvMsg = databento::OhlcvMsg; using StatusMsg = databento::StatusMsg; -using InstrumentDefMsg = databento::InstrumentDefMsg; using ImbalanceMsg = databento::ImbalanceMsg; -using StatMsg = databento::StatMsg; +using StatMsg = databento::v1::StatMsg; using ErrorMsg = databento::ErrorMsg; using SymbolMappingMsg = databento::SymbolMappingMsg; using SystemMsg = databento::SystemMsg; + +// An instrument definition message in DBN version 2. +struct InstrumentDefMsg { + static bool HasRType(RType rtype) { return rtype == RType::InstrumentDef; } + + databento::InstrumentDefMsg ToV3() const; + template + T Upgrade() const; + UnixNanos IndexTs() const { return ts_recv; } + + const char* Currency() const { return currency.data(); } + const char* SettlCurrency() const { return settl_currency.data(); } + const char* SecSubType() const { return secsubtype.data(); } + const char* RawSymbol() const { return raw_symbol.data(); } + const char* Group() const { return group.data(); } + const char* Exchange() const { return exchange.data(); } + const char* Asset() const { return asset.data(); } + const char* Cfi() const { return cfi.data(); } + const char* SecurityType() const { return security_type.data(); } + const char* UnitOfMeasure() const { return unit_of_measure.data(); } + const char* Underlying() const { return underlying.data(); } + const char* StrikePriceCurrency() const { + return strike_price_currency.data(); + } + + RecordHeader hd; + UnixNanos ts_recv; + std::int64_t min_price_increment; + std::int64_t display_factor; + UnixNanos expiration; + UnixNanos activation; + std::int64_t high_limit_price; + std::int64_t low_limit_price; + std::int64_t max_price_variation; + std::int64_t trading_reference_price; + std::int64_t unit_of_measure_qty; + std::int64_t min_price_increment_amount; + std::int64_t price_ratio; + std::int64_t strike_price; + std::int32_t inst_attrib_value; + std::uint32_t underlying_id; + std::uint32_t raw_instrument_id; + std::int32_t market_depth_implied; + std::int32_t market_depth; + std::uint32_t market_segment_id; + std::uint32_t max_trade_vol; + std::int32_t min_lot_size; + std::int32_t min_lot_size_block; + std::int32_t min_lot_size_round_lot; + std::uint32_t min_trade_vol; + std::int32_t contract_multiplier; + std::int32_t decay_quantity; + std::int32_t original_contract_size; + std::uint16_t trading_reference_date; + std::int16_t appl_id; + std::uint16_t maturity_year; + std::uint16_t decay_start_date; + std::uint16_t channel_id; + std::array currency; + std::array settl_currency; + std::array secsubtype; + std::array raw_symbol; + std::array group; + std::array exchange; + std::array asset; + std::array cfi; + std::array security_type; + std::array unit_of_measure; + std::array underlying; + std::array strike_price_currency; + InstrumentClass instrument_class; + MatchAlgorithm match_algorithm; + std::uint8_t md_security_trading_status; + std::uint8_t main_fraction; + std::uint8_t price_display_format; + std::uint8_t settl_price_type; + std::uint8_t sub_fraction; + std::uint8_t underlying_product; + SecurityUpdateAction security_update_action; + std::uint8_t maturity_month; + std::uint8_t maturity_day; + std::uint8_t maturity_week; + UserDefinedInstrument user_defined_instrument; + std::int8_t contract_multiplier_unit; + std::int8_t flow_schedule_type; + std::uint8_t tick_rule; + std::array reserved; +}; +template <> +databento::InstrumentDefMsg InstrumentDefMsg::Upgrade() const; +static_assert(sizeof(InstrumentDefMsg) == 400, + "InstrumentDefMsg size must match Rust"); +static_assert(alignof(InstrumentDefMsg) == 8, "Must have 8-byte alignment"); + +bool operator==(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs); +inline bool operator!=(const InstrumentDefMsg& lhs, + const InstrumentDefMsg& rhs) { + return !(lhs == rhs); +} + +std::string ToString(const InstrumentDefMsg& instr_def_msg); +std::ostream& operator<<(std::ostream& stream, + const InstrumentDefMsg& instr_def_msg); } // namespace databento::v2 diff --git a/include/databento/v3.hpp b/include/databento/v3.hpp index d7ac98c..b68961d 100644 --- a/include/databento/v3.hpp +++ b/include/databento/v3.hpp @@ -2,19 +2,16 @@ #include #include -#include -#include "databento/constants.hpp" // kSymbolCstrLen -#include "databento/datetime.hpp" // UnixNanos -#include "databento/enums.hpp" // InstrumentClass, MatchingAlgorithm, RType, SecurityUpdateAction, Side, UserDefinedInstrument -#include "databento/record.hpp" // RecordHeader +#include "databento/constants.hpp" // kAssetCstrLen, kSymbolCstrLen, kUndefStatQuantity +#include "databento/record.hpp" namespace databento::v3 { static constexpr std::uint8_t kDbnVersion = 3; static constexpr std::size_t kSymbolCstrLen = databento::kSymbolCstrLen; -static constexpr std::size_t kAssetCstrLen = 11; +static constexpr std::size_t kAssetCstrLen = databento::kAssetCstrLen; static constexpr std::int64_t kUndefStatQuantity = - std::numeric_limits::max(); + databento::kUndefStatQuantity; using MboMsg = databento::MboMsg; using TradeMsg = databento::TradeMsg; @@ -31,155 +28,10 @@ using Cbbo1SMsg = databento::Cbbo1SMsg; using Cbbo1MMsg = databento::Cbbo1MMsg; using OhlcvMsg = databento::OhlcvMsg; using StatusMsg = databento::StatusMsg; +using InstrumentDefMsg = databento::InstrumentDefMsg; using ImbalanceMsg = databento::ImbalanceMsg; +using StatMsg = databento::StatMsg; using ErrorMsg = databento::ErrorMsg; using SymbolMappingMsg = databento::SymbolMappingMsg; using SystemMsg = databento::SystemMsg; - -// An instrument definition in DBN version 3. -struct InstrumentDefMsg { - static bool HasRType(RType rtype) { return rtype == RType::InstrumentDef; } - - UnixNanos IndexTs() const { return ts_recv; } - const char* Currency() const { return currency.data(); } - const char* SettlCurrency() const { return settl_currency.data(); } - const char* SecSubType() const { return secsubtype.data(); } - const char* RawSymbol() const { return raw_symbol.data(); } - const char* Group() const { return group.data(); } - const char* Exchange() const { return exchange.data(); } - const char* Asset() const { return asset.data(); } - const char* Cfi() const { return cfi.data(); } - const char* SecurityType() const { return security_type.data(); } - const char* UnitOfMeasure() const { return unit_of_measure.data(); } - const char* Underlying() const { return underlying.data(); } - const char* StrikePriceCurrency() const { - return strike_price_currency.data(); - } - const char* LegRawSymbol() const { return leg_raw_symbol.data(); } - - RecordHeader hd; - UnixNanos ts_recv; - std::int64_t min_price_increment; - std::int64_t display_factor; - UnixNanos expiration; - UnixNanos activation; - std::int64_t high_limit_price; - std::int64_t low_limit_price; - std::int64_t max_price_variation; - std::int64_t unit_of_measure_qty; - std::int64_t min_price_increment_amount; - std::int64_t price_ratio; - std::int64_t strike_price; - std::uint64_t raw_instrument_id; - std::int64_t leg_price; - std::int64_t leg_delta; - std::int32_t inst_attrib_value; - std::uint32_t underlying_id; - std::int32_t market_depth_implied; - std::int32_t market_depth; - std::uint32_t market_segment_id; - std::uint32_t max_trade_vol; - std::int32_t min_lot_size; - std::int32_t min_lot_size_block; - std::int32_t min_lot_size_round_lot; - std::uint32_t min_trade_vol; - std::int32_t contract_multiplier; - std::int32_t decay_quantity; - std::int32_t original_contract_size; - std::uint32_t leg_instrument_id; - std::int32_t leg_ratio_price_numerator; - std::int32_t leg_ratio_price_denominator; - std::int32_t leg_ratio_qty_numerator; - std::int32_t leg_ratio_qty_denominator; - std::uint32_t leg_underlying_id; - std::int16_t appl_id; - std::uint16_t maturity_year; - std::uint16_t decay_start_date; - std::uint16_t channel_id; - std::uint16_t leg_count; - std::uint16_t leg_index; - std::array currency; - std::array settl_currency; - std::array secsubtype; - std::array raw_symbol; - std::array group; - std::array exchange; - std::array asset; - std::array cfi; - std::array security_type; - std::array unit_of_measure; - std::array underlying; - std::array strike_price_currency; - std::array leg_raw_symbol; - InstrumentClass instrument_class; - MatchAlgorithm match_algorithm; - std::uint8_t main_fraction; - std::uint8_t price_display_format; - std::uint8_t sub_fraction; - std::uint8_t underlying_product; - SecurityUpdateAction security_update_action; - std::uint8_t maturity_month; - std::uint8_t maturity_day; - std::uint8_t maturity_week; - UserDefinedInstrument user_defined_instrument; - std::int8_t contract_multiplier_unit; - std::int8_t flow_schedule_type; - std::uint8_t tick_rule; - InstrumentClass leg_instrument_class; - Side leg_side; - // padding for alignment - std::array reserved; -}; -static_assert(sizeof(InstrumentDefMsg) == 520, - "InstrumentDefMsg size must match Rust"); -static_assert(alignof(InstrumentDefMsg) == 8, "Must have 8-byte alignment"); -static_assert(kMaxRecordLen == sizeof(InstrumentDefMsg) + sizeof(UnixNanos), - "v3 definition with ts_out should be the largest record"); - -/// A statistics message. A catchall for various data disseminated by -/// publishers. The `stat_type` indicates the statistic contained in the -/// message. -struct StatMsg { - static bool HasRType(RType rtype) { return rtype == RType::Statistics; } - - UnixNanos IndexTs() const { return ts_recv; } - - RecordHeader hd; - UnixNanos ts_recv; - UnixNanos ts_ref; - std::int64_t price; - std::int64_t quantity; - std::uint32_t sequence; - TimeDeltaNanos ts_in_delta; - StatType stat_type; - std::uint16_t channel_id; - StatUpdateAction update_action; - std::uint8_t stat_flags; - std::array reserved; -}; -static_assert(sizeof(StatMsg) == 80, "StatMsg size must match Rust"); -static_assert(alignof(StatMsg) == 8, "Must have 8-byte alignment"); - -bool operator==(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs); -inline bool operator!=(const InstrumentDefMsg& lhs, - const InstrumentDefMsg& rhs) { - return !(lhs == rhs); -} -inline bool operator==(const StatMsg& lhs, const StatMsg& rhs) { - return std::tie(lhs.hd, lhs.ts_recv, lhs.ts_ref, lhs.price, lhs.quantity, - lhs.sequence, lhs.ts_in_delta, lhs.stat_type, lhs.channel_id, - lhs.update_action, lhs.stat_flags) == - std::tie(rhs.hd, rhs.ts_recv, rhs.ts_ref, rhs.price, rhs.quantity, - rhs.sequence, rhs.ts_in_delta, rhs.stat_type, rhs.channel_id, - rhs.update_action, rhs.stat_flags); -} -inline bool operator!=(const StatMsg& lhs, const StatMsg& rhs) { - return !(lhs == rhs); -} - -std::string ToString(const InstrumentDefMsg& instr_def_msg); -std::ostream& operator<<(std::ostream& stream, - const InstrumentDefMsg& instr_def_msg); -std::string ToString(const StatMsg& stat_msg); -std::ostream& operator<<(std::ostream& stream, const StatMsg& stat_msg); } // namespace databento::v3 diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index f62eeae..899c80b 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.35.1 +pkgver=0.36.0 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') diff --git a/src/dbn.cpp b/src/dbn.cpp index 070c8a2..cad114e 100644 --- a/src/dbn.cpp +++ b/src/dbn.cpp @@ -16,8 +16,11 @@ PitSymbolMap Metadata::CreateSymbolMapForDate(date::year_month_day date) const { TsSymbolMap Metadata::CreateSymbolMap() const { return TsSymbolMap{*this}; } void Metadata::Upgrade(VersionUpgradePolicy upgrade_policy) { - if (version < kDbnVersion && - upgrade_policy == VersionUpgradePolicy::UpgradeToV2) { + if (upgrade_policy == VersionUpgradePolicy::UpgradeToV2 && version < 2) { + version = 2; + symbol_cstr_len = kSymbolCstrLen; + } else if (upgrade_policy == VersionUpgradePolicy::UpgradeToV3 && + version < 3) { version = kDbnVersion; symbol_cstr_len = kSymbolCstrLen; } diff --git a/src/dbn_decoder.cpp b/src/dbn_decoder.cpp index b5ef248..08a0dbc 100644 --- a/src/dbn_decoder.cpp +++ b/src/dbn_decoder.cpp @@ -14,6 +14,7 @@ #include "databento/enums.hpp" #include "databento/exceptions.hpp" #include "databento/record.hpp" +#include "databento/v3.hpp" #include "databento/with_ts_out.hpp" #include "dbn_constants.hpp" @@ -69,7 +70,7 @@ DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, InFileStream file_stream) DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr input) : DbnDecoder(log_receiver, std::move(input), - VersionUpgradePolicy::UpgradeToV2) {} + VersionUpgradePolicy::UpgradeToV3) {} DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, std::unique_ptr input, @@ -81,7 +82,7 @@ DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, input_ = std::make_unique(std::move(input_), buffer_); input_->ReadExact(buffer_.WriteBegin(), kMagicSize); - buffer_.WriteBegin() += kMagicSize; + buffer_.Fill(kMagicSize); const auto* buf_ptr = buffer_.ReadBegin(); if (std::strncmp(Consume(buf_ptr, 3), kDbnPrefix, 3) != 0) { throw DbnResponseError{"Found Zstd input, but not DBN prefix"}; @@ -180,17 +181,17 @@ databento::Metadata DbnDecoder::DecodeMetadata() { // already read first 4 bytes detecting compression const auto read_size = kMetadataPreludeSize - kMagicSize; input_->ReadExact(buffer_.WriteBegin(), read_size); - buffer_.WriteBegin() += read_size; + buffer_.Fill(read_size); const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize( buffer_.ReadBegin(), kMetadataPreludeSize); - buffer_.ReadBegin() += kMetadataPreludeSize; + buffer_.Consume(kMetadataPreludeSize); version_ = version; buffer_.Reserve(size); input_->ReadExact(buffer_.WriteBegin(), size); - buffer_.WriteBegin() += size; + buffer_.Fill(size); auto metadata = DbnDecoder::DecodeMetadataFields( version_, buffer_.ReadBegin(), buffer_.ReadEnd()); - buffer_.ReadBegin() += size; + buffer_.Consume(size); // Metadata may leave buffer misaligned. Shift records to ensure 8-byte // alignment buffer_.Shift(); @@ -206,13 +207,17 @@ databento::Record UpgradeRecord( databento::Record rec) { if (ts_out) { const auto orig = rec.Get>(); - const databento::WithTsOut v2{orig.rec.ToV2(), orig.ts_out}; - const auto v2_ptr = reinterpret_cast(&v2); - std::copy(v2_ptr, v2_ptr + v2.rec.hd.Size(), compat_buffer->data()); + const databento::WithTsOut upgraded = {orig.rec.template Upgrade(), + orig.ts_out}; + const auto upgraded_ptr = reinterpret_cast(&upgraded); + std::copy(upgraded_ptr, upgraded_ptr + upgraded.rec.hd.Size(), + compat_buffer->data()); } else { - const auto v2 = rec.Get().ToV2(); - const auto v2_ptr = reinterpret_cast(&v2); - std::copy(v2_ptr, v2_ptr + v2.hd.Size(), compat_buffer->data()); + const auto& orig = rec.Get(); + const U upgraded = orig.template Upgrade(); + const auto upgraded_ptr = reinterpret_cast(&upgraded); + std::copy(upgraded_ptr, upgraded_ptr + upgraded.hd.Size(), + compat_buffer->data()); } return databento::Record{ reinterpret_cast(compat_buffer->data())}; @@ -223,17 +228,68 @@ databento::Record DbnDecoder::DecodeRecordCompat( std::uint8_t version, VersionUpgradePolicy upgrade_policy, bool ts_out, std::array* compat_buffer, Record rec) { if (version == 1 && upgrade_policy == VersionUpgradePolicy::UpgradeToV2) { - if (rec.RType() == RType::InstrumentDef) { - return UpgradeRecord( - ts_out, compat_buffer, rec); - } else if (rec.RType() == RType::SymbolMapping) { - return UpgradeRecord( - ts_out, compat_buffer, rec); - } else if (rec.RType() == RType::Error) { - return UpgradeRecord(ts_out, compat_buffer, rec); - } else if (rec.RType() == RType::System) { - return UpgradeRecord(ts_out, compat_buffer, - rec); + switch (rec.RType()) { + case RType::InstrumentDef: { + return UpgradeRecord( + ts_out, compat_buffer, rec); + } + case RType::SymbolMapping: { + return UpgradeRecord( + ts_out, compat_buffer, rec); + } + case RType::Error: { + return UpgradeRecord(ts_out, compat_buffer, + rec); + } + case RType::System: { + return UpgradeRecord(ts_out, + compat_buffer, rec); + } + default: { + break; + } + } + } else if (version == 1 && + upgrade_policy == VersionUpgradePolicy::UpgradeToV3) { + switch (rec.RType()) { + case RType::InstrumentDef: { + return UpgradeRecord( + ts_out, compat_buffer, rec); + } + case RType::Statistics: { + return UpgradeRecord(ts_out, compat_buffer, + rec); + } + case RType::SymbolMapping: { + return UpgradeRecord( + ts_out, compat_buffer, rec); + } + case RType::Error: { + return UpgradeRecord(ts_out, compat_buffer, + rec); + } + case RType::System: { + return UpgradeRecord(ts_out, + compat_buffer, rec); + } + default: { + break; + } + } + } else if (version == 2 && + upgrade_policy == VersionUpgradePolicy::UpgradeToV3) { + switch (rec.RType()) { + case RType::InstrumentDef: { + return UpgradeRecord( + ts_out, compat_buffer, rec); + } + case RType::Statistics: { + return UpgradeRecord(ts_out, compat_buffer, + rec); + } + default: { + break; + } } } return rec; @@ -260,7 +316,7 @@ const databento::Record* DbnDecoder::DecodeRecord() { } } current_record_ = Record{BufferRecordHeader()}; - buffer_.ReadBegin() += current_record_.Size(); + buffer_.Consume(current_record_.Size()); current_record_ = DbnDecoder::DecodeRecordCompat( version_, upgrade_policy_, ts_out_, &compat_buffer_, current_record_); return ¤t_record_; @@ -272,7 +328,7 @@ size_t DbnDecoder::FillBuffer() { } const auto fill_size = input_->ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()); - buffer_.WriteBegin() += fill_size; + buffer_.Fill(fill_size); return fill_size; } @@ -282,7 +338,7 @@ databento::RecordHeader* DbnDecoder::BufferRecordHeader() { bool DbnDecoder::DetectCompression() { input_->ReadExact(buffer_.WriteBegin(), kMagicSize); - buffer_.WriteBegin() += kMagicSize; + buffer_.Fill(kMagicSize); const auto* buffer_it = buffer_.ReadBegin(); if (std::strncmp(Consume(buffer_it, 3), kDbnPrefix, 3) == 0) { return false; diff --git a/src/dbn_encoder.cpp b/src/dbn_encoder.cpp index e4ad52a..290ef4b 100644 --- a/src/dbn_encoder.cpp +++ b/src/dbn_encoder.cpp @@ -91,7 +91,7 @@ void DbnEncoder::EncodeMetadata(const Metadata& metadata, IWritable* output) { std::max(1, metadata.version), kDbnVersion); EncodeChars(kDbnPrefix, kMagicSize - 1, output); EncodeAsBytes(version, output); - const std::uint32_t length = CalcLength(metadata); + const auto [length, end_padding] = CalcLength(metadata); EncodeAsBytes(length, output); EncodeFixedLenCStr(kDatasetCstrLen, metadata.dataset, output); if (metadata.has_mixed_schema) { @@ -130,6 +130,10 @@ void DbnEncoder::EncodeMetadata(const Metadata& metadata, IWritable* output) { EncodeRepeatedSymbolCStr(metadata.symbol_cstr_len, metadata.not_found, output); EncodeSymbolMappings(metadata.symbol_cstr_len, metadata.mappings, output); + if (end_padding > 0) { + std::array end_padding_buf{}; + output->WriteAll(end_padding_buf.data(), end_padding); + } } void DbnEncoder::EncodeRecord(const Record& record, IWritable* output) { @@ -141,7 +145,8 @@ void DbnEncoder::EncodeRecord(const Record& record) { EncodeRecord(record, output_); } -std::uint32_t DbnEncoder::CalcLength(const Metadata& metadata) { +std::pair DbnEncoder::CalcLength( + const Metadata& metadata) { const auto symbol_cstr_len = metadata.symbol_cstr_len; const auto mapping_interval_len = sizeof(std::uint32_t) * 2 + symbol_cstr_len; // schema_definition_length, symbols_count, partial_count, not_found_count, @@ -157,7 +162,13 @@ std::uint32_t DbnEncoder::CalcLength(const Metadata& metadata) { return acc + symbol_cstr_len + sizeof(std::uint32_t) + m.intervals.size() * mapping_interval_len; }); - return static_cast(kFixedMetadataLen + var_len_counts_size + - c_str_count * symbol_cstr_len + - mappings_len); + const auto needed_len = + static_cast(kFixedMetadataLen + var_len_counts_size + + c_str_count * symbol_cstr_len + mappings_len); + const auto rem = needed_len % 8; + if (metadata.version < 3 || rem == 0) { + return {needed_len, 0}; + } + const auto end_padding = 8 - rem; + return {needed_len + end_padding, end_padding}; } diff --git a/src/detail/buffer.cpp b/src/detail/buffer.cpp index 7822fd4..6c5565d 100644 --- a/src/detail/buffer.cpp +++ b/src/detail/buffer.cpp @@ -4,6 +4,7 @@ #include #include "databento/exceptions.hpp" +#include "stream_op_helper.hpp" using databento::detail::Buffer; @@ -16,7 +17,7 @@ size_t Buffer::Write(const std::byte* data, std::size_t length) { } const auto write_size = std::min(WriteCapacity(), length); std::copy(data, data + write_size, WriteBegin()); - WriteBegin() += write_size; + Fill(length); return write_size; } @@ -34,7 +35,7 @@ void Buffer::WriteAll(const std::byte* data, std::size_t length) { } void Buffer::ReadExact(std::byte* buffer, std::size_t length) { - if (length < ReadCapacity()) { + if (length > ReadCapacity()) { std::ostringstream err_msg; err_msg << "Reached end of buffer without " << length << " bytes, only " << ReadCapacity() << " bytes available"; @@ -46,7 +47,7 @@ void Buffer::ReadExact(std::byte* buffer, std::size_t length) { std::size_t Buffer::ReadSome(std::byte* buffer, std::size_t max_length) { const auto read_size = std::min(ReadCapacity(), max_length); std::copy(ReadBegin(), ReadBegin() + read_size, buffer); - ReadBegin() += read_size; + Consume(read_size); return read_size; } @@ -71,3 +72,19 @@ void Buffer::Shift() { read_pos_ = buf_.get(); write_pos_ = read_pos_ + unread_bytes; } + +namespace databento::detail { +std::ostream& operator<<(std::ostream& stream, const Buffer& buffer) { + return StreamOpBuilder{stream} + .SetTypeName("Buffer") + .SetSpacer(" ") + .Build() + .AddField("buf_", buffer.buf_.get()) + .AddField("end_", buffer.end_) + .AddField("read_pos", buffer.read_pos_) + .AddField("write_pos_", buffer.write_pos_) + .AddField("ReadCapacity", buffer.ReadCapacity()) + .AddField("WriteCapacity", buffer.WriteCapacity()) + .Finish(); +} +} // namespace databento::detail diff --git a/src/detail/dbn_buffer_decoder.cpp b/src/detail/dbn_buffer_decoder.cpp index 2bb95e2..e499c67 100644 --- a/src/detail/dbn_buffer_decoder.cpp +++ b/src/detail/dbn_buffer_decoder.cpp @@ -8,10 +8,12 @@ using databento::detail::DbnBufferDecoder; databento::KeepGoing DbnBufferDecoder::Process(const char* data, std::size_t length) { + constexpr auto kUpgradePolicy = VersionUpgradePolicy::UpgradeToV3; + zstd_buffer_->WriteAll(data, length); const auto read_size = zstd_stream_.ReadSome(dbn_buffer_.WriteBegin(), dbn_buffer_.WriteCapacity()); - dbn_buffer_.WriteBegin() += read_size; + dbn_buffer_.Fill(read_size); if (read_size == 0) { return KeepGoing::Continue; } @@ -23,7 +25,7 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data, std::tie(input_version_, bytes_needed_) = DbnDecoder::DecodeMetadataVersionAndSize(dbn_buffer_.ReadBegin(), dbn_buffer_.ReadCapacity()); - dbn_buffer_.ReadBegin() += kMetadataPreludeSize; + dbn_buffer_.Consume(kMetadataPreludeSize); dbn_buffer_.Reserve(bytes_needed_); state_ = DecoderState::Metadata; [[fallthrough]]; @@ -34,12 +36,12 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data, } auto metadata = DbnDecoder::DecodeMetadataFields( input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd()); - dbn_buffer_.ReadBegin() += bytes_needed_; + dbn_buffer_.Consume(bytes_needed_); // Metadata may leave buffer misaligned. Shift records to ensure 8-byte // alignment dbn_buffer_.Shift(); ts_out_ = metadata.ts_out; - metadata.Upgrade(VersionUpgradePolicy::UpgradeToV2); + metadata.Upgrade(kUpgradePolicy); if (metadata_callback_) { metadata_callback_(std::move(metadata)); } @@ -55,12 +57,11 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data, break; } record = DbnDecoder::DecodeRecordCompat( - input_version_, VersionUpgradePolicy::UpgradeToV2, ts_out_, - &compat_buffer_, record); + input_version_, kUpgradePolicy, ts_out_, &compat_buffer_, record); if (record_callback_(record) == KeepGoing::Stop) { return KeepGoing::Stop; } - dbn_buffer_.ReadBegin() += bytes_needed_; + dbn_buffer_.Consume(bytes_needed_); } } } diff --git a/src/detail/zstd_stream.cpp b/src/detail/zstd_stream.cpp index c521379..d908716 100644 --- a/src/detail/zstd_stream.cpp +++ b/src/detail/zstd_stream.cpp @@ -24,7 +24,7 @@ ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr input, read_suggestion_{::ZSTD_initDStream(z_dstream_.get())}, in_buffer_{in_buffer.ReadBegin(), in_buffer.ReadEnd()}, z_in_buffer_{in_buffer_.data(), in_buffer_.size(), 0} { - in_buffer.ReadBegin() += in_buffer.ReadCapacity(); + in_buffer.Consume(in_buffer.ReadCapacity()); } void ZstdDecodeStream::ReadExact(std::byte* buffer, std::size_t length) { diff --git a/src/enums.cpp b/src/enums.cpp index 26dad86..bd322be 100644 --- a/src/enums.cpp +++ b/src/enums.cpp @@ -755,7 +755,10 @@ const char* ToString(VersionUpgradePolicy upgrade_policy) { return "AsIs"; } case VersionUpgradePolicy::UpgradeToV2: { - return "Upgrade"; + return "UpgradeToV2"; + } + case VersionUpgradePolicy::UpgradeToV3: { + return "UpgradeToV3"; } default: { return "Unknown"; diff --git a/src/file_stream.cpp b/src/file_stream.cpp index 063ecb8..178d39b 100644 --- a/src/file_stream.cpp +++ b/src/file_stream.cpp @@ -11,7 +11,7 @@ InFileStream::InFileStream(const std::string& file_path) : stream_{file_path, std::ios::binary} { if (stream_.fail()) { throw InvalidArgumentError{"InFileStream", "file_path", - "Non-existent or invalid file"}; + "Non-existent or invalid file at " + file_path}; } } diff --git a/src/historical.cpp b/src/historical.cpp index 07d20a6..19b1c6d 100644 --- a/src/historical.cpp +++ b/src/historical.cpp @@ -925,7 +925,7 @@ databento::DbnFileStore Historical::TimeseriesGetRangeToFile( const HttplibParams& params, const std::string& file_path) { StreamToFile(kTimeseriesGetRangePath, params, file_path); return DbnFileStore{log_receiver_, file_path, - VersionUpgradePolicy::UpgradeToV2}; + VersionUpgradePolicy::UpgradeToV3}; } using databento::HistoricalBuilder; @@ -949,6 +949,12 @@ HistoricalBuilder& HistoricalBuilder::SetGateway(HistoricalGateway gateway) { return *this; } +HistoricalBuilder& HistoricalBuilder::SetLogReceiver( + ILogReceiver* log_receiver) { + log_receiver_ = log_receiver; + return *this; +} + Historical HistoricalBuilder::Build() { if (key_.empty()) { throw Exception{"'key' is unset"}; diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 1fdeb3b..f5450ed 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -128,7 +128,9 @@ void LiveBlocking::Subscribe(std::string_view sub_msg, chunked_sub_msg << sub_msg << "|symbols=" << JoinSymbolStrings(kMethodName, symbols_it, symbols_it + chunk_size) - << "|snapshot=" << use_snapshot << '\n'; + << "|snapshot=" << use_snapshot + << "|is_last=" << (distance_from_end <= kSymbolMaxChunkSize) + << '\n'; client_.WriteAll(chunked_sub_msg.str()); symbols_it += chunk_size; @@ -138,16 +140,16 @@ void LiveBlocking::Subscribe(std::string_view sub_msg, databento::Metadata LiveBlocking::Start() { client_.WriteAll("start_session\n"); client_.ReadExact(buffer_.WriteBegin(), kMetadataPreludeSize); - buffer_.WriteBegin() += kMetadataPreludeSize; + buffer_.Fill(kMetadataPreludeSize); const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize( buffer_.ReadBegin(), kMetadataPreludeSize); - buffer_.ReadBegin() += kMetadataPreludeSize; + buffer_.Consume(kMetadataPreludeSize); buffer_.Reserve(size); client_.ReadExact(buffer_.WriteBegin(), size); - buffer_.WriteBegin() += size; + buffer_.Fill(size); auto metadata = DbnDecoder::DecodeMetadataFields(version, buffer_.ReadBegin(), buffer_.ReadEnd()); - buffer_.ReadBegin() += size; + buffer_.Consume(size); // Metadata may leave buffer misaligned. Shift records to ensure 8-byte // alignment buffer_.Shift(); @@ -182,7 +184,7 @@ const databento::Record* LiveBlocking::NextRecord( } } current_record_ = Record{BufferRecordHeader()}; - buffer_.ReadBegin() += current_record_.Size(); + buffer_.Consume(current_record_.Size()); current_record_ = DbnDecoder::DecodeRecordCompat(version_, upgrade_policy_, send_ts_out_, &compat_buffer_, current_record_); @@ -221,7 +223,7 @@ std::string LiveBlocking::DecodeChallenge() { if (read_size == 0) { throw LiveApiError{"Gateway closed socket during authentication"}; } - buffer_.WriteBegin() += read_size; + buffer_.Fill(read_size); // first line is version std::string response{reinterpret_cast(buffer_.ReadBegin()), buffer_.ReadCapacity()}; @@ -241,9 +243,8 @@ std::string LiveBlocking::DecodeChallenge() { : response.find('\n', find_start); while (next_nl_pos == std::string::npos) { // read more - buffer_.WriteBegin() += - client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()) - .read_size; + buffer_.Fill(client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()) + .read_size); if (buffer_.ReadCapacity() == 0) { throw LiveApiError{"Gateway closed socket during authentication"}; } @@ -333,7 +334,7 @@ std::uint64_t LiveBlocking::DecodeAuthResp() { "Unexpected end of message received from server after replying to " "CRAM"}; } - buffer_.WriteBegin() += read_size; + buffer_.Fill(read_size); newline_ptr = std::find(buffer_.ReadBegin(), buffer_.ReadEnd(), static_cast('\n')); } while (newline_ptr == buffer_.ReadEnd()); @@ -347,7 +348,7 @@ std::uint64_t LiveBlocking::DecodeAuthResp() { log_receiver_->Receive(LogLevel::Debug, log_ss.str()); } // set in case Read call also read records. One beyond newline - buffer_.ReadBegin() += response.length() + 1; + buffer_.Consume(response.length() + 1); std::size_t pos{}; bool found_success{}; @@ -409,7 +410,7 @@ databento::detail::TcpClient::Result LiveBlocking::FillBuffer( buffer_.Shift(); const auto read_res = client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout); - buffer_.WriteBegin() += read_res.read_size; + buffer_.Fill(read_res.read_size); return read_res; } diff --git a/src/record.cpp b/src/record.cpp index 5212f86..cf3da88 100644 --- a/src/record.cpp +++ b/src/record.cpp @@ -2,11 +2,9 @@ #include -#include "databento/constants.hpp" #include "databento/enums.hpp" #include "databento/exceptions.hpp" // InvalidArgumentError #include "databento/fixed_price.hpp" -#include "databento/v3.hpp" #include "stream_op_helper.hpp" using databento::Record; @@ -111,116 +109,6 @@ databento::RType Record::RTypeFromSchema(const Schema schema) { using databento::InstrumentDefMsg; -databento::v3::InstrumentDefMsg InstrumentDefMsg::ToV3() const { - v3::InstrumentDefMsg ret{ - RecordHeader{ - sizeof(v3::InstrumentDefMsg) / RecordHeader::kLengthMultiplier, - RType::InstrumentDef, hd.publisher_id, hd.instrument_id, hd.ts_event}, - ts_recv, - min_price_increment, - display_factor, - expiration, - activation, - high_limit_price, - low_limit_price, - max_price_variation, - unit_of_measure_qty, - min_price_increment_amount, - price_ratio, - strike_price, - raw_instrument_id, - kUndefPrice, - kUndefPrice, - inst_attrib_value, - underlying_id, - market_depth_implied, - market_depth, - market_segment_id, - max_trade_vol, - min_lot_size, - min_lot_size_block, - min_lot_size_round_lot, - min_trade_vol, - contract_multiplier, - decay_quantity, - original_contract_size, - {}, - {}, - {}, - {}, - {}, - {}, - appl_id, - maturity_year, - decay_start_date, - channel_id, - {}, - {}, - currency, - settl_currency, - secsubtype, - {}, - {}, - {}, - {}, - {}, - {}, - {}, - {}, - {}, - {}, - instrument_class, - match_algorithm, - main_fraction, - price_display_format, - sub_fraction, - underlying_product, - security_update_action, - maturity_month, - maturity_day, - maturity_week, - user_defined_instrument, - contract_multiplier_unit, - flow_schedule_type, - tick_rule, - {}, - Side::None, - {}}; - std::copy(raw_symbol.begin(), raw_symbol.end(), ret.raw_symbol.begin()); - std::copy(group.begin(), group.end(), ret.group.begin()); - std::copy(exchange.begin(), exchange.end(), ret.exchange.begin()); - std::copy(asset.begin(), asset.end(), ret.asset.begin()); - std::copy(cfi.begin(), cfi.end(), ret.cfi.begin()); - std::copy(security_type.begin(), security_type.end(), - ret.security_type.begin()); - std::copy(unit_of_measure.begin(), unit_of_measure.end(), - ret.unit_of_measure.begin()); - std::copy(underlying.begin(), underlying.end(), ret.underlying.begin()); - std::copy(strike_price_currency.begin(), strike_price_currency.end(), - ret.strike_price_currency.begin()); - return ret; -} - -using databento::StatMsg; - -databento::v3::StatMsg StatMsg::ToV3() const { - return databento::v3::StatMsg{ - RecordHeader{sizeof(v3::StatMsg) / RecordHeader::kLengthMultiplier, - RType::Statistics, hd.publisher_id, hd.instrument_id, - hd.ts_event}, - ts_recv, - ts_ref, - price, - quantity == kUndefStatQuantity ? v3::kUndefStatQuantity : quantity, - sequence, - ts_in_delta, - stat_type, - channel_id, - update_action, - stat_flags, - {}}; -} - bool databento::operator==(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs) { return lhs.hd == rhs.hd && lhs.ts_recv == rhs.ts_recv && @@ -230,14 +118,14 @@ bool databento::operator==(const InstrumentDefMsg& lhs, lhs.high_limit_price == rhs.high_limit_price && lhs.low_limit_price == rhs.low_limit_price && lhs.max_price_variation == rhs.max_price_variation && - lhs.trading_reference_price == rhs.trading_reference_price && lhs.unit_of_measure_qty == rhs.unit_of_measure_qty && lhs.min_price_increment_amount == rhs.min_price_increment_amount && lhs.price_ratio == rhs.price_ratio && lhs.strike_price == rhs.strike_price && + lhs.raw_instrument_id == rhs.raw_instrument_id && + lhs.leg_price == rhs.leg_price && lhs.leg_delta == rhs.leg_delta && lhs.inst_attrib_value == rhs.inst_attrib_value && lhs.underlying_id == rhs.underlying_id && - lhs.raw_instrument_id == rhs.raw_instrument_id && lhs.market_depth_implied == rhs.market_depth_implied && lhs.market_depth == rhs.market_depth && lhs.market_segment_id == rhs.market_segment_id && @@ -249,10 +137,16 @@ bool databento::operator==(const InstrumentDefMsg& lhs, lhs.contract_multiplier == rhs.contract_multiplier && lhs.decay_quantity == rhs.decay_quantity && lhs.original_contract_size == rhs.original_contract_size && - lhs.trading_reference_date == rhs.trading_reference_date && + lhs.leg_instrument_id == rhs.leg_instrument_id && + lhs.leg_ratio_price_numerator == rhs.leg_ratio_price_numerator && + lhs.leg_ratio_price_denominator == rhs.leg_ratio_price_denominator && + lhs.leg_ratio_qty_numerator == rhs.leg_ratio_qty_numerator && + lhs.leg_ratio_qty_denominator == rhs.leg_ratio_qty_denominator && + lhs.leg_underlying_id == rhs.leg_underlying_id && lhs.appl_id == rhs.appl_id && lhs.maturity_year == rhs.maturity_year && lhs.decay_start_date == rhs.decay_start_date && - lhs.channel_id == rhs.channel_id && lhs.currency == rhs.currency && + lhs.channel_id == rhs.channel_id && lhs.leg_count == rhs.leg_count && + lhs.leg_index == rhs.leg_index && lhs.currency == rhs.currency && lhs.settl_currency == rhs.settl_currency && lhs.secsubtype == rhs.secsubtype && lhs.raw_symbol == rhs.raw_symbol && lhs.group == rhs.group && lhs.exchange == rhs.exchange && @@ -261,12 +155,11 @@ bool databento::operator==(const InstrumentDefMsg& lhs, lhs.unit_of_measure == rhs.unit_of_measure && lhs.underlying == rhs.underlying && lhs.strike_price_currency == rhs.strike_price_currency && + lhs.leg_raw_symbol == rhs.leg_raw_symbol && lhs.instrument_class == rhs.instrument_class && lhs.match_algorithm == rhs.match_algorithm && - lhs.md_security_trading_status == rhs.md_security_trading_status && lhs.main_fraction == rhs.main_fraction && lhs.price_display_format == rhs.price_display_format && - lhs.settl_price_type == rhs.settl_price_type && lhs.sub_fraction == rhs.sub_fraction && lhs.underlying_product == rhs.underlying_product && lhs.security_update_action == rhs.security_update_action && @@ -276,7 +169,9 @@ bool databento::operator==(const InstrumentDefMsg& lhs, lhs.user_defined_instrument == rhs.user_defined_instrument && lhs.contract_multiplier_unit == rhs.contract_multiplier_unit && lhs.flow_schedule_type == rhs.flow_schedule_type && - lhs.tick_rule == rhs.tick_rule; + lhs.tick_rule == rhs.tick_rule && + lhs.leg_instrument_class == rhs.leg_instrument_class && + lhs.leg_side == rhs.leg_side; } using databento::ImbalanceMsg; @@ -541,16 +436,16 @@ std::ostream& operator<<(std::ostream& stream, .AddField("high_limit_price", FixPx{instr_def_msg.high_limit_price}) .AddField("low_limit_price", FixPx{instr_def_msg.low_limit_price}) .AddField("max_price_variation", FixPx{instr_def_msg.max_price_variation}) - .AddField("trading_reference_price", - FixPx{instr_def_msg.trading_reference_price}) .AddField("unit_of_measure_qty", FixPx{instr_def_msg.unit_of_measure_qty}) .AddField("min_price_increment_amount", FixPx{instr_def_msg.min_price_increment_amount}) .AddField("price_ratio", FixPx{instr_def_msg.price_ratio}) .AddField("strike_price", FixPx{instr_def_msg.strike_price}) + .AddField("raw_instrument_id", instr_def_msg.raw_instrument_id) + .AddField("leg_price", FixPx{instr_def_msg.leg_price}) + .AddField("leg_delta", FixPx{instr_def_msg.leg_delta}) .AddField("inst_attrib_value", instr_def_msg.inst_attrib_value) .AddField("underlying_id", instr_def_msg.underlying_id) - .AddField("raw_instrument_id", instr_def_msg.raw_instrument_id) .AddField("market_depth_implied", instr_def_msg.market_depth_implied) .AddField("market_depth", instr_def_msg.market_depth) .AddField("market_segment_id", instr_def_msg.market_segment_id) @@ -562,11 +457,22 @@ std::ostream& operator<<(std::ostream& stream, .AddField("contract_multiplier", instr_def_msg.contract_multiplier) .AddField("decay_quantity", instr_def_msg.decay_quantity) .AddField("original_contract_size", instr_def_msg.original_contract_size) - .AddField("trading_reference_date", instr_def_msg.trading_reference_date) + .AddField("leg_instrument_id", instr_def_msg.leg_instrument_id) + .AddField("leg_ratio_price_numerator", + instr_def_msg.leg_ratio_price_numerator) + .AddField("leg_ratio_price_denominator", + instr_def_msg.leg_ratio_price_denominator) + .AddField("leg_ratio_qty_numerator", + instr_def_msg.leg_ratio_qty_numerator) + .AddField("leg_ratio_qty_denominator", + instr_def_msg.leg_ratio_qty_denominator) + .AddField("leg_underlying_id", instr_def_msg.leg_underlying_id) .AddField("appl_id", instr_def_msg.appl_id) .AddField("maturity_year", instr_def_msg.maturity_year) .AddField("decay_start_date", instr_def_msg.decay_start_date) .AddField("channel_id", instr_def_msg.channel_id) + .AddField("leg_count", instr_def_msg.leg_count) + .AddField("leg_index", instr_def_msg.leg_index) .AddField("currency", instr_def_msg.currency) .AddField("settl_currency", instr_def_msg.settl_currency) .AddField("secsubtype", instr_def_msg.secsubtype) @@ -579,13 +485,11 @@ std::ostream& operator<<(std::ostream& stream, .AddField("unit_of_measure", instr_def_msg.unit_of_measure) .AddField("underlying", instr_def_msg.underlying) .AddField("strike_price_currency", instr_def_msg.strike_price_currency) + .AddField("leg_raw_symbol", instr_def_msg.leg_raw_symbol) .AddField("instrument_class", instr_def_msg.instrument_class) .AddField("match_algorithm", instr_def_msg.match_algorithm) - .AddField("md_security_trading_status", - instr_def_msg.md_security_trading_status) .AddField("main_fraction", instr_def_msg.main_fraction) .AddField("price_display_format", instr_def_msg.price_display_format) - .AddField("settl_price_type", instr_def_msg.settl_price_type) .AddField("sub_fraction", instr_def_msg.sub_fraction) .AddField("underlying_product", instr_def_msg.underlying_product) .AddField("security_update_action", instr_def_msg.security_update_action) @@ -598,6 +502,8 @@ std::ostream& operator<<(std::ostream& stream, instr_def_msg.contract_multiplier_unit) .AddField("flow_schedule_type", instr_def_msg.flow_schedule_type) .AddField("tick_rule", instr_def_msg.tick_rule) + .AddField("leg_instrument_class", instr_def_msg.leg_instrument_class) + .AddField("leg_side", instr_def_msg.leg_side) .Finish(); } diff --git a/src/v1.cpp b/src/v1.cpp index 2d7a137..f3abe2d 100644 --- a/src/v1.cpp +++ b/src/v1.cpp @@ -2,10 +2,8 @@ #include // copy #include -#include #include // numeric_limits -#include "databento/constants.hpp" #include "databento/enums.hpp" #include "databento/fixed_price.hpp" // FixedPx #include "databento/record.hpp" @@ -189,6 +187,38 @@ v3::InstrumentDefMsg InstrumentDefMsg::ToV3() const { return ret; } +template <> +v2::InstrumentDefMsg InstrumentDefMsg::Upgrade() const { + return ToV2(); +} +template <> +v3::InstrumentDefMsg InstrumentDefMsg::Upgrade() const { + return ToV3(); +} + +v3::StatMsg StatMsg::ToV3() const { + return v3::StatMsg{ + RecordHeader{sizeof(v3::StatMsg) / RecordHeader::kLengthMultiplier, + RType::Statistics, hd.publisher_id, hd.instrument_id, + hd.ts_event}, + ts_recv, + ts_ref, + price, + quantity == kUndefStatQuantity ? v3::kUndefStatQuantity : quantity, + sequence, + ts_in_delta, + stat_type, + channel_id, + update_action, + stat_flags, + {}}; +} + +template <> +v3::StatMsg StatMsg::Upgrade() const { + return ToV3(); +} + v2::ErrorMsg ErrorMsg::ToV2() const { v2::ErrorMsg ret{ RecordHeader{sizeof(v2::ErrorMsg) / RecordHeader::kLengthMultiplier, @@ -201,6 +231,11 @@ v2::ErrorMsg ErrorMsg::ToV2() const { return ret; } +template <> +v2::ErrorMsg ErrorMsg::Upgrade() const { + return ToV2(); +} + v2::SymbolMappingMsg SymbolMappingMsg::ToV2() const { v2::SymbolMappingMsg ret{ RecordHeader{ @@ -223,6 +258,11 @@ v2::SymbolMappingMsg SymbolMappingMsg::ToV2() const { return ret; } +template <> +v2::SymbolMappingMsg SymbolMappingMsg::Upgrade() const { + return ToV2(); +} + v2::SystemMsg SystemMsg::ToV2() const { v2::SystemMsg ret{ RecordHeader{sizeof(v2::SystemMsg) / RecordHeader::kLengthMultiplier, @@ -234,6 +274,11 @@ v2::SystemMsg SystemMsg::ToV2() const { return ret; } +template <> +v2::SystemMsg SystemMsg::Upgrade() const { + return ToV2(); +} + bool operator==(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs) { return lhs.hd == rhs.hd && lhs.ts_recv == rhs.ts_recv && lhs.min_price_increment == rhs.min_price_increment && @@ -368,6 +413,27 @@ std::ostream& operator<<(std::ostream& stream, .AddField("tick_rule", instr_def_msg.tick_rule) .Finish(); } + +std::string ToString(const StatMsg& stat_msg) { return MakeString(stat_msg); } +std::ostream& operator<<(std::ostream& stream, const StatMsg& stat_msg) { + return StreamOpBuilder{stream} + .SetSpacer("\n ") + .SetTypeName("StatMsg") + .Build() + .AddField("hd", stat_msg.hd) + .AddField("ts_recv", stat_msg.ts_recv) + .AddField("ts_ref", stat_msg.ts_ref) + .AddField("price", FixPx{stat_msg.price}) + .AddField("quantity", stat_msg.quantity) + .AddField("sequence", stat_msg.sequence) + .AddField("ts_in_delta", stat_msg.ts_in_delta) + .AddField("stat_type", stat_msg.stat_type) + .AddField("channel_id", stat_msg.channel_id) + .AddField("update_action", stat_msg.update_action) + .AddField("stat_flags", stat_msg.stat_flags) + .Finish(); +} + std::string ToString(const ErrorMsg& err_msg) { return MakeString(err_msg); } std::ostream& operator<<(std::ostream& stream, const ErrorMsg& err_msg) { return StreamOpBuilder{stream} diff --git a/src/v3.cpp b/src/v2.cpp similarity index 68% rename from src/v3.cpp rename to src/v2.cpp index a021f43..1ebd8da 100644 --- a/src/v3.cpp +++ b/src/v2.cpp @@ -1,9 +1,104 @@ -#include "databento/v3.hpp" +#include "databento/v2.hpp" #include "databento/fixed_price.hpp" +#include "databento/v3.hpp" #include "stream_op_helper.hpp" -namespace databento::v3 { +namespace databento::v2 { +databento::v3::InstrumentDefMsg InstrumentDefMsg::ToV3() const { + v3::InstrumentDefMsg ret{ + RecordHeader{ + sizeof(v3::InstrumentDefMsg) / RecordHeader::kLengthMultiplier, + RType::InstrumentDef, hd.publisher_id, hd.instrument_id, hd.ts_event}, + ts_recv, + min_price_increment, + display_factor, + expiration, + activation, + high_limit_price, + low_limit_price, + max_price_variation, + unit_of_measure_qty, + min_price_increment_amount, + price_ratio, + strike_price, + raw_instrument_id, + kUndefPrice, + kUndefPrice, + inst_attrib_value, + underlying_id, + market_depth_implied, + market_depth, + market_segment_id, + max_trade_vol, + min_lot_size, + min_lot_size_block, + min_lot_size_round_lot, + min_trade_vol, + contract_multiplier, + decay_quantity, + original_contract_size, + {}, + {}, + {}, + {}, + {}, + {}, + appl_id, + maturity_year, + decay_start_date, + channel_id, + {}, + {}, + currency, + settl_currency, + secsubtype, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + {}, + instrument_class, + match_algorithm, + main_fraction, + price_display_format, + sub_fraction, + underlying_product, + security_update_action, + maturity_month, + maturity_day, + maturity_week, + user_defined_instrument, + contract_multiplier_unit, + flow_schedule_type, + tick_rule, + {}, + Side::None, + {}}; + std::copy(raw_symbol.begin(), raw_symbol.end(), ret.raw_symbol.begin()); + std::copy(group.begin(), group.end(), ret.group.begin()); + std::copy(exchange.begin(), exchange.end(), ret.exchange.begin()); + std::copy(asset.begin(), asset.end(), ret.asset.begin()); + std::copy(cfi.begin(), cfi.end(), ret.cfi.begin()); + std::copy(security_type.begin(), security_type.end(), + ret.security_type.begin()); + std::copy(unit_of_measure.begin(), unit_of_measure.end(), + ret.unit_of_measure.begin()); + std::copy(underlying.begin(), underlying.end(), ret.underlying.begin()); + std::copy(strike_price_currency.begin(), strike_price_currency.end(), + ret.strike_price_currency.begin()); + return ret; +} + +template <> +v3::InstrumentDefMsg InstrumentDefMsg::Upgrade() const { + return ToV3(); +} bool operator==(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs) { return lhs.hd == rhs.hd && lhs.ts_recv == rhs.ts_recv && @@ -13,14 +108,14 @@ bool operator==(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs) { lhs.high_limit_price == rhs.high_limit_price && lhs.low_limit_price == rhs.low_limit_price && lhs.max_price_variation == rhs.max_price_variation && + lhs.trading_reference_price == rhs.trading_reference_price && lhs.unit_of_measure_qty == rhs.unit_of_measure_qty && lhs.min_price_increment_amount == rhs.min_price_increment_amount && lhs.price_ratio == rhs.price_ratio && lhs.strike_price == rhs.strike_price && - lhs.raw_instrument_id == rhs.raw_instrument_id && - lhs.leg_price == rhs.leg_price && lhs.leg_delta == rhs.leg_delta && lhs.inst_attrib_value == rhs.inst_attrib_value && lhs.underlying_id == rhs.underlying_id && + lhs.raw_instrument_id == rhs.raw_instrument_id && lhs.market_depth_implied == rhs.market_depth_implied && lhs.market_depth == rhs.market_depth && lhs.market_segment_id == rhs.market_segment_id && @@ -32,16 +127,10 @@ bool operator==(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs) { lhs.contract_multiplier == rhs.contract_multiplier && lhs.decay_quantity == rhs.decay_quantity && lhs.original_contract_size == rhs.original_contract_size && - lhs.leg_instrument_id == rhs.leg_instrument_id && - lhs.leg_ratio_price_numerator == rhs.leg_ratio_price_numerator && - lhs.leg_ratio_price_denominator == rhs.leg_ratio_price_denominator && - lhs.leg_ratio_qty_numerator == rhs.leg_ratio_qty_numerator && - lhs.leg_ratio_qty_denominator == rhs.leg_ratio_qty_denominator && - lhs.leg_underlying_id == rhs.leg_underlying_id && + lhs.trading_reference_date == rhs.trading_reference_date && lhs.appl_id == rhs.appl_id && lhs.maturity_year == rhs.maturity_year && lhs.decay_start_date == rhs.decay_start_date && - lhs.channel_id == rhs.channel_id && lhs.leg_count == rhs.leg_count && - lhs.leg_index == rhs.leg_index && lhs.currency == rhs.currency && + lhs.channel_id == rhs.channel_id && lhs.currency == rhs.currency && lhs.settl_currency == rhs.settl_currency && lhs.secsubtype == rhs.secsubtype && lhs.raw_symbol == rhs.raw_symbol && lhs.group == rhs.group && lhs.exchange == rhs.exchange && @@ -50,11 +139,12 @@ bool operator==(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs) { lhs.unit_of_measure == rhs.unit_of_measure && lhs.underlying == rhs.underlying && lhs.strike_price_currency == rhs.strike_price_currency && - lhs.leg_raw_symbol == rhs.leg_raw_symbol && lhs.instrument_class == rhs.instrument_class && lhs.match_algorithm == rhs.match_algorithm && + lhs.md_security_trading_status == rhs.md_security_trading_status && lhs.main_fraction == rhs.main_fraction && lhs.price_display_format == rhs.price_display_format && + lhs.settl_price_type == rhs.settl_price_type && lhs.sub_fraction == rhs.sub_fraction && lhs.underlying_product == rhs.underlying_product && lhs.security_update_action == rhs.security_update_action && @@ -64,9 +154,7 @@ bool operator==(const InstrumentDefMsg& lhs, const InstrumentDefMsg& rhs) { lhs.user_defined_instrument == rhs.user_defined_instrument && lhs.contract_multiplier_unit == rhs.contract_multiplier_unit && lhs.flow_schedule_type == rhs.flow_schedule_type && - lhs.tick_rule == rhs.tick_rule && - lhs.leg_instrument_class == rhs.leg_instrument_class && - lhs.leg_side == rhs.leg_side; + lhs.tick_rule == rhs.tick_rule; } std::string ToString(const InstrumentDefMsg& instr_def_msg) { @@ -76,7 +164,7 @@ std::ostream& operator<<(std::ostream& stream, const InstrumentDefMsg& instr_def_msg) { return StreamOpBuilder{stream} .SetSpacer("\n ") - .SetTypeName("v1::InstrumentDefMsg") + .SetTypeName("v2::InstrumentDefMsg") .Build() .AddField("hd", instr_def_msg.hd) .AddField("ts_recv", instr_def_msg.ts_recv) @@ -87,16 +175,16 @@ std::ostream& operator<<(std::ostream& stream, .AddField("high_limit_price", FixPx{instr_def_msg.high_limit_price}) .AddField("low_limit_price", FixPx{instr_def_msg.low_limit_price}) .AddField("max_price_variation", FixPx{instr_def_msg.max_price_variation}) + .AddField("trading_reference_price", + FixPx{instr_def_msg.trading_reference_price}) .AddField("unit_of_measure_qty", FixPx{instr_def_msg.unit_of_measure_qty}) .AddField("min_price_increment_amount", FixPx{instr_def_msg.min_price_increment_amount}) .AddField("price_ratio", FixPx{instr_def_msg.price_ratio}) .AddField("strike_price", FixPx{instr_def_msg.strike_price}) - .AddField("raw_instrument_id", instr_def_msg.raw_instrument_id) - .AddField("leg_price", FixPx{instr_def_msg.leg_price}) - .AddField("leg_delta", FixPx{instr_def_msg.leg_delta}) .AddField("inst_attrib_value", instr_def_msg.inst_attrib_value) .AddField("underlying_id", instr_def_msg.underlying_id) + .AddField("raw_instrument_id", instr_def_msg.raw_instrument_id) .AddField("market_depth_implied", instr_def_msg.market_depth_implied) .AddField("market_depth", instr_def_msg.market_depth) .AddField("market_segment_id", instr_def_msg.market_segment_id) @@ -108,22 +196,11 @@ std::ostream& operator<<(std::ostream& stream, .AddField("contract_multiplier", instr_def_msg.contract_multiplier) .AddField("decay_quantity", instr_def_msg.decay_quantity) .AddField("original_contract_size", instr_def_msg.original_contract_size) - .AddField("leg_instrument_id", instr_def_msg.leg_instrument_id) - .AddField("leg_ratio_price_numerator", - instr_def_msg.leg_ratio_price_numerator) - .AddField("leg_ratio_price_denominator", - instr_def_msg.leg_ratio_price_denominator) - .AddField("leg_ratio_qty_numerator", - instr_def_msg.leg_ratio_qty_numerator) - .AddField("leg_ratio_qty_denominator", - instr_def_msg.leg_ratio_qty_denominator) - .AddField("leg_underlying_id", instr_def_msg.leg_underlying_id) + .AddField("trading_reference_date", instr_def_msg.trading_reference_date) .AddField("appl_id", instr_def_msg.appl_id) .AddField("maturity_year", instr_def_msg.maturity_year) .AddField("decay_start_date", instr_def_msg.decay_start_date) .AddField("channel_id", instr_def_msg.channel_id) - .AddField("leg_count", instr_def_msg.leg_count) - .AddField("leg_index", instr_def_msg.leg_index) .AddField("currency", instr_def_msg.currency) .AddField("settl_currency", instr_def_msg.settl_currency) .AddField("secsubtype", instr_def_msg.secsubtype) @@ -136,11 +213,13 @@ std::ostream& operator<<(std::ostream& stream, .AddField("unit_of_measure", instr_def_msg.unit_of_measure) .AddField("underlying", instr_def_msg.underlying) .AddField("strike_price_currency", instr_def_msg.strike_price_currency) - .AddField("leg_raw_symbol", instr_def_msg.leg_raw_symbol) .AddField("instrument_class", instr_def_msg.instrument_class) .AddField("match_algorithm", instr_def_msg.match_algorithm) + .AddField("md_security_trading_status", + instr_def_msg.md_security_trading_status) .AddField("main_fraction", instr_def_msg.main_fraction) .AddField("price_display_format", instr_def_msg.price_display_format) + .AddField("settl_price_type", instr_def_msg.settl_price_type) .AddField("sub_fraction", instr_def_msg.sub_fraction) .AddField("underlying_product", instr_def_msg.underlying_product) .AddField("security_update_action", instr_def_msg.security_update_action) @@ -153,28 +232,7 @@ std::ostream& operator<<(std::ostream& stream, instr_def_msg.contract_multiplier_unit) .AddField("flow_schedule_type", instr_def_msg.flow_schedule_type) .AddField("tick_rule", instr_def_msg.tick_rule) - .AddField("leg_instrument_class", instr_def_msg.leg_instrument_class) - .AddField("leg_side", instr_def_msg.leg_side) .Finish(); } -std::string ToString(const StatMsg& stat_msg) { return MakeString(stat_msg); } -std::ostream& operator<<(std::ostream& stream, const StatMsg& stat_msg) { - return StreamOpBuilder{stream} - .SetSpacer("\n ") - .SetTypeName("StatMsg") - .Build() - .AddField("hd", stat_msg.hd) - .AddField("ts_recv", stat_msg.ts_recv) - .AddField("ts_ref", stat_msg.ts_ref) - .AddField("price", FixPx{stat_msg.price}) - .AddField("quantity", stat_msg.quantity) - .AddField("sequence", stat_msg.sequence) - .AddField("ts_in_delta", stat_msg.ts_in_delta) - .AddField("stat_type", stat_msg.stat_type) - .AddField("channel_id", stat_msg.channel_id) - .AddField("update_action", stat_msg.update_action) - .AddField("stat_flags", stat_msg.stat_flags) - .Finish(); -} -} // namespace databento::v3 +} // namespace databento::v2 diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c5f97ab..f099627 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -158,7 +158,7 @@ gtest_discover_tests(${PROJECT_NAME}) # file(COPY data DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) -add_compile_definitions(TEST_BUILD_DIR="${CMAKE_CURRENT_BINARY_DIR}") +add_compile_definitions(TEST_DATA_DIR="${CMAKE_CURRENT_SOURCE_DIR}/data") verbose_message("Finished adding unit tests for ${CMAKE_PROJECT_NAME}.") diff --git a/tests/data/multi-frame.definition.v1.dbn.zst b/tests/data/multi-frame.definition.v1.dbn.frag.zst similarity index 100% rename from tests/data/multi-frame.definition.v1.dbn.zst rename to tests/data/multi-frame.definition.v1.dbn.frag.zst diff --git a/tests/data/test_data.bbo-1m.v2.dbn.zst b/tests/data/test_data.bbo-1m.v2.dbn.zst new file mode 100644 index 0000000..abd773f Binary files /dev/null and b/tests/data/test_data.bbo-1m.v2.dbn.zst differ diff --git a/tests/data/test_data.bbo-1m.v3.dbn.zst b/tests/data/test_data.bbo-1m.v3.dbn.zst new file mode 100644 index 0000000..fd5d96d Binary files /dev/null and b/tests/data/test_data.bbo-1m.v3.dbn.zst differ diff --git a/tests/data/test_data.bbo-1s.v2.dbn.zst b/tests/data/test_data.bbo-1s.v2.dbn.zst new file mode 100644 index 0000000..040d758 Binary files /dev/null and b/tests/data/test_data.bbo-1s.v2.dbn.zst differ diff --git a/tests/data/test_data.bbo-1s.v3.dbn.zst b/tests/data/test_data.bbo-1s.v3.dbn.zst new file mode 100644 index 0000000..586823b Binary files /dev/null and b/tests/data/test_data.bbo-1s.v3.dbn.zst differ diff --git a/tests/data/test_data.cbbo-1s.v1.dbn.zst b/tests/data/test_data.cbbo-1s.v1.dbn.zst new file mode 100644 index 0000000..fa265cd Binary files /dev/null and b/tests/data/test_data.cbbo-1s.v1.dbn.zst differ diff --git a/tests/data/test_data.cbbo.dbn.zst b/tests/data/test_data.cbbo-1s.v2.dbn.zst similarity index 100% rename from tests/data/test_data.cbbo.dbn.zst rename to tests/data/test_data.cbbo-1s.v2.dbn.zst diff --git a/tests/data/test_data.cbbo-1s.v3.dbn.zst b/tests/data/test_data.cbbo-1s.v3.dbn.zst new file mode 100644 index 0000000..18c7479 Binary files /dev/null and b/tests/data/test_data.cbbo-1s.v3.dbn.zst differ diff --git a/tests/data/test_data.cbbo.v1.dbn.zst b/tests/data/test_data.cbbo.v1.dbn.zst deleted file mode 100644 index 1ea8fa0..0000000 Binary files a/tests/data/test_data.cbbo.v1.dbn.zst and /dev/null differ diff --git a/tests/data/test_data.cmbp-1.v1.dbn.zst b/tests/data/test_data.cmbp-1.v1.dbn.zst index 3a3c776..fba3591 100644 Binary files a/tests/data/test_data.cmbp-1.v1.dbn.zst and b/tests/data/test_data.cmbp-1.v1.dbn.zst differ diff --git a/tests/data/test_data.cmbp-1.dbn.zst b/tests/data/test_data.cmbp-1.v2.dbn.zst similarity index 100% rename from tests/data/test_data.cmbp-1.dbn.zst rename to tests/data/test_data.cmbp-1.v2.dbn.zst diff --git a/tests/data/test_data.cmbp-1.v3.dbn.zst b/tests/data/test_data.cmbp-1.v3.dbn.zst new file mode 100644 index 0000000..4d41c5b Binary files /dev/null and b/tests/data/test_data.cmbp-1.v3.dbn.zst differ diff --git a/tests/data/test_data.definition.dbn.frag.zst b/tests/data/test_data.definition.dbn.frag.zst new file mode 100644 index 0000000..cb84c9a Binary files /dev/null and b/tests/data/test_data.definition.dbn.frag.zst differ diff --git a/tests/data/test_data.definition.v1.dbn.frag b/tests/data/test_data.definition.v1.dbn.frag new file mode 100644 index 0000000..137e28d Binary files /dev/null and b/tests/data/test_data.definition.v1.dbn.frag differ diff --git a/tests/data/test_data.definition.v1.dbn.frag.zst b/tests/data/test_data.definition.v1.dbn.frag.zst new file mode 100644 index 0000000..89c1e16 Binary files /dev/null and b/tests/data/test_data.definition.v1.dbn.frag.zst differ diff --git a/tests/data/test_data.definition.v2.dbn.frag b/tests/data/test_data.definition.v2.dbn.frag new file mode 100644 index 0000000..a67a401 Binary files /dev/null and b/tests/data/test_data.definition.v2.dbn.frag differ diff --git a/tests/data/test_data.definition.dbn.zst b/tests/data/test_data.definition.v2.dbn.zst similarity index 100% rename from tests/data/test_data.definition.dbn.zst rename to tests/data/test_data.definition.v2.dbn.zst diff --git a/tests/data/test_data.definition.v3.dbn.frag b/tests/data/test_data.definition.v3.dbn.frag new file mode 100644 index 0000000..e16fed6 Binary files /dev/null and b/tests/data/test_data.definition.v3.dbn.frag differ diff --git a/tests/data/test_data.definition.v3.dbn.frag.zst b/tests/data/test_data.definition.v3.dbn.frag.zst new file mode 100644 index 0000000..8ee0789 Binary files /dev/null and b/tests/data/test_data.definition.v3.dbn.frag.zst differ diff --git a/tests/data/test_data.definition.v3.dbn.zst b/tests/data/test_data.definition.v3.dbn.zst new file mode 100644 index 0000000..ee0ed90 Binary files /dev/null and b/tests/data/test_data.definition.v3.dbn.zst differ diff --git a/tests/data/test_data.imbalance.dbn.zst b/tests/data/test_data.imbalance.v2.dbn.zst similarity index 100% rename from tests/data/test_data.imbalance.dbn.zst rename to tests/data/test_data.imbalance.v2.dbn.zst diff --git a/tests/data/test_data.imbalance.v3.dbn.zst b/tests/data/test_data.imbalance.v3.dbn.zst new file mode 100644 index 0000000..eb4e8a6 Binary files /dev/null and b/tests/data/test_data.imbalance.v3.dbn.zst differ diff --git a/tests/data/test_data.mbo.v1.dbn.zst b/tests/data/test_data.mbo.v1.dbn.zst index 34d90c7..30869f5 100644 Binary files a/tests/data/test_data.mbo.v1.dbn.zst and b/tests/data/test_data.mbo.v1.dbn.zst differ diff --git a/tests/data/test_data.mbo.dbn.zst b/tests/data/test_data.mbo.v2.dbn.zst similarity index 100% rename from tests/data/test_data.mbo.dbn.zst rename to tests/data/test_data.mbo.v2.dbn.zst diff --git a/tests/data/test_data.mbo.v3.dbn b/tests/data/test_data.mbo.v3.dbn new file mode 100644 index 0000000..de0f31d Binary files /dev/null and b/tests/data/test_data.mbo.v3.dbn differ diff --git a/tests/data/test_data.mbo.v3.dbn.zst b/tests/data/test_data.mbo.v3.dbn.zst new file mode 100644 index 0000000..e360b6e Binary files /dev/null and b/tests/data/test_data.mbo.v3.dbn.zst differ diff --git a/tests/data/test_data.mbp-1.v1.dbn.zst b/tests/data/test_data.mbp-1.v1.dbn.zst index 57e6f17..ae8fb95 100644 Binary files a/tests/data/test_data.mbp-1.v1.dbn.zst and b/tests/data/test_data.mbp-1.v1.dbn.zst differ diff --git a/tests/data/test_data.mbp-1.dbn.zst b/tests/data/test_data.mbp-1.v2.dbn.zst similarity index 100% rename from tests/data/test_data.mbp-1.dbn.zst rename to tests/data/test_data.mbp-1.v2.dbn.zst diff --git a/tests/data/test_data.mbp-1.v3.dbn.zst b/tests/data/test_data.mbp-1.v3.dbn.zst new file mode 100644 index 0000000..c1253bb Binary files /dev/null and b/tests/data/test_data.mbp-1.v3.dbn.zst differ diff --git a/tests/data/test_data.mbp-10.v1.dbn.zst b/tests/data/test_data.mbp-10.v1.dbn.zst index 69bda8d..bb011bf 100644 Binary files a/tests/data/test_data.mbp-10.v1.dbn.zst and b/tests/data/test_data.mbp-10.v1.dbn.zst differ diff --git a/tests/data/test_data.mbp-10.dbn.zst b/tests/data/test_data.mbp-10.v2.dbn.zst similarity index 100% rename from tests/data/test_data.mbp-10.dbn.zst rename to tests/data/test_data.mbp-10.v2.dbn.zst diff --git a/tests/data/test_data.mbp-10.v3.dbn.zst b/tests/data/test_data.mbp-10.v3.dbn.zst new file mode 100644 index 0000000..f7e8774 Binary files /dev/null and b/tests/data/test_data.mbp-10.v3.dbn.zst differ diff --git a/tests/data/test_data.ohlcv-1d.v1.dbn.zst b/tests/data/test_data.ohlcv-1d.v1.dbn.zst index e9d4166..b000d06 100644 Binary files a/tests/data/test_data.ohlcv-1d.v1.dbn.zst and b/tests/data/test_data.ohlcv-1d.v1.dbn.zst differ diff --git a/tests/data/test_data.ohlcv-1d.dbn.zst b/tests/data/test_data.ohlcv-1d.v2.dbn.zst similarity index 100% rename from tests/data/test_data.ohlcv-1d.dbn.zst rename to tests/data/test_data.ohlcv-1d.v2.dbn.zst diff --git a/tests/data/test_data.ohlcv-1d.v3.dbn.zst b/tests/data/test_data.ohlcv-1d.v3.dbn.zst new file mode 100644 index 0000000..68d0426 Binary files /dev/null and b/tests/data/test_data.ohlcv-1d.v3.dbn.zst differ diff --git a/tests/data/test_data.ohlcv-1h.dbn.zst b/tests/data/test_data.ohlcv-1h.v2.dbn.zst similarity index 100% rename from tests/data/test_data.ohlcv-1h.dbn.zst rename to tests/data/test_data.ohlcv-1h.v2.dbn.zst diff --git a/tests/data/test_data.ohlcv-1h.v3.dbn.zst b/tests/data/test_data.ohlcv-1h.v3.dbn.zst new file mode 100644 index 0000000..a614b13 Binary files /dev/null and b/tests/data/test_data.ohlcv-1h.v3.dbn.zst differ diff --git a/tests/data/test_data.ohlcv-1m.dbn.zst b/tests/data/test_data.ohlcv-1m.v2.dbn.zst similarity index 100% rename from tests/data/test_data.ohlcv-1m.dbn.zst rename to tests/data/test_data.ohlcv-1m.v2.dbn.zst diff --git a/tests/data/test_data.ohlcv-1m.v3.dbn.zst b/tests/data/test_data.ohlcv-1m.v3.dbn.zst new file mode 100644 index 0000000..128dc65 Binary files /dev/null and b/tests/data/test_data.ohlcv-1m.v3.dbn.zst differ diff --git a/tests/data/test_data.ohlcv-1s.dbn.zst b/tests/data/test_data.ohlcv-1s.v2.dbn.zst similarity index 100% rename from tests/data/test_data.ohlcv-1s.dbn.zst rename to tests/data/test_data.ohlcv-1s.v2.dbn.zst diff --git a/tests/data/test_data.ohlcv-1s.v3.dbn.zst b/tests/data/test_data.ohlcv-1s.v3.dbn.zst new file mode 100644 index 0000000..bcc5fef Binary files /dev/null and b/tests/data/test_data.ohlcv-1s.v3.dbn.zst differ diff --git a/tests/data/test_data.statistics.dbn.zst b/tests/data/test_data.statistics.v2.dbn.zst similarity index 100% rename from tests/data/test_data.statistics.dbn.zst rename to tests/data/test_data.statistics.v2.dbn.zst diff --git a/tests/data/test_data.statistics.v3.dbn.zst b/tests/data/test_data.statistics.v3.dbn.zst new file mode 100644 index 0000000..c78af00 Binary files /dev/null and b/tests/data/test_data.statistics.v3.dbn.zst differ diff --git a/tests/data/test_data.status.v2.dbn.zst b/tests/data/test_data.status.v2.dbn.zst new file mode 100644 index 0000000..1ef5d7a Binary files /dev/null and b/tests/data/test_data.status.v2.dbn.zst differ diff --git a/tests/data/test_data.status.v3.dbn.zst b/tests/data/test_data.status.v3.dbn.zst new file mode 100644 index 0000000..b1c7d5d Binary files /dev/null and b/tests/data/test_data.status.v3.dbn.zst differ diff --git a/tests/data/test_data.tbbo.v1.dbn.zst b/tests/data/test_data.tbbo.v1.dbn.zst index c464edb..dba29c9 100644 Binary files a/tests/data/test_data.tbbo.v1.dbn.zst and b/tests/data/test_data.tbbo.v1.dbn.zst differ diff --git a/tests/data/test_data.tbbo.dbn.zst b/tests/data/test_data.tbbo.v2.dbn.zst similarity index 100% rename from tests/data/test_data.tbbo.dbn.zst rename to tests/data/test_data.tbbo.v2.dbn.zst diff --git a/tests/data/test_data.tbbo.v3.dbn.zst b/tests/data/test_data.tbbo.v3.dbn.zst new file mode 100644 index 0000000..5c81ac9 Binary files /dev/null and b/tests/data/test_data.tbbo.v3.dbn.zst differ diff --git a/tests/data/test_data.trades.v1.dbn.zst b/tests/data/test_data.trades.v1.dbn.zst index 733a97c..9a3094c 100644 Binary files a/tests/data/test_data.trades.v1.dbn.zst and b/tests/data/test_data.trades.v1.dbn.zst differ diff --git a/tests/data/test_data.trades.dbn.zst b/tests/data/test_data.trades.v2.dbn.zst similarity index 100% rename from tests/data/test_data.trades.dbn.zst rename to tests/data/test_data.trades.v2.dbn.zst diff --git a/tests/data/test_data.trades.v3.dbn.zst b/tests/data/test_data.trades.v3.dbn.zst new file mode 100644 index 0000000..1d4353d Binary files /dev/null and b/tests/data/test_data.trades.v3.dbn.zst differ diff --git a/tests/include/mock/mock_lsg_server.hpp b/tests/include/mock/mock_lsg_server.hpp index 7b7a65a..6571b33 100644 --- a/tests/include/mock/mock_lsg_server.hpp +++ b/tests/include/mock/mock_lsg_server.hpp @@ -51,11 +51,11 @@ class MockLsgServer { void Accept(); void Authenticate(); void Subscribe(const std::vector& symbols, Schema schema, - SType stype); + SType stype, bool is_last); void Subscribe(const std::vector& symbols, Schema schema, - SType stype, const std::string& start); + SType stype, const std::string& start, bool is_last); void SubscribeWithSnapshot(const std::vector& symbols, - Schema schema, SType stype); + Schema schema, SType stype, bool is_last); void Start(); std::size_t Send(const std::string& msg); ::ssize_t UncheckedSend(const std::string& msg); diff --git a/tests/include/temp_file.hpp b/tests/include/temp_file.hpp index 60d5986..46333d6 100644 --- a/tests/include/temp_file.hpp +++ b/tests/include/temp_file.hpp @@ -4,6 +4,7 @@ #include // assert #include // remove +#include #include // ifstream #include #include // move @@ -15,6 +16,8 @@ namespace databento { // goes out of scope. class TempFile { public: + explicit TempFile(const std::filesystem::path& path) + : TempFile{path.string()} {} explicit TempFile(std::string path) : path_{std::move(path)} { std::ifstream f{path_}; if (Exists()) { diff --git a/tests/src/buffer_tests.cpp b/tests/src/buffer_tests.cpp index f006756..1d2c238 100644 --- a/tests/src/buffer_tests.cpp +++ b/tests/src/buffer_tests.cpp @@ -10,8 +10,8 @@ using namespace std::string_view_literals; namespace databento::detail::tests { TEST(BufferTests, TestWriteAllPastCapacity) { Buffer target{10}; - target.WriteBegin() += 4; - target.ReadBegin() += 2; + target.Fill(4); + target.ConsumeNoShift(2); ASSERT_EQ(target.WriteCapacity(), 6); ASSERT_EQ(target.ReadCapacity(), 2); ASSERT_EQ(target.Capacity(), 10); @@ -25,7 +25,7 @@ TEST(BufferTests, TestWriteAllPastCapacity) { TEST(BufferTests, TestWriteAllShift) { Buffer target{20}; target.WriteAll("TestWriteAllShift", 17); - target.ReadBegin() += 4; + target.ConsumeNoShift(4); ASSERT_EQ(target.WriteCapacity(), 3); ASSERT_EQ(target.ReadCapacity(), 13); ASSERT_EQ(target.Capacity(), 20); @@ -38,8 +38,8 @@ TEST(BufferTests, TestWriteAllShift) { TEST(BufferTests, TestWriteRead) { Buffer target{10}; - target.WriteBegin() += 5; - target.ReadBegin() += 5; + target.Fill(5); + target.ConsumeNoShift(5); const auto write_len = target.Write("BufferTests", 11); ASSERT_EQ(write_len, 10); std::array read_buf{}; @@ -55,7 +55,16 @@ TEST(BufferTests, TestReserve) { ASSERT_EQ(target.ReadCapacity(), 0); ASSERT_EQ(target.Capacity(), 120); target.WriteAll("TestReserve", 11); - target.ReadBegin() += 4; + target.ConsumeNoShift(4); } +TEST(BufferTests, TestConsumeShift) { + Buffer target{120}; + target.Fill(120); + ASSERT_EQ(target.WriteCapacity(), 0); + target.ConsumeNoShift(100); + ASSERT_EQ(target.WriteCapacity(), 0); + target.Consume(1); + ASSERT_EQ(target.WriteCapacity(), 101); +} } // namespace databento::detail::tests diff --git a/tests/src/dbn_decoder_tests.cpp b/tests/src/dbn_decoder_tests.cpp index e9bd1bb..7e545c4 100644 --- a/tests/src/dbn_decoder_tests.cpp +++ b/tests/src/dbn_decoder_tests.cpp @@ -35,7 +35,7 @@ namespace databento::tests { class DbnDecoderTests : public testing::Test { public: - std::unique_ptr file_target_; + std::unique_ptr target_; detail::ScopedThread write_thread_; std::unique_ptr logger_{std::make_unique()}; @@ -46,13 +46,13 @@ class DbnDecoderTests : public testing::Test { void ReadFromFile(const std::string& schema_str, const std::string& extension, std::uint8_t version, VersionUpgradePolicy upgrade_policy) { - const char* version_str = version == 1 ? ".v1" : ""; - const std::string file_path = TEST_BUILD_DIR "/data/test_data." + - schema_str + version_str + extension; + const std::string file_path = + TEST_DATA_DIR "/test_data." + schema_str + + (version == 0 ? "" : ".v" + std::to_string(+version)) + extension; // Channel setup write_thread_ = detail::ScopedThread{[this, file_path] { std::ifstream input_file{file_path, std::ios::binary | std::ios::ate}; - ASSERT_TRUE(input_file.good()); + ASSERT_TRUE(input_file.good()) << "Failed to open: " << file_path; const auto size = static_cast(input_file.tellg()); input_file.seekg(0, std::ios::beg); std::vector buffer(size); @@ -60,7 +60,7 @@ class DbnDecoderTests : public testing::Test { ASSERT_EQ(input_file.gcount(), size); }}; // File setup - file_target_ = std::make_unique( + target_ = std::make_unique( logger_.get(), std::make_unique(file_path), upgrade_policy); } @@ -77,20 +77,25 @@ class DbnDecoderTests : public testing::Test { } }; -// FIXME: update for only a single record template -void AssertDefEq(const Record* ch_record, const Record* f_record) { - ASSERT_TRUE(ch_record->Holds()); - ASSERT_TRUE(f_record->Holds()); - const auto& ch_def = ch_record->Get(); - const auto& f_def = f_record->Get(); - EXPECT_EQ(ch_def, f_def); - EXPECT_STREQ(ch_def.Exchange(), "XNAS"); - EXPECT_STREQ(ch_def.RawSymbol(), "MSFT"); - EXPECT_EQ(ch_def.security_update_action, SecurityUpdateAction::Add); - EXPECT_EQ(ch_def.min_lot_size_round_lot, 100); - EXPECT_EQ(ch_def.instrument_class, InstrumentClass::Stock); - EXPECT_EQ(ch_def.strike_price, kUndefPrice); +void AssertDefHas(const Record* record) { + ASSERT_TRUE(record->Holds()); + const auto& def = record->Get(); + EXPECT_STREQ(def.Exchange(), "XNAS"); + EXPECT_EQ(def.security_update_action, SecurityUpdateAction::Add); + EXPECT_EQ(def.min_lot_size_round_lot, 100); + EXPECT_EQ(def.instrument_class, InstrumentClass::Stock); + EXPECT_EQ(def.strike_price, kUndefPrice); +} + +template +void AssertStatHas(const Record* record, StatType stat_type, std::int64_t price, + Q quantity) { + ASSERT_TRUE(record->Holds()); + const auto& stat = record->Get(); + ASSERT_EQ(stat.stat_type, stat_type); + ASSERT_EQ(stat.price, price); + ASSERT_EQ(stat.quantity, quantity); } TEST_F(DbnDecoderTests, TestDecodeDbz) { @@ -106,11 +111,11 @@ TEST_F(DbnDecoderTests, TestDecodeDbz) { } TEST_F(DbnDecoderTests, TestDecodeDefinitionUpgrade) { - ReadFromFile("definition", ".dbn", 1, VersionUpgradePolicy::UpgradeToV2); + ReadFromFile("definition", ".dbn.zst", 1, VersionUpgradePolicy::UpgradeToV3); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata, metadata); - EXPECT_EQ(metadata.version, 2); + EXPECT_EQ(metadata.version, kDbnVersion); EXPECT_EQ(metadata.dataset, dataset::kXnasItch); EXPECT_EQ(metadata.schema, Schema::Definition); EXPECT_EQ(metadata.start.time_since_epoch().count(), 1633305600000000000); @@ -130,13 +135,30 @@ TEST_F(DbnDecoderTests, TestDecodeDefinitionUpgrade) { EXPECT_EQ(interval.start_date, date::year{2021} / 10 / 4); EXPECT_EQ(interval.end_date, date::year{2021} / 10 / 5); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); + ASSERT_NE(record1, nullptr); + record1->Get(); + + const auto record2 = target_->DecodeRecord(); + ASSERT_NE(record2, nullptr); + record2->Get(); +} + +TEST_F(DbnDecoderTests, TestDecodeStatUpgrade) { + ReadFromFile("statistics", ".dbn.zst", 2, VersionUpgradePolicy::UpgradeToV3); + const Metadata metadata = target_->DecodeMetadata(); + EXPECT_EQ(metadata, metadata); + EXPECT_EQ(metadata.version, kDbnVersion); + + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); + AssertStatHas(record1, StatType::LowestOffer, + 100 * kFixedPriceScale, v3::kUndefStatQuantity); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); - // AssertDefEq(ch_record1, f_record1); - // AssertDefEq(ch_record2, f_record2); + AssertStatHas(record2, StatType::TradingSessionHighPrice, + 100 * kFixedPriceScale, v3::kUndefStatQuantity); } TEST_F(DbnDecoderTests, TestUpgradeSymbolMappingWithTsOut) { @@ -205,20 +227,17 @@ class DbnDecoderSchemaTests public testing::WithParamInterface> { }; -INSTANTIATE_TEST_SUITE_P( - TestFiles, DbnDecoderSchemaTests, - testing::Values(std::make_pair(".dbn", 1), std::make_pair(".dbn", 2), - std::make_pair(".dbn.zst", 1), - std::make_pair(".dbn.zst", 2)), - [](const testing::TestParamInfo>& - test_info) { - const auto extension = test_info.param.first; - const auto version = test_info.param.second; - const auto size = ::strlen(extension); - return ::strncmp(extension + size - 3, "zst", 3) == 0 - ? "ZstdDBNv" + std::to_string(version) - : "UncompressedDBNv" + std::to_string(version); - }); +INSTANTIATE_TEST_SUITE_P(TestFiles, DbnDecoderSchemaTests, + testing::Values(std::make_pair(".dbn.zst", 1), + std::make_pair(".dbn.zst", 2), + std::make_pair(".dbn.zst", 3)), + [](const testing::TestParamInfo< + std::pair>& test_info) { + const auto extension = test_info.param.first; + const auto version = test_info.param.second; + const auto size = ::strlen(extension); + return "ZstdDBNv" + std::to_string(version); + }); // Expected data for these tests obtained using the `dbn` CLI tool @@ -227,7 +246,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbo) { const auto version = GetParam().second; ReadFromFile("mbo", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata, metadata); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); @@ -241,7 +260,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbo) { EXPECT_TRUE(metadata.partial.empty()); EXPECT_TRUE(metadata.not_found.empty()); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& mbo1 = record1->Get(); @@ -260,7 +279,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbo) { EXPECT_EQ(mbo1.ts_in_delta.count(), 22993); EXPECT_EQ(mbo1.sequence, 1170352); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& mbo2 = record2->Get(); @@ -285,7 +304,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbp1) { const auto version = GetParam().second; ReadFromFile("mbp-1", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata, metadata); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); @@ -300,7 +319,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbp1) { EXPECT_TRUE(metadata.not_found.empty()); AssertMappings(metadata.mappings); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& mbp1 = record1->Get(); @@ -323,7 +342,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbp1) { EXPECT_EQ(mbp1.levels[0].bid_ct, 15); EXPECT_EQ(mbp1.levels[0].ask_ct, 9); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& mbp2 = record2->Get(); @@ -353,7 +372,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbp10) { const auto version = GetParam().second; ReadFromFile("mbp-10", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Mbp10); @@ -367,7 +386,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbp10) { EXPECT_TRUE(metadata.not_found.empty()); AssertMappings(metadata.mappings); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& mbp1 = record1->Get(); @@ -402,7 +421,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeMbp10) { EXPECT_EQ(mbp1.levels[2].bid_ct, 23); EXPECT_EQ(mbp1.levels[2].ask_ct, 25); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& mbp2 = record2->Get(); @@ -444,7 +463,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeCmbp1) { const auto version = GetParam().second; ReadFromFile("cmbp-1", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Cmbp1); @@ -458,7 +477,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeCmbp1) { EXPECT_TRUE(metadata.not_found.empty()); AssertMappings(metadata.mappings); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& cmbp1 = record1->Get(); @@ -479,7 +498,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeCmbp1) { EXPECT_EQ(cmbp1.levels[0].bid_pb, 1); EXPECT_EQ(cmbp1.levels[0].ask_pb, 1); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& cmbp2 = record2->Get(); @@ -504,9 +523,9 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeCmbp1) { TEST_P(DbnDecoderSchemaTests, TestDecodeCbbo) { const auto extension = GetParam().first; const auto version = GetParam().second; - ReadFromFile("cbbo", extension, version); + ReadFromFile("cbbo-1s", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Cbbo1S); @@ -520,7 +539,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeCbbo) { EXPECT_TRUE(metadata.not_found.empty()); AssertMappings(metadata.mappings); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& cbbo1 = record1->Get(); @@ -539,7 +558,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeCbbo) { EXPECT_EQ(cbbo1.levels[0].bid_pb, 1); EXPECT_EQ(cbbo1.levels[0].ask_pb, 1); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& cbbo2 = record2->Get(); @@ -564,7 +583,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeTbbo) { const auto version = GetParam().second; ReadFromFile("tbbo", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Tbbo); @@ -578,7 +597,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeTbbo) { EXPECT_TRUE(metadata.not_found.empty()); AssertMappings(metadata.mappings); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& tbbo1 = record1->Get(); @@ -601,7 +620,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeTbbo) { EXPECT_EQ(tbbo1.levels[0].bid_ct, 16); EXPECT_EQ(tbbo1.levels[0].ask_ct, 6); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& tbbo2 = record2->Get(); @@ -630,7 +649,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeTrades) { const auto version = GetParam().second; ReadFromFile("trades", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Trades); @@ -644,7 +663,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeTrades) { EXPECT_TRUE(metadata.not_found.empty()); AssertMappings(metadata.mappings); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& trade1 = record1->Get(); @@ -661,7 +680,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeTrades) { EXPECT_EQ(trade1.ts_in_delta.count(), 19251); EXPECT_EQ(trade1.sequence, 1170380); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& trade2 = record2->Get(); @@ -684,7 +703,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1D) { const auto version = GetParam().second; ReadFromFile("ohlcv-1d", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Ohlcv1D); @@ -704,7 +723,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1H) { const auto version = GetParam().second; ReadFromFile("ohlcv-1h", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Ohlcv1H); @@ -718,7 +737,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1H) { EXPECT_TRUE(metadata.not_found.empty()); AssertMappings(metadata.mappings); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& ohlcv1 = record1->Get(); @@ -731,7 +750,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1H) { EXPECT_EQ(ohlcv1.close, 372225000000000); EXPECT_EQ(ohlcv1.volume, 9385); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& ohlcv2 = record2->Get(); @@ -750,7 +769,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1M) { const auto version = GetParam().second; ReadFromFile("ohlcv-1m", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Ohlcv1M); @@ -764,7 +783,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1M) { EXPECT_TRUE(metadata.not_found.empty()); AssertMappings(metadata.mappings); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& ohlcv1 = record1->Get(); @@ -777,7 +796,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1M) { EXPECT_EQ(ohlcv1.close, 372100000000000); EXPECT_EQ(ohlcv1.volume, 353); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& ohlcv2 = record2->Get(); @@ -796,7 +815,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1S) { const auto version = GetParam().second; ReadFromFile("ohlcv-1s", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Ohlcv1S); @@ -810,7 +829,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1S) { EXPECT_TRUE(metadata.not_found.empty()); AssertMappings(metadata.mappings); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& ohlcv1 = record1->Get(); @@ -823,7 +842,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeOhlcv1S) { EXPECT_EQ(ohlcv1.close, 372050000000000); EXPECT_EQ(ohlcv1.volume, 57); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& ohlcv2 = record2->Get(); @@ -842,7 +861,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeDefinition) { const auto version = GetParam().second; ReadFromFile("definition", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kXnasItch); EXPECT_EQ(metadata.schema, Schema::Definition); @@ -863,19 +882,22 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeDefinition) { EXPECT_EQ(interval.start_date, date::year{2021} / 10 / 4); EXPECT_EQ(interval.end_date, date::year{2021} / 10 / 5); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_NE(record1, nullptr); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_NE(record2, nullptr); if (version == 1) { - AssertDefEq(record1, record1); - AssertDefEq(record2, record2); + AssertDefHas(record1); + AssertDefHas(record2); + } else if (version == 2) { + AssertDefHas(record1); + AssertDefHas(record2); } else { - AssertDefEq(record1, record1); - AssertDefEq(record2, record2); + AssertDefHas(record1); + AssertDefHas(record2); } } @@ -884,7 +906,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeImbalance) { const auto version = GetParam().second; ReadFromFile("imbalance", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kXnasItch); EXPECT_EQ(metadata.schema, Schema::Imbalance); @@ -898,14 +920,14 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeImbalance) { EXPECT_TRUE(metadata.not_found.empty()); EXPECT_EQ(metadata.mappings.size(), 1); - const auto record1 = file_target_->DecodeRecord(); + const auto record1 = target_->DecodeRecord(); ASSERT_NE(record1, nullptr); ASSERT_TRUE(record1->Holds()); const auto& imbalance1 = record1->Get(); EXPECT_EQ(imbalance1, imbalance1); EXPECT_EQ(imbalance1.ref_price, 229430000000); - const auto record2 = file_target_->DecodeRecord(); + const auto record2 = target_->DecodeRecord(); ASSERT_NE(record2, nullptr); ASSERT_TRUE(record2->Holds()); const auto& imbalance2 = record2->Get(); @@ -917,7 +939,7 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeStatistics) { const auto version = GetParam().second; ReadFromFile("statistics", extension, version); - const Metadata metadata = file_target_->DecodeMetadata(); + const Metadata metadata = target_->DecodeMetadata(); EXPECT_EQ(metadata.version, version); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); EXPECT_EQ(metadata.schema, Schema::Statistics); @@ -931,19 +953,23 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeStatistics) { EXPECT_TRUE(metadata.not_found.empty()); EXPECT_TRUE(metadata.mappings.empty()); - const auto record1 = file_target_->DecodeRecord(); - ASSERT_NE(record1, nullptr); - ASSERT_TRUE(record1->Holds()); - const auto& stat1 = record1->Get(); - EXPECT_EQ(stat1.stat_type, StatType::LowestOffer); - EXPECT_EQ(stat1.price, 100 * kFixedPriceScale); + const auto record1 = target_->DecodeRecord(); + if (version < 3) { + AssertStatHas(record1, StatType::LowestOffer, + 100 * kFixedPriceScale, v1::kUndefStatQuantity); + } else { + AssertStatHas(record1, StatType::LowestOffer, + 100 * kFixedPriceScale, v3::kUndefStatQuantity); + } - const auto record2 = file_target_->DecodeRecord(); - ASSERT_NE(record2, nullptr); - ASSERT_TRUE(record2->Holds()); - const auto& stat2 = record2->Get(); - EXPECT_EQ(stat2.stat_type, StatType::TradingSessionHighPrice); - EXPECT_EQ(stat2.price, 100 * kFixedPriceScale); + const auto record2 = target_->DecodeRecord(); + if (version < 3) { + AssertStatHas(record2, StatType::TradingSessionHighPrice, + 100 * kFixedPriceScale, v1::kUndefStatQuantity); + } else { + AssertStatHas(record2, StatType::TradingSessionHighPrice, + 100 * kFixedPriceScale, v3::kUndefStatQuantity); + } } class DbnIdentityTests : public testing::TestWithParam< @@ -954,19 +980,7 @@ class DbnIdentityTests : public testing::TestWithParam< INSTANTIATE_TEST_SUITE_P( TestFiles, DbnIdentityTests, - testing::Values(std::make_tuple(1, Schema::Mbo, Compression::None), - std::make_tuple(1, Schema::Trades, Compression::None), - std::make_tuple(1, Schema::Mbp1, Compression::None), - std::make_tuple(1, Schema::Tbbo, Compression::None), - std::make_tuple(1, Schema::Mbp10, Compression::None), - std::make_tuple(1, Schema::Ohlcv1D, Compression::None), - std::make_tuple(1, Schema::Ohlcv1H, Compression::None), - std::make_tuple(1, Schema::Ohlcv1M, Compression::None), - std::make_tuple(1, Schema::Ohlcv1S, Compression::None), - std::make_tuple(1, Schema::Definition, Compression::None), - std::make_tuple(1, Schema::Imbalance, Compression::None), - std::make_tuple(1, Schema::Statistics, Compression::None), - std::make_tuple(1, Schema::Mbo, Compression::Zstd), + testing::Values(std::make_tuple(1, Schema::Mbo, Compression::Zstd), std::make_tuple(1, Schema::Trades, Compression::Zstd), std::make_tuple(1, Schema::Mbp1, Compression::Zstd), std::make_tuple(1, Schema::Tbbo, Compression::Zstd), @@ -978,18 +992,8 @@ INSTANTIATE_TEST_SUITE_P( std::make_tuple(1, Schema::Definition, Compression::Zstd), std::make_tuple(1, Schema::Imbalance, Compression::Zstd), std::make_tuple(1, Schema::Statistics, Compression::Zstd), - std::make_tuple(2, Schema::Mbo, Compression::None), - std::make_tuple(2, Schema::Trades, Compression::None), - std::make_tuple(2, Schema::Tbbo, Compression::None), - std::make_tuple(2, Schema::Mbp1, Compression::None), - std::make_tuple(2, Schema::Mbp10, Compression::None), - std::make_tuple(2, Schema::Ohlcv1D, Compression::None), - std::make_tuple(2, Schema::Ohlcv1H, Compression::None), - std::make_tuple(2, Schema::Ohlcv1M, Compression::None), - std::make_tuple(2, Schema::Ohlcv1S, Compression::None), - std::make_tuple(2, Schema::Definition, Compression::None), - std::make_tuple(2, Schema::Imbalance, Compression::None), - std::make_tuple(2, Schema::Statistics, Compression::None), + std::make_tuple(1, Schema::Cmbp1, Compression::Zstd), + std::make_tuple(1, Schema::Cbbo1S, Compression::Zstd), std::make_tuple(2, Schema::Mbo, Compression::Zstd), std::make_tuple(2, Schema::Trades, Compression::Zstd), std::make_tuple(2, Schema::Tbbo, Compression::Zstd), @@ -1001,7 +1005,30 @@ INSTANTIATE_TEST_SUITE_P( std::make_tuple(2, Schema::Ohlcv1S, Compression::Zstd), std::make_tuple(2, Schema::Definition, Compression::Zstd), std::make_tuple(2, Schema::Imbalance, Compression::Zstd), - std::make_tuple(2, Schema::Statistics, Compression::Zstd)), + std::make_tuple(2, Schema::Statistics, Compression::Zstd), + std::make_tuple(2, Schema::Bbo1S, Compression::Zstd), + std::make_tuple(2, Schema::Bbo1M, Compression::Zstd), + std::make_tuple(2, Schema::Cmbp1, Compression::Zstd), + std::make_tuple(2, Schema::Cbbo1S, Compression::Zstd), + std::make_tuple(2, Schema::Status, Compression::Zstd), + std::make_tuple(3, Schema::Mbo, Compression::None), + std::make_tuple(3, Schema::Mbo, Compression::Zstd), + std::make_tuple(3, Schema::Trades, Compression::Zstd), + std::make_tuple(3, Schema::Tbbo, Compression::Zstd), + std::make_tuple(3, Schema::Mbp1, Compression::Zstd), + std::make_tuple(3, Schema::Mbp10, Compression::Zstd), + std::make_tuple(3, Schema::Ohlcv1D, Compression::Zstd), + std::make_tuple(3, Schema::Ohlcv1H, Compression::Zstd), + std::make_tuple(3, Schema::Ohlcv1M, Compression::Zstd), + std::make_tuple(3, Schema::Ohlcv1S, Compression::Zstd), + std::make_tuple(3, Schema::Definition, Compression::Zstd), + std::make_tuple(3, Schema::Imbalance, Compression::Zstd), + std::make_tuple(3, Schema::Statistics, Compression::Zstd), + std::make_tuple(3, Schema::Bbo1S, Compression::Zstd), + std::make_tuple(3, Schema::Bbo1M, Compression::Zstd), + std::make_tuple(3, Schema::Cmbp1, Compression::Zstd), + std::make_tuple(3, Schema::Cbbo1S, Compression::Zstd), + std::make_tuple(3, Schema::Status, Compression::Zstd)), [](const testing::TestParamInfo< std::tuple>& test_info) { const auto version = std::get<0>(test_info.param); @@ -1022,8 +1049,8 @@ TEST_P(DbnIdentityTests, TestIdentity) { const auto schema = std::get<1>(GetParam()); const auto compression = std::get<2>(GetParam()); const auto file_name = - std::string{TEST_BUILD_DIR "/data/test_data."} + ToString(schema) + - (version == 1 ? ".v1" : "") + + std::string{TEST_DATA_DIR "/test_data."} + ToString(schema) + ".v" + + std::to_string(+version) + (compression == Compression::Zstd ? ".dbn.zst" : ".dbn"); DbnDecoder file_decoder{logger_.get(), std::make_unique(file_name), @@ -1065,6 +1092,14 @@ TEST_P(DbnIdentityTests, TestIdentity) { EXPECT_EQ(*mbp1, file_record->Get()); } else if (auto* mbp10 = buf_record->GetIf()) { EXPECT_EQ(*mbp10, file_record->Get()); + } else if (auto* cmbp = buf_record->GetIf()) { + EXPECT_EQ(*cmbp, file_record->Get()); + } else if (auto* bbo = buf_record->GetIf()) { + EXPECT_EQ(*bbo, file_record->Get()); + } else if (auto* cbbo = buf_record->GetIf()) { + EXPECT_EQ(*cbbo, file_record->Get()); + } else if (auto* status = buf_record->GetIf()) { + EXPECT_EQ(*status, file_record->Get()); } else if (auto* ohlcv = buf_record->GetIf()) { EXPECT_EQ(*ohlcv, file_record->Get()); } else if (auto* trade = buf_record->GetIf()) { @@ -1084,8 +1119,16 @@ TEST_P(DbnIdentityTests, TestIdentity) { } else { FAIL() << "Unknown definition size"; } - } else if (auto* stats = buf_record->GetIf()) { - EXPECT_EQ(*stats, file_record->Get()); + } else if (buf_record->Header().rtype == RType::Statistics) { + if (buf_record->Size() == sizeof(v1::StatMsg)) { + EXPECT_EQ(buf_record->Get(), + file_record->Get()); + } else if (buf_record->Size() == sizeof(v3::StatMsg)) { + EXPECT_EQ(buf_record->Get(), + file_record->Get()); + } else { + FAIL() << "Unknown stats size"; + } } else { FAIL() << "Unexpected rtype " << static_cast(file_record->Header().rtype); diff --git a/tests/src/dbn_tests.cpp b/tests/src/dbn_tests.cpp index e4d941c..31c5f66 100644 --- a/tests/src/dbn_tests.cpp +++ b/tests/src/dbn_tests.cpp @@ -28,7 +28,7 @@ TEST(DbnTests, TestMetadataToString) { {"NGQ4", {{date::year{2022} / 6 / 1, date::year{2022} / 7 / 1, "4"}}}}}; const auto res = ToString(target); ASSERT_EQ(res, R"(Metadata { - version = 2, + version = 3, dataset = "GLBX.MDP3", has_mixed_schema = false, schema = ohlcv-1d, diff --git a/tests/src/file_stream_tests.cpp b/tests/src/file_stream_tests.cpp index d50e9c0..a4a5328 100644 --- a/tests/src/file_stream_tests.cpp +++ b/tests/src/file_stream_tests.cpp @@ -1,6 +1,7 @@ #include #include +#include #include "databento/exceptions.hpp" #include "databento/file_stream.hpp" @@ -8,8 +9,7 @@ namespace databento::tests { TEST(InFileStreamTests, TestReadExactInsufficient) { - const std::string file_path = - TEST_BUILD_DIR "/data/test_data.ohlcv-1d.v1.dbn"; + const std::string file_path = TEST_DATA_DIR "/test_data.mbo.v3.dbn"; InFileStream target{file_path}; std::vector buffer(1024); // File is less than 1KiB try { @@ -17,13 +17,12 @@ TEST(InFileStreamTests, TestReadExactInsufficient) { FAIL() << "Expected throw"; } catch (const databento::Exception& exc) { ASSERT_STREQ(exc.what(), - "Unexpected end of file, expected 1024 bytes, got 206"); + "Unexpected end of file, expected 1024 bytes, got 472"); } } TEST(InFileStreamTests, TestReadSomeLessThanMax) { - const std::string file_path = - TEST_BUILD_DIR "/data/test_data.ohlcv-1d.v1.dbn"; + const std::string file_path = TEST_DATA_DIR "/test_data.ohlcv-1d.v1.dbn.zst"; InFileStream target{file_path}; std::vector buffer(1024); // File is less than 1KiB const auto read_size = target.ReadSome(buffer.data(), buffer.size()); @@ -35,7 +34,7 @@ TEST(InFileStreamTests, TestReadSomeLessThanMax) { TEST(OutFileStreamTests, TestWriteAllCanBeRead) { constexpr auto data = "abcdefgh"; - TempFile temp_file{"out"}; + TempFile temp_file{std::filesystem::temp_directory_path() / "out"}; ASSERT_FALSE(temp_file.Exists()); { OutFileStream target{temp_file.Path()}; diff --git a/tests/src/historical_tests.cpp b/tests/src/historical_tests.cpp index 637be74..3731fe8 100644 --- a/tests/src/historical_tests.cpp +++ b/tests/src/historical_tests.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include // logic_error #include // move @@ -40,6 +41,7 @@ constexpr auto kApiKey = "HIST_SECRET"; class HistoricalTests : public ::testing::Test { protected: + std::filesystem::path tmp_path_{std::filesystem::temp_directory_path()}; mock::MockHttpServer mock_server_{kApiKey}; std::unique_ptr logger_{std::make_unique()}; }; @@ -229,13 +231,12 @@ static const nlohmann::json kListFilesResp{ TEST_F(HistoricalTests, TestBatchDownloadAll) { const auto kJobId = "job123"; - const TempFile temp_metadata_file{TEST_BUILD_DIR - "/job123/test_metadata.json"}; - const TempFile temp_dbn_file{TEST_BUILD_DIR "/job123/test.dbn"}; + const TempFile temp_metadata_file{tmp_path_ / "job123/test_metadata.json"}; + const TempFile temp_dbn_file{tmp_path_ / "job123/test.dbn"}; mock_server_.MockGetJson("/v0/batch.list_files", {{"job_id", kJobId}}, kListFilesResp); mock_server_.MockStreamDbn("/v0/job_id/test.dbn", {}, - TEST_BUILD_DIR "/data/test_data.mbo.dbn"); + TEST_DATA_DIR "/test_data.mbo.v3.dbn"); mock_server_.MockGetJson("/v0/job_id/test_metadata.json", {{"key", "value"}}); const auto port = mock_server_.ListenOnThread(); @@ -244,19 +245,31 @@ TEST_F(HistoricalTests, TestBatchDownloadAll) { ASSERT_FALSE(temp_metadata_file.Exists()); ASSERT_FALSE(temp_dbn_file.Exists()); const std::vector paths = - target.BatchDownload(TEST_BUILD_DIR, kJobId); + target.BatchDownload(tmp_path_.string(), kJobId); EXPECT_TRUE(temp_metadata_file.Exists()); EXPECT_TRUE(temp_dbn_file.Exists()); ASSERT_EQ(paths.size(), 2); - EXPECT_NE(std::find(paths.begin(), paths.end(), temp_metadata_file.Path()), - paths.end()); - EXPECT_NE(std::find(paths.begin(), paths.end(), temp_dbn_file.Path()), - paths.end()); + EXPECT_NE( + std::find_if(paths.begin(), paths.end(), + [&temp_metadata_file](const auto& path) { + return std::filesystem::path{path}.lexically_normal() == + std::filesystem::path{temp_metadata_file.Path()} + .lexically_normal(); + }), + paths.end()); + EXPECT_NE( + std::find_if(paths.begin(), paths.end(), + [&temp_dbn_file](const auto& path) { + return std::filesystem::path{path}.lexically_normal() == + std::filesystem::path{temp_dbn_file.Path()} + .lexically_normal(); + }), + paths.end()); } TEST_F(HistoricalTests, TestBatchDownloadSingle) { const auto kJobId = "654"; - const TempFile temp_metadata_file{TEST_BUILD_DIR "/654/test_metadata.json"}; + const TempFile temp_metadata_file{tmp_path_ / "654/test_metadata.json"}; mock_server_.MockGetJson("/v0/batch.list_files", {{"job_id", kJobId}}, kListFilesResp); mock_server_.MockGetJson("/v0/job_id/test_metadata.json", {{"key", "value"}}); @@ -266,9 +279,11 @@ TEST_F(HistoricalTests, TestBatchDownloadSingle) { static_cast(port)}; ASSERT_FALSE(temp_metadata_file.Exists()); const std::string path = - target.BatchDownload(TEST_BUILD_DIR, kJobId, "test_metadata.json"); + target.BatchDownload(tmp_path_.string(), kJobId, "test_metadata.json"); EXPECT_TRUE(temp_metadata_file.Exists()); - EXPECT_EQ(path, temp_metadata_file.Path()); + EXPECT_EQ( + std::filesystem::path{path}.lexically_normal(), + std::filesystem::path{temp_metadata_file.Path()}.lexically_normal()); } TEST_F(HistoricalTests, TestBatchDownloadSingleInvalidFile) { @@ -279,8 +294,9 @@ TEST_F(HistoricalTests, TestBatchDownloadSingleInvalidFile) { databento::Historical target{logger_.get(), kApiKey, "localhost", static_cast(port)}; - ASSERT_THROW(target.BatchDownload(TEST_BUILD_DIR, kJobId, "test_metadata.js"), - InvalidArgumentError); + ASSERT_THROW( + target.BatchDownload(tmp_path_.string(), kJobId, "test_metadata.js"), + InvalidArgumentError); } TEST_F(HistoricalTests, TestMetadataListPublishers) { @@ -618,7 +634,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_Basic) { {"stype_in", "raw_symbol"}, {"stype_out", "instrument_id"}, {"limit", "2"}}, - TEST_BUILD_DIR "/data/test_data.mbo.dbn.zst"); + TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); databento::Historical target{logger_.get(), kApiKey, "localhost", @@ -653,7 +669,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_NoMetadataCallback) { {"encoding", "dbn"}, {"stype_in", "raw_symbol"}, {"stype_out", "instrument_id"}}, - TEST_BUILD_DIR "/data/test_data.tbbo.dbn.zst"); + TEST_DATA_DIR "/test_data.tbbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); databento::Historical target{logger_.get(), kApiKey, "localhost", @@ -698,7 +714,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_BadRequest) { TEST_F(HistoricalTests, TestTimeseriesGetRange_CallbackException) { mock_server_.MockStreamDbn("/v0/timeseries.get_range", {}, - TEST_BUILD_DIR "/data/test_data.mbo.dbn.zst"); + TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); databento::Historical target{logger_.get(), kApiKey, "localhost", @@ -716,7 +732,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_CallbackException) { TEST_F(HistoricalTests, TestTimeseriesGetRangeCancellation) { mock_server_.MockStreamDbn("/v0/timeseries.get_range", {}, - TEST_BUILD_DIR "/data/test_data.mbo.dbn.zst"); + TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); databento::Historical target{logger_.get(), kApiKey, "localhost", @@ -747,7 +763,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRangeToFile) { {"encoding", "dbn"}, {"stype_in", "raw_symbol"}, {"stype_out", "instrument_id"}}, - TEST_BUILD_DIR "/data/test_data.tbbo.dbn.zst"); + TEST_DATA_DIR "/test_data.tbbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); databento::Historical target{logger_.get(), kApiKey, "localhost", diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index 91517e6..98a7091 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -59,9 +59,10 @@ TEST_F(LiveBlockingTests, TestAuthentication) { TEST_F(LiveBlockingTests, TestStartAndUpgrade) { constexpr auto kTsOut = true; - for (const auto policy_and_version : + for (const auto [upgrade_policy, exp_version] : {std::make_pair(VersionUpgradePolicy::AsIs, 1), - std::make_pair(VersionUpgradePolicy::UpgradeToV2, 2)}) { + std::make_pair(VersionUpgradePolicy::UpgradeToV2, 2), + std::make_pair(VersionUpgradePolicy::UpgradeToV3, 3)}) { const mock::MockLsgServer mock_server{dataset::kGlbxMdp3, kTsOut, [](mock::MockLsgServer& self) { self.Accept(); @@ -72,10 +73,10 @@ TEST_F(LiveBlockingTests, TestStartAndUpgrade) { LiveBlocking target = builder_.SetAddress(kLocalhost, mock_server.Port()) .SetSendTsOut(kTsOut) .SetDataset(dataset::kGlbxMdp3) - .SetUpgradePolicy(policy_and_version.first) + .SetUpgradePolicy(upgrade_policy) .BuildBlocking(); const auto metadata = target.Start(); - EXPECT_EQ(metadata.version, policy_and_version.second); + EXPECT_EQ(metadata.version, exp_version); EXPECT_TRUE(metadata.has_mixed_schema); EXPECT_EQ(metadata.dataset, dataset::kGlbxMdp3); } @@ -93,7 +94,7 @@ TEST_F(LiveBlockingTests, TestSubscribe) { [&kSymbols, kSchema, kSType](mock::MockLsgServer& self) { self.Accept(); self.Authenticate(); - self.Subscribe(kSymbols, kSchema, kSType); + self.Subscribe(kSymbols, kSchema, kSType, true); }}; LiveBlocking target = builder_.SetDataset(kDataset) @@ -121,7 +122,8 @@ TEST_F(LiveBlockingTests, TestSubscriptionChunkingUnixNanos) { const auto chunk_size = std::min(static_cast(500), kSymbolCount - i); const std::vector symbols_chunk(chunk_size, kSymbol); - self.Subscribe(symbols_chunk, kSchema, kSType); + self.Subscribe(symbols_chunk, kSchema, kSType, + i + chunk_size == kSymbolCount); i += chunk_size; } }}; @@ -148,7 +150,7 @@ TEST_F(LiveBlockingTests, TestSubscriptionUnixNanos0) { self.Accept(); self.Authenticate(); std::size_t i{}; - self.Subscribe(kSymbols, kSchema, kSType, "0"); + self.Subscribe(kSymbols, kSchema, kSType, "0", true); }}; LiveBlocking target = builder_.SetDataset(kDataset) @@ -178,7 +180,8 @@ TEST_F(LiveBlockingTests, TestSubscriptionChunkingStringStart) { const auto chunk_size = std::min(static_cast(500), kSymbolCount - i); const std::vector symbols_chunk(chunk_size, kSymbol); - self.Subscribe(symbols_chunk, kSchema, kSType, kStart); + self.Subscribe(symbols_chunk, kSchema, kSType, kStart, + i + chunk_size == kSymbolCount); i += chunk_size; } }}; @@ -211,7 +214,8 @@ TEST_F(LiveBlockingTests, TestSubscribeSnapshot) { const auto chunk_size = std::min(static_cast(500), kSymbolCount - i); const std::vector symbols_chunk(chunk_size, kSymbol); - self.SubscribeWithSnapshot(symbols_chunk, kSchema, kSType); + self.SubscribeWithSnapshot(symbols_chunk, kSchema, kSType, + i + chunk_size == kSymbolCount); i += chunk_size; } }}; @@ -500,7 +504,8 @@ TEST_F(LiveBlockingTests, TestReconnectAndResubscribe) { &should_close_cv, &should_close_mutex](mock::MockLsgServer& self) { self.Accept(); self.Authenticate(); - self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol, "0"); + self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol, "0", + true); self.Start(); self.SendRecord(kRec); { @@ -517,7 +522,7 @@ TEST_F(LiveBlockingTests, TestReconnectAndResubscribe) { // Wait for reconnect self.Accept(); self.Authenticate(); - self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol); + self.Subscribe(kAllSymbols, Schema::Trades, SType::RawSymbol, true); self.Start(); self.SendRecord(kRec); }); diff --git a/tests/src/live_threaded_tests.cpp b/tests/src/live_threaded_tests.cpp index 9d13b9a..cd55e51 100644 --- a/tests/src/live_threaded_tests.cpp +++ b/tests/src/live_threaded_tests.cpp @@ -199,7 +199,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackReconnectAndResubscribe) { kSType, kUseSnapshot](mock::MockLsgServer& self) { self.Accept(); self.Authenticate(); - self.Subscribe(kAllSymbols, kSchema, kSType, "0"); + self.Subscribe(kAllSymbols, kSchema, kSType, "0", true); self.Start(); self.SendRecord(kRec); { @@ -210,7 +210,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackReconnectAndResubscribe) { self.Close(); self.Accept(); self.Authenticate(); - self.Subscribe(kAllSymbols, kSchema, kSType); + self.Subscribe(kAllSymbols, kSchema, kSType, true); self.Start(); self.SendRecord(kRec); }}; @@ -290,7 +290,7 @@ TEST_F(LiveThreadedTests, TestDeadlockPrevention) { self.Close(); self.Accept(); self.Authenticate(); - self.Subscribe(kSymbols, kSchema, kSType); + self.Subscribe(kSymbols, kSchema, kSType, true); }}; LiveThreaded target = builder_.SetLogReceiver(ILogReceiver::Default()) .SetDataset(dataset::kXnasItch) diff --git a/tests/src/mock_lsg_server.cpp b/tests/src/mock_lsg_server.cpp index 84a9aa7..44ec657 100644 --- a/tests/src/mock_lsg_server.cpp +++ b/tests/src/mock_lsg_server.cpp @@ -9,6 +9,7 @@ #include // SHA256_DIGEST_LENGTH #include +#include #include "databento/compat.hpp" #include "databento/constants.hpp" @@ -119,12 +120,13 @@ void MockLsgServer::Authenticate() { } void MockLsgServer::Subscribe(const std::vector& symbols, - Schema schema, SType stype) { - Subscribe(symbols, schema, stype, ""); + Schema schema, SType stype, bool is_last) { + Subscribe(symbols, schema, stype, "", is_last); } void MockLsgServer::SubscribeWithSnapshot( - const std::vector& symbols, Schema schema, SType stype) { + const std::vector& symbols, Schema schema, SType stype, + bool is_last) { const auto received = Receive(); EXPECT_NE( received.find("symbols=" + @@ -137,11 +139,13 @@ void MockLsgServer::SubscribeWithSnapshot( EXPECT_EQ(received.find("start="), std::string::npos); EXPECT_NE(received.find("id="), std::string::npos); EXPECT_NE(received.find("snapshot=1"), std::string::npos); + EXPECT_NE(received.find(std::string{"is_last="} + std::to_string(is_last)), + std::string::npos); } void MockLsgServer::Subscribe(const std::vector& symbols, Schema schema, SType stype, - const std::string& start) { + const std::string& start, bool is_last) { const auto received = Receive(); EXPECT_NE( received.find("symbols=" + @@ -158,6 +162,8 @@ void MockLsgServer::Subscribe(const std::vector& symbols, } EXPECT_NE(received.find("id="), std::string::npos); EXPECT_NE(received.find("snapshot=0"), std::string::npos); + EXPECT_NE(received.find(std::string{"is_last="} + std::to_string(is_last)), + std::string::npos); } void MockLsgServer::Start() { diff --git a/tests/src/record_tests.cpp b/tests/src/record_tests.cpp index 6980749..4576e8f 100644 --- a/tests/src/record_tests.cpp +++ b/tests/src/record_tests.cpp @@ -116,8 +116,8 @@ TEST(RecordTests, TestInstrumentDefMsgToString) { 8, 9, 10, - 11, kUndefPrice, + 11, 12, 13, 14, @@ -137,6 +137,14 @@ TEST(RecordTests, TestInstrumentDefMsgToString) { 28, 29, 30, + 31, + 32, + 33, + 34, + 35, + 36, + 37, + 38, {'U', 'S', 'D'}, {'U', 'S', 'D'}, {'A'}, @@ -149,14 +157,13 @@ TEST(RecordTests, TestInstrumentDefMsgToString) { {'D'}, {}, {}, + {'E', 'S', 'M', '5'}, InstrumentClass::Future, MatchAlgorithm::Fifo, 33, 34, 35, 36, - 37, - 38, SecurityUpdateAction::Add, 39, 40, @@ -165,10 +172,11 @@ TEST(RecordTests, TestInstrumentDefMsgToString) { 42, 43, 44, - {}}; + InstrumentClass::CommoditySpot, + Side::Bid}; const auto res = ToString(target); ASSERT_EQ(res, R"(InstrumentDefMsg { - hd = RecordHeader { length = 100, rtype = InstrumentDef, publisher_id = 1, instrument_id = 1, ts_event = 1970-01-01T00:00:00.000000000Z }, + hd = RecordHeader { length = 130, rtype = InstrumentDef, publisher_id = 1, instrument_id = 1, ts_event = 1970-01-01T00:00:00.000000000Z }, ts_recv = 1970-01-01T00:00:00.000000000Z, min_price_increment = 0.000000001, display_factor = 0.000000002, @@ -177,30 +185,38 @@ TEST(RecordTests, TestInstrumentDefMsgToString) { high_limit_price = 0.000000005, low_limit_price = 0.000000006, max_price_variation = 0.000000007, - trading_reference_price = 0.000000008, - unit_of_measure_qty = 0.000000009, - min_price_increment_amount = 0.000000010, - price_ratio = 0.000000011, + unit_of_measure_qty = 0.000000008, + min_price_increment_amount = 0.000000009, + price_ratio = 0.000000010, strike_price = kUndefPrice, - inst_attrib_value = 12, - underlying_id = 13, - raw_instrument_id = 14, - market_depth_implied = 15, - market_depth = 16, - market_segment_id = 17, - max_trade_vol = 18, - min_lot_size = 19, - min_lot_size_block = 20, - min_lot_size_round_lot = 21, - min_trade_vol = 22, - contract_multiplier = 23, - decay_quantity = 24, - original_contract_size = 25, - trading_reference_date = 26, - appl_id = 27, - maturity_year = 28, - decay_start_date = 29, - channel_id = 30, + raw_instrument_id = 11, + leg_price = 0.000000012, + leg_delta = 0.000000013, + inst_attrib_value = 14, + underlying_id = 15, + market_depth_implied = 16, + market_depth = 17, + market_segment_id = 18, + max_trade_vol = 19, + min_lot_size = 20, + min_lot_size_block = 21, + min_lot_size_round_lot = 22, + min_trade_vol = 23, + contract_multiplier = 24, + decay_quantity = 25, + original_contract_size = 26, + leg_instrument_id = 27, + leg_ratio_price_numerator = 28, + leg_ratio_price_denominator = 29, + leg_ratio_qty_numerator = 30, + leg_ratio_qty_denominator = 31, + leg_underlying_id = 32, + appl_id = 33, + maturity_year = 34, + decay_start_date = 35, + channel_id = 36, + leg_count = 37, + leg_index = 38, currency = "USD", settl_currency = "USD", secsubtype = "A", @@ -213,14 +229,13 @@ TEST(RecordTests, TestInstrumentDefMsgToString) { unit_of_measure = "D", underlying = "", strike_price_currency = "", + leg_raw_symbol = "ESM5", instrument_class = Future, match_algorithm = Fifo, - md_security_trading_status = 33, - main_fraction = 34, - price_display_format = 35, - settl_price_type = 36, - sub_fraction = 37, - underlying_product = 38, + main_fraction = 33, + price_display_format = 34, + sub_fraction = 35, + underlying_product = 36, security_update_action = Add, maturity_month = 39, maturity_day = 40, @@ -228,7 +243,9 @@ TEST(RecordTests, TestInstrumentDefMsgToString) { user_defined_instrument = No, contract_multiplier_unit = 42, flow_schedule_type = 43, - tick_rule = 44 + tick_rule = 44, + leg_instrument_class = CommoditySpot, + leg_side = Bid })"); } diff --git a/tests/src/zstd_stream_tests.cpp b/tests/src/zstd_stream_tests.cpp index 3f2d966..ea7378b 100644 --- a/tests/src/zstd_stream_tests.cpp +++ b/tests/src/zstd_stream_tests.cpp @@ -15,7 +15,7 @@ namespace databento::detail::tests { TEST(ZstdStreamTests, TestMultiFrameFiles) { constexpr auto kRecordCount = 8; const std::string file_path = - TEST_BUILD_DIR "/data/multi-frame.definition.v1.dbn.zst"; + TEST_DATA_DIR "/multi-frame.definition.v1.dbn.frag.zst"; databento::detail::ZstdDecodeStream target{ std::make_unique(file_path)};