diff --git a/CHANGELOG.md b/CHANGELOG.md index be0ec50..44db12f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 0.33.0 - 2025-04-15 + +### Enhancements +- Added `id` field to `LiveSubscription` requests, which will be used for improved error + messages +- Removed Windows-only `dirent` dependency + +### Breaking changes +- Changed `DbnDecoder`, `FileStream`, `IReadable`, `IWritable` to work on `byte`s + ## 0.32.1 - 2025-04-07 ### Bug fixes diff --git a/CMakeLists.txt b/CMakeLists.txt index 0723cbe..e91eccb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ cmake_minimum_required(VERSION 3.14) # Project details # -project("databento" VERSION 0.32.1 LANGUAGES CXX) +project("databento" VERSION 0.33.0 LANGUAGES CXX) string(TOUPPER ${PROJECT_NAME} PROJECT_NAME_UPPERCASE) # @@ -225,18 +225,6 @@ else() add_system_include_property(date) endif() -# -# Platform-specific dependencies -# -if(WIN32) - find_path( - DIRENT_INCLUDE_DIR "dirent.h" - PATHS "${_VCPKG_INSTALLED_DIR}/${VCPKG_TARGET_TRIPLET}/include" - REQUIRED - ) - target_include_directories(${PROJECT_NAME} PRIVATE ${DIRENT_INCLUDE_DIR}) -endif() - target_link_libraries( ${PROJECT_NAME} PUBLIC diff --git a/README.md b/README.md index 6c7cac1..deb1cf5 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,6 @@ You'll need to ensure the following dependencies are installed: - [nlohmann\_json (header-only)](https://github.com/nlohmann/json) - [cpp-httplib (header-only)](https://github.com/yhirose/cpp-httplib) - [date (header-only)](https://github.com/HowardHinnant/date) -- [dirent (Windows-only, header-only)](https://github.com/tronkko/dirent) By default, date, cpp-httplib and nlohmann\_json are downloaded by CMake as part of the build process. If you would like to use a local version of these libraries, enable the CMake flag diff --git a/include/databento/constants.hpp b/include/databento/constants.hpp index ff99a6e..52eaae5 100644 --- a/include/databento/constants.hpp +++ b/include/databento/constants.hpp @@ -27,8 +27,8 @@ static constexpr auto kSymbolCstrLen = 71; // The multiplier for converting the `length` field in `RecordHeader` to bytes. static constexpr std::size_t kRecordHeaderLengthMultiplier = 4; -// This is not necessarily a comprehensive list of available datasets. Please -// use `Historical.MetadataListDatasets` to retrieve an up-to-date list. +// This is not a comprehensive list of datasets, for that see the `Dataset` +// enum. namespace dataset { // The dataset code for Databento Equities Basic. static constexpr auto kDbeqBasic = "DBEQ.BASIC"; diff --git a/include/databento/dbn_decoder.hpp b/include/databento/dbn_decoder.hpp index 422a28a..c714be2 100644 --- a/include/databento/dbn_decoder.hpp +++ b/include/databento/dbn_decoder.hpp @@ -25,15 +25,15 @@ class DbnDecoder { VersionUpgradePolicy upgrade_policy); static std::pair DecodeMetadataVersionAndSize( - const std::uint8_t* buffer, std::size_t size); + const std::byte* buffer, std::size_t size); static Metadata DecodeMetadataFields(std::uint8_t version, - const std::vector& buffer); + const std::vector& buffer); // Decodes a record possibly applying upgrading the data according to the // given version and upgrade policy. If an upgrade is applied, // compat_buffer is modified. static Record DecodeRecordCompat( std::uint8_t version, VersionUpgradePolicy upgrade_policy, bool ts_out, - std::array* compat_buffer, Record rec); + std::array* compat_buffer, Record rec); // Should be called exactly once. Metadata DecodeMetadata(); @@ -44,19 +44,19 @@ class DbnDecoder { private: static std::string DecodeSymbol( std::size_t symbol_cstr_len, - std::vector::const_iterator& buffer_it); + std::vector::const_iterator& buffer_it); static std::vector DecodeRepeatedSymbol( std::size_t symbol_cstr_len, - std::vector::const_iterator& buffer_it, - std::vector::const_iterator buffer_end_it); + std::vector::const_iterator& buffer_it, + std::vector::const_iterator buffer_end_it); static std::vector DecodeSymbolMappings( std::size_t symbol_cstr_len, - std::vector::const_iterator& buffer_it, - std::vector::const_iterator buffer_end_it); + std::vector::const_iterator& buffer_it, + std::vector::const_iterator buffer_end_it); static SymbolMapping DecodeSymbolMapping( std::size_t symbol_cstr_len, - std::vector::const_iterator& buffer_it, - std::vector::const_iterator buffer_end_it); + std::vector::const_iterator& buffer_it, + std::vector::const_iterator buffer_end_it); bool DetectCompression(); std::size_t FillBuffer(); std::size_t GetReadBufferSize() const; @@ -67,11 +67,10 @@ class DbnDecoder { VersionUpgradePolicy upgrade_policy_; bool ts_out_{}; std::unique_ptr input_; - std::vector read_buffer_; + std::vector read_buffer_; std::size_t buffer_idx_{}; // Must be 8-byte aligned for records - alignas( - RecordHeader) std::array compat_buffer_{}; + alignas(RecordHeader) std::array compat_buffer_{}; Record current_record_{nullptr}; }; } // namespace databento diff --git a/include/databento/detail/json_helpers.hpp b/include/databento/detail/json_helpers.hpp index 243f204..e54dad2 100644 --- a/include/databento/detail/json_helpers.hpp +++ b/include/databento/detail/json_helpers.hpp @@ -5,6 +5,7 @@ #include // multimap #include +#include #include #include "databento/datetime.hpp" // UnixNanos @@ -22,7 +23,7 @@ void SetIfNotEmpty(httplib::Params* params, const std::string& key, const std::vector& states); template -void SetIfPositive(httplib::Params* params, const std::string& key, +void SetIfPositive(httplib::Params* params, std::string_view key, const T value) { if (value > 0) { params->emplace(key, std::to_string(value)); @@ -31,30 +32,31 @@ void SetIfPositive(httplib::Params* params, const std::string& key, template <> inline void SetIfPositive( - httplib::Params* params, const std::string& key, + httplib::Params* params, std::string_view key, const databento::UnixNanos value) { if (value.time_since_epoch().count()) { params->emplace(key, databento::ToString(value)); } } -const nlohmann::json& CheckedAt(const std::string& endpoint, +const nlohmann::json& CheckedAt(std::string_view endpoint, const nlohmann::json& json, - const std::string& key); + std::string_view key); template -T FromCheckedAtString(const std::string& endpoint, const nlohmann::json& json, - const std::string& key) { +T FromCheckedAtString(std::string_view endpoint, const nlohmann::json& json, + std::string_view key) { const auto& val_json = CheckedAt(endpoint, json, key); if (!val_json.is_string()) { - throw JsonResponseError::TypeMismatch(endpoint, key + " string", val_json); + throw JsonResponseError::TypeMismatch( + endpoint, std::string{key} + " string", val_json); } return databento::FromString(val_json); } template -T FromCheckedAtStringOrNull(const std::string& endpoint, - const nlohmann::json& json, const std::string& key, +T FromCheckedAtStringOrNull(std::string_view endpoint, + const nlohmann::json& json, std::string_view key, T null_value) { const auto& val_json = CheckedAt(endpoint, json, key); if (val_json.is_null()) { @@ -63,35 +65,34 @@ T FromCheckedAtStringOrNull(const std::string& endpoint, if (val_json.is_string()) { return databento::FromString(val_json); } - throw JsonResponseError::TypeMismatch(endpoint, key + " null or string", - val_json); + throw JsonResponseError::TypeMismatch( + endpoint, std::string{key} + " null or string", val_json); } template -T ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key); +T ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key); template <> -bool ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key); +bool ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key); template <> -std::string ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key); +std::string ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key); template <> -std::uint64_t ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key); +std::uint64_t ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key); template <> -std::uint16_t ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key); +std::uint16_t ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key); template <> -double ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key); +double ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key); template <> -std::vector ParseAt(const std::string& endpoint, +std::vector ParseAt(std::string_view endpoint, const nlohmann::json& json, - const std::string& key); + std::string_view key); template <> -date::year_month_day ParseAt(const std::string& endpoint, - const nlohmann::json& json, - const std::string& key); +date::year_month_day ParseAt(std::string_view endpoint, + const nlohmann::json& json, std::string_view key); } // namespace databento::detail diff --git a/include/databento/detail/shared_channel.hpp b/include/databento/detail/shared_channel.hpp index 77080d6..ae02819 100644 --- a/include/databento/detail/shared_channel.hpp +++ b/include/databento/detail/shared_channel.hpp @@ -1,7 +1,6 @@ #pragma once -#include // size_t -#include // uint8_t +#include // byte, size_t #include // shared_ptr #include "databento/ireadable.hpp" @@ -13,14 +12,14 @@ class SharedChannel : public IReadable { SharedChannel(); // Write `data` of `length` bytes to the channel. - void Write(const std::uint8_t* data, std::size_t length); + void Write(const std::byte* data, std::size_t length); // Signal the end of input. void Finish(); // Read exactly `length` bytes. - void ReadExact(std::uint8_t* buffer, std::size_t length) override; + void ReadExact(std::byte* buffer, std::size_t length) override; // Read at most `length` bytes. Returns the number of bytes read. Will only // return 0 if the end of the stream is reached. - std::size_t ReadSome(std::uint8_t* buffer, std::size_t length) override; + std::size_t ReadSome(std::byte* buffer, std::size_t length) override; private: class Channel; diff --git a/include/databento/detail/tcp_client.hpp b/include/databento/detail/tcp_client.hpp index f20d544..1060691 100644 --- a/include/databento/detail/tcp_client.hpp +++ b/include/databento/detail/tcp_client.hpp @@ -1,8 +1,10 @@ #pragma once #include // milliseconds +#include #include #include +#include #include "databento/detail/scoped_fd.hpp" // ScopedFd @@ -29,13 +31,13 @@ class TcpClient { TcpClient(const std::string& gateway, std::uint16_t port, RetryConf retry_conf); - void WriteAll(const std::string& str); - void WriteAll(const char* buffer, std::size_t size); - void ReadExact(char* buffer, std::size_t size); - Result ReadSome(char* buffer, std::size_t max_size); + void WriteAll(std::string_view str); + void WriteAll(const std::byte* buffer, std::size_t size); + void ReadExact(std::byte* buffer, std::size_t size); + Result ReadSome(std::byte* buffer, std::size_t max_size); // Passing a timeout of 0 will block until data is available of the socket is // closed, the same behavior as the Read overload without a timeout. - Result ReadSome(char* buffer, std::size_t max_size, + Result ReadSome(std::byte* buffer, std::size_t max_size, std::chrono::milliseconds timeout); // Closes the socket. void Close(); diff --git a/include/databento/detail/zstd_stream.hpp b/include/databento/detail/zstd_stream.hpp index e2db4a9..56da12b 100644 --- a/include/databento/detail/zstd_stream.hpp +++ b/include/databento/detail/zstd_stream.hpp @@ -3,7 +3,6 @@ #include #include // size_t -#include // uint8_t #include // unique_ptr #include @@ -16,19 +15,19 @@ class ZstdDecodeStream : public IReadable { public: explicit ZstdDecodeStream(std::unique_ptr input); ZstdDecodeStream(std::unique_ptr input, - std::vector&& in_buffer); + std::vector&& in_buffer); // Read exactly `length` bytes into `buffer`. - void ReadExact(std::uint8_t* buffer, std::size_t length) override; + void ReadExact(std::byte* buffer, std::size_t length) override; // Read at most `length` bytes. Returns the number of bytes read. Will only // return 0 if the end of the stream is reached. - std::size_t ReadSome(std::uint8_t* buffer, std::size_t max_length) override; + std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; private: std::unique_ptr input_; std::unique_ptr z_dstream_; std::size_t read_suggestion_; - std::vector in_buffer_; + std::vector in_buffer_; ZSTD_inBuffer z_in_buffer_; }; @@ -42,15 +41,15 @@ class ZstdCompressStream : public IWritable { ZstdCompressStream& operator=(ZstdCompressStream&&) = delete; ~ZstdCompressStream() override; - void WriteAll(const std::uint8_t* buffer, std::size_t length) override; + void WriteAll(const std::byte* buffer, std::size_t length) override; private: ILogReceiver* log_receiver_; IWritable* output_; std::unique_ptr z_cstream_; - std::vector in_buffer_; + std::vector in_buffer_; ZSTD_inBuffer z_in_buffer_; std::size_t in_size_; - std::vector out_buffer_; + std::vector out_buffer_; }; } // namespace databento::detail diff --git a/include/databento/exceptions.hpp b/include/databento/exceptions.hpp index 32aa7c6..d0851c2 100644 --- a/include/databento/exceptions.hpp +++ b/include/databento/exceptions.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include // move namespace databento { @@ -32,7 +33,7 @@ class HttpRequestError : public Exception { httplib::Error ErrorCode() const { return error_code_; } private: - static std::string BuildMessage(const std::string& request_path, + static std::string BuildMessage(std::string_view request_path, httplib::Error error_code); const std::string request_path_; @@ -55,9 +56,9 @@ class HttpResponseError : public Exception { const std::string& ResponseBody() const { return response_body_; } private: - static std::string BuildMessage(const std::string& request_path, + static std::string BuildMessage(std::string_view request_path, std::int32_t status_code, - const std::string& response_body); + std::string_view response_body); const std::string request_path_; // int32 is the representation used by httplib @@ -95,9 +96,9 @@ class InvalidArgumentError : public Exception { const std::string& Details() const { return details_; } private: - static std::string BuildMessage(const std::string& method_name, - const std::string& param_name, - const std::string& details); + static std::string BuildMessage(std::string_view method_name, + std::string_view param_name, + std::string_view details); const std::string method_name_; const std::string param_name_; @@ -108,15 +109,14 @@ class InvalidArgumentError : public Exception { class JsonResponseError : public Exception { public: static JsonResponseError ParseError( - const std::string& path, - const nlohmann::detail::parse_error& parse_error); - static JsonResponseError MissingKey(const std::string& method_name, + std::string_view path, const nlohmann::detail::parse_error& parse_error); + static JsonResponseError MissingKey(std::string_view method_name, const nlohmann::json& key); - static JsonResponseError TypeMismatch(const std::string& method_name, - const std::string& expected_type_name, + static JsonResponseError TypeMismatch(std::string_view method_name, + std::string_view expected_type_name, const nlohmann::json& json); - static JsonResponseError TypeMismatch(const std::string& method_name, - const std::string& expected_type_name, + static JsonResponseError TypeMismatch(std::string_view method_name, + std::string_view expected_type_name, const nlohmann::json& key, const nlohmann::json& value); @@ -138,7 +138,7 @@ class LiveApiError : public Exception { public: explicit LiveApiError(std::string message) : Exception{std::move(message)} {} - static LiveApiError UnexpectedMsg(const std::string& message, - const std::string& response); + static LiveApiError UnexpectedMsg(std::string_view message, + std::string_view response); }; } // namespace databento diff --git a/include/databento/file_stream.hpp b/include/databento/file_stream.hpp index 422ff00..a4480c6 100644 --- a/include/databento/file_stream.hpp +++ b/include/databento/file_stream.hpp @@ -1,7 +1,6 @@ #pragma once -#include // size_t -#include // uint8_t +#include // byte, size_t #include // ifstream, ofstream #include @@ -14,10 +13,10 @@ class InFileStream : public IReadable { explicit InFileStream(const std::string& file_path); // Read exactly `length` bytes into `buffer`. - void ReadExact(std::uint8_t* buffer, std::size_t length) override; + void ReadExact(std::byte* buffer, std::size_t length) override; // Read at most `length` bytes. Returns the number of bytes read. Will only // return 0 if the end of the stream is reached. - std::size_t ReadSome(std::uint8_t* buffer, std::size_t max_length) override; + std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; private: std::ifstream stream_; @@ -27,7 +26,7 @@ class OutFileStream : public IWritable { public: explicit OutFileStream(const std::string& file_path); - void WriteAll(const std::uint8_t* buffer, std::size_t length) override; + void WriteAll(const std::byte* buffer, std::size_t length) override; private: std::ofstream stream_; diff --git a/include/databento/ireadable.hpp b/include/databento/ireadable.hpp index 0532405..0c242a1 100644 --- a/include/databento/ireadable.hpp +++ b/include/databento/ireadable.hpp @@ -1,7 +1,6 @@ #pragma once -#include // size_t -#include // uint8_t +#include // byte, size_t namespace databento { // An abstract class for readable objects to allow for runtime polymorphism @@ -11,10 +10,9 @@ class IReadable { virtual ~IReadable() = default; // Read exactly `length` bytes into `buffer`. - virtual void ReadExact(std::uint8_t* buffer, std::size_t length) = 0; + virtual void ReadExact(std::byte* buffer, std::size_t length) = 0; // Read at most `length` bytes. Returns the number of bytes read. Will only // return 0 if the end of the stream is reached. - virtual std::size_t ReadSome(std::uint8_t* buffer, - std::size_t max_length) = 0; + virtual std::size_t ReadSome(std::byte* buffer, std::size_t max_length) = 0; }; } // namespace databento diff --git a/include/databento/iwritable.hpp b/include/databento/iwritable.hpp index 9f51805..c1961c5 100644 --- a/include/databento/iwritable.hpp +++ b/include/databento/iwritable.hpp @@ -1,7 +1,6 @@ #pragma once -#include // size_t -#include // uint8_t +#include // byte, size_t namespace databento { // An abstract class for writable objects to allow for runtime polymorphism @@ -10,6 +9,6 @@ class IWritable { public: virtual ~IWritable() = default; - virtual void WriteAll(const std::uint8_t* buffer, std::size_t length) = 0; + virtual void WriteAll(const std::byte* buffer, std::size_t length) = 0; }; } // namespace databento diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index 1039a5a..5227563 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -2,8 +2,10 @@ #include #include // milliseconds +#include #include #include +#include #include // pair #include @@ -93,10 +95,11 @@ class LiveBlocking { std::string DetermineGateway() const; std::uint64_t Authenticate(); std::string DecodeChallenge(); - std::string GenerateCramReply(const std::string& challenge_key); - std::string EncodeAuthReq(const std::string& auth); + std::string GenerateCramReply(std::string_view challenge_key); + std::string EncodeAuthReq(std::string_view auth); std::uint64_t DecodeAuthResp(); - void Subscribe(const std::string& sub_msg, + void IncrementSubCounter(); + void Subscribe(std::string_view sub_msg, const std::vector& symbols, bool use_snapshot); detail::TcpClient::Result FillBuffer(std::chrono::milliseconds timeout); RecordHeader* BufferRecordHeader(); @@ -113,14 +116,14 @@ class LiveBlocking { VersionUpgradePolicy upgrade_policy_; std::chrono::seconds heartbeat_interval_; detail::TcpClient client_; + std::uint32_t sub_counter_{}; std::vector subscriptions_; // Must be 8-byte aligned for records - alignas(RecordHeader) std::array read_buffer_{}; + alignas(RecordHeader) std::array read_buffer_{}; std::size_t buffer_size_{}; std::size_t buffer_idx_{}; // Must be 8-byte aligned for records - alignas( - RecordHeader) std::array compat_buffer_{}; + alignas(RecordHeader) std::array compat_buffer_{}; std::uint64_t session_id_; Record current_record_{nullptr}; }; diff --git a/include/databento/live_subscription.hpp b/include/databento/live_subscription.hpp index 7d7fb4c..50d7313 100644 --- a/include/databento/live_subscription.hpp +++ b/include/databento/live_subscription.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -17,5 +18,6 @@ struct LiveSubscription { Schema schema; SType stype_in; Start start; + std::uint32_t id{}; }; } // namespace databento diff --git a/include/databento/record.hpp b/include/databento/record.hpp index 83f083a..a526e75 100644 --- a/include/databento/record.hpp +++ b/include/databento/record.hpp @@ -226,7 +226,8 @@ static_assert(sizeof(Mbp10Msg) == sizeof(TradeMsg) + sizeof(BidAskPair) * 10, struct BboMsg { static bool HasRType(RType rtype) { switch (rtype) { - case RType::Bbo1S: // fallthrough + case RType::Bbo1S: + [[fallthrough]]; case RType::Bbo1M: return true; default: @@ -256,7 +257,8 @@ static_assert(sizeof(BboMsg) == sizeof(Mbp1Msg), "BboMsg size must match Rust"); struct Cmbp1Msg { static bool HasRType(RType rtype) { switch (rtype) { - case RType::Cmbp1: // fallthrough + case RType::Cmbp1: + [[fallthrough]]; case RType::Tcbbo: return true; default: @@ -287,8 +289,9 @@ static_assert(sizeof(Cmbp1Msg) == struct CbboMsg { static bool HasRType(RType rtype) { switch (rtype) { - case RType::Cbbo1S: // fallthrough - case RType::Cbbo1M: // fallthrough + case RType::Cbbo1S: + [[fallthrough]]; + case RType::Cbbo1M: return true; default: return false; @@ -324,10 +327,14 @@ static_assert(sizeof(CbboMsg) == struct OhlcvMsg { static bool HasRType(RType rtype) { switch (rtype) { - case RType::OhlcvDeprecated: // fallthrough - case RType::Ohlcv1S: // fallthrough - case RType::Ohlcv1M: // fallthrough - case RType::Ohlcv1H: // fallthrough + case RType::OhlcvDeprecated: + [[fallthrough]]; + case RType::Ohlcv1S: + [[fallthrough]]; + case RType::Ohlcv1M: + [[fallthrough]]; + case RType::Ohlcv1H: + [[fallthrough]]; case RType::Ohlcv1D: return true; default: diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index eb882c5..35b290a 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.32.1 +pkgver=0.33.0 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') diff --git a/src/dbn_decoder.cpp b/src/dbn_decoder.cpp index f6afa73..dbd18ed 100644 --- a/src/dbn_decoder.cpp +++ b/src/dbn_decoder.cpp @@ -20,27 +20,27 @@ using databento::DbnDecoder; namespace { template -T Consume(std::vector::const_iterator& byte_it) { +T Consume(std::vector::const_iterator& byte_it) { const auto res = *reinterpret_cast(&*byte_it); byte_it += sizeof(T); return res; } template <> -std::uint8_t Consume(std::vector::const_iterator& byte_it) { +std::uint8_t Consume(std::vector::const_iterator& byte_it) { const auto res = *byte_it; byte_it += 1; - return res; + return static_cast(res); } -const char* Consume(std::vector::const_iterator& byte_it, +const char* Consume(std::vector::const_iterator& byte_it, const std::ptrdiff_t num_bytes) { const auto* pos = &*byte_it; byte_it += num_bytes; return reinterpret_cast(pos); } -std::string Consume(std::vector::const_iterator& byte_it, +std::string Consume(std::vector::const_iterator& byte_it, const std::ptrdiff_t num_bytes, const char* context) { const auto cstr = Consume(byte_it, num_bytes); // strnlen isn't portable @@ -87,7 +87,7 @@ DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, input_ = std::make_unique( std::move(input_), std::move(read_buffer_)); // Reinitialize buffer and get it into the same state as uncompressed input - read_buffer_ = std::vector(); + read_buffer_ = std::vector(); read_buffer_.reserve(kBufferCapacity); read_buffer_.resize(kMagicSize); input_->ReadExact(read_buffer_.data(), kMagicSize); @@ -99,14 +99,14 @@ DbnDecoder::DbnDecoder(ILogReceiver* log_receiver, } std::pair DbnDecoder::DecodeMetadataVersionAndSize( - const std::uint8_t* buffer, std::size_t size) { + const std::byte* buffer, std::size_t size) { if (size < 8) { throw DbnResponseError{"Buffer too small to decode version and size"}; } if (std::strncmp(reinterpret_cast(buffer), kDbnPrefix, 3) != 0) { throw DbnResponseError{"Missing DBN prefix"}; } - const auto version = buffer[3]; + const auto version = static_cast(buffer[3]); const auto frame_size = *reinterpret_cast(&buffer[4]); if (frame_size < kFixedMetadataLen) { throw DbnResponseError{ @@ -116,7 +116,7 @@ std::pair DbnDecoder::DecodeMetadataVersionAndSize( } databento::Metadata DbnDecoder::DecodeMetadataFields( - std::uint8_t version, const std::vector& buffer) { + std::uint8_t version, const std::vector& buffer) { Metadata res; res.version = version; if (res.version > kDbnVersion) { @@ -205,17 +205,16 @@ databento::Metadata DbnDecoder::DecodeMetadata() { namespace { template databento::Record UpgradeRecord( - bool ts_out, - std::array* compat_buffer, + bool ts_out, std::array* compat_buffer, databento::Record rec) { if (ts_out) { const auto orig = rec.Get>(); const databento::WithTsOut v2{orig.rec.ToV2(), orig.ts_out}; - const auto v2_ptr = reinterpret_cast(&v2); + const auto v2_ptr = reinterpret_cast(&v2); std::copy(v2_ptr, v2_ptr + v2.rec.hd.Size(), compat_buffer->data()); } else { const auto v2 = rec.Get().ToV2(); - const auto v2_ptr = reinterpret_cast(&v2); + const auto v2_ptr = reinterpret_cast(&v2); std::copy(v2_ptr, v2_ptr + v2.hd.Size(), compat_buffer->data()); } return databento::Record{ @@ -225,7 +224,7 @@ databento::Record UpgradeRecord( databento::Record DbnDecoder::DecodeRecordCompat( std::uint8_t version, VersionUpgradePolicy upgrade_policy, bool ts_out, - std::array* compat_buffer, Record rec) { + std::array* compat_buffer, Record rec) { if (version == 1 && upgrade_policy == VersionUpgradePolicy::UpgradeToV2) { if (rec.RType() == RType::InstrumentDef) { return UpgradeRecord( @@ -319,15 +318,15 @@ bool DbnDecoder::DetectCompression() { std::string DbnDecoder::DecodeSymbol( std::size_t symbol_cstr_len, - std::vector::const_iterator& read_buffer_it) { + std::vector::const_iterator& read_buffer_it) { return Consume(read_buffer_it, static_cast(symbol_cstr_len), "symbol"); } std::vector DbnDecoder::DecodeRepeatedSymbol( std::size_t symbol_cstr_len, - std::vector::const_iterator& read_buffer_it, - std::vector::const_iterator read_buffer_end_it) { + std::vector::const_iterator& read_buffer_it, + std::vector::const_iterator read_buffer_end_it) { if (read_buffer_it + sizeof(std::uint32_t) > read_buffer_end_it) { throw DbnResponseError{ "Unexpected end of metadata buffer while parsing symbol"}; @@ -348,8 +347,8 @@ std::vector DbnDecoder::DecodeRepeatedSymbol( std::vector DbnDecoder::DecodeSymbolMappings( std::size_t symbol_cstr_len, - std::vector::const_iterator& read_buffer_it, - std::vector::const_iterator read_buffer_end_it) { + std::vector::const_iterator& read_buffer_it, + std::vector::const_iterator read_buffer_end_it) { if (read_buffer_it + sizeof(std::uint32_t) > read_buffer_end_it) { throw DbnResponseError{ "Unexpected end of metadata buffer while parsing mappings"}; @@ -366,8 +365,8 @@ std::vector DbnDecoder::DecodeSymbolMappings( databento::SymbolMapping DbnDecoder::DecodeSymbolMapping( std::size_t symbol_cstr_len, - std::vector::const_iterator& read_buffer_it, - std::vector::const_iterator read_buffer_end_it) { + std::vector::const_iterator& read_buffer_it, + std::vector::const_iterator read_buffer_end_it) { const auto min_symbol_mapping_encoded_len = static_cast(symbol_cstr_len + sizeof(std::uint32_t)); const auto mapping_encoded_len = sizeof(std::uint32_t) * 2 + symbol_cstr_len; diff --git a/src/dbn_encoder.cpp b/src/dbn_encoder.cpp index e3cd5ca..e4ad52a 100644 --- a/src/dbn_encoder.cpp +++ b/src/dbn_encoder.cpp @@ -19,7 +19,7 @@ using databento::DbnEncoder; namespace { void EncodeChars(const char* bytes, std::size_t length, databento::IWritable* output) { - output->WriteAll(reinterpret_cast(bytes), length); + output->WriteAll(reinterpret_cast(bytes), length); } void EncodeFixedLenCStr(std::size_t cstr_len, const std::string& str, @@ -31,17 +31,16 @@ void EncodeFixedLenCStr(std::size_t cstr_len, const std::string& str, std::string{"String is too long to encode, maximum length of "} + std::to_string(cstr_len - 1)}; } - output->WriteAll(reinterpret_cast(str.data()), + output->WriteAll(reinterpret_cast(str.data()), str.length()); // Null padding - std::vector filler(cstr_len - str.length()); + std::vector filler(cstr_len - str.length()); output->WriteAll(filler.data(), filler.size()); } template void EncodeAsBytes(T bytes, databento::IWritable* output) { - output->WriteAll(reinterpret_cast(&bytes), - sizeof(bytes)); + output->WriteAll(reinterpret_cast(&bytes), sizeof(bytes)); } void EncodeDate(date::year_month_day date, databento::IWritable* output) { @@ -122,8 +121,7 @@ void DbnEncoder::EncodeMetadata(const Metadata& metadata, IWritable* output) { // padding + schema definition length auto reserved_length = version == 1 ? kMetadataReservedLenV1 : kMetadataReservedLen; - const std::vector padding(reserved_length + - sizeof(std::uint32_t)); + const std::vector padding(reserved_length + sizeof(std::uint32_t)); output->WriteAll(padding.data(), padding.size()); // variable-length data @@ -135,7 +133,7 @@ void DbnEncoder::EncodeMetadata(const Metadata& metadata, IWritable* output) { } void DbnEncoder::EncodeRecord(const Record& record, IWritable* output) { - output->WriteAll(reinterpret_cast(&record.Header()), + output->WriteAll(reinterpret_cast(&record.Header()), record.Size()); } diff --git a/src/detail/json_helpers.cpp b/src/detail/json_helpers.cpp index fccd624..7abae76 100644 --- a/src/detail/json_helpers.cpp +++ b/src/detail/json_helpers.cpp @@ -2,6 +2,7 @@ #include // accumulate #include // istringstream +#include namespace databento::detail { void SetIfNotEmpty(httplib::Params* params, const std::string& key, @@ -25,9 +26,9 @@ void SetIfNotEmpty(httplib::Params* params, const std::string& key, } } -const nlohmann::json& CheckedAt(const std::string& endpoint, +const nlohmann::json& CheckedAt(std::string_view endpoint, const nlohmann::json& json, - const std::string& key) { + std::string_view key) { if (json.contains(key)) { return json.at(key); } @@ -35,88 +36,91 @@ const nlohmann::json& CheckedAt(const std::string& endpoint, } template <> -bool ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key) { +bool ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key) { const auto& val_json = CheckedAt(endpoint, json, key); if (!val_json.is_boolean()) { - throw JsonResponseError::TypeMismatch(endpoint, key + " bool", val_json); + throw JsonResponseError::TypeMismatch(endpoint, std::string{key} + " bool", + val_json); } return val_json; } template <> -std::string ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key) { +std::string ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key) { const auto& val_json = CheckedAt(endpoint, json, key); if (val_json.is_null()) { return {}; } if (!val_json.is_string()) { - throw JsonResponseError::TypeMismatch(endpoint, key + " string", val_json); + throw JsonResponseError::TypeMismatch( + endpoint, std::string{key} + " string", val_json); } return val_json; } template <> -std::uint64_t ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key) { +std::uint64_t ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key) { const auto& val_json = CheckedAt(endpoint, json, key); if (val_json.is_null()) { return 0; } if (!val_json.is_number_unsigned()) { - throw JsonResponseError::TypeMismatch(endpoint, key + " unsigned number", - val_json); + throw JsonResponseError::TypeMismatch( + endpoint, std::string{key} + " unsigned number", val_json); } return val_json; } template <> -std::uint16_t ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key) { +std::uint16_t ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key) { const auto& val_json = CheckedAt(endpoint, json, key); if (val_json.is_null()) { return 0; } if (!val_json.is_number_unsigned()) { - throw JsonResponseError::TypeMismatch(endpoint, key + " unsigned number", - val_json); + throw JsonResponseError::TypeMismatch( + endpoint, std::string{key} + " unsigned number", val_json); } return val_json; } template <> -double ParseAt(const std::string& endpoint, const nlohmann::json& json, - const std::string& key) { +double ParseAt(std::string_view endpoint, const nlohmann::json& json, + std::string_view key) { const auto& val_json = CheckedAt(endpoint, json, key); if (val_json.is_null()) { return 0; } if (!val_json.is_number()) { - throw JsonResponseError::TypeMismatch(endpoint, key + " number", val_json); + throw JsonResponseError::TypeMismatch( + endpoint, std::string{key} + " number", val_json); } return val_json; } template <> -std::vector ParseAt(const std::string& endpoint, +std::vector ParseAt(std::string_view endpoint, const nlohmann::json& json, - const std::string& key) { + std::string_view key) { const auto& symbols_json = CheckedAt(endpoint, json, key); // if there's only one symbol, it returns a string not an array if (symbols_json.is_string()) { return {symbols_json}; } if (!symbols_json.is_array()) { - throw JsonResponseError::TypeMismatch(endpoint, key + " array", json); + throw JsonResponseError::TypeMismatch(endpoint, std::string{key} + " array", + json); } return {symbols_json.begin(), symbols_json.end()}; } template <> -date::year_month_day ParseAt(const std::string& endpoint, - const nlohmann::json& json, - const std::string& key) { +date::year_month_day ParseAt(std::string_view endpoint, + const nlohmann::json& json, std::string_view key) { std::string raw_start = detail::CheckedAt(endpoint, json, key); std::istringstream start_stream{raw_start}; date::year_month_day start; diff --git a/src/detail/shared_channel.cpp b/src/detail/shared_channel.cpp index d5f1dc4..a3d027b 100644 --- a/src/detail/shared_channel.cpp +++ b/src/detail/shared_channel.cpp @@ -17,13 +17,13 @@ class SharedChannel::Channel { Channel& operator=(Channel&&) = delete; ~Channel(); - void Write(const std::uint8_t* data, std::size_t length); + void Write(const std::byte* data, std::size_t length); void Finish(); // Read exactly `length` bytes - void ReadExact(std::uint8_t* buffer, std::size_t length); + void ReadExact(std::byte* buffer, std::size_t length); // Read at most `length` bytes. Returns the number of bytes read. Will only // return 0 if the end of the stream is reached. - std::size_t ReadSome(std::uint8_t* buffer, std::size_t length); + std::size_t ReadSome(std::byte* buffer, std::size_t length); private: std::size_t Size(); @@ -41,27 +41,25 @@ using databento::detail::SharedChannel; SharedChannel::SharedChannel() : channel_{std::make_shared()} {} -void SharedChannel::Write(const std::uint8_t* data, std::size_t length) { +void SharedChannel::Write(const std::byte* data, std::size_t length) { channel_->Write(data, length); } void SharedChannel::Finish() { channel_->Finish(); } -void SharedChannel::ReadExact(std::uint8_t* buffer, std::size_t length) { +void SharedChannel::ReadExact(std::byte* buffer, std::size_t length) { channel_->ReadExact(buffer, length); } // Read at most `length` bytes. Returns the number of bytes read. Will only // return 0 if the end of the stream is reached. -std::size_t SharedChannel::ReadSome(std::uint8_t* buffer, - std::size_t max_length) { +std::size_t SharedChannel::ReadSome(std::byte* buffer, std::size_t max_length) { return channel_->ReadSome(buffer, max_length); } SharedChannel::Channel::~Channel() { Finish(); } -void SharedChannel::Channel::Write(const std::uint8_t* data, - std::size_t length) { +void SharedChannel::Channel::Write(const std::byte* data, std::size_t length) { const std::lock_guard lock{mutex_}; stream_.write(reinterpret_cast(data), static_cast(length)); @@ -74,8 +72,7 @@ void SharedChannel::Channel::Finish() { cv_.notify_one(); } -void SharedChannel::Channel::ReadExact(std::uint8_t* buffer, - std::size_t length) { +void SharedChannel::Channel::ReadExact(std::byte* buffer, std::size_t length) { std::unique_lock lock{mutex_}; cv_.wait(lock, [this, length] { return Size() >= length || is_finished_; }); if (Size() < length) { @@ -88,7 +85,7 @@ void SharedChannel::Channel::ReadExact(std::uint8_t* buffer, static_cast(length)); } -std::size_t SharedChannel::Channel::ReadSome(std::uint8_t* buffer, +std::size_t SharedChannel::Channel::ReadSome(std::byte* buffer, std::size_t length) { std::unique_lock lock{mutex_}; cv_.wait(lock, [this] { return Size() > 0 || is_finished_; }); diff --git a/src/detail/tcp_client.cpp b/src/detail/tcp_client.cpp index 4633f44..75522bc 100644 --- a/src/detail/tcp_client.cpp +++ b/src/detail/tcp_client.cpp @@ -38,13 +38,14 @@ TcpClient::TcpClient(const std::string& gateway, std::uint16_t port, RetryConf retry_conf) : socket_{InitSocket(gateway, port, retry_conf)} {} -void TcpClient::WriteAll(const std::string& str) { - WriteAll(str.c_str(), str.length()); +void TcpClient::WriteAll(std::string_view str) { + WriteAll(reinterpret_cast(str.data()), str.length()); } -void TcpClient::WriteAll(const char* buffer, std::size_t size) { +void TcpClient::WriteAll(const std::byte* buffer, std::size_t size) { do { - const ::ssize_t res = ::send(socket_.Get(), buffer, size, {}); + const ::ssize_t res = + ::send(socket_.Get(), reinterpret_cast(buffer), size, {}); if (res < 0) { throw TcpError{::GetErrNo(), "Error writing to socket"}; } @@ -53,15 +54,17 @@ void TcpClient::WriteAll(const char* buffer, std::size_t size) { } while (size > 0); } -void TcpClient::ReadExact(char* buffer, std::size_t size) { - const ::ssize_t res = ::recv(socket_.Get(), buffer, size, MSG_WAITALL); +void TcpClient::ReadExact(std::byte* buffer, std::size_t size) { + const ::ssize_t res = + ::recv(socket_.Get(), reinterpret_cast(buffer), size, MSG_WAITALL); if (res != static_cast<::ssize_t>(size)) { throw TcpError{::GetErrNo(), "Error reading from socket"}; } } -TcpClient::Result TcpClient::ReadSome(char* buffer, std::size_t max_size) { - const ::ssize_t res = ::recv(socket_.Get(), buffer, max_size, {}); +TcpClient::Result TcpClient::ReadSome(std::byte* buffer, std::size_t max_size) { + const ::ssize_t res = + ::recv(socket_.Get(), reinterpret_cast(buffer), max_size, {}); if (res < 0) { throw TcpError{::GetErrNo(), "Error reading from socket"}; } @@ -69,7 +72,7 @@ TcpClient::Result TcpClient::ReadSome(char* buffer, std::size_t max_size) { res == 0 ? Status::Closed : Status::Ok}; } -TcpClient::Result TcpClient::ReadSome(char* buffer, std::size_t max_size, +TcpClient::Result TcpClient::ReadSome(std::byte* buffer, std::size_t max_size, std::chrono::milliseconds timeout) { pollfd fds{socket_.Get(), POLLIN, {}}; // passing a timeout of -1 blocks indefinitely, which is the equivalent of diff --git a/src/detail/zstd_stream.cpp b/src/detail/zstd_stream.cpp index 7021d75..234ad52 100644 --- a/src/detail/zstd_stream.cpp +++ b/src/detail/zstd_stream.cpp @@ -13,14 +13,14 @@ ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr input) : ZstdDecodeStream{std::move(input), {}} {} ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr input, - std::vector&& in_buffer) + std::vector&& in_buffer) : input_{std::move(input)}, z_dstream_{::ZSTD_createDStream(), ::ZSTD_freeDStream}, read_suggestion_{::ZSTD_initDStream(z_dstream_.get())}, in_buffer_{std::move(in_buffer)}, z_in_buffer_{in_buffer_.data(), in_buffer_.size(), 0} {} -void ZstdDecodeStream::ReadExact(std::uint8_t* buffer, std::size_t length) { +void ZstdDecodeStream::ReadExact(std::byte* buffer, std::size_t length) { std::size_t size{}; do { size += ReadSome(&buffer[size], length - size); @@ -34,7 +34,7 @@ void ZstdDecodeStream::ReadExact(std::uint8_t* buffer, std::size_t length) { } } -std::size_t ZstdDecodeStream::ReadSome(std::uint8_t* buffer, +std::size_t ZstdDecodeStream::ReadSome(std::byte* buffer, std::size_t max_length) { ZSTD_outBuffer z_out_buffer{buffer, max_length, 0}; std::size_t read_size = 0; @@ -110,8 +110,7 @@ ZstdCompressStream::~ZstdCompressStream() { } } -void ZstdCompressStream::WriteAll(const std::uint8_t* buffer, - std::size_t length) { +void ZstdCompressStream::WriteAll(const std::byte* buffer, std::size_t length) { in_buffer_.insert(in_buffer_.end(), buffer, buffer + length); z_in_buffer_ = {in_buffer_.data(), in_buffer_.size(), 0}; // Wait for sufficient data before compressing diff --git a/src/exceptions.cpp b/src/exceptions.cpp index ac6a435..fc20ac8 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -10,7 +10,7 @@ using databento::HttpRequestError; -std::string HttpRequestError::BuildMessage(const std::string& request_path, +std::string HttpRequestError::BuildMessage(std::string_view request_path, httplib::Error error_code) { std::ostringstream err_msg; err_msg << "Request to " << request_path << " failed with " << error_code; @@ -32,9 +32,9 @@ std::string TcpError::BuildMessage(int err_num, std::string message) { using databento::HttpResponseError; -std::string HttpResponseError::BuildMessage(const std::string& request_path, +std::string HttpResponseError::BuildMessage(std::string_view request_path, std::int32_t status_code, - const std::string& response_body) { + std::string_view response_body) { std::ostringstream err_msg; err_msg << "Received an error response from request to " << request_path << " with status " << status_code << " and body '" << response_body @@ -44,9 +44,9 @@ std::string HttpResponseError::BuildMessage(const std::string& request_path, using databento::InvalidArgumentError; -std::string InvalidArgumentError::BuildMessage(const std::string& method_name, - const std::string& param_name, - const std::string& details) { +std::string InvalidArgumentError::BuildMessage(std::string_view method_name, + std::string_view param_name, + std::string_view details) { std::ostringstream err_msg; err_msg << "Invalid argument '" << param_name << "' to " << method_name << ' ' << details; @@ -56,7 +56,7 @@ std::string InvalidArgumentError::BuildMessage(const std::string& method_name, using databento::JsonResponseError; JsonResponseError JsonResponseError::ParseError( - const std::string& method_name, + std::string_view method_name, const nlohmann::json::parse_error& parse_error) { std::ostringstream err_msg; err_msg << "Error parsing JSON response to " << method_name << ' ' @@ -64,7 +64,7 @@ JsonResponseError JsonResponseError::ParseError( return JsonResponseError{err_msg.str()}; } -JsonResponseError JsonResponseError::MissingKey(const std::string& path, +JsonResponseError JsonResponseError::MissingKey(std::string_view path, const nlohmann::json& key) { std::ostringstream err_msg; err_msg << "Missing key '" << key << "' in response for " << path; @@ -72,7 +72,7 @@ JsonResponseError JsonResponseError::MissingKey(const std::string& path, } JsonResponseError JsonResponseError::TypeMismatch( - const std::string& method_name, const std::string& expected_type_name, + std::string_view method_name, std::string_view expected_type_name, const nlohmann::json& json) { std::ostringstream err_msg; err_msg << "Expected JSON " << expected_type_name << " response for " @@ -81,7 +81,7 @@ JsonResponseError JsonResponseError::TypeMismatch( } JsonResponseError JsonResponseError::TypeMismatch( - const std::string& method_name, const std::string& expected_type_name, + std::string_view method_name, std::string_view expected_type_name, const nlohmann::json& key, const nlohmann::json& value) { std::ostringstream err_msg; err_msg << "Expected " << expected_type_name @@ -92,8 +92,8 @@ JsonResponseError JsonResponseError::TypeMismatch( using databento::LiveApiError; -LiveApiError LiveApiError::UnexpectedMsg(const std::string& message, - const std::string& response) { +LiveApiError LiveApiError::UnexpectedMsg(std::string_view message, + std::string_view response) { std::ostringstream err_msg; err_msg << message << " with response '" << response << '\''; return LiveApiError{err_msg.str()}; diff --git a/src/file_stream.cpp b/src/file_stream.cpp index 042ef9a..063ecb8 100644 --- a/src/file_stream.cpp +++ b/src/file_stream.cpp @@ -15,7 +15,7 @@ InFileStream::InFileStream(const std::string& file_path) } } -void InFileStream::ReadExact(std::uint8_t* buffer, std::size_t length) { +void InFileStream::ReadExact(std::byte* buffer, std::size_t length) { const auto size = ReadSome(buffer, length); if (size != length) { std::ostringstream err_msg; @@ -25,8 +25,7 @@ void InFileStream::ReadExact(std::uint8_t* buffer, std::size_t length) { } } -std::size_t InFileStream::ReadSome(std::uint8_t* buffer, - std::size_t max_length) { +std::size_t InFileStream::ReadSome(std::byte* buffer, std::size_t max_length) { stream_.read(reinterpret_cast(buffer), static_cast(max_length)); return static_cast(stream_.gcount()); @@ -42,7 +41,7 @@ OutFileStream::OutFileStream(const std::string& file_path) } } -void OutFileStream::WriteAll(const std::uint8_t* buffer, std::size_t length) { +void OutFileStream::WriteAll(const std::byte* buffer, std::size_t length) { stream_.write(reinterpret_cast(buffer), static_cast(length)); } diff --git a/src/historical.cpp b/src/historical.cpp index a78aa2e..0e289ed 100644 --- a/src/historical.cpp +++ b/src/historical.cpp @@ -1,6 +1,5 @@ #include "databento/historical.hpp" -#include // closedir, opendir #include #include @@ -9,16 +8,12 @@ #include // size_t #include // get_env #include // exception, exception_ptr -#include // back_inserter -#include // unique_ptr +#include +#include // back_inserter #include +#include #include // move -#include "databento/file_stream.hpp" -#ifdef _WIN32 -#include // _mkdir -#endif - #include "databento/constants.hpp" #include "databento/datetime.hpp" #include "databento/dbn_decoder.hpp" @@ -28,6 +23,7 @@ #include "databento/detail/shared_channel.hpp" #include "databento/enums.hpp" #include "databento/exceptions.hpp" // Exception, JsonResponseError +#include "databento/file_stream.hpp" #include "databento/log.hpp" #include "databento/metadata.hpp" #include "databento/timeseries.hpp" @@ -112,24 +108,18 @@ databento::BatchJob Parse(const std::string& endpoint, return res; } -void TryCreateDir(const std::string& dir_name) { +void TryCreateDir(const std::filesystem::path& dir_name) { + using namespace std::string_literals; if (dir_name.empty()) { return; } - const std::unique_ptr dir{::opendir(dir_name.c_str()), - &::closedir}; - if (dir == nullptr) { - const int ret = -#ifdef _WIN32 - ::_mkdir(dir_name.c_str()); -#else - ::mkdir(dir_name.c_str(), 0777); -#endif - if (ret != 0) { - throw databento::Exception{std::string{"Unable to create directory "} + - dir_name + ": " + ::strerror(errno)}; - } + std::error_code ec{}; + if (std::filesystem::create_directory(dir_name, ec) || !ec) { + // Successfully created directory or it already exists + return; } + throw databento::Exception{"Unable to create directory "s + + dir_name.generic_string() + ": " + ec.message()}; } std::string PathJoin(const std::string& dir, const std::string& path) { @@ -160,7 +150,6 @@ Historical::Historical(ILogReceiver* log_receiver, std::string key, static const std::string kBatchSubmitJobEndpoint = "Historical::BatchSubmitJob"; databento::BatchJob Historical::BatchSubmitJob( - const std::string& dataset, const std::vector& symbols, Schema schema, const DateTimeRange& datetime_range) { return this->BatchSubmitJob(dataset, symbols, schema, datetime_range, @@ -342,7 +331,7 @@ void Historical::StreamToFile(const std::string& url_path, OutFileStream out_file{file_path}; this->client_.GetRawStream( url_path, params, [&out_file](const char* data, std::size_t length) { - out_file.WriteAll(reinterpret_cast(data), length); + out_file.WriteAll(reinterpret_cast(data), length); return true; }); } @@ -868,23 +857,23 @@ void Historical::TimeseriesGetRange(const HttplibParams& params, std::atomic should_continue{true}; detail::SharedChannel channel; std::exception_ptr exception_ptr{}; - detail::ScopedThread stream{[this, &channel, &exception_ptr, ¶ms, - &should_continue] { - try { - this->client_.GetRawStream( - kTimeseriesGetRangePath, params, - [channel, &should_continue](const char* data, - std::size_t length) mutable { - channel.Write(reinterpret_cast(data), length); - return should_continue.load(); - }); - channel.Finish(); - } catch (const std::exception&) { - channel.Finish(); - // rethrowing here will cause the process to be terminated - exception_ptr = std::current_exception(); - } - }}; + detail::ScopedThread stream{ + [this, &channel, &exception_ptr, ¶ms, &should_continue] { + try { + this->client_.GetRawStream( + kTimeseriesGetRangePath, params, + [channel, &should_continue](const char* data, + std::size_t length) mutable { + channel.Write(reinterpret_cast(data), length); + return should_continue.load(); + }); + channel.Finish(); + } catch (const std::exception&) { + channel.Finish(); + // rethrowing here will cause the process to be terminated + exception_ptr = std::current_exception(); + } + }}; try { DbnDecoder dbn_decoder{log_receiver_, channel}; Metadata metadata = dbn_decoder.DecodeMetadata(); diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index dd547fe..19a83e2 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -8,6 +8,7 @@ #include // ptrdiff_t #include #include // hex, setfill, setw +#include #include #include @@ -66,45 +67,49 @@ void LiveBlocking::Subscribe(const std::vector& symbols, void LiveBlocking::Subscribe(const std::vector& symbols, Schema schema, SType stype_in, UnixNanos start) { + IncrementSubCounter(); std::ostringstream sub_msg; sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in) - << "|start=" << start.time_since_epoch().count(); + << "|start=" << start.time_since_epoch().count() + << "|id=" << std::to_string(sub_counter_); Subscribe(sub_msg.str(), symbols, false); subscriptions_.emplace_back( - LiveSubscription{symbols, schema, stype_in, start}); + LiveSubscription{symbols, schema, stype_in, start, sub_counter_}); } void LiveBlocking::Subscribe(const std::vector& symbols, Schema schema, SType stype_in, const std::string& start) { + IncrementSubCounter(); std::ostringstream sub_msg; - sub_msg << "schema=" << ToString(schema) - << "|stype_in=" << ToString(stype_in); + sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in) + << "|id=" << std::to_string(sub_counter_); if (!start.empty()) { sub_msg << "|start=" << start; } Subscribe(sub_msg.str(), symbols, false); if (start.empty()) { - subscriptions_.emplace_back(LiveSubscription{symbols, schema, stype_in, - LiveSubscription::NoStart{}}); + subscriptions_.emplace_back(LiveSubscription{ + symbols, schema, stype_in, LiveSubscription::NoStart{}, sub_counter_}); } else { subscriptions_.emplace_back( - LiveSubscription{symbols, schema, stype_in, start}); + LiveSubscription{symbols, schema, stype_in, start, sub_counter_}); } } void LiveBlocking::SubscribeWithSnapshot( const std::vector& symbols, Schema schema, SType stype_in) { + IncrementSubCounter(); std::ostringstream sub_msg; - sub_msg << "schema=" << ToString(schema) - << "|stype_in=" << ToString(stype_in); + sub_msg << "schema=" << ToString(schema) << "|stype_in=" << ToString(stype_in) + << "|id=" << std::to_string(sub_counter_); Subscribe(sub_msg.str(), symbols, true); - subscriptions_.emplace_back(LiveSubscription{symbols, schema, stype_in, - LiveSubscription::Snapshot{}}); + subscriptions_.emplace_back(LiveSubscription{ + symbols, schema, stype_in, LiveSubscription::Snapshot{}, sub_counter_}); } -void LiveBlocking::Subscribe(const std::string& sub_msg, +void LiveBlocking::Subscribe(std::string_view sub_msg, const std::vector& symbols, bool use_snapshot) { static constexpr auto kMethodName = "Live::Subscribe"; @@ -134,10 +139,9 @@ databento::Metadata LiveBlocking::Start() { client_.WriteAll("start_session\n"); client_.ReadExact(read_buffer_.data(), kMetadataPreludeSize); const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize( - reinterpret_cast(read_buffer_.data()), - kMetadataPreludeSize); - std::vector meta_buffer(size); - client_.ReadExact(reinterpret_cast(meta_buffer.data()), size); + read_buffer_.data(), kMetadataPreludeSize); + std::vector meta_buffer(size); + client_.ReadExact(meta_buffer.data(), size); auto metadata = DbnDecoder::DecodeMetadataFields(version, meta_buffer); version_ = metadata.version; metadata.Upgrade(upgrade_policy_); @@ -182,6 +186,7 @@ void LiveBlocking::Stop() { client_.Close(); } void LiveBlocking::Reconnect() { log_receiver_->Receive(LogLevel::Info, "Reconnecting"); client_ = detail::TcpClient{gateway_, port_}; + sub_counter_ = 0; session_id_ = this->Authenticate(); } @@ -191,9 +196,11 @@ void LiveBlocking::Resubscribe() { std::holds_alternative(subscription.start)) { subscription.start = LiveSubscription::NoStart{}; } + sub_counter_ = std::max(sub_counter_, subscription.id); std::ostringstream sub_msg; sub_msg << "schema=" << ToString(subscription.schema) - << "|stype_in=" << ToString(subscription.stype_in); + << "|stype_in=" << ToString(subscription.stype_in) + << "|id=" << std::to_string(sub_counter_); Subscribe( sub_msg.str(), subscription.symbols, std::holds_alternative(subscription.start)); @@ -207,7 +214,8 @@ std::string LiveBlocking::DecodeChallenge() { throw LiveApiError{"Gateway closed socket during authentication"}; } // first line is version - std::string response{read_buffer_.data(), buffer_size_}; + std::string response{reinterpret_cast(read_buffer_.data()), + buffer_size_}; { std::ostringstream log_ss; log_ss << "[LiveBlocking::DecodeChallenge] Challenge: " << response; @@ -231,7 +239,8 @@ std::string LiveBlocking::DecodeChallenge() { if (buffer_size_ == 0) { throw LiveApiError{"Gateway closed socket during authentication"}; } - response = {read_buffer_.data(), buffer_size_}; + response = {reinterpret_cast(read_buffer_.data()), + buffer_size_}; next_nl_pos = response.find('\n', find_start); } const auto challenge_line = @@ -274,10 +283,10 @@ std::uint64_t LiveBlocking::Authenticate() { return session_id; } -std::string LiveBlocking::GenerateCramReply(const std::string& challenge_key) { +std::string LiveBlocking::GenerateCramReply(std::string_view challenge_key) { std::array sha{}; const unsigned char* sha_res = - ::SHA256(reinterpret_cast(challenge_key.c_str()), + ::SHA256(reinterpret_cast(challenge_key.data()), challenge_key.size(), sha.data()); if (sha_res == nullptr) { throw LiveApiError{"Unable to generate SHA 256"}; @@ -292,7 +301,7 @@ std::string LiveBlocking::GenerateCramReply(const std::string& challenge_key) { return auth_stream.str(); } -std::string LiveBlocking::EncodeAuthReq(const std::string& auth) { +std::string LiveBlocking::EncodeAuthReq(std::string_view auth) { std::ostringstream req_stream; req_stream << "auth=" << auth << "|dataset=" << dataset_ << "|encoding=dbn|" << "ts_out=" << send_ts_out_ << "|client=C++ " DATABENTO_VERSION; @@ -305,7 +314,7 @@ std::string LiveBlocking::EncodeAuthReq(const std::string& auth) { std::uint64_t LiveBlocking::DecodeAuthResp() { // handle split packet read - std::array::const_iterator nl_it; + std::array::const_iterator nl_it; buffer_size_ = 0; do { buffer_idx_ = buffer_size_; @@ -320,9 +329,12 @@ std::uint64_t LiveBlocking::DecodeAuthResp() { } buffer_size_ += read_size; nl_it = std::find(read_buffer_.begin() + buffer_idx_, - read_buffer_.begin() + buffer_size_, '\n'); + read_buffer_.begin() + buffer_size_, + static_cast('\n')); } while (nl_it == read_buffer_.end()); - const std::string response{read_buffer_.cbegin(), nl_it}; + const std::string response{ + reinterpret_cast(read_buffer_.data()), + static_cast(nl_it - read_buffer_.cbegin())}; { std::ostringstream log_ss; log_ss << "[LiveBlocking::DecodeAuthResp] Authentication response: " @@ -377,6 +389,16 @@ std::uint64_t LiveBlocking::DecodeAuthResp() { return session_id; } +void LiveBlocking::IncrementSubCounter() { + if (sub_counter_ == std::numeric_limits::max()) { + log_receiver_->Receive( + LogLevel::Warning, + "[LiveBlocking::Subscribe] Exhausted all subscription IDs"); + } else { + ++sub_counter_; + } +} + databento::detail::TcpClient::Result LiveBlocking::FillBuffer( std::chrono::milliseconds timeout) { // Shift data forward diff --git a/src/record.cpp b/src/record.cpp index e0b0946..c2c6b0a 100644 --- a/src/record.cpp +++ b/src/record.cpp @@ -34,9 +34,12 @@ std::size_t Record::SizeOfSchema(const Schema schema) { case Schema::Tbbo: { return sizeof(Mbp1Msg); } - case Schema::Ohlcv1S: // fallthrough - case Schema::Ohlcv1M: // fallthrough - case Schema::Ohlcv1H: // fallthrough + case Schema::Ohlcv1S: + [[fallthrough]]; + case Schema::Ohlcv1M: + [[fallthrough]]; + case Schema::Ohlcv1H: + [[fallthrough]]; case Schema::Ohlcv1D: { return sizeof(OhlcvMsg); } diff --git a/src/stream_op_helper.hpp b/src/stream_op_helper.hpp index d298e09..bc5bc4e 100644 --- a/src/stream_op_helper.hpp +++ b/src/stream_op_helper.hpp @@ -8,6 +8,7 @@ #include // ostream #include // stringstream #include +#include #include // move #include "databento/datetime.hpp" // TimeDeltaNanos, UnixNanos @@ -30,6 +31,8 @@ class StreamOpHelper { void FmtToStream(const std::string& val) { stream_ << std::quoted(val); } + void FmtToStream(std::string_view val) { stream_ << std::quoted(val); } + void FmtToStream(const bool& val) { // otherwise bool is formatted (1|0) stream_ << std::boolalpha << val; @@ -62,7 +65,7 @@ class StreamOpHelper { } public: - StreamOpHelper(std::ostream& stream, const std::string& type_name, + StreamOpHelper(std::ostream& stream, std::string_view type_name, std::string spacer, std::string indent) : stream_{stream}, spacer_{std::move(spacer)}, @@ -75,7 +78,7 @@ class StreamOpHelper { } template - StreamOpHelper& AddField(const std::string& field_name, const T& field_val) { + StreamOpHelper& AddField(std::string_view field_name, const T& field_val) { if (!is_first_) { stream_ << ','; } diff --git a/src/symbology.cpp b/src/symbology.cpp index 33b3687..4414d30 100644 --- a/src/symbology.cpp +++ b/src/symbology.cpp @@ -5,6 +5,7 @@ #include // accumulate #include #include +#include #include "databento/exceptions.hpp" // InvalidArgumentError #include "stream_op_helper.hpp" // StreamOpBuilder diff --git a/tests/include/mock/mock_io.hpp b/tests/include/mock/mock_io.hpp index b42f108..0b2befa 100644 --- a/tests/include/mock/mock_io.hpp +++ b/tests/include/mock/mock_io.hpp @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include "databento/ireadable.hpp" @@ -10,16 +9,16 @@ namespace databento::tests::mock { class MockIo : public databento::IWritable, public databento::IReadable { public: - void WriteAll(const std::uint8_t* buffer, std::size_t length); + void WriteAll(const std::byte* buffer, std::size_t length); - void ReadExact(std::uint8_t* buffer, std::size_t length); + void ReadExact(std::byte* buffer, std::size_t length); - std::size_t ReadSome(std::uint8_t* buffer, std::size_t max_length); + std::size_t ReadSome(std::byte* buffer, std::size_t max_length); - const std::vector& GetContents() const { return contents_; } + const std::vector& GetContents() const { return contents_; } private: - std::vector contents_; + std::vector contents_; std::ptrdiff_t read_idx_{0}; }; } // namespace databento::tests::mock diff --git a/tests/include/mock/mock_lsg_server.hpp b/tests/include/mock/mock_lsg_server.hpp index 4433ea6..7b7a65a 100644 --- a/tests/include/mock/mock_lsg_server.hpp +++ b/tests/include/mock/mock_lsg_server.hpp @@ -13,6 +13,7 @@ using ssize_t = SSIZE_T; #include #include +#include #include // function #include #include @@ -29,7 +30,7 @@ class SocketStream : public databento::IWritable { public: explicit SocketStream(detail::Socket socket) : socket_{socket} {} - void WriteAll(const std::uint8_t* buffer, std::size_t length) override; + void WriteAll(const std::byte* buffer, std::size_t length) override; ::ssize_t LastWriteSize() const { return last_write_size_; } private: diff --git a/tests/src/dbn_decoder_tests.cpp b/tests/src/dbn_decoder_tests.cpp index 0d12508..80973c0 100644 --- a/tests/src/dbn_decoder_tests.cpp +++ b/tests/src/dbn_decoder_tests.cpp @@ -61,8 +61,7 @@ class DbnDecoderTests : public testing::Test { std::vector buffer(size); input_file.read(buffer.data(), static_cast(size)); ASSERT_EQ(input_file.gcount(), size); - channel_.Write(reinterpret_cast(buffer.data()), - size); + channel_.Write(reinterpret_cast(buffer.data()), size); channel_.Finish(); }}; channel_target_ = std::make_unique( @@ -163,7 +162,7 @@ TEST_F(DbnDecoderTests, TestUpgradeSymbolMappingWithTsOut) { {}}; WithTsOut orig{ sym_map, UnixNanos{std::chrono::system_clock::now()}}; - std::array compat_buffer{}; + std::array compat_buffer{}; const auto res = DbnDecoder::DecodeRecordCompat(1, VersionUpgradePolicy::UpgradeToV2, true, &compat_buffer, Record{&orig.rec.hd}); @@ -182,7 +181,7 @@ TEST_F(DbnDecoderTests, TestUpgradeSymbolMappingWithTsOut) { // `length` properly set EXPECT_EQ(upgraded.rec.hd.Size(), sizeof(upgraded)); // used compat buffer - EXPECT_EQ(reinterpret_cast(&upgraded), + EXPECT_EQ(reinterpret_cast(&upgraded), compat_buffer.data()); } @@ -204,7 +203,7 @@ TEST_F(DbnDecoderTests, TestUpgradeMbp1WithTsOut) { {}, {}}, {std::chrono::system_clock::now()}}; - std::array compat_buffer{}; + std::array compat_buffer{}; const auto res = DbnDecoder::DecodeRecordCompat(1, VersionUpgradePolicy::UpgradeToV2, true, &compat_buffer, Record{&orig.rec.hd}); diff --git a/tests/src/file_stream_tests.cpp b/tests/src/file_stream_tests.cpp index 9558fdf..d50e9c0 100644 --- a/tests/src/file_stream_tests.cpp +++ b/tests/src/file_stream_tests.cpp @@ -1,6 +1,6 @@ #include -#include +#include #include "databento/exceptions.hpp" #include "databento/file_stream.hpp" @@ -11,7 +11,7 @@ TEST(InFileStreamTests, TestReadExactInsufficient) { const std::string file_path = TEST_BUILD_DIR "/data/test_data.ohlcv-1d.v1.dbn"; InFileStream target{file_path}; - std::vector buffer(1024); // File is less than 1KiB + std::vector buffer(1024); // File is less than 1KiB try { target.ReadExact(buffer.data(), buffer.size()); FAIL() << "Expected throw"; @@ -25,11 +25,12 @@ TEST(InFileStreamTests, TestReadSomeLessThanMax) { const std::string file_path = TEST_BUILD_DIR "/data/test_data.ohlcv-1d.v1.dbn"; InFileStream target{file_path}; - std::vector buffer(1024); // File is less than 1KiB + std::vector buffer(1024); // File is less than 1KiB const auto read_size = target.ReadSome(buffer.data(), buffer.size()); ASSERT_GT(read_size, 0); - ASSERT_TRUE(std::any_of(buffer.cbegin(), buffer.cend(), - [](std::uint8_t byte) { return byte != 0; })); + ASSERT_TRUE(std::any_of(buffer.cbegin(), buffer.cend(), [](std::byte byte) { + return std::to_integer(byte) != 0; + })); } TEST(OutFileStreamTests, TestWriteAllCanBeRead) { @@ -38,12 +39,12 @@ TEST(OutFileStreamTests, TestWriteAllCanBeRead) { ASSERT_FALSE(temp_file.Exists()); { OutFileStream target{temp_file.Path()}; - target.WriteAll(reinterpret_cast(data), 8); + target.WriteAll(reinterpret_cast(data), 8); } ASSERT_TRUE(temp_file.Exists()); InFileStream input{temp_file.Path()}; std::vector buf(9); - input.ReadExact(reinterpret_cast(buf.data()), 8); + input.ReadExact(reinterpret_cast(buf.data()), 8); ASSERT_STREQ(buf.data(), data); } } // namespace databento::tests diff --git a/tests/src/mock_io.cpp b/tests/src/mock_io.cpp index 6cb9cbd..2199a6e 100644 --- a/tests/src/mock_io.cpp +++ b/tests/src/mock_io.cpp @@ -6,11 +6,11 @@ using databento::tests::mock::MockIo; -void MockIo::WriteAll(const std::uint8_t* buffer, std::size_t length) { +void MockIo::WriteAll(const std::byte* buffer, std::size_t length) { contents_.insert(contents_.end(), buffer, buffer + length); } -void MockIo::ReadExact(std::uint8_t* buffer, std::size_t length) { +void MockIo::ReadExact(std::byte* buffer, std::size_t length) { const auto remaining_bytes = contents_.size() - read_idx_; if (remaining_bytes < length) { throw std::runtime_error{"Not enough bytes remaining: expected " + @@ -23,7 +23,7 @@ void MockIo::ReadExact(std::uint8_t* buffer, std::size_t length) { read_idx_ += s_length; } -std::size_t MockIo::ReadSome(std::uint8_t* buffer, std::size_t max_length) { +std::size_t MockIo::ReadSome(std::byte* buffer, std::size_t max_length) { auto read_size = static_cast( std::min(max_length, contents_.size() - read_idx_)); std::copy(contents_.cbegin() + read_idx_, diff --git a/tests/src/mock_lsg_server.cpp b/tests/src/mock_lsg_server.cpp index dabeff3..c497769 100644 --- a/tests/src/mock_lsg_server.cpp +++ b/tests/src/mock_lsg_server.cpp @@ -9,7 +9,6 @@ #include // SHA256_DIGEST_LENGTH #include -#include #include "databento/compat.hpp" #include "databento/constants.hpp" @@ -22,7 +21,7 @@ using databento::tests::mock::SocketStream; -void SocketStream::WriteAll(const std::uint8_t* buffer, std::size_t length) { +void SocketStream::WriteAll(const std::byte* buffer, std::size_t length) { // MSG_NOSIGNAL doesn't exist on Windows, but also isn't necessary #ifdef _WIN32 constexpr int MSG_NOSIGNAL = {}; @@ -136,6 +135,7 @@ void MockLsgServer::SubscribeWithSnapshot( EXPECT_NE(received.find(std::string{"stype_in="} + ToString(stype)), std::string::npos); EXPECT_EQ(received.find("start="), std::string::npos); + EXPECT_NE(received.find("id="), std::string::npos); EXPECT_NE(received.find("snapshot=1"), std::string::npos); } @@ -156,6 +156,7 @@ void MockLsgServer::Subscribe(const std::vector& symbols, } else { EXPECT_NE(received.find(std::string{"start="} + start), std::string::npos); } + EXPECT_NE(received.find("id="), std::string::npos); EXPECT_NE(received.find("snapshot=0"), std::string::npos); } diff --git a/tests/src/shared_channel_tests.cpp b/tests/src/shared_channel_tests.cpp index 7e46f61..e8d7390 100644 --- a/tests/src/shared_channel_tests.cpp +++ b/tests/src/shared_channel_tests.cpp @@ -2,7 +2,7 @@ #include #include -#include +#include #include #include #include @@ -16,7 +16,7 @@ class SharedChannelTests : public testing::Test { protected: void Write(const std::vector& inputs) { for (const auto& input : inputs) { - target_.Write(reinterpret_cast(input.data()), + target_.Write(reinterpret_cast(input.data()), input.size()); std::this_thread::sleep_for(std::chrono::milliseconds{10}); } @@ -30,7 +30,7 @@ class SharedChannelTests : public testing::Test { TEST_F(SharedChannelTests, TestReadExact) { write_thread_ = ScopedThread{ [this] { this->Write({"parse", "stream", "tests", "end"}); }}; - std::array buffer{}; + std::array buffer{}; target_.ReadExact(buffer.data(), 3); EXPECT_STREQ(reinterpret_cast(buffer.data()), "par"); target_.ReadExact(buffer.data(), 8); @@ -43,7 +43,7 @@ TEST_F(SharedChannelTests, TestReadExact) { TEST_F(SharedChannelTests, TestReadExactAfterFinished) { // write on same thread, so all reading happens after writing this->Write({"parse", "exact"}); - std::array buffer{}; + std::array buffer{}; target_.ReadExact(buffer.data(), 7); EXPECT_STREQ(reinterpret_cast(buffer.data()), "parseex"); // reset buffer @@ -53,12 +53,12 @@ TEST_F(SharedChannelTests, TestReadExactAfterFinished) { } TEST_F(SharedChannelTests, TestInterleavedReadsAndWrites) { - std::array buffer{}; - target_.Write(reinterpret_cast("hello"), 5); + std::array buffer{}; + target_.Write(reinterpret_cast("hello"), 5); ASSERT_EQ(target_.ReadSome(buffer.data(), buffer.size()), 5); EXPECT_STREQ(reinterpret_cast(buffer.data()), "hello"); buffer = {}; - target_.Write(reinterpret_cast("longer message"), 14); + target_.Write(reinterpret_cast("longer message"), 14); target_.Finish(); target_.ReadSome(buffer.data(), 6); target_.ReadSome(&buffer[6], 1); @@ -69,7 +69,7 @@ TEST_F(SharedChannelTests, TestInterleavedReadsAndWrites) { TEST_F(SharedChannelTests, TestReadSome) { write_thread_ = ScopedThread{ [this] { this->Write({"parse", "stream", "tests", "some", "last"}); }}; - std::array buffer{}; + std::array buffer{}; std::string res; // -1 to keep last null byte while (res.size() < 23) { diff --git a/tests/src/tcp_client_tests.cpp b/tests/src/tcp_client_tests.cpp index 8b97577..b74b1dc 100644 --- a/tests/src/tcp_client_tests.cpp +++ b/tests/src/tcp_client_tests.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -30,7 +31,7 @@ TEST_F(TcpClientTests, TestWriteAllString) { TEST_F(TcpClientTests, TestWriteAllCStr) { const std::string msg = "testing 1, 2, 3"; - target_.WriteAll(msg.c_str(), msg.length()); + target_.WriteAll(msg); ASSERT_EQ(mock_server_.AwaitReceived(), msg); } @@ -39,11 +40,11 @@ TEST_F(TcpClientTests, TestReadExact) { mock_server_.SetSend(kSendData); target_.WriteAll("start"); - std::array buffer{0}; + std::array buffer{}; ASSERT_EQ(buffer.size() - 1, kSendData.size()); target_.ReadExact(buffer.data(), buffer.size() - 1); - ASSERT_STREQ(buffer.data(), kSendData.c_str()); + ASSERT_STREQ(reinterpret_cast(buffer.data()), kSendData.c_str()); } TEST_F(TcpClientTests, TestFullReadSome) { @@ -52,11 +53,11 @@ TEST_F(TcpClientTests, TestFullReadSome) { // server does one write than reads target_.WriteAll("start"); - std::array buffer{0}; + std::array buffer{}; // - 1 to leave NUL byte const auto res = target_.ReadSome(buffer.data(), buffer.size() - 1); - EXPECT_STREQ(buffer.data(), kSendData.c_str()); + EXPECT_STREQ(reinterpret_cast(buffer.data()), kSendData.c_str()); EXPECT_EQ(res.status, detail::TcpClient::Status::Ok); EXPECT_EQ(res.read_size, kSendData.length()); EXPECT_EQ(res.read_size, buffer.size() - 1); @@ -68,10 +69,10 @@ TEST_F(TcpClientTests, TestPartialReadSome) { // server does one write than reads target_.WriteAll("start"); - std::array buffer{0}; + std::array buffer{}; const auto res = target_.ReadSome(buffer.data(), buffer.size()); - EXPECT_STREQ(buffer.data(), kSendData.c_str()); + EXPECT_STREQ(reinterpret_cast(buffer.data()), kSendData.c_str()); EXPECT_EQ(res.status, detail::TcpClient::Status::Ok); EXPECT_EQ(res.read_size, kSendData.length()); } @@ -80,7 +81,7 @@ TEST_F(TcpClientTests, TestReadSomeClose) { // server does one write than reads target_.WriteAll("start"); - std::array buffer{0}; + std::array buffer{}; const auto res = target_.ReadSome(buffer.data(), buffer.size()); EXPECT_EQ(res.status, detail::TcpClient::Status::Closed); EXPECT_EQ(res.read_size, 0); @@ -107,7 +108,7 @@ TEST_F(TcpClientTests, TestReadSomeTimeout) { }}; target_ = {"127.0.0.1", mock_server.Port()}; - std::array buffer{0}; + std::array buffer{}; const auto res = target_.ReadSome(buffer.data(), buffer.size(), std::chrono::milliseconds{5}); { @@ -128,7 +129,7 @@ TEST_F(TcpClientTests, TestReadCloseNoTimeout) { constexpr std::chrono::milliseconds kTimeout{5}; - std::array buffer{0}; + std::array buffer{}; const auto start = std::chrono::steady_clock::now(); // server closing the connection should cause `ReadSome` to return // immediately, not wait for the timeout @@ -145,7 +146,7 @@ TEST_F(TcpClientTests, ReadAfterClose) { // server does one write than reads target_.WriteAll("start"); - std::array buffer{0}; + std::array buffer{}; const auto res = target_.ReadSome(buffer.data(), buffer.size()); EXPECT_EQ(res.status, detail::TcpClient::Status::Ok); EXPECT_GT(res.read_size, 0); diff --git a/tests/src/zstd_stream_tests.cpp b/tests/src/zstd_stream_tests.cpp index 7695458..3f2d966 100644 --- a/tests/src/zstd_stream_tests.cpp +++ b/tests/src/zstd_stream_tests.cpp @@ -21,8 +21,7 @@ TEST(ZstdStreamTests, TestMultiFrameFiles) { std::make_unique(file_path)}; for (std::size_t i = 0; i < kRecordCount; ++i) { databento::InstrumentDefMsgV1 def_msg; - target.ReadExact(reinterpret_cast(&def_msg), - sizeof(def_msg)); + target.ReadExact(reinterpret_cast(&def_msg), sizeof(def_msg)); EXPECT_EQ(def_msg.hd.rtype, databento::rtype::InstrumentDef); } } @@ -37,11 +36,11 @@ TEST(ZstdStreamTests, TestIdentity) { { ZstdCompressStream compressor{&mock_io}; for (auto it = source_data.begin(); it != source_data.end(); it += 100) { - compressor.WriteAll(reinterpret_cast(&*it), + compressor.WriteAll(reinterpret_cast(&*it), 100 * sizeof(std::int64_t)); } } - std::vector res(size); + std::vector res(size); ZstdDecodeStream decode{ std::make_unique(std::move(mock_io)) diff --git a/vcpkg.json b/vcpkg.json index 7199325..76d51c5 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -2,10 +2,6 @@ "$schema": "https://raw.githubusercontent.com/microsoft/vcpkg-tool/main/docs/vcpkg.schema.json", "name": "databento", "dependencies": [ - { - "name": "dirent", - "platform": "windows" - }, "openssl", "zstd" ],