diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index ce1a346..1115adb 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -32,13 +32,12 @@ jobs: -DHAVE_RULES=ON cmake --build build --config Release --parallel sudo cmake --install build --prefix /usr - - name: Install gtest - uses: MarkusJx/googletest-installer@v1.1 - name: CMake configure run: | cmake -S . -B build \ -GNinja \ -DCMAKE_CXX_COMPILER=${{ matrix.compiler }} \ + -DDATABENTO_USE_EXTERNAL_GTEST=0 \ -DDATABENTO_ENABLE_UNIT_TESTING=1 \ -DDATABENTO_ENABLE_EXAMPLES=1 \ -DDATABENTO_ENABLE_CLANG_TIDY=1 \ diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cd8506..ea9fe88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,30 @@ # Changelog +## 0.38.0 - 2025-06-10 + +### Enhancements +- Made the buffer size used by the live clients when reading from the TCP socket + configurable through the `LiveBuilder::SetBufferSize()` method +- Added log level prefix to `ConsoleLogReceiver` output +- Added `iomanip` compatibility: fill, precision, and width to `pretty::Px` (formerly + `FixPx`) +- Added new `pretty::Ts` helper type for human-readable formatting of `UnixNanos` + +### Breaking changes +- Live client instances can only be created through the `LiveBuilder` class +- Changed `HeartbeatInterval()` getters on `LiveBlocking` and `LiveThreaded` to return + an `std::optional` +- Added new optional `ShouldLog` virtual method to `ILogReceiver` to + filter the levels of log messages that will be sent to the receiver + +### Deprecations +- Deprecated `FixPx` in favor of `pretty::Px` which has consistent naming with the API + and the Python and Rust client libraries + +### Bug fixes +- Ensure `CPPHTTPLIB_OPENSSL_SUPPORT` is defined at all locations where `cpp-httplib` + is included + ## 0.37.1 - 2025-06-03 ### Bug fixes diff --git a/CMakeLists.txt b/CMakeLists.txt index 97a5fe0..264eeda 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.0) project( databento - VERSION 0.37.1 + VERSION 0.38.0 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/cmake/SourcesAndHeaders.cmake b/cmake/SourcesAndHeaders.cmake index 093490b..2c51ce8 100644 --- a/cmake/SourcesAndHeaders.cmake +++ b/cmake/SourcesAndHeaders.cmake @@ -28,6 +28,7 @@ set(headers include/databento/live_threaded.hpp include/databento/log.hpp include/databento/metadata.hpp + include/databento/pretty.hpp include/databento/publishers.hpp include/databento/record.hpp include/databento/symbol_map.hpp @@ -58,7 +59,6 @@ set(sources src/enums.cpp src/exceptions.cpp src/file_stream.cpp - src/fixed_price.cpp src/flag_set.cpp src/historical.cpp src/live.cpp @@ -66,6 +66,7 @@ set(sources src/live_threaded.cpp src/log.cpp src/metadata.cpp + src/pretty.cpp src/publishers.cpp src/record.cpp src/symbol_map.cpp diff --git a/include/databento/detail/buffer.hpp b/include/databento/detail/buffer.hpp index 539d2c1..d618970 100644 --- a/include/databento/detail/buffer.hpp +++ b/include/databento/detail/buffer.hpp @@ -11,7 +11,9 @@ namespace databento::detail { class Buffer : public IReadable, public IWritable { public: - Buffer() : Buffer(64 * std::size_t{1 << 10}) {} + static constexpr std::size_t kDefaultBufSize = 64 * std::size_t{1 << 10}; + + Buffer() : Buffer(kDefaultBufSize) {} explicit Buffer(std::size_t init_capacity) : buf_{AlignedNew(init_capacity), AlignedDelete}, end_{buf_.get() + init_capacity}, @@ -80,7 +82,7 @@ class Buffer : public IReadable, public IWritable { UniqueBufPtr buf_; std::byte* end_; - std::byte* read_pos_{}; - std::byte* write_pos_{}; + std::byte* read_pos_; + std::byte* write_pos_; }; } // namespace databento::detail diff --git a/include/databento/exceptions.hpp b/include/databento/exceptions.hpp index d0851c2..3e91053 100644 --- a/include/databento/exceptions.hpp +++ b/include/databento/exceptions.hpp @@ -1,5 +1,8 @@ #pragma once +#ifndef CPPHTTPLIB_OPENSSL_SUPPORT +#define CPPHTTPLIB_OPENSSL_SUPPORT +#endif #include // Error #include // json, parse_error diff --git a/include/databento/fixed_price.hpp b/include/databento/fixed_price.hpp index 6f998c6..1322d3e 100644 --- a/include/databento/fixed_price.hpp +++ b/include/databento/fixed_price.hpp @@ -1,25 +1,16 @@ #pragma once #include -#include #include -#include "databento/constants.hpp" +#include "databento/pretty.hpp" namespace databento { -// A fixed-precision price. -struct FixPx { - bool IsUndefined() const { return val == databento::kUndefPrice; } +// Has been renamed to pretty::Px +using FixPx [[deprecated]] = pretty::Px; - std::int64_t val; -}; - -std::ostream& operator<<(std::ostream& stream, FixPx fix_px); - -// Convert a fixed-precision price to a formatted string. -inline std::string PxToString(std::int64_t px) { - std::ostringstream ss; - ss << FixPx{px}; - return ss.str(); +// Has been moved to the pretty namespace +[[deprecated]] inline std::string PxToString(std::int64_t px) { + return pretty::PxToString(px); } } // namespace databento diff --git a/include/databento/live.hpp b/include/databento/live.hpp index 149e871..9496fd5 100644 --- a/include/databento/live.hpp +++ b/include/databento/live.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include "databento/enums.hpp" // VersionUpgradePolicy @@ -9,13 +10,18 @@ #include "databento/publishers.hpp" namespace databento { +// Forward declarations class ILogReceiver; // A helper class for constructing a Live client, either an instance of // LiveBlocking or LiveThreaded. class LiveBuilder { public: - LiveBuilder() = default; + LiveBuilder(); + + /* + * Required settters + */ // Sets `key_` based on the environment variable DATABENTO_API_KEY. // @@ -23,8 +29,13 @@ class LiveBuilder { // program. LiveBuilder& SetKeyFromEnv(); LiveBuilder& SetKey(std::string key); - LiveBuilder& SetDataset(std::string dataset); LiveBuilder& SetDataset(Dataset dataset); + LiveBuilder& SetDataset(std::string dataset); + + /* + * Optional settters + */ + // Whether to append the gateway send timestamp after each DBN message. LiveBuilder& SetSendTsOut(bool send_ts_out); // Set the version upgrade policy for when receiving DBN data from a prior @@ -36,6 +47,13 @@ class LiveBuilder { LiveBuilder& SetHeartbeatInterval(std::chrono::seconds heartbeat_interval); // Overrides the gateway and port. This is an advanced method. LiveBuilder& SetAddress(std::string gateway, std::uint16_t port); + // Overrides the size of the buffer used for reading data from the TCP socket. + LiveBuilder& SetBufferSize(std::size_t size); + + /* + * Build a live client instance + */ + // Attempts to construct an instance of a blocking live client or throws an // exception. LiveBlocking BuildBlocking(); @@ -51,8 +69,10 @@ class LiveBuilder { std::uint16_t port_{}; std::string key_; std::string dataset_; + bool send_ts_out_{false}; VersionUpgradePolicy upgrade_policy_{VersionUpgradePolicy::UpgradeToV3}; - std::chrono::seconds heartbeat_interval_{}; + std::optional heartbeat_interval_{}; + std::size_t buffer_size_; }; } // namespace databento diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index c55eeb1..2ec5517 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -4,6 +4,7 @@ #include // milliseconds #include #include +#include #include #include #include // pair @@ -18,7 +19,10 @@ #include "databento/record.hpp" // Record, RecordHeader namespace databento { +// Forward declaration class ILogReceiver; +class LiveBuilder; +class LiveThreaded; // A client for interfacing with Databento's real-time and intraday replay // market data API. This client provides a blocking API for getting the next @@ -26,13 +30,6 @@ class ILogReceiver; // particular dataset. class LiveBlocking { public: - LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, - bool send_ts_out, VersionUpgradePolicy upgrade_policy, - std::chrono::seconds heartbeat_interval); - LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, - std::string gateway, std::uint16_t port, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::chrono::seconds heartbeat_interval); /* * Getters */ @@ -45,8 +42,8 @@ class LiveBlocking { VersionUpgradePolicy UpgradePolicy() const { return upgrade_policy_; } // The the first member of the pair will be true, when the heartbeat interval // was overridden. - std::pair HeartbeatInterval() const { - return {heartbeat_interval_.count() > 0, heartbeat_interval_}; + std::optional HeartbeatInterval() const { + return heartbeat_interval_; } const std::vector& Subscriptions() const { return subscriptions_; @@ -93,6 +90,19 @@ class LiveBlocking { void Resubscribe(); private: + friend LiveBuilder; + friend LiveThreaded; + + LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, + bool send_ts_out, VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size); + LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, + std::string gateway, std::uint16_t port, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size); + std::string DetermineGateway() const; std::uint64_t Authenticate(); std::string DecodeChallenge(); @@ -115,11 +125,11 @@ class LiveBlocking { bool send_ts_out_; std::uint8_t version_{}; VersionUpgradePolicy upgrade_policy_; - std::chrono::seconds heartbeat_interval_; + std::optional heartbeat_interval_; detail::TcpClient client_; std::uint32_t sub_counter_{}; std::vector subscriptions_; - detail::Buffer buffer_{}; + detail::Buffer buffer_; // Must be 8-byte aligned for records alignas(RecordHeader) std::array compat_buffer_{}; std::uint64_t session_id_; diff --git a/include/databento/live_threaded.hpp b/include/databento/live_threaded.hpp index c44b8fa..05e9bba 100644 --- a/include/databento/live_threaded.hpp +++ b/include/databento/live_threaded.hpp @@ -1,9 +1,12 @@ #pragma once #include +#include #include // function #include // unique_ptr +#include #include +#include #include // pair #include @@ -14,7 +17,9 @@ #include "databento/timeseries.hpp" // MetadataCallback, RecordCallback namespace databento { +// Forward declaration class ILogReceiver; +class LiveBuilder; // A client for interfacing with Databento's real-time and intraday replay // market data API. This client provides a threaded event-driven API for @@ -22,7 +27,7 @@ class ILogReceiver; // is associated with a particular dataset. class LiveThreaded { public: - enum class ExceptionAction { + enum class ExceptionAction : std::uint8_t { // Start a new session. Return this instead of calling `Start`, which would // cause a deadlock. Restart, @@ -32,13 +37,6 @@ class LiveThreaded { using ExceptionCallback = std::function; - LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, - bool send_ts_out, VersionUpgradePolicy upgrade_policy, - std::chrono::seconds heartbeat_interval); - LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, - std::string gateway, std::uint16_t port, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::chrono::seconds heartbeat_interval); LiveThreaded(const LiveThreaded&) = delete; LiveThreaded& operator=(const LiveThreaded&) = delete; LiveThreaded(LiveThreaded&& other) noexcept; @@ -57,7 +55,7 @@ class LiveThreaded { VersionUpgradePolicy UpgradePolicy() const; // The the first member of the pair will be true, when the heartbeat interval // was overridden. - std::pair HeartbeatInterval() const; + std::optional HeartbeatInterval() const; const std::vector& Subscriptions() const; std::vector& Subscriptions(); @@ -96,6 +94,8 @@ class LiveThreaded { KeepGoing BlockForStop(std::chrono::milliseconds timeout); private: + friend LiveBuilder; + struct Impl; static void ProcessingThread(Impl* impl, MetadataCallback&& metadata_callback, @@ -103,8 +103,18 @@ class LiveThreaded { ExceptionCallback&& exception_callback); static ExceptionAction ExceptionHandler( Impl* impl, const ExceptionCallback& exception_callback, - const std::exception& exc, const char* pretty_function_name, - const char* message); + const std::exception& exc, std::string_view pretty_function_name, + std::string_view message); + + LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, + bool send_ts_out, VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size); + LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, + std::string gateway, std::uint16_t port, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size); // unique_ptr to be movable std::unique_ptr impl_; diff --git a/include/databento/log.hpp b/include/databento/log.hpp index e4c7525..d8acf18 100644 --- a/include/databento/log.hpp +++ b/include/databento/log.hpp @@ -19,11 +19,13 @@ class ILogReceiver { virtual ~ILogReceiver() = default; virtual void Receive(databento::LogLevel level, const std::string& msg) = 0; + virtual bool ShouldLog(databento::LogLevel) const { return true; } }; class NullLogReceiver : public ILogReceiver { public: void Receive(databento::LogLevel, const std::string&) override {} + bool ShouldLog(databento::LogLevel) const override { return false; } }; class ConsoleLogReceiver : public ILogReceiver { @@ -34,9 +36,15 @@ class ConsoleLogReceiver : public ILogReceiver { ConsoleLogReceiver(LogLevel min_level, std::ostream& stream); void Receive(LogLevel level, const std::string& msg) override; + bool ShouldLog(databento::LogLevel level) const override { + return level > min_level_; + } private: std::ostream& stream_; const databento::LogLevel min_level_; }; + +std::ostream& operator<<(std::ostream& out, LogLevel level); +const char* ToString(LogLevel level); } // namespace databento diff --git a/include/databento/pretty.hpp b/include/databento/pretty.hpp new file mode 100644 index 0000000..848a047 --- /dev/null +++ b/include/databento/pretty.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include +#include +#include + +#include "databento/constants.hpp" +#include "databento/datetime.hpp" + +namespace databento::pretty { +// A helper type for formatting the fixed-precision prices used in DBN. +// +// Supports configurable fill, width, and precision [0, 6). By default +// will print all 9 decimal places of precision. +struct Px { + bool IsUndefined() const { return val == databento::kUndefPrice; } + + std::int64_t val; +}; + +std::ostream& operator<<(std::ostream& stream, Px px); + +// A helper type for formatting the nanosecond UNIX timestamps used in DBN to +// the canonical ISO 8601 format used by Databento. +// +// Supports configurable fill and width. +struct Ts { + bool IsUndefined() const { + return val.time_since_epoch().count() == databento::kUndefTimestamp; + } + + UnixNanos val; +}; + +std::ostream& operator<<(std::ostream& stream, Ts ts); + +// Convert a fixed-precision price to a formatted string. +inline std::string PxToString(std::int64_t px) { + std::ostringstream ss; + ss << Px{px}; + return ss.str(); +} +} // namespace databento::pretty diff --git a/include/databento/publishers.hpp b/include/databento/publishers.hpp index baafcf4..9a20062 100644 --- a/include/databento/publishers.hpp +++ b/include/databento/publishers.hpp @@ -335,13 +335,13 @@ enum class Publisher : std::uint16_t { EqusAllFiny = 69, // Databento US Equities (All Feeds) - FINRA/Nasdaq TRF Chicago EqusAllFinc = 70, - // Databento US Equities (All Feeds) - CBOE BZX + // Databento US Equities (All Feeds) - Cboe BZX EqusAllBats = 71, - // Databento US Equities (All Feeds) - CBOE BYX + // Databento US Equities (All Feeds) - Cboe BYX EqusAllBaty = 72, - // Databento US Equities (All Feeds) - CBOE EDGA + // Databento US Equities (All Feeds) - Cboe EDGA EqusAllEdga = 73, - // Databento US Equities (All Feeds) - CBOE EDGX + // Databento US Equities (All Feeds) - Cboe EDGX EqusAllEdgx = 74, // Databento US Equities (All Feeds) - Nasdaq BX EqusAllXbos = 75, diff --git a/live.hpp b/live.hpp new file mode 100644 index 0000000..9496fd5 --- /dev/null +++ b/live.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include +#include +#include + +#include "databento/enums.hpp" // VersionUpgradePolicy +#include "databento/live_blocking.hpp" +#include "databento/live_threaded.hpp" +#include "databento/publishers.hpp" + +namespace databento { +// Forward declarations +class ILogReceiver; + +// A helper class for constructing a Live client, either an instance of +// LiveBlocking or LiveThreaded. +class LiveBuilder { + public: + LiveBuilder(); + + /* + * Required settters + */ + + // Sets `key_` based on the environment variable DATABENTO_API_KEY. + // + // NOTE: This is not thread-safe if `std::setenv` is used elsewhere in the + // program. + LiveBuilder& SetKeyFromEnv(); + LiveBuilder& SetKey(std::string key); + LiveBuilder& SetDataset(Dataset dataset); + LiveBuilder& SetDataset(std::string dataset); + + /* + * Optional settters + */ + + // Whether to append the gateway send timestamp after each DBN message. + LiveBuilder& SetSendTsOut(bool send_ts_out); + // Set the version upgrade policy for when receiving DBN data from a prior + // version. Defaults to upgrading to DBNv2 (if not already). + LiveBuilder& SetUpgradePolicy(VersionUpgradePolicy upgrade_policy); + // Sets the receiver of the logs to be used by the client. + LiveBuilder& SetLogReceiver(ILogReceiver* log_receiver); + // Overrides the heartbeat interval. + LiveBuilder& SetHeartbeatInterval(std::chrono::seconds heartbeat_interval); + // Overrides the gateway and port. This is an advanced method. + LiveBuilder& SetAddress(std::string gateway, std::uint16_t port); + // Overrides the size of the buffer used for reading data from the TCP socket. + LiveBuilder& SetBufferSize(std::size_t size); + + /* + * Build a live client instance + */ + + // Attempts to construct an instance of a blocking live client or throws an + // exception. + LiveBlocking BuildBlocking(); + // Attempts to construct an instance of a threaded live client or throws an + // exception. + LiveThreaded BuildThreaded(); + + private: + void Validate(); + + ILogReceiver* log_receiver_{}; + std::string gateway_{}; + std::uint16_t port_{}; + std::string key_; + std::string dataset_; + + bool send_ts_out_{false}; + VersionUpgradePolicy upgrade_policy_{VersionUpgradePolicy::UpgradeToV3}; + std::optional heartbeat_interval_{}; + std::size_t buffer_size_; +}; +} // namespace databento diff --git a/live_blocking.hpp b/live_blocking.hpp new file mode 100644 index 0000000..2ec5517 --- /dev/null +++ b/live_blocking.hpp @@ -0,0 +1,138 @@ +#pragma once + +#include +#include // milliseconds +#include +#include +#include +#include +#include +#include // pair +#include + +#include "databento/datetime.hpp" // UnixNanos +#include "databento/dbn.hpp" // Metadata +#include "databento/detail/buffer.hpp" +#include "databento/detail/tcp_client.hpp" // TcpClient +#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy +#include "databento/live_subscription.hpp" +#include "databento/record.hpp" // Record, RecordHeader + +namespace databento { +// Forward declaration +class ILogReceiver; +class LiveBuilder; +class LiveThreaded; + +// A client for interfacing with Databento's real-time and intraday replay +// market data API. This client provides a blocking API for getting the next +// record. Unlike Historical, each instance of LiveBlocking is associated with a +// particular dataset. +class LiveBlocking { + public: + /* + * Getters + */ + + const std::string& Key() const { return key_; } + const std::string& Dataset() const { return dataset_; } + const std::string& Gateway() const { return gateway_; } + std::uint16_t Port() const { return port_; } + bool SendTsOut() const { return send_ts_out_; } + VersionUpgradePolicy UpgradePolicy() const { return upgrade_policy_; } + // The the first member of the pair will be true, when the heartbeat interval + // was overridden. + std::optional HeartbeatInterval() const { + return heartbeat_interval_; + } + const std::vector& Subscriptions() const { + return subscriptions_; + } + std::vector& Subscriptions() { return subscriptions_; } + + /* + * Methods + */ + + // Add a new subscription. A single client instance supports multiple + // subscriptions. Note there is no unsubscribe method. Subscriptions end + // when the client disconnects in its destructor. + void Subscribe(const std::vector& symbols, Schema schema, + SType stype_in); + void Subscribe(const std::vector& symbols, Schema schema, + SType stype_in, UnixNanos start); + void Subscribe(const std::vector& symbols, Schema schema, + SType stype_in, const std::string& start); + void SubscribeWithSnapshot(const std::vector& symbols, + Schema schema, SType stype_in); + // Notifies the gateway to start sending messages for all subscriptions. + // + // This method should only be called once per instance. + Metadata Start(); + // Block on getting the next record. The returned reference is valid until + // this method is called again. + // + // This method should only be called after `Start`. + const Record& NextRecord(); + // Block on getting the next record. The returned pointer is valid until + // this method is called again. Will return `nullptr` if the `timeout` is + // reached. + // + // This method should only be called after `Start`. + const Record* NextRecord(std::chrono::milliseconds timeout); + // Stops the session with the gateway. Once stopped, the session cannot be + // restarted. + void Stop(); + // Closes the current connection and attempts to reconnect to the gateway. + void Reconnect(); + // Resubscribes to all subscriptions, removing the original `start` time, if + // any. Usually performed after a `Reconnect()`. + void Resubscribe(); + + private: + friend LiveBuilder; + friend LiveThreaded; + + LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, + bool send_ts_out, VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size); + LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, + std::string gateway, std::uint16_t port, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size); + + std::string DetermineGateway() const; + std::uint64_t Authenticate(); + std::string DecodeChallenge(); + std::string GenerateCramReply(std::string_view challenge_key); + std::string EncodeAuthReq(std::string_view auth); + std::uint64_t DecodeAuthResp(); + void IncrementSubCounter(); + void Subscribe(std::string_view sub_msg, + const std::vector& symbols, bool use_snapshot); + detail::TcpClient::Result FillBuffer(std::chrono::milliseconds timeout); + RecordHeader* BufferRecordHeader(); + + static constexpr std::size_t kMaxStrLen = 24L * 1024; + + ILogReceiver* log_receiver_; + std::string key_; + std::string dataset_; + std::string gateway_; + std::uint16_t port_; + bool send_ts_out_; + std::uint8_t version_{}; + VersionUpgradePolicy upgrade_policy_; + std::optional heartbeat_interval_; + detail::TcpClient client_; + std::uint32_t sub_counter_{}; + std::vector subscriptions_; + detail::Buffer buffer_; + // Must be 8-byte aligned for records + alignas(RecordHeader) std::array compat_buffer_{}; + std::uint64_t session_id_; + Record current_record_{nullptr}; +}; +} // namespace databento diff --git a/live_subscription.hpp b/live_subscription.hpp new file mode 100644 index 0000000..50d7313 --- /dev/null +++ b/live_subscription.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include +#include +#include +#include + +#include "databento/datetime.hpp" // UnixNanos +#include "databento/enums.hpp" // Schema, SType + +namespace databento { +struct LiveSubscription { + struct Snapshot {}; + struct NoStart {}; + using Start = std::variant; + + std::vector symbols; + Schema schema; + SType stype_in; + Start start; + std::uint32_t id{}; +}; +} // namespace databento diff --git a/live_threaded.hpp b/live_threaded.hpp new file mode 100644 index 0000000..05e9bba --- /dev/null +++ b/live_threaded.hpp @@ -0,0 +1,123 @@ +#pragma once + +#include +#include +#include // function +#include // unique_ptr +#include +#include +#include +#include // pair +#include + +#include "databento/datetime.hpp" // UnixNanos +#include "databento/detail/scoped_thread.hpp" // ScopedThread +#include "databento/enums.hpp" // Schema, SType +#include "databento/live_subscription.hpp" +#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback + +namespace databento { +// Forward declaration +class ILogReceiver; +class LiveBuilder; + +// A client for interfacing with Databento's real-time and intraday replay +// market data API. This client provides a threaded event-driven API for +// receiving the next record. Unlike Historical, each instance of LiveThreaded +// is associated with a particular dataset. +class LiveThreaded { + public: + enum class ExceptionAction : std::uint8_t { + // Start a new session. Return this instead of calling `Start`, which would + // cause a deadlock. + Restart, + // Close the connection and stop the callback thread. + Stop, + }; + using ExceptionCallback = + std::function; + + LiveThreaded(const LiveThreaded&) = delete; + LiveThreaded& operator=(const LiveThreaded&) = delete; + LiveThreaded(LiveThreaded&& other) noexcept; + LiveThreaded& operator=(LiveThreaded&& rhs) noexcept; + ~LiveThreaded(); + + /* + * Getters + */ + + const std::string& Key() const; + const std::string& Dataset() const; + const std::string& Gateway() const; + std::uint16_t Port() const; + bool SendTsOut() const; + VersionUpgradePolicy UpgradePolicy() const; + // The the first member of the pair will be true, when the heartbeat interval + // was overridden. + std::optional HeartbeatInterval() const; + const std::vector& Subscriptions() const; + std::vector& Subscriptions(); + + /* + * Methods + */ + + // Add a new subscription. A single client instance supports multiple + // subscriptions. Note there is no unsubscribe method. Subscriptions end + // when the client disconnects when it's destroyed. + void Subscribe(const std::vector& symbols, Schema schema, + SType stype_in); + void Subscribe(const std::vector& symbols, Schema schema, + SType stype_in, UnixNanos start); + void Subscribe(const std::vector& symbols, Schema schema, + SType stype_in, const std::string& start); + void SubscribeWithSnapshot(const std::vector& symbols, + Schema schema, SType stype_in); + // Notifies the gateway to start sending messages for all subscriptions. + // `metadata_callback` will be called exactly once, before any calls to + // `record_callback`. `record_callback` will be called for records from all + // subscriptions. + // + // This method should only be called once per instance. + void Start(RecordCallback record_callback); + void Start(MetadataCallback metadata_callback, + RecordCallback record_callback); + void Start(MetadataCallback metadata_callback, RecordCallback record_callback, + ExceptionCallback exception_callback); + // Closes the current connection, and attempts to reconnect to the gateway. + void Reconnect(); + void Resubscribe(); + // Blocking wait with an optional timeout for the session to close when the + // record_callback or the exception_callback return Stop. + void BlockForStop(); + KeepGoing BlockForStop(std::chrono::milliseconds timeout); + + private: + friend LiveBuilder; + + struct Impl; + + static void ProcessingThread(Impl* impl, MetadataCallback&& metadata_callback, + RecordCallback&& record_callback, + ExceptionCallback&& exception_callback); + static ExceptionAction ExceptionHandler( + Impl* impl, const ExceptionCallback& exception_callback, + const std::exception& exc, std::string_view pretty_function_name, + std::string_view message); + + LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, + bool send_ts_out, VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size); + LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, + std::string gateway, std::uint16_t port, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size); + + // unique_ptr to be movable + std::unique_ptr impl_; + detail::ScopedThread thread_; +}; +} // namespace databento diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index 26ac2a9..b57d262 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.37.1 +pkgver=0.38.0 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') diff --git a/src/detail/buffer.cpp b/src/detail/buffer.cpp index 44b2889..4659411 100644 --- a/src/detail/buffer.cpp +++ b/src/detail/buffer.cpp @@ -67,7 +67,7 @@ void Buffer::Reserve(std::size_t capacity) { void Buffer::Shift() { const auto unread_bytes = ReadCapacity(); if (unread_bytes) { - std::copy(read_pos_, write_pos_, buf_.get()); + std::copy(ReadBegin(), ReadEnd(), buf_.get()); } read_pos_ = buf_.get(); write_pos_ = read_pos_ + unread_bytes; diff --git a/src/fixed_price.cpp b/src/fixed_price.cpp deleted file mode 100644 index dbbf841..0000000 --- a/src/fixed_price.cpp +++ /dev/null @@ -1,22 +0,0 @@ -#include "databento/fixed_price.hpp" - -#include - -namespace databento { -std::ostream& operator<<(std::ostream& stream, FixPx fix_px) { - if (fix_px.IsUndefined()) { - stream << "kUndefPrice"; - return stream; - } - const bool price_neg = (fix_px.val < 0); - const int64_t fixed_price_abs = price_neg ? -fix_px.val : fix_px.val; - const int64_t price_integer = fixed_price_abs / kFixedPriceScale; - const int64_t price_fraction = fixed_price_abs % kFixedPriceScale; - if (price_neg) { - stream << '-'; - } - stream << price_integer << '.' << std::setw(9) << std::setfill('0') - << price_fraction; - return stream; -} -} // namespace databento diff --git a/src/historical.cpp b/src/historical.cpp index d627ec1..3088183 100644 --- a/src/historical.cpp +++ b/src/historical.cpp @@ -1,5 +1,8 @@ #include "databento/historical.hpp" +#ifndef CPPHTTPLIB_OPENSSL_SUPPORT +#define CPPHTTPLIB_OPENSSL_SUPPORT +#endif #include #include diff --git a/src/live.cpp b/src/live.cpp index 8680bde..7345311 100644 --- a/src/live.cpp +++ b/src/live.cpp @@ -4,6 +4,7 @@ #include // move #include "databento/constants.hpp" // kApiKeyLength +#include "databento/detail/buffer.hpp" // kDefaultBufSize #include "databento/exceptions.hpp" // InvalidArgumentError, LiveApiError #include "databento/live_blocking.hpp" // LiveBlocking #include "databento/live_threaded.hpp" // LiveThreaded @@ -11,6 +12,8 @@ using databento::LiveBuilder; +LiveBuilder::LiveBuilder() : buffer_size_{detail::Buffer::kDefaultBufSize} {} + LiveBuilder& LiveBuilder::SetKeyFromEnv() { char const* env_key = std::getenv("DATABENTO_API_KEY"); if (env_key == nullptr) { @@ -74,11 +77,13 @@ databento::LiveBlocking LiveBuilder::BuildBlocking() { if (gateway_.empty()) { return databento::LiveBlocking{log_receiver_, key_, dataset_, send_ts_out_, - upgrade_policy_, heartbeat_interval_}; + upgrade_policy_, heartbeat_interval_, + buffer_size_}; } return databento::LiveBlocking{ log_receiver_, key_, dataset_, gateway_, - port_, send_ts_out_, upgrade_policy_, heartbeat_interval_}; + port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, + buffer_size_}; } databento::LiveThreaded LiveBuilder::BuildThreaded() { @@ -86,11 +91,13 @@ databento::LiveThreaded LiveBuilder::BuildThreaded() { if (gateway_.empty()) { return databento::LiveThreaded{log_receiver_, key_, dataset_, send_ts_out_, - upgrade_policy_, heartbeat_interval_}; + upgrade_policy_, heartbeat_interval_, + buffer_size_}; } return databento::LiveThreaded{ log_receiver_, key_, dataset_, gateway_, - port_, send_ts_out_, upgrade_policy_, heartbeat_interval_}; + port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, + buffer_size_}; } void LiveBuilder::Validate() { diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index f5450ed..d6356dc 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -28,10 +28,11 @@ namespace { constexpr std::size_t kBucketIdLength = 5; } // namespace -LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, - std::string dataset, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::chrono::seconds heartbeat_interval) +LiveBlocking::LiveBlocking( + ILogReceiver* log_receiver, std::string key, std::string dataset, + bool send_ts_out, VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size) : log_receiver_{log_receiver}, key_{std::move(key)}, @@ -42,13 +43,15 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, upgrade_policy_{upgrade_policy}, heartbeat_interval_{heartbeat_interval}, client_{gateway_, port_}, + buffer_{buffer_size}, session_id_{this->Authenticate()} {} -LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, - std::string dataset, std::string gateway, - std::uint16_t port, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::chrono::seconds heartbeat_interval) +LiveBlocking::LiveBlocking( + ILogReceiver* log_receiver, std::string key, std::string dataset, + std::string gateway, std::uint16_t port, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size) : log_receiver_{log_receiver}, key_{std::move(key)}, dataset_{std::move(dataset)}, @@ -58,6 +61,7 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, upgrade_policy_{upgrade_policy}, heartbeat_interval_{heartbeat_interval}, client_{gateway_, port_}, + buffer_{buffer_size}, session_id_{this->Authenticate()} {} void LiveBlocking::Subscribe(const std::vector& symbols, @@ -194,8 +198,13 @@ const databento::Record* LiveBlocking::NextRecord( void LiveBlocking::Stop() { client_.Close(); } void LiveBlocking::Reconnect() { - log_receiver_->Receive(LogLevel::Info, "Reconnecting"); + if (log_receiver_->ShouldLog(LogLevel::Info)) { + std::ostringstream log_msg; + log_msg << "Reconnecting to " << gateway_ << ':' << port_; + log_receiver_->Receive(LogLevel::Info, log_msg.str()); + } client_ = detail::TcpClient{gateway_, port_}; + buffer_.Clear(); sub_counter_ = 0; session_id_ = this->Authenticate(); } @@ -227,7 +236,7 @@ std::string LiveBlocking::DecodeChallenge() { // first line is version std::string response{reinterpret_cast(buffer_.ReadBegin()), buffer_.ReadCapacity()}; - { + if (log_receiver_->ShouldLog(LogLevel::Debug)) { std::ostringstream log_ss; log_ss << "[LiveBlocking::DecodeChallenge] Challenge: " << response; log_receiver_->Receive(LogLevel::Debug, log_ss.str()); @@ -283,12 +292,13 @@ std::uint64_t LiveBlocking::Authenticate() { client_.WriteAll(req); const std::uint64_t session_id = DecodeAuthResp(); - std::ostringstream log_ss; - log_ss << "[LiveBlocking::Authenticate] Successfully authenticated with " - "session_id " - << session_id; - log_receiver_->Receive(LogLevel::Info, log_ss.str()); - + if (log_receiver_->ShouldLog(LogLevel::Info)) { + std::ostringstream log_ss; + log_ss << "[LiveBlocking::Authenticate] Successfully authenticated with " + "session_id " + << session_id; + log_receiver_->Receive(LogLevel::Info, log_ss.str()); + } return session_id; } @@ -314,8 +324,8 @@ std::string LiveBlocking::EncodeAuthReq(std::string_view auth) { std::ostringstream req_stream; req_stream << "auth=" << auth << "|dataset=" << dataset_ << "|encoding=dbn|" << "ts_out=" << send_ts_out_ << "|client=C++ " DATABENTO_VERSION; - if (heartbeat_interval_.count() > 0) { - req_stream << "|heartbeat_interval_s=" << heartbeat_interval_.count(); + if (heartbeat_interval_.has_value()) { + req_stream << "|heartbeat_interval_s=" << heartbeat_interval_->count(); } req_stream << '\n'; return req_stream.str(); diff --git a/src/live_threaded.cpp b/src/live_threaded.cpp index e4b0937..6b4d5ad 100644 --- a/src/live_threaded.cpp +++ b/src/live_threaded.cpp @@ -55,22 +55,25 @@ LiveThreaded::~LiveThreaded() { } } -LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key, - std::string dataset, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::chrono::seconds heartbeat_interval) - : impl_{std::make_unique(log_receiver, std::move(key), - std::move(dataset), send_ts_out, - upgrade_policy, heartbeat_interval)} {} - -LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key, - std::string dataset, std::string gateway, - std::uint16_t port, bool send_ts_out, - VersionUpgradePolicy upgrade_policy, - std::chrono::seconds heartbeat_interval) +LiveThreaded::LiveThreaded( + ILogReceiver* log_receiver, std::string key, std::string dataset, + bool send_ts_out, VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size) + : impl_{std::make_unique( + log_receiver, std::move(key), std::move(dataset), send_ts_out, + upgrade_policy, heartbeat_interval, buffer_size)} {} + +LiveThreaded::LiveThreaded( + ILogReceiver* log_receiver, std::string key, std::string dataset, + std::string gateway, std::uint16_t port, bool send_ts_out, + VersionUpgradePolicy upgrade_policy, + std::optional heartbeat_interval, + std::size_t buffer_size) : impl_{std::make_unique( log_receiver, std::move(key), std::move(dataset), std::move(gateway), - port, send_ts_out, upgrade_policy, heartbeat_interval)} {} + port, send_ts_out, upgrade_policy, heartbeat_interval, buffer_size)} { +} const std::string& LiveThreaded::Key() const { return impl_->blocking.Key(); } @@ -90,7 +93,7 @@ databento::VersionUpgradePolicy LiveThreaded::UpgradePolicy() const { return impl_->blocking.UpgradePolicy(); } -std::pair LiveThreaded::HeartbeatInterval() const { +std::optional LiveThreaded::HeartbeatInterval() const { return impl_->blocking.HeartbeatInterval(); } @@ -233,8 +236,8 @@ void LiveThreaded::ProcessingThread(Impl* impl, LiveThreaded::ExceptionAction LiveThreaded::ExceptionHandler( Impl* impl, const ExceptionCallback& exception_callback, - const std::exception& exc, const char* pretty_function_name, - const char* message) { + const std::exception& exc, std::string_view pretty_function_name, + std::string_view message) { if (exception_callback && exception_callback(exc) == ExceptionAction::Restart) { std::ostringstream log_ss; diff --git a/src/log.cpp b/src/log.cpp index 3f30efe..b63987a 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -21,7 +21,33 @@ ConsoleLogReceiver::ConsoleLogReceiver(LogLevel min_level, std::ostream& stream) : stream_{stream}, min_level_{min_level} {} void ConsoleLogReceiver::Receive(LogLevel level, const std::string& msg) { - if (level >= min_level_) { - stream_ << msg << '\n'; + if (ShouldLog(level)) { + stream_ << level << ": " << msg << '\n'; } } + +namespace databento { +std::ostream& operator<<(std::ostream& out, LogLevel level) { + out << ToString(level); + return out; +} +const char* ToString(LogLevel level) { + switch (level) { + case LogLevel::Debug: { + return "DEBUG"; + } + case LogLevel::Info: { + return "INFO"; + } + case LogLevel::Warning: { + return "WARN"; + } + case LogLevel::Error: { + return "ERROR"; + } + default: { + return "UNKNOWN"; + }; + } +} +} // namespace databento diff --git a/src/pretty.cpp b/src/pretty.cpp new file mode 100644 index 0000000..1f6f193 --- /dev/null +++ b/src/pretty.cpp @@ -0,0 +1,47 @@ +#include "databento/pretty.hpp" + +#include +#include +#include +#include + +#include "databento/datetime.hpp" + +namespace databento::pretty { +std::ostream& operator<<(std::ostream& stream, Px px) { + constexpr std::array kDivisors = { + 0, 100'000'000, 10'000'000, 1'000'000, 100'000, 10'000}; + if (px.IsUndefined()) { + stream << "UNDEF_PRICE"; + return stream; + } + const bool is_negative = (px.val < 0); + const int64_t px_abs = is_negative ? -px.val : px.val; + const int64_t price_integer = px_abs / kFixedPriceScale; + const int64_t price_fraction = px_abs % kFixedPriceScale; + std::ostringstream ss; + if (is_negative) { + ss << '-'; + } + const auto orig_precision = static_cast(stream.precision()); + if (orig_precision == 0) { + ss << price_integer; + } else { + // Don't support precision 6-8 (inclusive). 6 is the default and there's no + // way to disambiguate between explicitly set 6 and the default value, + // however by default we want to print all 9 digits + const auto precision = orig_precision < 6 ? orig_precision : 9; + ss << price_integer << '.' << std::setw(precision) << std::setfill('0') + << (precision == 9 ? price_fraction + : price_fraction / + kDivisors[static_cast(precision)]); + } + stream << ss.str(); + return stream; +} + +std::ostream& operator<<(std::ostream& stream, Ts ts) { + stream << ToIso8601(ts.val); + return stream; +} +} // namespace databento::pretty diff --git a/src/record.cpp b/src/record.cpp index cf3da88..27f4746 100644 --- a/src/record.cpp +++ b/src/record.cpp @@ -4,7 +4,7 @@ #include "databento/enums.hpp" #include "databento/exceptions.hpp" // InvalidArgumentError -#include "databento/fixed_price.hpp" +#include "databento/pretty.hpp" // Px #include "stream_op_helper.hpp" using databento::Record; @@ -229,7 +229,7 @@ std::ostream& operator<<(std::ostream& stream, const MboMsg& mbo_msg) { .Build() .AddField("hd", mbo_msg.hd) .AddField("order_id", mbo_msg.order_id) - .AddField("price", FixPx{mbo_msg.price}) + .AddField("price", pretty::Px{mbo_msg.price}) .AddField("size", mbo_msg.size) .AddField("flags", mbo_msg.flags) .AddField("channel_id", mbo_msg.channel_id) @@ -246,8 +246,8 @@ std::ostream& operator<<(std::ostream& stream, const BidAskPair& ba_pair) { .SetSpacer(" ") .SetTypeName("BidAskPair") .Build() - .AddField("bid_px", FixPx{ba_pair.bid_px}) - .AddField("ask_px", FixPx{ba_pair.ask_px}) + .AddField("bid_px", pretty::Px{ba_pair.bid_px}) + .AddField("ask_px", pretty::Px{ba_pair.ask_px}) .AddField("bid_sz", ba_pair.bid_sz) .AddField("ask_sz", ba_pair.ask_sz) .AddField("bid_ct", ba_pair.bid_ct) @@ -263,8 +263,8 @@ std::ostream& operator<<(std::ostream& stream, .SetSpacer(" ") .SetTypeName("ConsolidatedBidAskPair") .Build() - .AddField("bid_px", FixPx{ba_pair.bid_px}) - .AddField("ask_px", FixPx{ba_pair.ask_px}) + .AddField("bid_px", pretty::Px{ba_pair.bid_px}) + .AddField("ask_px", pretty::Px{ba_pair.ask_px}) .AddField("bid_sz", ba_pair.bid_sz) .AddField("ask_sz", ba_pair.ask_sz) .AddField("bid_pb", ba_pair.bid_pb) @@ -278,7 +278,7 @@ std::ostream& operator<<(std::ostream& stream, const Mbp1Msg& mbp_msg) { .SetSpacer("\n ") .Build() .AddField("hd", mbp_msg.hd) - .AddField("price", FixPx{mbp_msg.price}) + .AddField("price", pretty::Px{mbp_msg.price}) .AddField("size", mbp_msg.size) .AddField("action", mbp_msg.action) .AddField("side", mbp_msg.side) @@ -303,7 +303,7 @@ std::ostream& operator<<(std::ostream& stream, const Mbp10Msg& mbp_msg) { .SetSpacer("\n ") .Build() .AddField("hd", mbp_msg.hd) - .AddField("price", FixPx{mbp_msg.price}) + .AddField("price", pretty::Px{mbp_msg.price}) .AddField("size", mbp_msg.size) .AddField("action", mbp_msg.action) .AddField("side", mbp_msg.side) @@ -323,7 +323,7 @@ std::ostream& operator<<(std::ostream& stream, const BboMsg& bbo_msg) { .SetSpacer("\n ") .Build() .AddField("hd", bbo_msg.hd) - .AddField("price", FixPx{bbo_msg.price}) + .AddField("price", pretty::Px{bbo_msg.price}) .AddField("size", bbo_msg.size) .AddField("side", bbo_msg.side) .AddField("flags", bbo_msg.flags) @@ -339,7 +339,7 @@ std::ostream& operator<<(std::ostream& stream, const Cmbp1Msg& cmbp1_msg) { .SetSpacer("\n ") .Build() .AddField("hd", cmbp1_msg.hd) - .AddField("price", FixPx{cmbp1_msg.price}) + .AddField("price", pretty::Px{cmbp1_msg.price}) .AddField("size", cmbp1_msg.size) .AddField("action", cmbp1_msg.action) .AddField("side", cmbp1_msg.side) @@ -356,7 +356,7 @@ std::ostream& operator<<(std::ostream& stream, const CbboMsg& cbbo_msg) { .SetSpacer("\n ") .Build() .AddField("hd", cbbo_msg.hd) - .AddField("price", FixPx{cbbo_msg.price}) + .AddField("price", pretty::Px{cbbo_msg.price}) .AddField("size", cbbo_msg.size) .AddField("side", cbbo_msg.side) .AddField("flags", cbbo_msg.flags) @@ -373,7 +373,7 @@ std::ostream& operator<<(std::ostream& stream, const TradeMsg& trade_msg) { .SetTypeName("TradeMsg") .Build() .AddField("hd", trade_msg.hd) - .AddField("price", FixPx{trade_msg.price}) + .AddField("price", pretty::Px{trade_msg.price}) .AddField("size", trade_msg.size) .AddField("action", trade_msg.action) .AddField("side", trade_msg.side) @@ -393,10 +393,10 @@ std::ostream& operator<<(std::ostream& stream, const OhlcvMsg& ohlcv_msg) { .SetTypeName("OhlcvMsg") .Build() .AddField("hd", ohlcv_msg.hd) - .AddField("open", FixPx{ohlcv_msg.open}) - .AddField("high", FixPx{ohlcv_msg.high}) - .AddField("low", FixPx{ohlcv_msg.low}) - .AddField("close", FixPx{ohlcv_msg.close}) + .AddField("open", pretty::Px{ohlcv_msg.open}) + .AddField("high", pretty::Px{ohlcv_msg.high}) + .AddField("low", pretty::Px{ohlcv_msg.low}) + .AddField("close", pretty::Px{ohlcv_msg.close}) .AddField("volume", ohlcv_msg.volume) .Finish(); } @@ -429,21 +429,24 @@ std::ostream& operator<<(std::ostream& stream, .Build() .AddField("hd", instr_def_msg.hd) .AddField("ts_recv", instr_def_msg.ts_recv) - .AddField("min_price_increment", FixPx{instr_def_msg.min_price_increment}) - .AddField("display_factor", FixPx{instr_def_msg.display_factor}) + .AddField("min_price_increment", + pretty::Px{instr_def_msg.min_price_increment}) + .AddField("display_factor", pretty::Px{instr_def_msg.display_factor}) .AddField("expiration", instr_def_msg.expiration) .AddField("activation", instr_def_msg.activation) - .AddField("high_limit_price", FixPx{instr_def_msg.high_limit_price}) - .AddField("low_limit_price", FixPx{instr_def_msg.low_limit_price}) - .AddField("max_price_variation", FixPx{instr_def_msg.max_price_variation}) - .AddField("unit_of_measure_qty", FixPx{instr_def_msg.unit_of_measure_qty}) + .AddField("high_limit_price", pretty::Px{instr_def_msg.high_limit_price}) + .AddField("low_limit_price", pretty::Px{instr_def_msg.low_limit_price}) + .AddField("max_price_variation", + pretty::Px{instr_def_msg.max_price_variation}) + .AddField("unit_of_measure_qty", + pretty::Px{instr_def_msg.unit_of_measure_qty}) .AddField("min_price_increment_amount", - FixPx{instr_def_msg.min_price_increment_amount}) - .AddField("price_ratio", FixPx{instr_def_msg.price_ratio}) - .AddField("strike_price", FixPx{instr_def_msg.strike_price}) + pretty::Px{instr_def_msg.min_price_increment_amount}) + .AddField("price_ratio", pretty::Px{instr_def_msg.price_ratio}) + .AddField("strike_price", pretty::Px{instr_def_msg.strike_price}) .AddField("raw_instrument_id", instr_def_msg.raw_instrument_id) - .AddField("leg_price", FixPx{instr_def_msg.leg_price}) - .AddField("leg_delta", FixPx{instr_def_msg.leg_delta}) + .AddField("leg_price", pretty::Px{instr_def_msg.leg_price}) + .AddField("leg_delta", pretty::Px{instr_def_msg.leg_delta}) .AddField("inst_attrib_value", instr_def_msg.inst_attrib_value) .AddField("underlying_id", instr_def_msg.underlying_id) .AddField("market_depth_implied", instr_def_msg.market_depth_implied) @@ -518,15 +521,17 @@ std::ostream& operator<<(std::ostream& stream, .Build() .AddField("hd", imbalance_msg.hd) .AddField("ts_recv", imbalance_msg.ts_recv) - .AddField("ref_price", FixPx{imbalance_msg.ref_price}) + .AddField("ref_price", pretty::Px{imbalance_msg.ref_price}) .AddField("auction_time", imbalance_msg.auction_time) - .AddField("cont_book_clr_price", FixPx{imbalance_msg.cont_book_clr_price}) + .AddField("cont_book_clr_price", + pretty::Px{imbalance_msg.cont_book_clr_price}) .AddField("auct_interest_clr_price", - FixPx{imbalance_msg.auct_interest_clr_price}) - .AddField("ssr_filling_price", FixPx{imbalance_msg.ssr_filling_price}) - .AddField("ind_match_price", FixPx{imbalance_msg.ind_match_price}) - .AddField("upper_collar", FixPx{imbalance_msg.upper_collar}) - .AddField("lower_collar", FixPx{imbalance_msg.lower_collar}) + pretty::Px{imbalance_msg.auct_interest_clr_price}) + .AddField("ssr_filling_price", + pretty::Px{imbalance_msg.ssr_filling_price}) + .AddField("ind_match_price", pretty::Px{imbalance_msg.ind_match_price}) + .AddField("upper_collar", pretty::Px{imbalance_msg.upper_collar}) + .AddField("lower_collar", pretty::Px{imbalance_msg.lower_collar}) .AddField("paired_qty", imbalance_msg.paired_qty) .AddField("total_imbalance_qty", imbalance_msg.total_imbalance_qty) .AddField("market_imbalance_qty", imbalance_msg.market_imbalance_qty) @@ -550,7 +555,7 @@ std::ostream& operator<<(std::ostream& stream, const StatMsg& stat_msg) { .AddField("hd", stat_msg.hd) .AddField("ts_recv", stat_msg.ts_recv) .AddField("ts_ref", stat_msg.ts_ref) - .AddField("price", FixPx{stat_msg.price}) + .AddField("price", pretty::Px{stat_msg.price}) .AddField("quantity", stat_msg.quantity) .AddField("sequence", stat_msg.sequence) .AddField("ts_in_delta", stat_msg.ts_in_delta) diff --git a/src/v1.cpp b/src/v1.cpp index f3abe2d..8ad1a1a 100644 --- a/src/v1.cpp +++ b/src/v1.cpp @@ -5,7 +5,7 @@ #include // numeric_limits #include "databento/enums.hpp" -#include "databento/fixed_price.hpp" // FixedPx +#include "databento/pretty.hpp" // Px #include "databento/record.hpp" #include "databento/v2.hpp" #include "databento/v3.hpp" @@ -347,19 +347,22 @@ std::ostream& operator<<(std::ostream& stream, .Build() .AddField("hd", instr_def_msg.hd) .AddField("ts_recv", instr_def_msg.ts_recv) - .AddField("min_price_increment", FixPx{instr_def_msg.min_price_increment}) - .AddField("display_factor", FixPx{instr_def_msg.display_factor}) + .AddField("min_price_increment", + pretty::Px{instr_def_msg.min_price_increment}) + .AddField("display_factor", pretty::Px{instr_def_msg.display_factor}) .AddField("expiration", instr_def_msg.expiration) .AddField("activation", instr_def_msg.activation) - .AddField("high_limit_price", FixPx{instr_def_msg.high_limit_price}) - .AddField("low_limit_price", FixPx{instr_def_msg.low_limit_price}) - .AddField("max_price_variation", FixPx{instr_def_msg.max_price_variation}) + .AddField("high_limit_price", pretty::Px{instr_def_msg.high_limit_price}) + .AddField("low_limit_price", pretty::Px{instr_def_msg.low_limit_price}) + .AddField("max_price_variation", + pretty::Px{instr_def_msg.max_price_variation}) .AddField("trading_reference_price", - FixPx{instr_def_msg.trading_reference_price}) - .AddField("unit_of_measure_qty", FixPx{instr_def_msg.unit_of_measure_qty}) + pretty::Px{instr_def_msg.trading_reference_price}) + .AddField("unit_of_measure_qty", + pretty::Px{instr_def_msg.unit_of_measure_qty}) .AddField("min_price_increment_amount", - FixPx{instr_def_msg.min_price_increment_amount}) - .AddField("price_ratio", FixPx{instr_def_msg.price_ratio}) + pretty::Px{instr_def_msg.min_price_increment_amount}) + .AddField("price_ratio", pretty::Px{instr_def_msg.price_ratio}) .AddField("inst_attrib_value", instr_def_msg.inst_attrib_value) .AddField("underlying_id", instr_def_msg.underlying_id) .AddField("raw_instrument_id", instr_def_msg.raw_instrument_id) @@ -392,7 +395,7 @@ std::ostream& operator<<(std::ostream& stream, .AddField("underlying", instr_def_msg.underlying) .AddField("strike_price_currency", instr_def_msg.strike_price_currency) .AddField("instrument_class", instr_def_msg.instrument_class) - .AddField("strike_price", FixPx{instr_def_msg.strike_price}) + .AddField("strike_price", pretty::Px{instr_def_msg.strike_price}) .AddField("match_algorithm", instr_def_msg.match_algorithm) .AddField("md_security_trading_status", instr_def_msg.md_security_trading_status) @@ -423,7 +426,7 @@ std::ostream& operator<<(std::ostream& stream, const StatMsg& stat_msg) { .AddField("hd", stat_msg.hd) .AddField("ts_recv", stat_msg.ts_recv) .AddField("ts_ref", stat_msg.ts_ref) - .AddField("price", FixPx{stat_msg.price}) + .AddField("price", pretty::Px{stat_msg.price}) .AddField("quantity", stat_msg.quantity) .AddField("sequence", stat_msg.sequence) .AddField("ts_in_delta", stat_msg.ts_in_delta) diff --git a/src/v2.cpp b/src/v2.cpp index 1ebd8da..7000dc9 100644 --- a/src/v2.cpp +++ b/src/v2.cpp @@ -1,6 +1,6 @@ #include "databento/v2.hpp" -#include "databento/fixed_price.hpp" +#include "databento/pretty.hpp" // Px #include "databento/v3.hpp" #include "stream_op_helper.hpp" @@ -168,20 +168,23 @@ std::ostream& operator<<(std::ostream& stream, .Build() .AddField("hd", instr_def_msg.hd) .AddField("ts_recv", instr_def_msg.ts_recv) - .AddField("min_price_increment", FixPx{instr_def_msg.min_price_increment}) - .AddField("display_factor", FixPx{instr_def_msg.display_factor}) + .AddField("min_price_increment", + pretty::Px{instr_def_msg.min_price_increment}) + .AddField("display_factor", pretty::Px{instr_def_msg.display_factor}) .AddField("expiration", instr_def_msg.expiration) .AddField("activation", instr_def_msg.activation) - .AddField("high_limit_price", FixPx{instr_def_msg.high_limit_price}) - .AddField("low_limit_price", FixPx{instr_def_msg.low_limit_price}) - .AddField("max_price_variation", FixPx{instr_def_msg.max_price_variation}) + .AddField("high_limit_price", pretty::Px{instr_def_msg.high_limit_price}) + .AddField("low_limit_price", pretty::Px{instr_def_msg.low_limit_price}) + .AddField("max_price_variation", + pretty::Px{instr_def_msg.max_price_variation}) .AddField("trading_reference_price", - FixPx{instr_def_msg.trading_reference_price}) - .AddField("unit_of_measure_qty", FixPx{instr_def_msg.unit_of_measure_qty}) + pretty::Px{instr_def_msg.trading_reference_price}) + .AddField("unit_of_measure_qty", + pretty::Px{instr_def_msg.unit_of_measure_qty}) .AddField("min_price_increment_amount", - FixPx{instr_def_msg.min_price_increment_amount}) - .AddField("price_ratio", FixPx{instr_def_msg.price_ratio}) - .AddField("strike_price", FixPx{instr_def_msg.strike_price}) + pretty::Px{instr_def_msg.min_price_increment_amount}) + .AddField("price_ratio", pretty::Px{instr_def_msg.price_ratio}) + .AddField("strike_price", pretty::Px{instr_def_msg.strike_price}) .AddField("inst_attrib_value", instr_def_msg.inst_attrib_value) .AddField("underlying_id", instr_def_msg.underlying_id) .AddField("raw_instrument_id", instr_def_msg.raw_instrument_id) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f099627..c555bfa 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -18,7 +18,7 @@ verbose_message("Adding tests under ${CMAKE_PROJECT_NAME}Tests...") set( test_headers include/mock/mock_http_server.hpp - include/mock/mock_io.hpp + include/mock/mock_log_receiver.hpp include/mock/mock_lsg_server.hpp include/mock/mock_tcp_server.hpp include/temp_file.hpp @@ -42,9 +42,9 @@ set( src/log_tests.cpp src/metadata_tests.cpp src/mock_http_server.cpp - src/mock_io.cpp src/mock_lsg_server.cpp src/mock_tcp_server.cpp + src/pretty_tests.cpp src/record_tests.cpp src/scoped_thread_tests.cpp src/stream_op_helper_tests.cpp @@ -120,25 +120,16 @@ else() # Visual Studio by default links C runtimes dynamically but gtest by default links them statically set(gtest_force_shared_crt ON CACHE BOOL "Link dynamic CRT" FORCE) endif() - if(CMAKE_VERSION VERSION_LESS 3.24) - FetchContent_Declare( - googletest - URL https://github.com/google/googletest/archive/refs/tags/release-1.12.1.tar.gz - ) - else() - # DOWNLOAD_EXTRACT_TIMESTAMP added in 3.24 - FetchContent_Declare( - googletest - URL https://github.com/google/googletest/archive/refs/tags/release-1.12.1.tar.gz - DOWNLOAD_EXTRACT_TIMESTAMP TRUE - ) - endif() + FetchContent_Declare( + googletest + URL https://github.com/google/googletest/releases/download/v1.17.0/googletest-1.17.0.tar.gz + DOWNLOAD_EXTRACT_TIMESTAMP TRUE + ) FetchContent_MakeAvailable(googletest) target_link_libraries( ${PROJECT_NAME} PUBLIC - gtest - gtest_main + gmock_main ${${CMAKE_PROJECT_NAME}_TEST_LIB} ) # Ignore compiler warnings in headers diff --git a/tests/include/mock/mock_http_server.hpp b/tests/include/mock/mock_http_server.hpp index 931b38a..724127a 100644 --- a/tests/include/mock/mock_http_server.hpp +++ b/tests/include/mock/mock_http_server.hpp @@ -3,10 +3,12 @@ #include #include +#include #include #include #include "databento/detail/scoped_thread.hpp" +#include "databento/record.hpp" namespace databento::tests::mock { class MockHttpServer { @@ -35,6 +37,13 @@ class MockHttpServer { void MockStreamDbn(const std::string& path, const std::map& params, const std::string& dbn_path); + void MockStreamDbn(const std::string& path, + const std::map& params, + Record record, std::size_t count, std::size_t chunk_size); + void MockStreamDbn(const std::string& path, + const std::map& params, + Record record, std::size_t count, std::size_t extra_bytes, + std::size_t chunk_size); private: static void CheckParams(const std::map& params, diff --git a/tests/include/mock/mock_io.hpp b/tests/include/mock/mock_io.hpp deleted file mode 100644 index 0b2befa..0000000 --- a/tests/include/mock/mock_io.hpp +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include -#include - -#include "databento/ireadable.hpp" -#include "databento/iwritable.hpp" - -namespace databento::tests::mock { -class MockIo : public databento::IWritable, public databento::IReadable { - public: - void WriteAll(const std::byte* buffer, std::size_t length); - - void ReadExact(std::byte* buffer, std::size_t length); - - std::size_t ReadSome(std::byte* buffer, std::size_t max_length); - - const std::vector& GetContents() const { return contents_; } - - private: - std::vector contents_; - std::ptrdiff_t read_idx_{0}; -}; -} // namespace databento::tests::mock diff --git a/tests/include/mock/mock_log_receiver.hpp b/tests/include/mock/mock_log_receiver.hpp new file mode 100644 index 0000000..b7a6fda --- /dev/null +++ b/tests/include/mock/mock_log_receiver.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include + +#include +#include +#include + +#include "databento/log.hpp" + +namespace databento::tests::mock { +class MockLogReceiver : public databento::ILogReceiver { + public: + using LogCallback = + std::function; + + explicit MockLogReceiver(LogCallback callback) + : MockLogReceiver{LogLevel::Info, std::move(callback)} {} + MockLogReceiver(LogLevel min_level, LogCallback callback) + : callback_{std::move(callback)}, min_level_{min_level} {} + + static MockLogReceiver AssertNoLogs(databento::LogLevel min_level) { + return MockLogReceiver{ + min_level, + [min_level](auto, databento::LogLevel level, const std::string& msg) { + FAIL() << "Received unexpected log message with level " << level + << ": " << msg; + }}; + } + + void Receive(databento::LogLevel level, const std::string& msg) override { + if (level >= min_level_) { + callback_(call_count_, level, msg); + ++call_count_; + } + } + + bool ShouldLog(databento::LogLevel level) const override { + return level >= min_level_; + } + + std::size_t CallCount() const { return call_count_; } + + private: + LogCallback callback_; + LogLevel min_level_{}; + std::size_t call_count_{}; +}; +} // namespace databento::tests::mock diff --git a/tests/src/dbn_decoder_tests.cpp b/tests/src/dbn_decoder_tests.cpp index 7e545c4..fbab127 100644 --- a/tests/src/dbn_decoder_tests.cpp +++ b/tests/src/dbn_decoder_tests.cpp @@ -18,6 +18,7 @@ #include "databento/dbn.hpp" #include "databento/dbn_decoder.hpp" #include "databento/dbn_encoder.hpp" +#include "databento/detail/buffer.hpp" #include "databento/detail/scoped_thread.hpp" #include "databento/detail/zstd_stream.hpp" #include "databento/enums.hpp" @@ -30,14 +31,15 @@ #include "databento/v2.hpp" #include "databento/v3.hpp" #include "databento/with_ts_out.hpp" -#include "mock/mock_io.hpp" +#include "mock/mock_log_receiver.hpp" namespace databento::tests { class DbnDecoderTests : public testing::Test { public: std::unique_ptr target_; detail::ScopedThread write_thread_; - std::unique_ptr logger_{std::make_unique()}; + mock::MockLogReceiver logger_ = + mock::MockLogReceiver::AssertNoLogs(LogLevel::Warning); void ReadFromFile(const std::string& schema_str, const std::string& extension, std::uint8_t version) { @@ -61,8 +63,7 @@ class DbnDecoderTests : public testing::Test { }}; // File setup target_ = std::make_unique( - logger_.get(), std::make_unique(file_path), - upgrade_policy); + &logger_, std::make_unique(file_path), upgrade_policy); } static void AssertMappings(const std::vector& mappings) { @@ -975,7 +976,8 @@ TEST_P(DbnDecoderSchemaTests, TestDecodeStatistics) { class DbnIdentityTests : public testing::TestWithParam< std::tuple> { protected: - std::unique_ptr logger_{std::make_unique()}; + mock::MockLogReceiver logger_ = + mock::MockLogReceiver::AssertNoLogs(LogLevel::Warning); }; INSTANTIATE_TEST_SUITE_P( @@ -1052,12 +1054,11 @@ TEST_P(DbnIdentityTests, TestIdentity) { std::string{TEST_DATA_DIR "/test_data."} + ToString(schema) + ".v" + std::to_string(+version) + (compression == Compression::Zstd ? ".dbn.zst" : ".dbn"); - DbnDecoder file_decoder{logger_.get(), - std::make_unique(file_name), + DbnDecoder file_decoder{&logger_, std::make_unique(file_name), VersionUpgradePolicy::AsIs}; const Metadata file_metadata = file_decoder.DecodeMetadata(); - mock::MockIo buf_io; + detail::Buffer buf_io; { std::unique_ptr zstd_io; if (compression == Compression::Zstd) { @@ -1072,12 +1073,12 @@ TEST_P(DbnIdentityTests, TestIdentity) { // Free zstd_io and flush } - file_decoder = {logger_.get(), std::make_unique(file_name), + file_decoder = {&logger_, std::make_unique(file_name), VersionUpgradePolicy::AsIs}; file_decoder.DecodeMetadata(); - auto input = std::make_unique(std::move(buf_io)); - DbnDecoder buf_decoder{logger_.get(), std::move(input), + auto input = std::make_unique(std::move(buf_io)); + DbnDecoder buf_decoder{&logger_, std::move(input), VersionUpgradePolicy::AsIs}; const auto buf_metadata = buf_decoder.DecodeMetadata(); EXPECT_EQ(file_metadata, buf_metadata); diff --git a/tests/src/dbn_encoder_tests.cpp b/tests/src/dbn_encoder_tests.cpp index e9c2097..b99e154 100644 --- a/tests/src/dbn_encoder_tests.cpp +++ b/tests/src/dbn_encoder_tests.cpp @@ -9,13 +9,14 @@ #include "databento/dbn.hpp" #include "databento/dbn_decoder.hpp" #include "databento/dbn_encoder.hpp" +#include "databento/detail/buffer.hpp" #include "databento/exceptions.hpp" #include "databento/log.hpp" -#include "mock/mock_io.hpp" +#include "mock/mock_log_receiver.hpp" namespace databento::tests { TEST(DbnEncoderTests, TestEncodeDecodeMetadataIdentity) { - auto logger = std::make_unique(); + auto logger = mock::MockLogReceiver::AssertNoLogs(LogLevel::Warning); const Metadata metadata{ kDbnVersion, dataset::kGlbxMdp3, @@ -35,16 +36,14 @@ TEST(DbnEncoderTests, TestEncodeDecodeMetadataIdentity) { {"NG.0", {{date::year{2022} / 7 / 26, date::year{2022} / 8 / 29, "NGU2"}, {date::year{2022} / 8 / 29, date::year{2022} / 9 / 1, "NGV2"}}}}}; - mock::MockIo io{}; + detail::Buffer io{}; DbnEncoder::EncodeMetadata(metadata, &io); - DbnDecoder decoder{logger.get(), - std::make_unique(std::move(io))}; + DbnDecoder decoder{&logger, std::make_unique(std::move(io))}; const auto res = decoder.DecodeMetadata(); ASSERT_EQ(res, metadata); } TEST(DbnEncoderTests, TestEncodeNewerMetadataErrors) { - auto logger = std::make_unique(); const Metadata metadata{kDbnVersion + 1, dataset::kGlbxMdp3, Schema::Mbp10, @@ -59,7 +58,7 @@ TEST(DbnEncoderTests, TestEncodeNewerMetadataErrors) { {}, {}, {}}; - mock::MockIo io{}; + detail::Buffer io{}; ASSERT_THROW(DbnEncoder::EncodeMetadata(metadata, &io), databento::InvalidArgumentError); } diff --git a/tests/src/historical_tests.cpp b/tests/src/historical_tests.cpp index cb3be88..858ae3a 100644 --- a/tests/src/historical_tests.cpp +++ b/tests/src/historical_tests.cpp @@ -1,10 +1,13 @@ #include +#include #include #include #include #include #include +#include +#include #include #include #include // logic_error @@ -23,6 +26,7 @@ #include "databento/symbology.hpp" // kAllSymbols #include "databento/timeseries.hpp" #include "mock/mock_http_server.hpp" +#include "mock/mock_log_receiver.hpp" #include "temp_file.hpp" #ifdef _WIN32 @@ -43,7 +47,8 @@ class HistoricalTests : public ::testing::Test { protected: std::filesystem::path tmp_path_{std::filesystem::temp_directory_path()}; mock::MockHttpServer mock_server_{kApiKey}; - std::unique_ptr logger_{std::make_unique()}; + mock::MockLogReceiver logger_ = + mock::MockLogReceiver::AssertNoLogs(LogLevel::Warning); }; TEST_F(HistoricalTests, TestBatchSubmitJob) { @@ -97,7 +102,7 @@ TEST_F(HistoricalTests, TestBatchSubmitJob) { kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.BatchSubmitJob(dataset::kXnasItch, {"CLH3"}, Schema::Trades, @@ -181,7 +186,7 @@ TEST_F(HistoricalTests, TestBatchListJobs) { mock_server_.MockGetJson("/v0/batch.list_jobs", kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.BatchListJobs(); ASSERT_EQ(res.size(), 2); @@ -202,7 +207,7 @@ TEST_F(HistoricalTests, TestBatchListFiles) { mock_server_.MockGetJson("/v0/batch.list_files", {{"job_id", kJobId}}, kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.BatchListFiles(kJobId); ASSERT_EQ(res.size(), 1); @@ -240,7 +245,7 @@ TEST_F(HistoricalTests, TestBatchDownloadAll) { mock_server_.MockGetJson("/v0/job_id/test_metadata.json", {{"key", "value"}}); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; ASSERT_FALSE(temp_metadata_file.Exists()); ASSERT_FALSE(temp_dbn_file.Exists()); @@ -273,7 +278,7 @@ TEST_F(HistoricalTests, TestBatchDownloadSingle) { mock_server_.MockGetJson("/v0/job_id/test_metadata.json", {{"key", "value"}}); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; ASSERT_FALSE(temp_metadata_file.Exists()); const std::filesystem::path path = @@ -289,7 +294,7 @@ TEST_F(HistoricalTests, TestBatchDownloadSingleInvalidFile) { kListFilesResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; ASSERT_THROW(target.BatchDownload(tmp_path_, kJobId, "test_metadata.js"), InvalidArgumentError); @@ -309,7 +314,7 @@ TEST_F(HistoricalTests, TestMetadataListPublishers) { mock_server_.MockGetJson("/v0/metadata.list_publishers", kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataListPublishers(); EXPECT_EQ(res.size(), kResp.size()); @@ -329,7 +334,7 @@ TEST_F(HistoricalTests, TestMetadataListDatasets_Simple) { mock_server_.MockGetJson("/v0/metadata.list_datasets", kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataListDatasets(); EXPECT_EQ(res.size(), kResp.size()); @@ -343,7 +348,7 @@ TEST_F(HistoricalTests, TestMetadataListDatasets_Full) { {{"start_date", "2021-01-05"}}, kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataListDatasets(DateRange{"2021-01-05"}); EXPECT_EQ(res.size(), kResp.size()); @@ -358,7 +363,7 @@ TEST_F(HistoricalTests, TestMetadataListSchemas_Simple) { {{"dataset", dataset::kGlbxMdp3}}, kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataListSchemas(dataset::kGlbxMdp3); const std::vector kExp{ @@ -379,7 +384,7 @@ TEST_F(HistoricalTests, TestMetadataListSchemas_Full) { {{"dataset", dataset::kGlbxMdp3}}, kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataListSchemas(dataset::kGlbxMdp3); const std::vector kExp{Schema::Mbo, Schema::Mbp1, Schema::Ohlcv1M, @@ -399,7 +404,7 @@ TEST_F(HistoricalTests, TestMetadataListFields) { {{"encoding", "dbn"}, {"schema", "trades"}}, kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataListFields(Encoding::Dbn, Schema::Trades); const std::vector kExp{ @@ -427,7 +432,7 @@ TEST_F(HistoricalTests, TestMetadataGetDatasetCondition) { kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataGetDatasetCondition( dataset::kXnasItch, {"2022-11-06", "2022-11-10"}); @@ -448,7 +453,7 @@ TEST_F(HistoricalTests, TestMetadataListUnitPrices) { {{"dataset", dataset::kGlbxMdp3}}, kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataListUnitPrices(dataset::kGlbxMdp3); const UnitPricesForMode kExp{ @@ -465,7 +470,7 @@ TEST_F(HistoricalTests, TestMetadataGetDatasetRange) { {{"dataset", dataset::kXnasItch}}, kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataGetDatasetRange(dataset::kXnasItch); EXPECT_EQ(res.start, "2017-05-21T00:00:00.000000000Z"); @@ -483,7 +488,7 @@ TEST_F(HistoricalTests, TestMetadataGetRecordCount) { kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataGetRecordCount( dataset::kGlbxMdp3, {"2020-06-06T00:00", "2021-03-02T00:00"}, @@ -502,7 +507,7 @@ TEST_F(HistoricalTests, TestMetadataGetBillableSize_Simple) { kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataGetBillableSize( dataset::kGlbxMdp3, {"2020-06-06T00:00", "2021-03-02T00:00"}, kAllSymbols, @@ -522,7 +527,7 @@ TEST_F(HistoricalTests, TestMetadataGetBillableSize_Full) { kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataGetBillableSize( dataset::kGlbxMdp3, {"2020-06-06T00:00", "2021-03-02T00:00"}, @@ -541,7 +546,7 @@ TEST_F(HistoricalTests, TestMetadataGetCost_Simple) { kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataGetCost( dataset::kGlbxMdp3, {"2020-06-06T00:00", "2021-03-02T00:00"}, @@ -562,7 +567,7 @@ TEST_F(HistoricalTests, TestMetadataGetCost_Full) { kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.MetadataGetCost( dataset::kGlbxMdp3, {"2020-06-06T00:00", "2021-03-02T00:00"}, @@ -603,7 +608,7 @@ TEST_F(HistoricalTests, TestSymbologyResolve) { kResp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const auto res = target.SymbologyResolve( dataset::kGlbxMdp3, {"ESM2"}, SType::RawSymbol, SType::InstrumentId, @@ -633,7 +638,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_Basic) { TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; std::unique_ptr metadata_ptr; std::vector mbo_records; @@ -668,7 +673,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_NoMetadataCallback) { TEST_DATA_DIR "/test_data.tbbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; std::vector mbo_records; target.TimeseriesGetRange(dataset::kGlbxMdp3, @@ -687,7 +692,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_BadRequest) { mock_server_.MockBadRequest("/v0/timeseries.get_range", resp); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; try { target.TimeseriesGetRange( @@ -713,7 +718,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_CallbackException) { TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; ASSERT_THROW( target.TimeseriesGetRange( @@ -726,12 +731,12 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_CallbackException) { std::logic_error); } -TEST_F(HistoricalTests, TestTimeseriesGetRangeCancellation) { +TEST_F(HistoricalTests, TestTimeseriesGetRange_Cancellation) { mock_server_.MockStreamDbn("/v0/timeseries.get_range", {}, TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; std::uint32_t call_count = 0; target.TimeseriesGetRange( @@ -749,6 +754,66 @@ TEST_F(HistoricalTests, TestTimeseriesGetRangeCancellation) { ASSERT_EQ(call_count, 1); } +TEST_F(HistoricalTests, TestTimeseriesGetRange_LargeChunks) { + Mbp1Msg mbp1{ + RecordHeader{sizeof(Mbp1Msg) / kRecordHeaderLengthMultiplier, + RType::Mbp1, + static_cast(Publisher::IfusImpactIfus), + 10005, + {}}}; + constexpr auto kRecordCount = 50'000; + mock_server_.MockStreamDbn("/v0/timeseries.get_range", + {{"dataset", ToString(Dataset::IfusImpact)}}, + Record{&mbp1.hd}, kRecordCount, 75'000); + const auto port = mock_server_.ListenOnThread(); + + databento::Historical target{&logger_, kApiKey, "localhost", + static_cast(port)}; + std::size_t counter = 0; + target.TimeseriesGetRange(ToString(Dataset::IfusImpact), + {"2024-05", "2025-05"}, kAllSymbols, Schema::Mbp1, + [&counter, &mbp1](const Record& record) { + ++counter; + EXPECT_TRUE(record.Holds()); + EXPECT_EQ(record.Get(), mbp1); + return KeepGoing::Continue; + }); + EXPECT_EQ(counter, kRecordCount); +} + +TEST_F(HistoricalTests, TestTimeseriesGetRange_UnreadBytes) { + Mbp1Msg mbp1{ + RecordHeader{sizeof(Mbp1Msg) / kRecordHeaderLengthMultiplier, + RType::Mbp1, + static_cast(Publisher::IfusImpactIfus), + 10005, + {}}}; + constexpr auto kRecordCount = 1'000; + mock_server_.MockStreamDbn("/v0/timeseries.get_range", + {{"dataset", ToString(Dataset::IfusImpact)}}, + Record{&mbp1.hd}, kRecordCount, 20, 75'000); + const auto port = mock_server_.ListenOnThread(); + + logger_ = mock::MockLogReceiver{[](auto count, LogLevel level, + const std::string& msg) { + EXPECT_THAT(msg, testing::EndsWith( + "Partial or incomplete record remaining of 20 bytes")); + }}; + databento::Historical target{&logger_, kApiKey, "localhost", + static_cast(port)}; + std::size_t counter = 0; + target.TimeseriesGetRange(ToString(Dataset::IfusImpact), + {"2024-05", "2025-05"}, kAllSymbols, Schema::Mbp1, + [&counter, &mbp1](const Record& record) { + ++counter; + EXPECT_TRUE(record.Holds()); + EXPECT_EQ(record.Get(), mbp1); + return KeepGoing::Continue; + }); + EXPECT_EQ(counter, kRecordCount); + ASSERT_EQ(logger_.CallCount(), 1); +} + TEST_F(HistoricalTests, TestTimeseriesGetRangeToFile) { mock_server_.MockStreamDbn("/v0/timeseries.get_range", {{"dataset", dataset::kGlbxMdp3}, @@ -762,7 +827,7 @@ TEST_F(HistoricalTests, TestTimeseriesGetRangeToFile) { TEST_DATA_DIR "/test_data.tbbo.v3.dbn.zst"); const auto port = mock_server_.ListenOnThread(); - databento::Historical target{logger_.get(), kApiKey, "localhost", + databento::Historical target{&logger_, kApiKey, "localhost", static_cast(port)}; const TempFile temp_file{testing::TempDir() + "/TestTimeseriesGetRangeToFile"}; diff --git a/tests/src/http_client_tests.cpp b/tests/src/http_client_tests.cpp index b0be953..bce9194 100644 --- a/tests/src/http_client_tests.cpp +++ b/tests/src/http_client_tests.cpp @@ -1,12 +1,11 @@ +#include #include #include -#include -#include - #include "databento/detail/http_client.hpp" #include "databento/log.hpp" #include "mock/mock_http_server.hpp" +#include "mock/mock_log_receiver.hpp" namespace databento::detail::tests { class HttpClientTests : public ::testing::Test { @@ -22,16 +21,21 @@ TEST_F(HttpClientTests, TestLogWarnings) { "Warning: Large request"}; mock_server_.MockGetJson("/warn", {}, {}, warnings); const auto port = mock_server_.ListenOnThread(); - HttpClient target{ILogReceiver::Default(), kApiKey, "localhost", + databento::tests::mock::MockLogReceiver mock_logger{ + [](auto call_count, databento::LogLevel level, const std::string& msg) { + EXPECT_EQ(level, LogLevel::Warning); + if (call_count == 0) { + EXPECT_THAT( + msg, + testing::EndsWith( + "Server DeprecationWarning: stype product_id is deprecated")); + } else { + EXPECT_THAT(msg, testing::EndsWith("Server Warning: Large request")); + } + }}; + HttpClient target{&mock_logger, kApiKey, "localhost", static_cast(port)}; - testing::internal::CaptureStderr(); target.GetJson("/warn", {}); - std::clog.flush(); - const std::string output = testing::internal::GetCapturedStderr(); - EXPECT_EQ(std::count(output.begin(), output.end(), '\n'), 2); - EXPECT_NE( - output.find("Server DeprecationWarning: stype product_id is deprecated"), - std::string::npos); - EXPECT_NE(output.find("Server Warning: Large request"), std::string::npos); + ASSERT_EQ(mock_logger.CallCount(), 2); } } // namespace databento::detail::tests diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index 8559bd6..7caf7ad 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -22,6 +22,7 @@ #include "databento/record.hpp" #include "databento/symbology.hpp" #include "databento/with_ts_out.hpp" +#include "mock/mock_log_receiver.hpp" #include "mock/mock_lsg_server.hpp" // MockLsgServer namespace databento::tests { @@ -36,9 +37,9 @@ class LiveBlockingTests : public testing::Test { static constexpr auto kKey = "32-character-with-lots-of-filler"; static constexpr auto kLocalhost = "127.0.0.1"; - std::unique_ptr logger_{std::make_unique()}; - LiveBuilder builder_{ - LiveBuilder{}.SetLogReceiver(logger_.get()).SetKey(kKey)}; + mock::MockLogReceiver logger_ = + mock::MockLogReceiver::AssertNoLogs(LogLevel::Warning); + LiveBuilder builder_{LiveBuilder{}.SetLogReceiver(&logger_).SetKey(kKey)}; }; TEST_F(LiveBlockingTests, TestAuthentication) { diff --git a/tests/src/live_threaded_tests.cpp b/tests/src/live_threaded_tests.cpp index 30cef92..049e506 100644 --- a/tests/src/live_threaded_tests.cpp +++ b/tests/src/live_threaded_tests.cpp @@ -1,10 +1,10 @@ +#include #include #include #include #include #include -#include #include #include // this_thread #include @@ -21,7 +21,7 @@ #include "databento/record.hpp" #include "databento/symbology.hpp" #include "databento/timeseries.hpp" -#include "gtest/gtest.h" +#include "mock/mock_log_receiver.hpp" #include "mock/mock_lsg_server.hpp" namespace databento::tests { @@ -37,9 +37,9 @@ class LiveThreadedTests : public testing::Test { static constexpr auto kTsOut = false; static constexpr auto kLocalhost = "127.0.0.1"; - std::unique_ptr logger_{std::make_unique()}; - LiveBuilder builder_{ - LiveBuilder{}.SetLogReceiver(logger_.get()).SetKey(kKey)}; + mock::MockLogReceiver logger_ = + mock::MockLogReceiver::AssertNoLogs(LogLevel::Warning); + LiveBuilder builder_{LiveBuilder{}.SetLogReceiver(&logger_).SetKey(kKey)}; }; TEST_F(LiveThreadedTests, TestBasic) { @@ -214,6 +214,14 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackReconnectAndResubscribe) { self.Start(); self.SendRecord(kRec); }}; + logger_ = mock::MockLogReceiver{ + LogLevel::Warning, + [](auto count, databento::LogLevel level, const std::string& msg) { + EXPECT_THAT( + msg, + testing::EndsWith( + "Gateway closed the session. Attempting to restart session.")); + }}; LiveThreaded target = builder_.SetDataset(dataset::kXnasItch) .SetSendTsOut(kTsOut) .SetAddress(kLocalhost, mock_server.Port()) @@ -264,6 +272,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackReconnectAndResubscribe) { EXPECT_EQ(metadata_calls, 2); EXPECT_EQ(exception_calls, 1); EXPECT_EQ(record_calls, 2); + EXPECT_EQ(logger_.CallCount(), 1); } TEST_F(LiveThreadedTests, TestDeadlockPrevention) { @@ -274,7 +283,14 @@ TEST_F(LiveThreadedTests, TestDeadlockPrevention) { bool should_close{}; std::mutex should_close_mutex; std::condition_variable should_close_cv; - testing::internal::CaptureStderr(); + logger_ = mock::MockLogReceiver{ + LogLevel::Warning, + [](auto count, databento::LogLevel level, const std::string& msg) { + if (count == 0) { + EXPECT_THAT(msg, testing::HasSubstr("which would cause a deadlock")) + << "Got unexpected log message " << level << ": " << msg; + } + }}; const mock::MockLsgServer mock_server{ dataset::kXnasItch, kTsOut, [&kSymbols, &should_close, &should_close_mutex, &should_close_cv, kSchema, @@ -292,8 +308,7 @@ TEST_F(LiveThreadedTests, TestDeadlockPrevention) { self.Authenticate(); self.Subscribe(kSymbols, kSchema, kSType, true); }}; - LiveThreaded target = builder_.SetLogReceiver(ILogReceiver::Default()) - .SetDataset(dataset::kXnasItch) + LiveThreaded target = builder_.SetDataset(dataset::kXnasItch) .SetSendTsOut(kTsOut) .SetAddress(kLocalhost, mock_server.Port()) .BuildThreaded(); @@ -326,10 +341,7 @@ TEST_F(LiveThreadedTests, TestDeadlockPrevention) { }; target.Start(metadata_cb, record_cb, exception_cb); target.BlockForStop(); - std::clog.flush(); - const std::string output = testing::internal::GetCapturedStderr(); - EXPECT_NE(output.find("which would cause a deadlock"), std::string::npos) - << "Got unexpected output: " << output; + EXPECT_GE(logger_.CallCount(), 1); } TEST_F(LiveThreadedTests, TestBlockForStopTimeout) { diff --git a/tests/src/log_tests.cpp b/tests/src/log_tests.cpp index ef3f1c8..13d45b0 100644 --- a/tests/src/log_tests.cpp +++ b/tests/src/log_tests.cpp @@ -17,7 +17,7 @@ TEST_F(ConsoleLogReceiverTests, TestOutput) { target_.Receive(LogLevel::Warning, msg); const std::string output = stream_.str(); // ConsoleLogReceiver adds newline - ASSERT_EQ(msg + '\n', output); + ASSERT_EQ("WARN: " + msg + '\n', output); } TEST_F(ConsoleLogReceiverTests, TestFilter) { diff --git a/tests/src/mock_http_server.cpp b/tests/src/mock_http_server.cpp index 55c5b41..d5dc69b 100644 --- a/tests/src/mock_http_server.cpp +++ b/tests/src/mock_http_server.cpp @@ -3,11 +3,18 @@ #include // EXPECT_* #include +#include #include // ifstream #include // streamsize #include // cerr #include +#include "databento/constants.hpp" +#include "databento/dbn.hpp" +#include "databento/dbn_encoder.hpp" +#include "databento/detail/zstd_stream.hpp" +#include "databento/record.hpp" + using databento::tests::mock::MockHttpServer; int MockHttpServer::ListenOnThread() { @@ -84,8 +91,8 @@ void MockHttpServer::MockStreamDbn( input_file.read(buffer.data(), static_cast(size)); // Serve - server_.Get(path, [buffer, kChunkSize, params](const httplib::Request& req, - httplib::Response& resp) { + server_.Get(path, [buffer = std::move(buffer), kChunkSize, params]( + const httplib::Request& req, httplib::Response& resp) { if (!req.has_header("Authorization")) { resp.status = 401; return; @@ -108,6 +115,64 @@ void MockHttpServer::MockStreamDbn( }); } +void MockHttpServer::MockStreamDbn( + const std::string& path, const std::map& params, + Record record, std::size_t count, std::size_t chunk_size) { + MockStreamDbn(path, params, record, count, 0, chunk_size); +} + +void MockHttpServer::MockStreamDbn( + const std::string& path, const std::map& params, + Record record, std::size_t count, std::size_t extra_bytes, + std::size_t chunk_size) { + // Needs to be copy-constructable for use in server + auto buffer = std::make_shared(); + { + detail::ZstdCompressStream zstd_stream{buffer.get()}; + DbnEncoder encoder{Metadata{ + kDbnVersion, + ToString(Dataset::IfusImpact), + {Schema::Mbp1}, + }, + &zstd_stream}; + for (std::size_t i = 0; i < count; ++i) { + encoder.EncodeRecord(record); + } + if (extra_bytes > sizeof(RecordHeader)) { + std::vector empty(extra_bytes - sizeof(RecordHeader)); + // write the header so it looks like the start of a valid record + zstd_stream.WriteAll(reinterpret_cast(&record.Header()), + sizeof(RecordHeader)); + zstd_stream.WriteAll(empty.data(), empty.size()); + } + } + server_.Get(path, [params, buffer, count, record, chunk_size]( + const httplib::Request& req, httplib::Response& resp) { + if (!req.has_header("Authorization")) { + resp.status = 401; + return; + } + CheckParams(params, req); + resp.status = 200; + resp.set_header("Content-Disposition", "attachment; filename=test.dbn.zst"); + resp.set_content_provider( + "application/octet-stream", + [&buffer, chunk_size](const std::size_t offset, + httplib::DataSink& sink) { + if (buffer->ReadCapacity() - offset) { + const auto write_size = + std::min(chunk_size, buffer->ReadCapacity() - offset); + sink.write( + reinterpret_cast(&buffer->ReadBegin()[offset]), + write_size); + } else { + sink.done(); + } + return true; + }); + }); +} + void MockHttpServer::CheckParams( const std::map& params, const httplib::Request& req) { diff --git a/tests/src/mock_io.cpp b/tests/src/mock_io.cpp deleted file mode 100644 index 2199a6e..0000000 --- a/tests/src/mock_io.cpp +++ /dev/null @@ -1,33 +0,0 @@ -#include "mock/mock_io.hpp" - -#include -#include -#include - -using databento::tests::mock::MockIo; - -void MockIo::WriteAll(const std::byte* buffer, std::size_t length) { - contents_.insert(contents_.end(), buffer, buffer + length); -} - -void MockIo::ReadExact(std::byte* buffer, std::size_t length) { - const auto remaining_bytes = contents_.size() - read_idx_; - if (remaining_bytes < length) { - throw std::runtime_error{"Not enough bytes remaining: expected " + - std::to_string(length) + " got " + - std::to_string(remaining_bytes)}; - } - auto s_length = static_cast(length); - std::copy(contents_.cbegin() + read_idx_, - contents_.cbegin() + read_idx_ + s_length, buffer); - read_idx_ += s_length; -} - -std::size_t MockIo::ReadSome(std::byte* buffer, std::size_t max_length) { - auto read_size = static_cast( - std::min(max_length, contents_.size() - read_idx_)); - std::copy(contents_.cbegin() + read_idx_, - contents_.cbegin() + read_idx_ + read_size, buffer); - read_idx_ += read_size; - return read_size; -} diff --git a/tests/src/pretty_tests.cpp b/tests/src/pretty_tests.cpp new file mode 100644 index 0000000..099bd6d --- /dev/null +++ b/tests/src/pretty_tests.cpp @@ -0,0 +1,114 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "databento/constants.hpp" +#include "databento/datetime.hpp" +#include "databento/pretty.hpp" + +namespace databento::pretty::tests { + +TEST(PrettyTests, TestPrettyPx) { + std::ostringstream ss; + std::vector> cases{ + {-100'000, "-0.000100000"}, + {32'500'000'000, "32.500000000"}, + {101'005'000'000, "101.005000000"}, + {0, "0.000000000"}, + {kUndefPrice, "UNDEF_PRICE"}}; + for (const auto& [num, exp] : cases) { + ss << Px{num}; + ASSERT_EQ(ss.str(), exp); + ss.str(""); + } +} +TEST(PrettyTests, TestPrecision) { + std::ostringstream ss; + std::vector> cases{ + {32'500'000'000, 3, "32.500"}, + {101'005'000'000, 5, "101.00500"}, + {75'000'000, 5, "0.07500"}, + {32'123'456'789, 2, "32.12"}}; + for (const auto& [num, precision, exp] : cases) { + ss << std::setprecision(precision) << Px{num}; + ASSERT_EQ(ss.str(), exp); + ss.str(""); + } +} + +TEST(PrettyTests, TestDefaultFill) { + std::ostringstream ss; + std::vector> cases{ + {32'500'000'000, 4, 3, "32.500", "32.500"}, + {32'500'000'000, 8, 3, " 32.500", "32.500 "}, + {101'005'000'000, 10, 5, " 101.00500", "101.00500 "}, + {75'000'000, 13, 5, " 0.07500", "0.07500 "}, + {32'123'456'789, 7, 2, " 32.12", "32.12 "}, + {32'123'456'789, 16, 5, " 32.12345", "32.12345 "}}; + for (const auto& [num, width, precision, exp_right, exp_left] : cases) { + // Default + ss << std::setw(width) << std::setprecision(precision) << Px{num}; + ASSERT_EQ(ss.str(), exp_right); + ss.str(""); + // Left + ss << std::setw(width) << std::left << std::setprecision(precision) + << Px{num}; + ASSERT_EQ(ss.str(), exp_left); + ss.str(""); + // Right + ss << std::setw(width) << std::right << std::setprecision(precision) + << Px{num}; + ASSERT_EQ(ss.str(), exp_right); + ss.str(""); + } +} + +TEST(PrettyTests, TestZeroFill) { + std::ostringstream ss; + std::vector> cases{ + {32'500'000'000, 4, 3, "32.500", "32.500"}, + {32'500'000'000, 8, 3, "0032.500", "32.50000"}, + {101'005'000'000, 10, 5, "0101.00500", "101.005000"}, + {75'000'000, 13, 5, "0000000.07500", "0.07500000000"}, + {32'123'456'789, 7, 2, "0032.12", "32.1200"}, + {32'123'456'789, 16, 4, "00000000032.1234", "32.1234000000000"}, + }; + for (const auto& [num, width, precision, exp_right, exp_left] : cases) { + // Default + ss << std::setw(width) << std::setfill('0') << std::setprecision(precision) + << Px{num}; + ASSERT_EQ(ss.str(), exp_right); + ss.str(""); + // Left + ss << std::setw(width) << std::left << std::setfill('0') + << std::setprecision(precision) << Px{num}; + ASSERT_EQ(ss.str(), exp_left); + ss.str(""); + // Right + ss << std::setw(width) << std::right << std::setfill('0') + << std::setprecision(precision) << Px{num}; + ASSERT_EQ(ss.str(), exp_right); + ss.str(""); + } +} + +TEST(PrettyTests, TestPrettyTs) { + std::ostringstream ss; + std::vector> cases{ + {0, "1970-01-01T00:00:00.000000000Z"}, + {1, "1970-01-01T00:00:00.000000001Z"}, + {1622838300000000000, "2021-06-04T20:25:00.000000000Z"}, + {kUndefTimestamp - 1, "2554-07-21T23:34:33.709551614Z"}, + {kUndefTimestamp, "UNDEF_TIMESTAMP"}}; + for (const auto& [num, exp] : cases) { + ss << Ts{UnixNanos{std::chrono::nanoseconds{num}}}; + ASSERT_EQ(ss.str(), exp); + ss.str(""); + } +} +} // namespace databento::pretty::tests diff --git a/tests/src/record_tests.cpp b/tests/src/record_tests.cpp index 4576e8f..0a2ea47 100644 --- a/tests/src/record_tests.cpp +++ b/tests/src/record_tests.cpp @@ -188,7 +188,7 @@ TEST(RecordTests, TestInstrumentDefMsgToString) { unit_of_measure_qty = 0.000000008, min_price_increment_amount = 0.000000009, price_ratio = 0.000000010, - strike_price = kUndefPrice, + strike_price = UNDEF_PRICE, raw_instrument_id = 11, leg_price = 0.000000012, leg_delta = 0.000000013, diff --git a/tests/src/zstd_stream_tests.cpp b/tests/src/zstd_stream_tests.cpp index ea7378b..bff6f76 100644 --- a/tests/src/zstd_stream_tests.cpp +++ b/tests/src/zstd_stream_tests.cpp @@ -6,10 +6,10 @@ #include #include "databento/compat.hpp" +#include "databento/detail/buffer.hpp" #include "databento/detail/zstd_stream.hpp" #include "databento/enums.hpp" #include "databento/file_stream.hpp" -#include "mock/mock_io.hpp" namespace databento::detail::tests { TEST(ZstdStreamTests, TestMultiFrameFiles) { @@ -32,7 +32,7 @@ TEST(ZstdStreamTests, TestIdentity) { source_data.emplace_back(i); } auto size = source_data.size() * sizeof(std::int64_t); - databento::tests::mock::MockIo mock_io; + detail::Buffer mock_io; { ZstdCompressStream compressor{&mock_io}; for (auto it = source_data.begin(); it != source_data.end(); it += 100) { @@ -41,8 +41,7 @@ TEST(ZstdStreamTests, TestIdentity) { } } std::vector res(size); - ZstdDecodeStream decode{ - std::make_unique(std::move(mock_io)) + ZstdDecodeStream decode{std::make_unique(std::move(mock_io)) }; decode.ReadExact(res.data(), size);