From 51ff73a4e3ba216eab4d8a6b1b189ec610ffc9aa Mon Sep 17 00:00:00 2001 From: Sofiane Soufi Date: Sat, 21 Feb 2026 20:18:15 -0500 Subject: [PATCH] feat: add Dynamic column support for Proton native read path --- README.md | 1 + timeplus/CMakeLists.txt | 3 + timeplus/client.cpp | 82 ++++- timeplus/columns/dynamic.cpp | 610 +++++++++++++++++++++++++++++++++ timeplus/columns/dynamic.h | 75 ++++ timeplus/columns/factory.cpp | 27 ++ timeplus/columns/itemview.cpp | 1 + timeplus/types/type_parser.cpp | 1 + timeplus/types/types.cpp | 28 +- timeplus/types/types.h | 21 +- ut/CMakeLists.txt | 1 + ut/CreateColumnByType_ut.cpp | 26 +- ut/column_dynamic_ut.cpp | 271 +++++++++++++++ ut/itemview_ut.cpp | 2 +- ut/roundtrip_tests.cpp | 70 ++++ ut/type_parser_ut.cpp | 27 ++ ut/types_ut.cpp | 12 +- 17 files changed, 1234 insertions(+), 24 deletions(-) create mode 100644 timeplus/columns/dynamic.cpp create mode 100644 timeplus/columns/dynamic.h create mode 100644 ut/column_dynamic_ut.cpp diff --git a/README.md b/README.md index b7619e0..27cf5ac 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ You can run DDL, streaming queries, or data ingestion with this C++ client. Both * datetime, datetime64 * datetime([timezone]), datetime64(N, [timezone]) * decimal32, decimal64, decimal128, decimal256 +* dynamic, dynamic(max_types=N) * enum8, enum16 * fixed_string(N) * float32, float64 diff --git a/timeplus/CMakeLists.txt b/timeplus/CMakeLists.txt index fa5a608..e95d2d2 100644 --- a/timeplus/CMakeLists.txt +++ b/timeplus/CMakeLists.txt @@ -11,6 +11,7 @@ SET ( timeplus-cpp-lib-src columns/column.cpp columns/date.cpp columns/decimal.cpp + columns/dynamic.cpp columns/enum.cpp columns/factory.cpp columns/geo.cpp @@ -56,6 +57,7 @@ SET ( timeplus-cpp-lib-src columns/column.h columns/date.h columns/decimal.h + columns/dynamic.h columns/enum.h columns/factory.h columns/geo.h @@ -199,6 +201,7 @@ INSTALL(FILES columns/array.h DESTINATION include/timeplus/columns/) INSTALL(FILES columns/column.h DESTINATION include/timeplus/columns/) INSTALL(FILES columns/date.h DESTINATION include/timeplus/columns/) INSTALL(FILES columns/decimal.h DESTINATION include/timeplus/columns/) +INSTALL(FILES columns/dynamic.h DESTINATION include/timeplus/columns/) INSTALL(FILES columns/enum.h DESTINATION include/timeplus/columns/) INSTALL(FILES columns/factory.h DESTINATION include/timeplus/columns/) INSTALL(FILES columns/geo.h DESTINATION include/timeplus/columns/) diff --git a/timeplus/client.cpp b/timeplus/client.cpp index 4522502..f926b34 100644 --- a/timeplus/client.cpp +++ b/timeplus/client.cpp @@ -609,6 +609,31 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { return true; } + case ServerCodes::Totals: + case ServerCodes::Extremes: { + // These packets carry an additional block payload. + // Parse and discard it to keep protocol stream aligned. + if constexpr (DMBS_PROTOCOL_REVISION >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { + if (!WireFormat::SkipString(*input_)) { + return false; + } + } + + Block ignored_block; + if (compression_ == CompressionState::Enable) { + CompressedInput compressed(input_.get()); + if (!ReadBlock(compressed, &ignored_block)) { + return false; + } + } else { + if (!ReadBlock(*input_, &ignored_block)) { + return false; + } + } + + return true; + } + default: throw UnimplementedError("unimplemented " + std::to_string((int)packet_type)); break; @@ -618,24 +643,49 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { bool Client::Impl::ReadBlock(InputStream& input, Block* block) { // Additional information about block. if constexpr (DMBS_PROTOCOL_REVISION >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { - uint64_t num; BlockInfo info; + while (true) { + uint64_t field_num = 0; + if (!WireFormat::ReadUInt64(input, &field_num)) { + return false; + } - // BlockInfo - if (!WireFormat::ReadUInt64(input, &num)) { - return false; - } - if (!WireFormat::ReadFixed(input, &info.is_overflows)) { - return false; - } - if (!WireFormat::ReadUInt64(input, &num)) { - return false; - } - if (!WireFormat::ReadFixed(input, &info.bucket_num)) { - return false; - } - if (!WireFormat::ReadUInt64(input, &num)) { - return false; + if (field_num == 0) { + break; + } + + switch (field_num) { + case 1: { + if (!WireFormat::ReadFixed(input, &info.is_overflows)) { + return false; + } + break; + } + case 2: { + if (!WireFormat::ReadFixed(input, &info.bucket_num)) { + return false; + } + break; + } + // Proton internal block-info fields. + case 100: { + uint64_t ignored_flags = 0; + if (!WireFormat::ReadFixed(input, &ignored_flags)) { + return false; + } + break; + } + case 101: + case 102: { + int64_t ignored_internal = 0; + if (!WireFormat::ReadFixed(input, &ignored_internal)) { + return false; + } + break; + } + default: + throw ProtocolError("Unknown block info field number: " + std::to_string(field_num)); + } } block->SetInfo(std::move(info)); diff --git a/timeplus/columns/dynamic.cpp b/timeplus/columns/dynamic.cpp new file mode 100644 index 0000000..360f389 --- /dev/null +++ b/timeplus/columns/dynamic.cpp @@ -0,0 +1,610 @@ +#include "dynamic.h" + +#include "factory.h" +#include "string.h" + +#include "../base/wire_format.h" +#include "../exceptions.h" + +#include +#include +#include + +namespace timeplus { +namespace { + +constexpr uint8_t COMPACT_GRANULE_PLAIN = 0; +constexpr uint8_t COMPACT_GRANULE_SINGLE_DISCRIMINATOR = 1; + +} // namespace + +ColumnDynamic::ColumnDynamic(size_t max_dynamic_types) + : Column(Type::CreateDynamic(max_dynamic_types)) + , max_dynamic_types_(max_dynamic_types) + , structure_version_(STRUCTURE_VERSION_V1) + , discriminator_mode_(DISCRIMINATOR_MODE_BASIC) + , shared_variant_discriminator_(0) +{ + variant_type_names_.push_back(SHARED_VARIANT_TYPE_NAME); + variant_columns_.push_back(std::make_shared()); +} + +void ColumnDynamic::Append(ColumnRef column) { + auto other = column->As(); + if (!other) { + return; + } + + if (!IsCompatibleLayout(*other)) { + throw ValidationError("can't append dynamic columns with different layout"); + } + + if (other->Size() == 0) { + return; + } + + std::vector base_offsets(variant_columns_.size(), 0); + for (size_t i = 0; i < variant_columns_.size(); ++i) { + base_offsets[i] = variant_columns_[i]->Size(); + variant_columns_[i]->Append(other->variant_columns_[i]); + } + + discriminators_.reserve(discriminators_.size() + other->discriminators_.size()); + offsets_.reserve(offsets_.size() + other->offsets_.size()); + for (size_t i = 0; i < other->Size(); ++i) { + const auto discriminator = other->discriminators_[i]; + discriminators_.push_back(discriminator); + + if (discriminator == NULL_DISCRIMINATOR) { + offsets_.push_back(0); + } else { + offsets_.push_back(base_offsets[discriminator] + other->offsets_[i]); + } + } +} + +void ColumnDynamic::Reserve(size_t new_cap) { + discriminators_.reserve(new_cap); + offsets_.reserve(new_cap); + for (auto& variant_column : variant_columns_) { + variant_column->Reserve(new_cap); + } +} + +bool ColumnDynamic::LoadPrefix(InputStream* input, size_t rows) { + uint64_t structure_version = 0; + if (!WireFormat::ReadFixed(*input, &structure_version)) { + return false; + } + + if (structure_version != STRUCTURE_VERSION_V1 && structure_version != STRUCTURE_VERSION_V2) { + throw ProtocolError("Invalid dynamic structure serialization version: " + std::to_string(structure_version)); + } + + if (structure_version == STRUCTURE_VERSION_V1) { + uint64_t ignored = 0; + if (!WireFormat::ReadUInt64(*input, &ignored)) { + return false; + } + } + + uint64_t num_dynamic_types = 0; + if (!WireFormat::ReadUInt64(*input, &num_dynamic_types)) { + return false; + } + + if (num_dynamic_types > Type::MAX_DYNAMIC_TYPES_LIMIT) { + throw ProtocolError("Dynamic type count is out of range: " + std::to_string(num_dynamic_types)); + } + + struct VariantEntry { + std::string type_name; + ColumnRef column; + }; + + std::vector variants; + variants.reserve(static_cast(num_dynamic_types) + 1); + + for (size_t i = 0; i < static_cast(num_dynamic_types); ++i) { + std::string type_name; + if (!WireFormat::ReadString(*input, &type_name)) { + return false; + } + + if (type_name == SHARED_VARIANT_TYPE_NAME) { + throw ProtocolError("Unexpected dynamic variant type name: " + type_name); + } + + auto variant_column = CreateColumnByType(type_name); + if (!variant_column) { + throw UnimplementedError("unsupported dynamic variant type: " + type_name); + } + + variants.push_back({type_name, variant_column}); + } + + variants.push_back({SHARED_VARIANT_TYPE_NAME, std::make_shared()}); + + // Proton serializes Dynamic discriminators in Variant global order, where + // variants are sorted lexicographically by type name. + std::sort(variants.begin(), variants.end(), [](const VariantEntry& lhs, const VariantEntry& rhs) { + return lhs.type_name < rhs.type_name; + }); + + std::vector variant_type_names; + std::vector variant_columns; + variant_type_names.reserve(variants.size()); + variant_columns.reserve(variants.size()); + + size_t shared_variant_discriminator = std::numeric_limits::max(); + for (size_t i = 0; i < variants.size(); ++i) { + variant_type_names.push_back(std::move(variants[i].type_name)); + variant_columns.push_back(std::move(variants[i].column)); + if (variant_type_names.back() == SHARED_VARIANT_TYPE_NAME) { + shared_variant_discriminator = i; + } + } + + uint64_t discriminator_mode = 0; + if (!WireFormat::ReadFixed(*input, &discriminator_mode)) { + return false; + } + + if (discriminator_mode != DISCRIMINATOR_MODE_BASIC + && discriminator_mode != DISCRIMINATOR_MODE_COMPACT) { + throw ProtocolError("Invalid dynamic discriminator serialization mode: " + std::to_string(discriminator_mode)); + } + + if (shared_variant_discriminator == std::numeric_limits::max()) { + throw ProtocolError("Dynamic shared variant is missing in serialization"); + } + + if (!variant_columns[shared_variant_discriminator]->As()) { + throw ProtocolError("Dynamic shared variant must use string serialization"); + } + + for (auto& variant_column : variant_columns) { + if (!variant_column->LoadPrefix(input, rows)) { + return false; + } + } + + structure_version_ = structure_version; + discriminator_mode_ = discriminator_mode; + shared_variant_discriminator_ = shared_variant_discriminator; + variant_type_names_ = std::move(variant_type_names); + variant_columns_ = std::move(variant_columns); + + discriminators_.clear(); + offsets_.clear(); + return true; +} + +bool ColumnDynamic::LoadBody(InputStream* input, size_t rows) { + ValidateSerializedState(); + + std::vector new_discriminators; + if (!ReadDiscriminators(input, rows, &new_discriminators)) { + return false; + } + + std::vector variant_sizes(variant_columns_.size(), 0); + std::vector new_offsets(rows, 0); + + for (size_t row = 0; row < rows; ++row) { + const auto discriminator = new_discriminators[row]; + if (discriminator == NULL_DISCRIMINATOR) { + continue; + } + + if (discriminator >= variant_columns_.size()) { + throw ProtocolError("Invalid dynamic discriminator: " + std::to_string(discriminator)); + } + + new_offsets[row] = variant_sizes[discriminator]; + ++variant_sizes[discriminator]; + } + + for (auto& variant_column : variant_columns_) { + variant_column->Clear(); + } + + for (size_t i = 0; i < variant_columns_.size(); ++i) { + if (variant_sizes[i] == 0) { + continue; + } + + if (!variant_columns_[i]->LoadBody(input, variant_sizes[i])) { + return false; + } + } + + discriminators_ = std::move(new_discriminators); + offsets_ = std::move(new_offsets); + return true; +} + +void ColumnDynamic::SavePrefix(OutputStream* output) { + ValidateSerializedState(); + + const auto num_dynamic_types = static_cast(variant_columns_.size() - 1); + + // Use V1 for better compatibility with old server/client revisions. + WireFormat::WriteFixed(*output, static_cast(STRUCTURE_VERSION_V1)); + WireFormat::WriteUInt64(*output, num_dynamic_types); + WireFormat::WriteUInt64(*output, num_dynamic_types); + + for (size_t i = 0; i < variant_type_names_.size(); ++i) { + if (i == shared_variant_discriminator_) { + continue; + } + + WireFormat::WriteString(*output, variant_type_names_[i]); + } + + WireFormat::WriteFixed(*output, discriminator_mode_); + + for (auto& variant_column : variant_columns_) { + variant_column->SavePrefix(output); + } +} + +void ColumnDynamic::SaveBody(OutputStream* output) { + ValidateSerializedState(); + + std::vector variant_sizes(variant_columns_.size(), 0); + for (size_t row = 0; row < Size(); ++row) { + const auto discriminator = discriminators_[row]; + if (discriminator == NULL_DISCRIMINATOR) { + continue; + } + + if (discriminator >= variant_columns_.size()) { + throw ValidationError("Invalid dynamic discriminator value: " + std::to_string(discriminator)); + } + + if (offsets_[row] != variant_sizes[discriminator]) { + throw ValidationError("Invalid dynamic row offset for discriminator: " + std::to_string(discriminator)); + } + + ++variant_sizes[discriminator]; + } + + for (size_t i = 0; i < variant_columns_.size(); ++i) { + if (variant_columns_[i]->Size() != variant_sizes[i]) { + throw ValidationError("Invalid dynamic variant size for type: " + variant_type_names_[i]); + } + } + + WriteDiscriminators(output); + + for (size_t i = 0; i < variant_columns_.size(); ++i) { + if (variant_sizes[i] == 0) { + continue; + } + + variant_columns_[i]->SaveBody(output); + } +} + +void ColumnDynamic::Clear() { + discriminators_.clear(); + offsets_.clear(); + for (auto& variant_column : variant_columns_) { + variant_column->Clear(); + } +} + +size_t ColumnDynamic::Size() const { + return discriminators_.size(); +} + +ColumnRef ColumnDynamic::Slice(size_t begin, size_t len) const { + if (len && begin + len > Size()) { + throw ValidationError("Slice indexes are out of bounds"); + } + + auto result = CloneStructureEmpty(); + if (len == 0) { + return result; + } + + result->discriminators_.reserve(len); + result->offsets_.reserve(len); + + std::vector variant_sizes(variant_columns_.size(), 0); + std::vector first_offsets(variant_columns_.size(), std::numeric_limits::max()); + + for (size_t row = begin; row < begin + len; ++row) { + const auto discriminator = discriminators_[row]; + + result->discriminators_.push_back(discriminator); + + if (discriminator == NULL_DISCRIMINATOR) { + result->offsets_.push_back(0); + continue; + } + + if (discriminator >= variant_columns_.size()) { + throw ValidationError("Invalid dynamic discriminator value: " + std::to_string(discriminator)); + } + + if (first_offsets[discriminator] == std::numeric_limits::max()) { + first_offsets[discriminator] = offsets_[row]; + } + + result->offsets_.push_back(variant_sizes[discriminator]); + ++variant_sizes[discriminator]; + } + + for (size_t i = 0; i < variant_columns_.size(); ++i) { + if (variant_sizes[i] == 0) { + continue; + } + + result->variant_columns_[i]->Append(variant_columns_[i]->Slice(first_offsets[i], variant_sizes[i])); + } + + return result; +} + +ColumnRef ColumnDynamic::CloneEmpty() const { + return CloneStructureEmpty(); +} + +void ColumnDynamic::Swap(Column& other) { + auto& col = dynamic_cast(other); + if (!IsCompatibleLayout(col)) { + throw ValidationError("can't swap dynamic columns with different layout"); + } + + discriminators_.swap(col.discriminators_); + offsets_.swap(col.offsets_); + variant_columns_.swap(col.variant_columns_); + std::swap(shared_variant_discriminator_, col.shared_variant_discriminator_); +} + +uint8_t ColumnDynamic::GetDiscriminator(size_t row) const { + if (row >= Size()) { + throw ValidationError("Index is out of bounds: " + std::to_string(row)); + } + + return discriminators_[row]; +} + +size_t ColumnDynamic::GetVariantOffset(size_t row) const { + if (row >= Size()) { + throw ValidationError("Index is out of bounds: " + std::to_string(row)); + } + + return offsets_[row]; +} + +bool ColumnDynamic::IsNull(size_t row) const { + return GetDiscriminator(row) == NULL_DISCRIMINATOR; +} + +bool ColumnDynamic::IsSharedVariant(size_t row) const { + return !IsNull(row) && GetDiscriminator(row) == GetSharedVariantDiscriminator(); +} + +size_t ColumnDynamic::GetSharedVariantDiscriminator() const { + if (variant_columns_.empty() || shared_variant_discriminator_ >= variant_columns_.size()) { + throw ValidationError("Dynamic column has no variant columns"); + } + + return shared_variant_discriminator_; +} + +const std::vector& ColumnDynamic::GetVariantTypeNames() const { + return variant_type_names_; +} + +ColumnRef ColumnDynamic::GetVariantColumn(size_t discriminator) const { + if (discriminator >= variant_columns_.size()) { + throw ValidationError("Dynamic variant index is out of bounds: " + std::to_string(discriminator)); + } + + return variant_columns_[discriminator]; +} + +bool ColumnDynamic::IsCompatibleLayout(const ColumnDynamic& other) const { + if (!Type()->IsEqual(other.Type())) { + return false; + } + + if (structure_version_ != other.structure_version_ || discriminator_mode_ != other.discriminator_mode_) { + return false; + } + + if (variant_type_names_ != other.variant_type_names_) { + return false; + } + + if (shared_variant_discriminator_ != other.shared_variant_discriminator_) { + return false; + } + + if (variant_columns_.size() != other.variant_columns_.size()) { + return false; + } + + for (size_t i = 0; i < variant_columns_.size(); ++i) { + if (!variant_columns_[i]->Type()->IsEqual(other.variant_columns_[i]->Type())) { + return false; + } + } + + return true; +} + +std::shared_ptr ColumnDynamic::CloneStructureEmpty() const { + auto result = std::make_shared(max_dynamic_types_); + + result->structure_version_ = structure_version_; + result->discriminator_mode_ = discriminator_mode_; + result->shared_variant_discriminator_ = shared_variant_discriminator_; + result->variant_type_names_ = variant_type_names_; + + result->variant_columns_.clear(); + result->variant_columns_.reserve(variant_columns_.size()); + for (const auto& variant_column : variant_columns_) { + result->variant_columns_.push_back(variant_column->CloneEmpty()); + } + + return result; +} + +bool ColumnDynamic::ReadDiscriminators(InputStream* input, size_t rows, std::vector* discriminators) const { + switch (discriminator_mode_) { + case DISCRIMINATOR_MODE_BASIC: + return ReadDiscriminatorsBasic(input, rows, discriminators); + case DISCRIMINATOR_MODE_COMPACT: + return ReadDiscriminatorsCompact(input, rows, discriminators); + default: + throw ProtocolError("Invalid dynamic discriminator serialization mode: " + std::to_string(discriminator_mode_)); + } +} + +bool ColumnDynamic::ReadDiscriminatorsBasic(InputStream* input, size_t rows, std::vector* discriminators) { + discriminators->assign(rows, 0); + + for (size_t i = 0; i < rows; ++i) { + if (!WireFormat::ReadFixed(*input, &(*discriminators)[i])) { + return false; + } + } + + return true; +} + +bool ColumnDynamic::ReadDiscriminatorsCompact(InputStream* input, size_t rows, std::vector* discriminators) { + discriminators->clear(); + discriminators->reserve(rows); + + size_t read_rows = 0; + while (read_rows < rows) { + uint64_t granule_rows = 0; + if (!WireFormat::ReadUInt64(*input, &granule_rows)) { + return false; + } + + if (granule_rows == 0) { + throw ProtocolError("Invalid compact discriminator granule size: 0"); + } + + if (read_rows + granule_rows > rows) { + throw ProtocolError("Invalid compact discriminator granule size: " + std::to_string(granule_rows)); + } + + uint8_t granule_format = 0; + if (!WireFormat::ReadFixed(*input, &granule_format)) { + return false; + } + + if (granule_format == COMPACT_GRANULE_PLAIN) { + for (size_t i = 0; i < static_cast(granule_rows); ++i) { + uint8_t discriminator = 0; + if (!WireFormat::ReadFixed(*input, &discriminator)) { + return false; + } + discriminators->push_back(discriminator); + } + } else if (granule_format == COMPACT_GRANULE_SINGLE_DISCRIMINATOR) { + uint8_t discriminator = 0; + if (!WireFormat::ReadFixed(*input, &discriminator)) { + return false; + } + + discriminators->insert( + discriminators->end(), + static_cast(granule_rows), + discriminator); + } else { + throw ProtocolError("Invalid compact discriminator granule format: " + std::to_string(granule_format)); + } + + read_rows += static_cast(granule_rows); + } + + return true; +} + +void ColumnDynamic::WriteDiscriminators(OutputStream* output) const { + switch (discriminator_mode_) { + case DISCRIMINATOR_MODE_BASIC: + WriteDiscriminatorsBasic(output); + return; + case DISCRIMINATOR_MODE_COMPACT: + WriteDiscriminatorsCompact(output); + return; + default: + throw ValidationError("Invalid dynamic discriminator serialization mode: " + std::to_string(discriminator_mode_)); + } +} + +void ColumnDynamic::WriteDiscriminatorsBasic(OutputStream* output) const { + for (const auto discriminator : discriminators_) { + WireFormat::WriteFixed(*output, discriminator); + } +} + +void ColumnDynamic::WriteDiscriminatorsCompact(OutputStream* output) const { + if (discriminators_.empty()) { + return; + } + + WireFormat::WriteUInt64(*output, discriminators_.size()); + + const auto first_discriminator = discriminators_[0]; + bool same_discriminator = true; + for (const auto discriminator : discriminators_) { + if (discriminator != first_discriminator) { + same_discriminator = false; + break; + } + } + + if (same_discriminator) { + WireFormat::WriteFixed(*output, static_cast(COMPACT_GRANULE_SINGLE_DISCRIMINATOR)); + WireFormat::WriteFixed(*output, first_discriminator); + return; + } + + WireFormat::WriteFixed(*output, static_cast(COMPACT_GRANULE_PLAIN)); + for (const auto discriminator : discriminators_) { + WireFormat::WriteFixed(*output, discriminator); + } +} + +void ColumnDynamic::ValidateSerializedState() const { + if (max_dynamic_types_ > Type::MAX_DYNAMIC_TYPES_LIMIT) { + throw ValidationError("dynamic max_types is out of range"); + } + + if (variant_type_names_.size() != variant_columns_.size()) { + throw ValidationError("dynamic variant names and columns count mismatch"); + } + + if (variant_columns_.empty()) { + throw ValidationError("dynamic column has no variants"); + } + + if (shared_variant_discriminator_ >= variant_type_names_.size()) { + throw ValidationError("dynamic shared variant index is out of bounds"); + } + + if (variant_type_names_[shared_variant_discriminator_] != SHARED_VARIANT_TYPE_NAME) { + throw ValidationError("dynamic shared variant is missing"); + } + + if (!variant_columns_[shared_variant_discriminator_]->As()) { + throw ValidationError("dynamic shared variant must be string column"); + } + + if (discriminators_.size() != offsets_.size()) { + throw ValidationError("dynamic discriminators and offsets size mismatch"); + } +} + +} // namespace timeplus diff --git a/timeplus/columns/dynamic.h b/timeplus/columns/dynamic.h new file mode 100644 index 0000000..af02d24 --- /dev/null +++ b/timeplus/columns/dynamic.h @@ -0,0 +1,75 @@ +#pragma once + +#include "column.h" + +#include +#include +#include +#include + +namespace timeplus { + +class ColumnDynamic : public Column { +public: + static constexpr uint8_t NULL_DISCRIMINATOR = 0xFF; + static constexpr uint64_t STRUCTURE_VERSION_V1 = 1; + static constexpr uint64_t STRUCTURE_VERSION_V2 = 2; + static constexpr uint64_t DISCRIMINATOR_MODE_BASIC = 0; + static constexpr uint64_t DISCRIMINATOR_MODE_COMPACT = 1; + + explicit ColumnDynamic(size_t max_dynamic_types = Type::DEFAULT_DYNAMIC_MAX_TYPES); + + void Append(ColumnRef column) override; + void Reserve(size_t new_cap) override; + + bool LoadPrefix(InputStream* input, size_t rows) override; + bool LoadBody(InputStream* input, size_t rows) override; + + void SavePrefix(OutputStream* output) override; + void SaveBody(OutputStream* output) override; + + void Clear() override; + size_t Size() const override; + + ColumnRef Slice(size_t begin, size_t len) const override; + ColumnRef CloneEmpty() const override; + void Swap(Column& other) override; + + uint8_t GetDiscriminator(size_t row) const; + size_t GetVariantOffset(size_t row) const; + bool IsNull(size_t row) const; + bool IsSharedVariant(size_t row) const; + size_t GetSharedVariantDiscriminator() const; + + const std::vector& GetVariantTypeNames() const; + ColumnRef GetVariantColumn(size_t discriminator) const; + +private: + static constexpr const char* SHARED_VARIANT_TYPE_NAME = "shared_variant"; + + bool IsCompatibleLayout(const ColumnDynamic& other) const; + std::shared_ptr CloneStructureEmpty() const; + + bool ReadDiscriminators(InputStream* input, size_t rows, std::vector* discriminators) const; + static bool ReadDiscriminatorsBasic(InputStream* input, size_t rows, std::vector* discriminators); + static bool ReadDiscriminatorsCompact(InputStream* input, size_t rows, std::vector* discriminators); + + void WriteDiscriminators(OutputStream* output) const; + void WriteDiscriminatorsBasic(OutputStream* output) const; + void WriteDiscriminatorsCompact(OutputStream* output) const; + + void ValidateSerializedState() const; + + size_t max_dynamic_types_; + uint64_t structure_version_; + uint64_t discriminator_mode_; + size_t shared_variant_discriminator_; + + std::vector variant_type_names_; + std::vector variant_columns_; + + std::vector discriminators_; + std::vector offsets_; +}; + +} // namespace timeplus diff --git a/timeplus/columns/factory.cpp b/timeplus/columns/factory.cpp index f0a8f1c..ecfa5f6 100644 --- a/timeplus/columns/factory.cpp +++ b/timeplus/columns/factory.cpp @@ -3,6 +3,7 @@ #include "array.h" #include "date.h" #include "decimal.h" +#include "dynamic.h" #include "enum.h" #include "geo.h" #include "ip4.h" @@ -43,6 +44,29 @@ const auto& GetASTChildElement(const TypeAst & ast, int position) { return ast.elements[static_cast(position)]; } +size_t ParseDynamicMaxTypes(const TypeAst& ast) { + if (ast.elements.empty()) { + return Type::DEFAULT_DYNAMIC_MAX_TYPES; + } + + if (ast.elements.size() != 2) { + throw ValidationError(ast.name + " content is not correct"); + } + + const auto& key = GetASTChildElement(ast, 0); + const auto& value = GetASTChildElement(ast, 1); + + if (key.meta != TypeAst::Terminal || key.name != "max_types" || value.meta != TypeAst::Number) { + throw ValidationError(ast.name + " content is not correct"); + } + + if (value.value < 0 || value.value > static_cast(Type::MAX_DYNAMIC_TYPES_LIMIT)) { + throw ValidationError("dynamic max_types is out of range"); + } + + return static_cast(value.value); +} + static ColumnRef CreateTerminalColumn(const TypeAst& ast) { switch (ast.code) { case Type::Void: @@ -123,6 +147,9 @@ static ColumnRef CreateTerminalColumn(const TypeAst& ast) { case Type::UUID: return std::make_shared(); + case Type::Dynamic: + return std::make_shared(ParseDynamicMaxTypes(ast)); + case Type::Point: return std::make_shared(); diff --git a/timeplus/columns/itemview.cpp b/timeplus/columns/itemview.cpp index f932dca..78108c6 100644 --- a/timeplus/columns/itemview.cpp +++ b/timeplus/columns/itemview.cpp @@ -78,6 +78,7 @@ void ItemView::ValidateData(Type::Code type, DataType data) { case Type::Code::Tuple: case Type::Code::LowCardinality: case Type::Code::Map: + case Type::Code::Dynamic: throw AssertionError("Unsupported type in ItemView: " + std::string(Type::TypeName(type))); case Type::Code::IPv6: diff --git a/timeplus/types/type_parser.cpp b/timeplus/types/type_parser.cpp index c177e89..4f5a726 100644 --- a/timeplus/types/type_parser.cpp +++ b/timeplus/types/type_parser.cpp @@ -64,6 +64,7 @@ static const std::unordered_map kTypeCode = { { "decimal256", Type::Decimal256 }, { "low_cardinality", Type::LowCardinality }, { "map", Type::Map }, + { "dynamic", Type::Dynamic }, { "point", Type::Point }, { "ring", Type::Ring }, { "polygon", Type::Polygon }, diff --git a/timeplus/types/types.cpp b/timeplus/types/types.cpp index f3c3b7d..374b82f 100644 --- a/timeplus/types/types.cpp +++ b/timeplus/types/types.cpp @@ -48,6 +48,7 @@ const char* Type::TypeName(Type::Code code) { case Type::Code::DateTime64: return "datetime64"; case Type::Code::Date32: return "date32"; case Type::Code::Map: return "map"; + case Type::Code::Dynamic: return "dynamic"; case Type::Code::Point: return "point"; case Type::Code::Ring: return "ring"; case Type::Code::Polygon: return "polygon"; @@ -113,6 +114,8 @@ std::string Type::GetName() const { return As()->GetName(); case Map: return As()->GetName(); + case Dynamic: + return As()->GetName(); } // XXX: NOT REACHED! @@ -166,7 +169,8 @@ uint64_t Type::GetTypeUniqueId() const { case Decimal128: case Decimal256: case LowCardinality: - case Map: { + case Map: + case Dynamic: { // For complex types, exact unique ID depends on nested types and/or parameters, // the easiest way is to lazy-compute unique ID from name once. // Here we do not care if multiple threads are computing value simultaneosly since it is both: @@ -257,6 +261,10 @@ TypeRef Type::CreateMap(TypeRef key_type, TypeRef value_type) { return std::make_shared(key_type, value_type); } +TypeRef Type::CreateDynamic(size_t max_dynamic_types) { + return std::make_shared(max_dynamic_types); +} + TypeRef Type::CreatePoint() { return TypeRef(new Type(Type::Point)); } @@ -465,4 +473,22 @@ std::string MapType::GetName() const { return std::string("map(") + key_type_->GetName() + ", " +value_type_->GetName() + ")"; } +/// class DynamicType +DynamicType::DynamicType(size_t max_dynamic_types) + : Type(Dynamic) + , max_dynamic_types_(max_dynamic_types) +{ + if (max_dynamic_types_ > Type::MAX_DYNAMIC_TYPES_LIMIT) { + throw ValidationError("dynamic max_types is > " + std::to_string(Type::MAX_DYNAMIC_TYPES_LIMIT)); + } +} + +std::string DynamicType::GetName() const { + if (max_dynamic_types_ == Type::DEFAULT_DYNAMIC_MAX_TYPES) { + return "dynamic"; + } + + return "dynamic(max_types=" + std::to_string(max_dynamic_types_) + ")"; +} + } // namespace timeplus diff --git a/timeplus/types/types.h b/timeplus/types/types.h index cd5733c..8da56bb 100644 --- a/timeplus/types/types.h +++ b/timeplus/types/types.h @@ -64,7 +64,8 @@ class Type { UInt128, Int256, UInt256, - Decimal256 + Decimal256, + Dynamic }; using EnumItem = std::pair; @@ -102,6 +103,10 @@ class Type { /// Simple name, doesn't depend on parameters and\or nested types, caller MUST NOT free returned value. static const char* TypeName(Code); +public: + static constexpr size_t DEFAULT_DYNAMIC_MAX_TYPES = 32; + static constexpr size_t MAX_DYNAMIC_TYPES_LIMIT = 254; + public: static TypeRef CreateArray(TypeRef item_type); @@ -142,6 +147,8 @@ class Type { static TypeRef CreateMap(TypeRef key_type, TypeRef value_type); + static TypeRef CreateDynamic(size_t max_dynamic_types = DEFAULT_DYNAMIC_MAX_TYPES); + static TypeRef CreatePoint(); static TypeRef CreateRing(); @@ -324,6 +331,18 @@ class MapType : public Type { TypeRef value_type_; }; +class DynamicType : public Type { +public: + explicit DynamicType(size_t max_dynamic_types); + + std::string GetName() const; + + inline size_t GetMaxDynamicTypes() const { return max_dynamic_types_; } + +private: + const size_t max_dynamic_types_; +}; + template <> inline TypeRef Type::CreateSimple() { return TypeRef(new Type(Int8)); diff --git a/ut/CMakeLists.txt b/ut/CMakeLists.txt index b638ecc..c67972f 100644 --- a/ut/CMakeLists.txt +++ b/ut/CMakeLists.txt @@ -21,6 +21,7 @@ SET ( timeplus-cpp-ut-src connection_failed_client_test.cpp array_of_low_cardinality_tests.cpp CreateColumnByType_ut.cpp + column_dynamic_ut.cpp Column_ut.cpp roundtrip_column.cpp roundtrip_tests.cpp diff --git a/ut/CreateColumnByType_ut.cpp b/ut/CreateColumnByType_ut.cpp index 92ca577..2978249 100644 --- a/ut/CreateColumnByType_ut.cpp +++ b/ut/CreateColumnByType_ut.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -55,6 +56,28 @@ TEST(CreateColumnByType, AggregateFunction) { EXPECT_EQ(nullptr, CreateColumnByType("aggregate_function(argMax, fixed_string(10), datetime64(3, 'UTC'))")); } +TEST(CreateColumnByType, Dynamic) { + auto dynamic_default = CreateColumnByType("dynamic"); + ASSERT_NE(nullptr, dynamic_default); + ASSERT_EQ(Type::Dynamic, dynamic_default->GetType().GetCode()); + ASSERT_EQ("dynamic", dynamic_default->GetType().GetName()); + ASSERT_NE(nullptr, dynamic_default->As()); + + auto dynamic_max_types = CreateColumnByType("dynamic(max_types=5)"); + ASSERT_NE(nullptr, dynamic_max_types); + ASSERT_EQ(Type::Dynamic, dynamic_max_types->GetType().GetCode()); + ASSERT_EQ("dynamic(max_types=5)", dynamic_max_types->GetType().GetName()); + ASSERT_NE(nullptr, dynamic_max_types->As()); +} + +TEST(CreateColumnByType, DynamicInvalidArguments) { + EXPECT_THROW(CreateColumnByType("dynamic(max_types=255)"), ValidationError); + EXPECT_THROW(CreateColumnByType("dynamic(max_types=-1)"), ValidationError); + EXPECT_THROW(CreateColumnByType("dynamic(max_types='5')"), ValidationError); + EXPECT_THROW(CreateColumnByType("dynamic(max_type=5)"), ValidationError); + EXPECT_THROW(CreateColumnByType("dynamic(max_types=5, foo=1)"), ValidationError); +} + class CreateColumnByTypeWithName : public ::testing::TestWithParam {}; @@ -75,13 +98,14 @@ TEST_P(CreateColumnByTypeWithName, CreateColumnByType) INSTANTIATE_TEST_SUITE_P(Basic, CreateColumnByTypeWithName, ::testing::Values( "int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64", - "string", "date", "datetime", + "string", "date", "datetime", "dynamic", "uuid", "int128" )); INSTANTIATE_TEST_SUITE_P(Parametrized, CreateColumnByTypeWithName, ::testing::Values( "fixed_string(0)", "fixed_string(10000)", "datetime('UTC')", "datetime64(3, 'UTC')", + "dynamic(max_types=5)", "decimal(9,3)", "decimal(18,3)", "enum8('ONE' = 1, 'TWO' = 2)", "enum16('ONE' = 1, 'TWO' = 2, 'THREE' = 3, 'FOUR' = 4)" diff --git a/ut/column_dynamic_ut.cpp b/ut/column_dynamic_ut.cpp new file mode 100644 index 0000000..809f25c --- /dev/null +++ b/ut/column_dynamic_ut.cpp @@ -0,0 +1,271 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace { + +using namespace timeplus; + +std::string BuildDynamicSerializedData( + uint64_t structure_version, + const std::vector& variant_type_names, + uint64_t discriminator_mode, + const std::vector& discriminators, + const std::vector>& variant_values) { + std::array buffer{}; + ArrayOutput output(buffer.data(), buffer.size()); + + const auto num_dynamic_types = static_cast(variant_type_names.size()); + + WireFormat::WriteFixed(output, structure_version); + if (structure_version == ColumnDynamic::STRUCTURE_VERSION_V1) { + WireFormat::WriteUInt64(output, num_dynamic_types); + } + + WireFormat::WriteUInt64(output, num_dynamic_types); + for (const auto& variant_type_name : variant_type_names) { + WireFormat::WriteString(output, variant_type_name); + } + + WireFormat::WriteFixed(output, discriminator_mode); + + std::vector source_variant_names = variant_type_names; + source_variant_names.push_back("shared_variant"); + EXPECT_EQ(source_variant_names.size(), variant_values.size()); + + std::vector sorted_indices(source_variant_names.size()); + for (size_t i = 0; i < sorted_indices.size(); ++i) { + sorted_indices[i] = i; + } + + std::sort(sorted_indices.begin(), sorted_indices.end(), [&](size_t lhs, size_t rhs) { + return source_variant_names[lhs] < source_variant_names[rhs]; + }); + + std::vector source_to_serialized_discriminator(source_variant_names.size(), 0); + for (size_t serialized_discriminator = 0; serialized_discriminator < sorted_indices.size(); ++serialized_discriminator) { + source_to_serialized_discriminator[sorted_indices[serialized_discriminator]] = + static_cast(serialized_discriminator); + } + + auto map_discriminator = [&](uint8_t discriminator) { + if (discriminator == ColumnDynamic::NULL_DISCRIMINATOR) { + return discriminator; + } + + if (discriminator >= source_to_serialized_discriminator.size()) { + return discriminator; + } + + return source_to_serialized_discriminator[discriminator]; + }; + + if (discriminator_mode == ColumnDynamic::DISCRIMINATOR_MODE_BASIC) { + for (const auto discriminator : discriminators) { + WireFormat::WriteFixed(output, map_discriminator(discriminator)); + } + } else { + WireFormat::WriteUInt64(output, discriminators.size()); + WireFormat::WriteFixed(output, static_cast(0)); + for (const auto discriminator : discriminators) { + WireFormat::WriteFixed(output, map_discriminator(discriminator)); + } + } + + for (const auto variant_index : sorted_indices) { + for (const auto& value : variant_values[variant_index]) { + WireFormat::WriteString(output, value); + } + } + + return std::string(buffer.data(), output.Size()); +} + +std::shared_ptr LoadDynamicColumn(const std::string& serialized_data, size_t rows, size_t max_types = 1) { + auto col = std::make_shared(max_types); + ArrayInput input(serialized_data.data(), serialized_data.size()); + EXPECT_TRUE(col->Load(&input, rows)); + return col; +} + +} // namespace + +TEST(ColumnDynamic, LoadV1Basic) { + const std::string shared_0 = std::string("\x2b\x03", 2); + const std::string shared_1 = std::string("\x2b\x04\xff", 3); + + const auto serialized = BuildDynamicSerializedData( + ColumnDynamic::STRUCTURE_VERSION_V1, + {"string"}, + ColumnDynamic::DISCRIMINATOR_MODE_BASIC, + {0, ColumnDynamic::NULL_DISCRIMINATOR, 1, 1}, + { + {"alpha"}, + {shared_0, shared_1}, + }); + + auto col = LoadDynamicColumn(serialized, 4); + + ASSERT_EQ(4u, col->Size()); + ASSERT_EQ(2u, col->GetVariantTypeNames().size()); + EXPECT_EQ("shared_variant", col->GetVariantTypeNames()[0]); + EXPECT_EQ("string", col->GetVariantTypeNames()[1]); + + const auto shared_discriminator = static_cast(col->GetSharedVariantDiscriminator()); + const auto string_discriminator = static_cast(shared_discriminator == 0 ? 1 : 0); + + EXPECT_EQ(string_discriminator, col->GetDiscriminator(0)); + EXPECT_EQ(ColumnDynamic::NULL_DISCRIMINATOR, col->GetDiscriminator(1)); + EXPECT_EQ(shared_discriminator, col->GetDiscriminator(2)); + EXPECT_EQ(shared_discriminator, col->GetDiscriminator(3)); + + EXPECT_FALSE(col->IsNull(0)); + EXPECT_TRUE(col->IsNull(1)); + EXPECT_TRUE(col->IsSharedVariant(2)); + EXPECT_TRUE(col->IsSharedVariant(3)); + + auto string_variant = col->GetVariantColumn(string_discriminator)->As(); + auto shared_variant = col->GetVariantColumn(shared_discriminator)->As(); + + ASSERT_NE(nullptr, string_variant); + ASSERT_NE(nullptr, shared_variant); + ASSERT_EQ(1u, string_variant->Size()); + ASSERT_EQ(2u, shared_variant->Size()); + EXPECT_EQ("alpha", string_variant->At(0)); + EXPECT_EQ(shared_0, shared_variant->At(0)); + EXPECT_EQ(shared_1, shared_variant->At(1)); +} + +TEST(ColumnDynamic, LoadV2Basic) { + const auto serialized = BuildDynamicSerializedData( + ColumnDynamic::STRUCTURE_VERSION_V2, + {"string"}, + ColumnDynamic::DISCRIMINATOR_MODE_BASIC, + {1, 0, ColumnDynamic::NULL_DISCRIMINATOR}, + { + {"from_variant"}, + {std::string("\x2b\x07\x01", 3)}, + }); + + auto col = LoadDynamicColumn(serialized, 3); + + ASSERT_EQ(3u, col->Size()); + const auto shared_discriminator = static_cast(col->GetSharedVariantDiscriminator()); + const auto string_discriminator = static_cast(shared_discriminator == 0 ? 1 : 0); + + EXPECT_EQ(shared_discriminator, col->GetDiscriminator(0)); + EXPECT_EQ(string_discriminator, col->GetDiscriminator(1)); + EXPECT_TRUE(col->IsSharedVariant(0)); + EXPECT_FALSE(col->IsSharedVariant(1)); + EXPECT_TRUE(col->IsNull(2)); +} + +TEST(ColumnDynamic, SaveAfterLoadPreservesBytes) { + const auto serialized = BuildDynamicSerializedData( + ColumnDynamic::STRUCTURE_VERSION_V1, + {"string"}, + ColumnDynamic::DISCRIMINATOR_MODE_BASIC, + {0, ColumnDynamic::NULL_DISCRIMINATOR, 1, 1}, + { + {"alpha"}, + { + std::string("\x2b\x03", 2), + std::string("\x2b\x04\xff", 3), + }, + }); + + auto col = LoadDynamicColumn(serialized, 4); + + std::array buffer{}; + ArrayOutput output(buffer.data(), buffer.size()); + col->Save(&output); + + const std::string roundtripped(buffer.data(), output.Size()); + EXPECT_EQ(serialized, roundtripped); +} + +TEST(ColumnDynamic, SliceAndAppendPreserveLayout) { + const auto serialized = BuildDynamicSerializedData( + ColumnDynamic::STRUCTURE_VERSION_V1, + {"string"}, + ColumnDynamic::DISCRIMINATOR_MODE_BASIC, + {0, ColumnDynamic::NULL_DISCRIMINATOR, 1, 1}, + { + {"alpha"}, + { + std::string("\x2b\x03", 2), + std::string("\x2b\x04\xff", 3), + }, + }); + + auto col = LoadDynamicColumn(serialized, 4); + + auto first = col->Slice(0, 2)->As(); + auto second = col->Slice(2, 2)->As(); + + ASSERT_NE(nullptr, first); + ASSERT_NE(nullptr, second); + + first->Append(second); + + ASSERT_EQ(4u, first->Size()); + const auto shared_discriminator = static_cast(first->GetSharedVariantDiscriminator()); + const auto string_discriminator = static_cast(shared_discriminator == 0 ? 1 : 0); + + EXPECT_EQ(string_discriminator, first->GetDiscriminator(0)); + EXPECT_EQ(ColumnDynamic::NULL_DISCRIMINATOR, first->GetDiscriminator(1)); + EXPECT_EQ(shared_discriminator, first->GetDiscriminator(2)); + EXPECT_EQ(shared_discriminator, first->GetDiscriminator(3)); + + auto string_variant = first->GetVariantColumn(string_discriminator)->As(); + auto shared_variant = first->GetVariantColumn(shared_discriminator)->As(); + ASSERT_NE(nullptr, string_variant); + ASSERT_NE(nullptr, shared_variant); + + EXPECT_EQ(1u, string_variant->Size()); + EXPECT_EQ(2u, shared_variant->Size()); + EXPECT_EQ(std::string("\x2b\x03", 2), shared_variant->At(0)); + EXPECT_EQ(std::string("\x2b\x04\xff", 3), shared_variant->At(1)); +} + +TEST(ColumnDynamic, RejectInvalidDiscriminator) { + const auto serialized = BuildDynamicSerializedData( + ColumnDynamic::STRUCTURE_VERSION_V1, + {"string"}, + ColumnDynamic::DISCRIMINATOR_MODE_BASIC, + {5}, + { + {}, + {}, + }); + + auto col = std::make_shared(1); + ArrayInput input(serialized.data(), serialized.size()); + EXPECT_THROW(col->Load(&input, 1), ProtocolError); +} + +TEST(ColumnDynamic, RejectUnsupportedVariantType) { + const auto serialized = BuildDynamicSerializedData( + ColumnDynamic::STRUCTURE_VERSION_V1, + {"variant(uint8, string)"}, + ColumnDynamic::DISCRIMINATOR_MODE_BASIC, + {}, + { + {}, + {}, + }); + + auto col = std::make_shared(1); + ArrayInput input(serialized.data(), serialized.size()); + EXPECT_THROW(col->Load(&input, 0), UnimplementedError); +} diff --git a/ut/itemview_ut.cpp b/ut/itemview_ut.cpp index 66f822e..7bc339c 100644 --- a/ut/itemview_ut.cpp +++ b/ut/itemview_ut.cpp @@ -81,6 +81,7 @@ TEST(ItemView, ErrorTypes) { EXPECT_ITEMVIEW_ERROR(Type::Code::Tuple, int); EXPECT_ITEMVIEW_ERROR(Type::Code::LowCardinality, int); EXPECT_ITEMVIEW_ERROR(Type::Code::Map, int); + EXPECT_ITEMVIEW_ERROR(Type::Code::Dynamic, int); } TEST(ItemView, TypeSizeMismatch) { @@ -186,4 +187,3 @@ TEST(ItemView, TypeSizeMismatch) { EXPECT_ITEMVIEW_ERROR(Type::Code::Enum16, int64_t); EXPECT_ITEMVIEW_ERROR(Type::Code::Enum16, Int128); } - diff --git a/ut/roundtrip_tests.cpp b/ut/roundtrip_tests.cpp index e80558d..0006845 100644 --- a/ut/roundtrip_tests.cpp +++ b/ut/roundtrip_tests.cpp @@ -1,4 +1,5 @@ #include +#include #include "utils.h" #include "roundtrip_column.h" @@ -127,6 +128,75 @@ TEST_P(RoundtripCase, MapUUID_Tuple_String_Array_Uint64) { EXPECT_TRUE(CompareRecursive(*map, *result_typed)); } +TEST_P(RoundtripCase, DynamicReadPathSharedVariant) { + if (GetParam().compression_method != CompressionMethod::None) { + GTEST_SKIP() << "Skipping compressed variant: Proton 3.x server uses compression framing unsupported by this client yet."; + } + + client_->Execute("DROP TEMPORARY STREAM IF EXISTS temporary_dynamic_read_path;"); + + struct DynamicDialect { + const char* type_name; + bool use_cast_as; + }; + + const DynamicDialect dialects[] = { + {"dynamic(max_types=1)", false}, + {"dynamic(max_types=1)", true}, + }; + + auto make_cast_expr = [](const std::string& value_expr, const DynamicDialect& dialect) { + if (dialect.use_cast_as) { + return "CAST(" + value_expr + " AS " + dialect.type_name + ")"; + } + + return "CAST(" + value_expr + ", '" + std::string(dialect.type_name) + "')"; + }; + + bool configured = false; + std::string last_error_message; + for (const auto& dialect : dialects) { + client_->Execute("DROP TEMPORARY STREAM IF EXISTS temporary_dynamic_read_path;"); + + try { + client_->Execute("CREATE TEMPORARY STREAM temporary_dynamic_read_path (id uint32, d " + std::string(dialect.type_name) + ") ENGINE = Memory;"); + client_->Execute("INSERT INTO temporary_dynamic_read_path SELECT 0, " + make_cast_expr("42", dialect)); + client_->Execute("INSERT INTO temporary_dynamic_read_path SELECT 1, " + make_cast_expr("'hello'", dialect)); + client_->Execute("INSERT INTO temporary_dynamic_read_path SELECT 2, " + make_cast_expr("43", dialect)); + configured = true; + break; + } catch (const std::exception& e) { + last_error_message = e.what(); + } + } + + if (!configured) { + GTEST_SKIP() << "Server doesn't support Dynamic integration test query: " << last_error_message; + } + + size_t total_rows = 0; + size_t shared_rows = 0; + client_->Select("SELECT d FROM temporary_dynamic_read_path ORDER BY id", [&total_rows, &shared_rows](const Block& block) { + if (block.GetRowCount() == 0) { + return; + } + + ASSERT_EQ(1u, block.GetColumnCount()); + auto col = block[0]->As(); + ASSERT_NE(nullptr, col); + + total_rows += col->Size(); + for (size_t i = 0; i < col->Size(); ++i) { + if (col->IsSharedVariant(i)) { + ++shared_rows; + } + } + }); + + EXPECT_EQ(3u, total_rows); + EXPECT_GT(shared_rows, 0u); +} + /// Geometric tests are not supported in Proton #ifdef GEOMETRIC_TESTS_ENABLED diff --git a/ut/type_parser_ut.cpp b/ut/type_parser_ut.cpp index f8267b3..9132a3c 100644 --- a/ut/type_parser_ut.cpp +++ b/ut/type_parser_ut.cpp @@ -240,6 +240,33 @@ TEST(TypeParserCase, ParseMap) { ASSERT_EQ(ast.elements[1].name, "string"); } +TEST(TypeParserCase, ParseDynamic) { + TypeAst ast; + ASSERT_TRUE(TypeParser("dynamic").Parse(&ast)); + ASSERT_EQ(ast.meta, TypeAst::Terminal); + ASSERT_EQ(ast.name, "dynamic"); + ASSERT_EQ(ast.code, Type::Dynamic); + ASSERT_EQ(ast.elements.size(), 0u); +} + +TEST(TypeParserCase, ParseDynamicWithMaxTypes) { + TypeAst ast; + ASSERT_TRUE(TypeParser("dynamic(max_types=5)").Parse(&ast)); + ASSERT_EQ(ast.meta, TypeAst::Terminal); + ASSERT_EQ(ast.name, "dynamic"); + ASSERT_EQ(ast.code, Type::Dynamic); + ASSERT_EQ(ast.elements.size(), 2u); + ASSERT_EQ(ast.elements[0].meta, TypeAst::Terminal); + ASSERT_EQ(ast.elements[0].name, "max_types"); + ASSERT_EQ(ast.elements[1].meta, TypeAst::Number); + ASSERT_EQ(ast.elements[1].value, 5); +} + +TEST(TypeParserCase, ParseDynamicMalformed) { + TypeAst ast; + EXPECT_FALSE(TypeParser("dynamic(max_types=5").Parse(&ast)); +} + TEST(TypeParser, EmptyName) { { TypeAst ast; diff --git a/ut/types_ut.cpp b/ut/types_ut.cpp index b2ee3c1..9706e53 100644 --- a/ut/types_ut.cpp +++ b/ut/types_ut.cpp @@ -34,6 +34,8 @@ TEST(TypesCase, TypeName) { ); ASSERT_EQ(Type::CreateMap(Type::CreateSimple(), Type::CreateString())->GetName(), "map(int32, string)"); + ASSERT_EQ(Type::CreateDynamic()->GetName(), "dynamic"); + ASSERT_EQ(Type::CreateDynamic(5)->GetName(), "dynamic(max_types=5)"); } TEST(TypesCase, NullableType) { @@ -97,14 +99,14 @@ TEST(TypesCase, IsEqual) { "array(uint8)", "array(string)", "array(nullable(low_cardinality(fixed_string(10000))))", - "array(enum8('ONE' = 1, 'TWO' = 2))" + "array(enum8('ONE' = 1, 'TWO' = 2))", "tuple(string, int8, date, datetime)", "nullable(tuple(string, int8, date, datetime))", "array(nullable(tuple(string, int8, date, datetime)))", "array(array(nullable(tuple(string, int8, date, datetime))))", "array(array(array(nullable(tuple(string, int8, date, datetime)))))", - "array(array(array(array(nullable(tuple(string, int8, date, datetime('UTC')))))))" - "array(array(array(array(nullable(tuple(string, int8, date, datetime('UTC'), tuple(low_cardinality(String), enum8('READ'=1, 'WRITE'=0))))))))", + "array(array(array(array(nullable(tuple(string, int8, date, datetime('UTC')))))))", + "array(array(array(array(nullable(tuple(string, int8, date, datetime('UTC'), tuple(low_cardinality(string), enum8('READ'=1, 'WRITE'=0))))))))", "map(string, int8)", "map(string, tuple(string, int8, date, datetime))", "map(uuid, array(tuple(string, int8, date, datetime)))", @@ -113,7 +115,9 @@ TEST(TypesCase, IsEqual) { "point", "ring", "polygon", - "multi_polygon" + "multi_polygon", + "dynamic", + "dynamic(max_types=5)" }; // Check that Type::IsEqual returns true only if: