From cba7fa694058af4f3745626d00295124411ce17b Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 27 May 2026 14:28:55 +0200 Subject: [PATCH 01/13] feat(cubeorchestrator): Use Arrow format for CubeStore response format Add QueryResult::from_arrow to decode Arrow IPC stream payloads into the columnar QueryResult, and route the HttpQueryResult (Arrow) command in from_cubestore_fb to it. Arrow temporal columns map to a new dedicated DBResponsePrimitive::Timestamp variant that serializes to the existing ISO-8601 millisecond format. --- packages/cubejs-backend-native/Cargo.lock | 394 ++++++++++++++++-- .../cubejs-backend-native/src/orchestrator.rs | 1 + .../codegen/http-command.ts | 14 +- .../codegen/http-query-result-arrow.ts | 81 ++++ .../codegen/http-query-result-data.ts | 34 ++ .../codegen/http-query-result.ts | 62 +++ .../codegen/http-query.ts | 15 +- .../cubejs-cubestore-driver/codegen/index.ts | 4 + .../codegen/query-result-format.ts | 8 + .../src/CubeStoreDriver.ts | 18 +- .../src/WebSocketConnection.ts | 23 +- rust/cube/Cargo.lock | 1 + rust/cube/cubeorchestrator/Cargo.toml | 1 + .../cubeorchestrator/benches/common/mod.rs | 59 +++ rust/cube/cubeorchestrator/benches/parser.rs | 62 ++- .../src/query_message_parser.rs | 351 ++++++++++++++++ .../src/query_result_transform.rs | 33 +- 17 files changed, 1103 insertions(+), 58 deletions(-) create mode 100644 packages/cubejs-cubestore-driver/codegen/http-query-result-arrow.ts create mode 100644 packages/cubejs-cubestore-driver/codegen/http-query-result-data.ts create mode 100644 packages/cubejs-cubestore-driver/codegen/http-query-result.ts create mode 100644 packages/cubejs-cubestore-driver/codegen/query-result-format.ts diff --git a/packages/cubejs-backend-native/Cargo.lock b/packages/cubejs-backend-native/Cargo.lock index 6b48fb72cea71..65ed59ea96992 100644 --- a/packages/cubejs-backend-native/Cargo.lock +++ b/packages/cubejs-backend-native/Cargo.lock @@ -45,9 +45,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", + "getrandom 0.2.11", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.32", ] [[package]] @@ -65,12 +67,6 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - [[package]] name = "android_system_properties" version = "0.1.5" @@ -114,11 +110,11 @@ dependencies = [ "comfy-table 5.0.1", "csv", "flatbuffers 2.1.2", - "half", + "half 1.8.2", "hex", "indexmap 1.9.3", "lazy_static", - "lexical-core", + "lexical-core 0.8.5", "multiversion", "num", "rand", @@ -129,6 +125,180 @@ dependencies = [ "serde_json", ] +[[package]] +name = "arrow" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "378530e55cd479eda3c14eb345310799717e6f76d0c332041e8487022166b471" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0ab212d2c1886e802f51c5212d78ebbcbb0bec980fff9dadc1eb8d45cd0b738" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num-traits", +] + +[[package]] +name = "arrow-array" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfd33d3e92f207444098c75b42de99d329562be0cf686b307b097cc52b4e999e" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.7.1", + "hashbrown 0.17.0", + "num-complex", + "num-integer", + "num-traits", +] + +[[package]] +name = "arrow-buffer" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6cd424c2693bcdbc150d843dc9d4d137dd2de4782ce6df491ad11a3a0416c0" +dependencies = [ + "bytes", + "half 2.7.1", + "num-bigint", + "num-traits", +] + +[[package]] +name = "arrow-cast" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c5aefb56a2c02e9e2b30746241058b85f8983f0fcff2ba0c6d09006e1cded7f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-ord", + "arrow-schema", + "arrow-select", + "atoi", + "base64 0.22.1", + "chrono", + "half 2.7.1", + "lexical-core 1.0.6", + "num-traits", + "ryu", +] + +[[package]] +name = "arrow-data" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c88210023a2bfee1896af366309a3028fc3bcbd6515fa29a7990ee1baa08ee0" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half 2.7.1", + "num-integer", + "num-traits", +] + +[[package]] +name = "arrow-ipc" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "238438f0834483703d88896db6fe5a7138b2230debc31b34c0336c2996e3c64f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "flatbuffers 25.12.19", +] + +[[package]] +name = "arrow-ord" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bffd8fd2579286a5d63bac898159873e5094a79009940bcb42bbfce4f19f1d0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab5994731204603c73ba69267616c50f80780774c6bb0476f1f830625115e0c" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half 2.7.1", +] + +[[package]] +name = "arrow-schema" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f633dbfdf39c039ada1bf9e34c694816eb71fbb7dc78f613993b7245e078a1ed" + +[[package]] +name = "arrow-select" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cd065c54172ac787cf3f2f8d4107e0d3fdc26edba76fdf4f4cc170258942222" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num-traits", +] + +[[package]] +name = "arrow-string" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29dd7cda3ab9692f43a2e4acc444d760cc17b12bb6d8232ddf64e9bab7c06b42" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num-traits", + "regex", + "regex-syntax 0.8.5", +] + [[package]] name = "async-channel" version = "2.1.1" @@ -186,6 +356,15 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atty" version = "0.2.14" @@ -464,17 +643,16 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.39" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ - "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.0", + "windows-link", ] [[package]] @@ -573,6 +751,26 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.11", + "once_cell", + "tiny-keccak", +] + [[package]] name = "constant_time_eq" version = "0.3.0" @@ -658,6 +856,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-common" version = "0.1.6" @@ -694,7 +898,7 @@ name = "cube-ext" version = "1.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ - "arrow", + "arrow 13.0.0", "chrono", "datafusion-common", "datafusion-expr", @@ -773,6 +977,7 @@ name = "cubeorchestrator" version = "0.1.0" dependencies = [ "anyhow", + "arrow 58.3.0", "chrono", "cubeshared", "indexmap 2.14.0", @@ -870,7 +1075,7 @@ version = "7.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ "ahash 0.7.8", - "arrow", + "arrow 13.0.0", "async-trait", "chrono", "datafusion-common", @@ -902,7 +1107,7 @@ name = "datafusion-common" version = "7.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ - "arrow", + "arrow 13.0.0", "ordered-float 2.10.1", "parquet", "sqlparser", @@ -927,7 +1132,7 @@ version = "7.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ "ahash 0.7.8", - "arrow", + "arrow 13.0.0", "datafusion-common", "sqlparser", ] @@ -938,7 +1143,7 @@ version = "7.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ "ahash 0.7.8", - "arrow", + "arrow 13.0.0", "blake2", "blake3", "chrono", @@ -1305,6 +1510,18 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "half" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" +dependencies = [ + "cfg-if", + "crunchy", + "num-traits", + "zerocopy 0.8.27", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1771,11 +1988,24 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" dependencies = [ - "lexical-parse-float", - "lexical-parse-integer", - "lexical-util", - "lexical-write-float", - "lexical-write-integer", + "lexical-parse-float 0.8.5", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "lexical-write-float 0.8.5", + "lexical-write-integer 0.8.5", +] + +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float 1.0.6", + "lexical-parse-integer 1.0.6", + "lexical-util 1.0.7", + "lexical-write-float 1.0.6", + "lexical-write-integer 1.0.6", ] [[package]] @@ -1784,21 +2014,40 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" dependencies = [ - "lexical-parse-integer", - "lexical-util", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", "static_assertions", ] +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer 1.0.6", + "lexical-util 1.0.7", +] + [[package]] name = "lexical-parse-integer" version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", "static_assertions", ] +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util 1.0.7", +] + [[package]] name = "lexical-util" version = "0.8.5" @@ -1808,27 +2057,52 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + [[package]] name = "lexical-write-float" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" dependencies = [ - "lexical-util", - "lexical-write-integer", + "lexical-util 0.8.5", + "lexical-write-integer 0.8.5", "static_assertions", ] +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util 1.0.7", + "lexical-write-integer 1.0.6", +] + [[package]] name = "lexical-write-integer" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", "static_assertions", ] +[[package]] +name = "lexical-write-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" +dependencies = [ + "lexical-util 1.0.7", +] + [[package]] name = "libc" version = "0.2.177" @@ -1932,9 +2206,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.4" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "memo-map" @@ -2095,20 +2369,19 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.4.4" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" dependencies = [ - "autocfg", "num-integer", "num-traits", ] [[package]] name = "num-complex" -version = "0.4.4" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" dependencies = [ "num-traits", ] @@ -2121,11 +2394,10 @@ checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" [[package]] name = "num-integer" -version = "0.1.45" +version = "0.1.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" dependencies = [ - "autocfg", "num-traits", ] @@ -2154,11 +2426,12 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -2247,7 +2520,7 @@ name = "parquet" version = "13.0.0" source = "git+https://github.com/cube-js/arrow-rs.git?rev=64899e4bbf07a111a94ac2084a6b70ae2374e421#64899e4bbf07a111a94ac2084a6b70ae2374e421" dependencies = [ - "arrow", + "arrow 13.0.0", "base64 0.13.1", "byteorder", "chrono", @@ -3477,6 +3750,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -4043,6 +4325,12 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-sys" version = "0.48.0" @@ -4254,7 +4542,16 @@ version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" dependencies = [ - "zerocopy-derive", + "zerocopy-derive 0.7.32", +] + +[[package]] +name = "zerocopy" +version = "0.8.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0894878a5fa3edfd6da3f88c4805f4c8558e2b996227a3d864f47fe11e38282c" +dependencies = [ + "zerocopy-derive 0.8.27", ] [[package]] @@ -4268,6 +4565,17 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "zerofrom" version = "0.1.5" diff --git a/packages/cubejs-backend-native/src/orchestrator.rs b/packages/cubejs-backend-native/src/orchestrator.rs index dd8b0a9994802..c1faf324f0ef9 100644 --- a/packages/cubejs-backend-native/src/orchestrator.rs +++ b/packages/cubejs-backend-native/src/orchestrator.rs @@ -137,6 +137,7 @@ fn db_primitive_to_field_value(value: &DBResponsePrimitive) -> FieldValue<'_> { DBResponsePrimitive::Uncommon(v) => FieldValue::String(Cow::Owned( serde_json::to_string(&v).unwrap_or_else(|_| v.to_string()), )), + DBResponsePrimitive::Timestamp(_) => FieldValue::String(Cow::Owned(value.to_string())), DBResponsePrimitive::Null => FieldValue::Null, } } diff --git a/packages/cubejs-cubestore-driver/codegen/http-command.ts b/packages/cubejs-cubestore-driver/codegen/http-command.ts index eaca7b7120327..439b5b9d95824 100644 --- a/packages/cubejs-cubestore-driver/codegen/http-command.ts +++ b/packages/cubejs-cubestore-driver/codegen/http-command.ts @@ -4,6 +4,7 @@ import { HttpError } from './http-error.js'; import { HttpQuery } from './http-query.js'; +import { HttpQueryResult } from './http-query-result.js'; import { HttpResultSet } from './http-result-set.js'; @@ -11,32 +12,35 @@ export enum HttpCommand { NONE = 0, HttpQuery = 1, HttpResultSet = 2, - HttpError = 3 + HttpError = 3, + HttpQueryResult = 4 } export function unionToHttpCommand( type: HttpCommand, - accessor: (obj:HttpError|HttpQuery|HttpResultSet) => HttpError|HttpQuery|HttpResultSet|null -): HttpError|HttpQuery|HttpResultSet|null { + accessor: (obj:HttpError|HttpQuery|HttpQueryResult|HttpResultSet) => HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null +): HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null { switch(HttpCommand[type]) { case 'NONE': return null; case 'HttpQuery': return accessor(new HttpQuery())! as HttpQuery; case 'HttpResultSet': return accessor(new HttpResultSet())! as HttpResultSet; case 'HttpError': return accessor(new HttpError())! as HttpError; + case 'HttpQueryResult': return accessor(new HttpQueryResult())! as HttpQueryResult; default: return null; } } export function unionListToHttpCommand( type: HttpCommand, - accessor: (index: number, obj:HttpError|HttpQuery|HttpResultSet) => HttpError|HttpQuery|HttpResultSet|null, + accessor: (index: number, obj:HttpError|HttpQuery|HttpQueryResult|HttpResultSet) => HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null, index: number -): HttpError|HttpQuery|HttpResultSet|null { +): HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null { switch(HttpCommand[type]) { case 'NONE': return null; case 'HttpQuery': return accessor(index, new HttpQuery())! as HttpQuery; case 'HttpResultSet': return accessor(index, new HttpResultSet())! as HttpResultSet; case 'HttpError': return accessor(index, new HttpError())! as HttpError; + case 'HttpQueryResult': return accessor(index, new HttpQueryResult())! as HttpQueryResult; default: return null; } } diff --git a/packages/cubejs-cubestore-driver/codegen/http-query-result-arrow.ts b/packages/cubejs-cubestore-driver/codegen/http-query-result-arrow.ts new file mode 100644 index 0000000000000..4af90e979dd18 --- /dev/null +++ b/packages/cubejs-cubestore-driver/codegen/http-query-result-arrow.ts @@ -0,0 +1,81 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */ + +import * as flatbuffers from 'flatbuffers'; + +export class HttpQueryResultArrow { + bb: flatbuffers.ByteBuffer|null = null; + bb_pos = 0; + __init(i:number, bb:flatbuffers.ByteBuffer):HttpQueryResultArrow { + this.bb_pos = i; + this.bb = bb; + return this; +} + +static getRootAsHttpQueryResultArrow(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResultArrow):HttpQueryResultArrow { + return (obj || new HttpQueryResultArrow()).__init(bb.readInt32(bb.position()) + bb.position(), bb); +} + +static getSizePrefixedRootAsHttpQueryResultArrow(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResultArrow):HttpQueryResultArrow { + bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH); + return (obj || new HttpQueryResultArrow()).__init(bb.readInt32(bb.position()) + bb.position(), bb); +} + +data(index: number):number|null { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? this.bb!.readUint8(this.bb!.__vector(this.bb_pos + offset) + index) : 0; +} + +dataLength():number { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0; +} + +dataArray():Uint8Array|null { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? new Uint8Array(this.bb!.bytes().buffer, this.bb!.bytes().byteOffset + this.bb!.__vector(this.bb_pos + offset), this.bb!.__vector_len(this.bb_pos + offset)) : null; +} + +isLast():boolean { + const offset = this.bb!.__offset(this.bb_pos, 6); + return offset ? !!this.bb!.readInt8(this.bb_pos + offset) : false; +} + +static startHttpQueryResultArrow(builder:flatbuffers.Builder) { + builder.startObject(2); +} + +static addData(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset) { + builder.addFieldOffset(0, dataOffset, 0); +} + +static createDataVector(builder:flatbuffers.Builder, data:number[]|Uint8Array):flatbuffers.Offset { + builder.startVector(1, data.length, 1); + for (let i = data.length - 1; i >= 0; i--) { + builder.addInt8(data[i]!); + } + return builder.endVector(); +} + +static startDataVector(builder:flatbuffers.Builder, numElems:number) { + builder.startVector(1, numElems, 1); +} + +static addIsLast(builder:flatbuffers.Builder, isLast:boolean) { + builder.addFieldInt8(1, +isLast, +false); +} + +static endHttpQueryResultArrow(builder:flatbuffers.Builder):flatbuffers.Offset { + const offset = builder.endObject(); + builder.requiredField(offset, 4) // data + return offset; +} + +static createHttpQueryResultArrow(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset, isLast:boolean):flatbuffers.Offset { + HttpQueryResultArrow.startHttpQueryResultArrow(builder); + HttpQueryResultArrow.addData(builder, dataOffset); + HttpQueryResultArrow.addIsLast(builder, isLast); + return HttpQueryResultArrow.endHttpQueryResultArrow(builder); +} +} diff --git a/packages/cubejs-cubestore-driver/codegen/http-query-result-data.ts b/packages/cubejs-cubestore-driver/codegen/http-query-result-data.ts new file mode 100644 index 0000000000000..5ee08968db103 --- /dev/null +++ b/packages/cubejs-cubestore-driver/codegen/http-query-result-data.ts @@ -0,0 +1,34 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */ + +import { HttpQueryResultArrow } from './http-query-result-arrow.js'; + + +export enum HttpQueryResultData { + NONE = 0, + HttpQueryResultArrow = 1 +} + +export function unionToHttpQueryResultData( + type: HttpQueryResultData, + accessor: (obj:HttpQueryResultArrow) => HttpQueryResultArrow|null +): HttpQueryResultArrow|null { + switch(HttpQueryResultData[type]) { + case 'NONE': return null; + case 'HttpQueryResultArrow': return accessor(new HttpQueryResultArrow())! as HttpQueryResultArrow; + default: return null; + } +} + +export function unionListToHttpQueryResultData( + type: HttpQueryResultData, + accessor: (index: number, obj:HttpQueryResultArrow) => HttpQueryResultArrow|null, + index: number +): HttpQueryResultArrow|null { + switch(HttpQueryResultData[type]) { + case 'NONE': return null; + case 'HttpQueryResultArrow': return accessor(index, new HttpQueryResultArrow())! as HttpQueryResultArrow; + default: return null; + } +} diff --git a/packages/cubejs-cubestore-driver/codegen/http-query-result.ts b/packages/cubejs-cubestore-driver/codegen/http-query-result.ts new file mode 100644 index 0000000000000..edf026048ddf9 --- /dev/null +++ b/packages/cubejs-cubestore-driver/codegen/http-query-result.ts @@ -0,0 +1,62 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */ + +import * as flatbuffers from 'flatbuffers'; + +import { HttpQueryResultData, unionToHttpQueryResultData, unionListToHttpQueryResultData } from './http-query-result-data.js'; + + +export class HttpQueryResult { + bb: flatbuffers.ByteBuffer|null = null; + bb_pos = 0; + __init(i:number, bb:flatbuffers.ByteBuffer):HttpQueryResult { + this.bb_pos = i; + this.bb = bb; + return this; +} + +static getRootAsHttpQueryResult(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResult):HttpQueryResult { + return (obj || new HttpQueryResult()).__init(bb.readInt32(bb.position()) + bb.position(), bb); +} + +static getSizePrefixedRootAsHttpQueryResult(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResult):HttpQueryResult { + bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH); + return (obj || new HttpQueryResult()).__init(bb.readInt32(bb.position()) + bb.position(), bb); +} + +dataType():HttpQueryResultData { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? this.bb!.readUint8(this.bb_pos + offset) : HttpQueryResultData.NONE; +} + +data(obj:any):any|null { + const offset = this.bb!.__offset(this.bb_pos, 6); + return offset ? this.bb!.__union(obj, this.bb_pos + offset) : null; +} + +static startHttpQueryResult(builder:flatbuffers.Builder) { + builder.startObject(2); +} + +static addDataType(builder:flatbuffers.Builder, dataType:HttpQueryResultData) { + builder.addFieldInt8(0, dataType, HttpQueryResultData.NONE); +} + +static addData(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset) { + builder.addFieldOffset(1, dataOffset, 0); +} + +static endHttpQueryResult(builder:flatbuffers.Builder):flatbuffers.Offset { + const offset = builder.endObject(); + builder.requiredField(offset, 6) // data + return offset; +} + +static createHttpQueryResult(builder:flatbuffers.Builder, dataType:HttpQueryResultData, dataOffset:flatbuffers.Offset):flatbuffers.Offset { + HttpQueryResult.startHttpQueryResult(builder); + HttpQueryResult.addDataType(builder, dataType); + HttpQueryResult.addData(builder, dataOffset); + return HttpQueryResult.endHttpQueryResult(builder); +} +} diff --git a/packages/cubejs-cubestore-driver/codegen/http-query.ts b/packages/cubejs-cubestore-driver/codegen/http-query.ts index d695cefe2611f..9161d8a274c56 100644 --- a/packages/cubejs-cubestore-driver/codegen/http-query.ts +++ b/packages/cubejs-cubestore-driver/codegen/http-query.ts @@ -6,6 +6,7 @@ import * as flatbuffers from 'flatbuffers'; import { HttpParameter } from './http-parameter.js'; import { HttpTable } from './http-table.js'; +import { QueryResultFormat } from './query-result-format.js'; export class HttpQuery { @@ -60,8 +61,13 @@ parametersLength():number { return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0; } +responseFormat():QueryResultFormat { + const offset = this.bb!.__offset(this.bb_pos, 12); + return offset ? this.bb!.readUint8(this.bb_pos + offset) : QueryResultFormat.Legacy; +} + static startHttpQuery(builder:flatbuffers.Builder) { - builder.startObject(4); + builder.startObject(5); } static addQuery(builder:flatbuffers.Builder, queryOffset:flatbuffers.Offset) { @@ -104,17 +110,22 @@ static startParametersVector(builder:flatbuffers.Builder, numElems:number) { builder.startVector(4, numElems, 4); } +static addResponseFormat(builder:flatbuffers.Builder, responseFormat:QueryResultFormat) { + builder.addFieldInt8(4, responseFormat, QueryResultFormat.Legacy); +} + static endHttpQuery(builder:flatbuffers.Builder):flatbuffers.Offset { const offset = builder.endObject(); return offset; } -static createHttpQuery(builder:flatbuffers.Builder, queryOffset:flatbuffers.Offset, traceObjOffset:flatbuffers.Offset, inlineTablesOffset:flatbuffers.Offset, parametersOffset:flatbuffers.Offset):flatbuffers.Offset { +static createHttpQuery(builder:flatbuffers.Builder, queryOffset:flatbuffers.Offset, traceObjOffset:flatbuffers.Offset, inlineTablesOffset:flatbuffers.Offset, parametersOffset:flatbuffers.Offset, responseFormat:QueryResultFormat):flatbuffers.Offset { HttpQuery.startHttpQuery(builder); HttpQuery.addQuery(builder, queryOffset); HttpQuery.addTraceObj(builder, traceObjOffset); HttpQuery.addInlineTables(builder, inlineTablesOffset); HttpQuery.addParameters(builder, parametersOffset); + HttpQuery.addResponseFormat(builder, responseFormat); return HttpQuery.endHttpQuery(builder); } } diff --git a/packages/cubejs-cubestore-driver/codegen/index.ts b/packages/cubejs-cubestore-driver/codegen/index.ts index 1b52b62c90b56..25ab49a508fbc 100644 --- a/packages/cubejs-cubestore-driver/codegen/index.ts +++ b/packages/cubejs-cubestore-driver/codegen/index.ts @@ -12,9 +12,13 @@ export { HttpMessage } from './http-message.js'; export { HttpParameter } from './http-parameter.js'; export { HttpParameterValue } from './http-parameter-value.js'; export { HttpQuery } from './http-query.js'; +export { HttpQueryResult } from './http-query-result.js'; +export { HttpQueryResultArrow } from './http-query-result-arrow.js'; +export { HttpQueryResultData } from './http-query-result-data.js'; export { HttpResultSet } from './http-result-set.js'; export { HttpRow } from './http-row.js'; export { HttpTable } from './http-table.js'; export { Int64Value } from './int64-value.js'; export { NullValue } from './null-value.js'; +export { QueryResultFormat } from './query-result-format.js'; export { StringValue } from './string-value.js'; diff --git a/packages/cubejs-cubestore-driver/codegen/query-result-format.ts b/packages/cubejs-cubestore-driver/codegen/query-result-format.ts new file mode 100644 index 0000000000000..9f52455cbfb4c --- /dev/null +++ b/packages/cubejs-cubestore-driver/codegen/query-result-format.ts @@ -0,0 +1,8 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */ + +export enum QueryResultFormat { + Legacy = 0, + Arrow = 1 +} diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts index 43dc60eb2456b..27a05ab6df37f 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts @@ -20,6 +20,7 @@ import fetch from 'node-fetch'; import { ConnectionConfig } from './types'; import { WebSocketConnection } from './WebSocketConnection'; +import { QueryResultFormat } from '../codegen'; const CubeStoreCapabilityMinVersion = { queueExclusive: '1.6.22', @@ -87,6 +88,17 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { this.connection = new WebSocketConnection(`${this.baseUrl}/ws`); } + private static toResponseFormat(format: string): QueryResultFormat { + switch (format) { + case 'arrow': + return QueryResultFormat.Arrow; + case 'legacy': + return QueryResultFormat.Legacy; + default: + throw new Error(`Unsupported CubeStore response format: ${format}. Expected 'arrow' or 'legacy'.`); + } + } + public async hasCapability(capability: CubeStoreCapability): Promise { const minVersion = CubeStoreCapabilityMinVersion[capability]; @@ -106,7 +118,11 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { const tracingObj = { ...queryTracingObj, instance: getEnv('instanceId') }; - return this.connection.query(query, inlineTables ?? [], tracingObj, sendParameters ? values : undefined); + return this.connection.query(query, sendParameters ? values : [], { + inlineTables: inlineTables ?? [], + queryTracingObj: tracingObj, + responseFormat: CubeStoreDriver.toResponseFormat(this.config.responseFormat), + }); } public async release() { diff --git a/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts b/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts index ae08e3f78badb..238f14b7528a5 100644 --- a/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts +++ b/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts @@ -18,6 +18,7 @@ import { HttpTable, Int64Value, NullValue, + QueryResultFormat, StringValue, } from '../codegen'; @@ -27,6 +28,14 @@ interface SentMessage { buffer: Uint8Array; } +export type QueryParameter = null | boolean | number | string | Buffer; + +export type WebSocketQueryOptions = { + inlineTables?: InlineTable[]; + queryTracingObj?: any; + responseFormat?: QueryResultFormat; +}; + interface CubeStoreWebSocket extends WebSocket { readyPromise: Promise; lastHeartBeat: Date; @@ -282,7 +291,15 @@ export class WebSocketConnection { } } - public async query(query: string, inlineTables: InlineTable[], queryTracingObj?: any, parameters?: any): Promise { + public async query(query: string, parameters: QueryParameter[], options: WebSocketQueryOptions = {}): Promise { + const { + inlineTables, + queryTracingObj, + // Why, it's not a breaking change: + // An old Cube Store will ignore this option and will continue to serve results in Legacy format + responseFormat = QueryResultFormat.Arrow + } = options; + const builder = new flatbuffers.Builder(1024); const queryOffset = builder.createString(query); @@ -321,7 +338,7 @@ export class WebSocketConnection { } let parametersOffset: flatbuffers.Offset | null = null; - if (parameters) { + if (parameters.length > 0) { const httpParameterValues: flatbuffers.Offset[] = []; for (const parameter of parameters) { @@ -349,6 +366,8 @@ export class WebSocketConnection { HttpQuery.addParameters(builder, parametersOffset); } + HttpQuery.addResponseFormat(builder, responseFormat); + const httpQueryOffset = HttpQuery.endHttpQuery(builder); const messageId = this.messageCounter++; const connectionIdOffset = builder.createString(this.connectionId); diff --git a/rust/cube/Cargo.lock b/rust/cube/Cargo.lock index 42f0fbffd43f7..e03145010b6d9 100644 --- a/rust/cube/Cargo.lock +++ b/rust/cube/Cargo.lock @@ -824,6 +824,7 @@ name = "cubeorchestrator" version = "0.1.0" dependencies = [ "anyhow", + "arrow", "chrono", "criterion", "cubeshared", diff --git a/rust/cube/cubeorchestrator/Cargo.toml b/rust/cube/cubeorchestrator/Cargo.toml index 20a1971e77d35..3dbde88404ef0 100644 --- a/rust/cube/cubeorchestrator/Cargo.toml +++ b/rust/cube/cubeorchestrator/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +arrow = { version = "58", default-features = false, features = ["ipc"] } chrono = { version = "0.4.31", features = ["serde"] } cubeshared = { path = "../cubeshared" } serde = { version = "1.0.217", features = ["derive", "rc"] } diff --git a/rust/cube/cubeorchestrator/benches/common/mod.rs b/rust/cube/cubeorchestrator/benches/common/mod.rs index 45d8abce374af..a8246889c55a7 100644 --- a/rust/cube/cubeorchestrator/benches/common/mod.rs +++ b/rust/cube/cubeorchestrator/benches/common/mod.rs @@ -78,3 +78,62 @@ pub fn build_dataset( JsRawColumnarData { members, columns } } + +/// Build an Arrow IPC **stream** payload with the same logical data shape as +/// [`build_dataset`]: dimensions as Utf8, measures as Float64, time dimensions +/// as Timestamp(Millisecond). Used to compare Arrow parse throughput against the +/// JSON path. +pub fn build_arrow_ipc( + row_count: usize, + dimensions: &[(String, String)], + measures: &[(String, String)], + time_dims: &[TimeColumn], +) -> Vec { + use arrow::array::{ArrayRef, Float64Array, StringArray, TimestampMillisecondArray}; + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use arrow::ipc::writer::StreamWriter; + use arrow::record_batch::RecordBatch; + use std::sync::Arc; + + let total_cols = dimensions.len() + measures.len() + time_dims.len(); + let mut fields = Vec::with_capacity(total_cols); + let mut columns: Vec = Vec::with_capacity(total_cols); + + for (j, (_, alias)) in dimensions.iter().enumerate() { + fields.push(Field::new(alias.clone(), DataType::Utf8, false)); + let values: Vec = (0..row_count) + .map(|i| format!("dim_{}_{}", j, i % 1000)) + .collect(); + columns.push(Arc::new(StringArray::from(values))); + } + for (j, (_, alias)) in measures.iter().enumerate() { + fields.push(Field::new(alias.clone(), DataType::Float64, false)); + let values: Vec = (0..row_count) + .map(|i| ((i * (j + 1)) as f64) * 0.5) + .collect(); + columns.push(Arc::new(Float64Array::from(values))); + } + for (j, td) in time_dims.iter().enumerate() { + fields.push(Field::new( + td.alias.clone(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + // One day apart, offset per column — arbitrary but realistic spread. + let values: Vec = (0..row_count) + .map(|i| ((i + j) as i64) * 86_400_000) + .collect(); + columns.push(Arc::new(TimestampMillisecondArray::from(values))); + } + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new(schema.clone(), columns).expect("arrow record batch"); + + let mut buf = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buf, schema.as_ref()).expect("arrow writer"); + writer.write(&batch).expect("write arrow batch"); + writer.finish().expect("finish arrow stream"); + } + buf +} diff --git a/rust/cube/cubeorchestrator/benches/parser.rs b/rust/cube/cubeorchestrator/benches/parser.rs index eef0a51472e32..a66a82ff39dde 100644 --- a/rust/cube/cubeorchestrator/benches/parser.rs +++ b/rust/cube/cubeorchestrator/benches/parser.rs @@ -11,7 +11,10 @@ use cubeshared::flatbuffers::FlatBufferBuilder; #[path = "common/mod.rs"] mod common; -use common::{build_dataset, make_member_aliases, split_dim_measure, COLUMN_COUNTS, ROW_COUNTS}; +use common::{ + build_arrow_ipc, build_dataset, make_member_aliases, split_dim_measure, COLUMN_COUNTS, + ROW_COUNTS, +}; /// Build a FlatBuffer `HttpMessage` payload mirroring CubeStore's wire format /// for `from_cubestore_fb` to parse. Cells are 16-character strings to give @@ -95,7 +98,14 @@ fn bench_from_cubestore_fb(c: &mut Criterion) { fn bench_from_js_raw_data(c: &mut Criterion) { let mut group = c.benchmark_group("QueryResult::from_js_raw_data"); - let combos: &[(usize, usize)] = &[(8, 10_000), (16, 10_000), (16, 100_000), (32, 100_000)]; + let combos: &[(usize, usize)] = &[ + (8, 1), + (8, 10), + (8, 10_000), + (16, 10_000), + (16, 100_000), + (32, 100_000), + ]; for &(col_count, row_count) in combos { let (dim_count, measure_count) = split_dim_measure(col_count); @@ -142,5 +152,51 @@ fn bench_from_js_raw_data(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, bench_from_cubestore_fb, bench_from_js_raw_data); +fn bench_from_arrow(c: &mut Criterion) { + let mut group = c.benchmark_group("QueryResult::from_arrow"); + + let combos: &[(usize, usize)] = &[ + (8, 1), + (8, 10), + (8, 10_000), + (16, 10_000), + (16, 100_000), + (32, 100_000), + ]; + + for &(col_count, row_count) in combos { + let (dim_count, measure_count) = split_dim_measure(col_count); + let dimensions = make_member_aliases("dim", dim_count); + let measures = make_member_aliases("measure", measure_count); + + let payload = build_arrow_ipc(row_count, &dimensions, &measures, &[]); + let payload_len = payload.len(); + + eprintln!( + "from_arrow: c{:02}_r{} payload_bytes={}", + col_count, row_count, payload_len + ); + + group.throughput(Throughput::Elements((row_count * col_count) as u64)); + + let id = format!("c{:02}_r{}", col_count, row_count); + // Arrow IPC parse always materializes the QueryResult, so this measures + // the equivalent of from_js_raw_data's `parse_plus_build`. + group.bench_with_input(BenchmarkId::from_parameter(id), &(), |b, _| { + b.iter(|| { + let built = QueryResult::from_arrow(black_box(&payload)).expect("from_arrow"); + black_box(built); + }); + }); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_from_cubestore_fb, + bench_from_js_raw_data, + bench_from_arrow +); criterion_main!(benches); diff --git a/rust/cube/cubeorchestrator/src/query_message_parser.rs b/rust/cube/cubeorchestrator/src/query_message_parser.rs index c3b489b55f209..c248300b25d8c 100644 --- a/rust/cube/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cube/cubeorchestrator/src/query_message_parser.rs @@ -2,10 +2,20 @@ use crate::{ query_result_transform::{ColumnarArray, DBResponsePrimitive}, transport::JsRawColumnarData, }; +use arrow::array::{ + Array, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, Float16Array, + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, + StringArray, StringViewArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, + UInt8Array, +}; +use arrow::datatypes::{DataType, TimeUnit}; +use arrow::ipc::reader::StreamReader; use cubeshared::codegen::{root_as_http_message_with_opts, HttpCommand}; use cubeshared::flatbuffers::VerifierOptions; use indexmap::IndexMap; use neon::prelude::Finalize; +use std::io::Cursor; #[derive(Debug)] pub enum ParseError { @@ -28,6 +38,8 @@ pub enum ParseError { data_len: usize, }, FlatBufferError(String), + ArrowError(String), + UnsupportedArrowType(String), ErrorMessage(String), } @@ -70,6 +82,10 @@ impl std::fmt::Display for ParseError { members_len, data_len ), ParseError::FlatBufferError(msg) => write!(f, "FlatBuffer parsing error: {}", msg), + ParseError::ArrowError(msg) => write!(f, "Arrow parsing error: {}", msg), + ParseError::UnsupportedArrowType(ty) => { + write!(f, "Unsupported Arrow data type: {}", ty) + } ParseError::ErrorMessage(msg) => write!(f, "Error: {}", msg), } } @@ -216,6 +232,20 @@ impl QueryResult { QueryResult::try_new(members, data) } + HttpCommand::HttpQueryResult => { + let query_result = http_message + .command_as_http_query_result() + .ok_or(ParseError::EmptyResultSet)?; + let arrow = query_result + .data_as_http_query_result_arrow() + .ok_or_else(|| { + ParseError::FlatBufferError( + "HttpQueryResult.data is not HttpQueryResultArrow".to_string(), + ) + })?; + + Self::from_arrow(arrow.data().bytes()) + } _ => Err(ParseError::UnsupportedCommand), } } @@ -224,6 +254,155 @@ impl QueryResult { let JsRawColumnarData { members, columns } = js_raw_data; QueryResult::try_new(members, columns) } + + /// Parse an Arrow IPC **stream** payload into the columnar [`QueryResult`]. + /// + /// Member names come from the schema field names; each Arrow column is + /// converted element-wise into [`DBResponsePrimitive`] (see + /// [`append_arrow_array`]). Multiple record batches are concatenated so a + /// streamed result with several batches yields one column per field. + pub fn from_arrow(bytes: &[u8]) -> Result { + let reader = StreamReader::try_new(Cursor::new(bytes), None) + .map_err(|err| ParseError::ArrowError(err.to_string()))?; + + let schema = reader.schema(); + let members: Vec = schema.fields().iter().map(|f| f.name().clone()).collect(); + let n_cols = members.len(); + + let mut columns: Vec> = (0..n_cols).map(|_| Vec::new()).collect(); + + for batch in reader { + let batch = batch.map_err(|err| ParseError::ArrowError(err.to_string()))?; + for (idx, col) in columns.iter_mut().enumerate() { + append_arrow_array(col, batch.column(idx).as_ref())?; + } + } + + let data: Vec = columns.into_iter().map(ColumnarArray::from).collect(); + QueryResult::try_new(members, data) + } +} + +/// Append every element of an Arrow `array` to a column accumulator, converting +/// each value to [`DBResponsePrimitive`]. Temporal types become +/// [`DBResponsePrimitive::Timestamp`]; integers/floats/decimals become `Number`. +fn append_arrow_array( + col: &mut Vec, + array: &dyn Array, +) -> Result<(), ParseError> { + let len = array.len(); + col.reserve(len); + + macro_rules! push_numeric { + ($ty:ty) => {{ + let a = array.as_any().downcast_ref::<$ty>().unwrap(); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::Number(a.value(i) as f64)); + } + } + }}; + } + + macro_rules! push_str { + ($ty:ty) => {{ + let a = array.as_any().downcast_ref::<$ty>().unwrap(); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::String(a.value(i).to_owned())); + } + } + }}; + } + + macro_rules! push_datetime { + ($ty:ty) => {{ + let a = array.as_any().downcast_ref::<$ty>().unwrap(); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + match a.value_as_datetime(i) { + Some(dt) => col.push(DBResponsePrimitive::Timestamp(dt)), + None => col.push(DBResponsePrimitive::Null), + } + } + } + }}; + } + + macro_rules! push_decimal { + ($ty:ty) => {{ + let a = array.as_any().downcast_ref::<$ty>().unwrap(); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + let s = a.value_as_string(i); + match s.parse::() { + Ok(n) => col.push(DBResponsePrimitive::Number(n)), + Err(_) => col.push(DBResponsePrimitive::String(s)), + } + } + } + }}; + } + + match array.data_type() { + DataType::Null => { + for _ in 0..len { + col.push(DBResponsePrimitive::Null); + } + } + DataType::Boolean => { + let a = array.as_any().downcast_ref::().unwrap(); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::Boolean(a.value(i))); + } + } + } + DataType::Int8 => push_numeric!(Int8Array), + DataType::Int16 => push_numeric!(Int16Array), + DataType::Int32 => push_numeric!(Int32Array), + DataType::Int64 => push_numeric!(Int64Array), + DataType::UInt8 => push_numeric!(UInt8Array), + DataType::UInt16 => push_numeric!(UInt16Array), + DataType::UInt32 => push_numeric!(UInt32Array), + DataType::UInt64 => push_numeric!(UInt64Array), + DataType::Float32 => push_numeric!(Float32Array), + DataType::Float64 => push_numeric!(Float64Array), + DataType::Float16 => { + let a = array.as_any().downcast_ref::().unwrap(); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::Number(a.value(i).to_f64())); + } + } + } + DataType::Utf8 => push_str!(StringArray), + DataType::LargeUtf8 => push_str!(LargeStringArray), + DataType::Utf8View => push_str!(StringViewArray), + DataType::Date32 => push_datetime!(Date32Array), + DataType::Date64 => push_datetime!(Date64Array), + DataType::Timestamp(TimeUnit::Second, _) => push_datetime!(TimestampSecondArray), + DataType::Timestamp(TimeUnit::Millisecond, _) => push_datetime!(TimestampMillisecondArray), + DataType::Timestamp(TimeUnit::Microsecond, _) => push_datetime!(TimestampMicrosecondArray), + DataType::Timestamp(TimeUnit::Nanosecond, _) => push_datetime!(TimestampNanosecondArray), + DataType::Decimal128(_, _) => push_decimal!(Decimal128Array), + DataType::Decimal256(_, _) => push_decimal!(Decimal256Array), + other => return Err(ParseError::UnsupportedArrowType(format!("{other:?}"))), + } + + Ok(()) } #[cfg(test)] @@ -402,6 +581,178 @@ mod tests { assert!(unchecked_result.is_ok()); } + fn arrow_ipc_bytes(batch: &arrow::record_batch::RecordBatch) -> Vec { + use arrow::ipc::writer::StreamWriter; + let mut buf = Vec::new(); + { + let schema = batch.schema(); + let mut writer = StreamWriter::try_new(&mut buf, schema.as_ref()).unwrap(); + writer.write(batch).unwrap(); + writer.finish().unwrap(); + } + buf + } + + #[test] + fn test_from_arrow_basic_types() { + use arrow::array::{Float64Array, StringArray, TimestampMillisecondArray}; + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use arrow::record_batch::RecordBatch; + use std::sync::Arc; + + let schema = Arc::new(Schema::new(vec![ + Field::new("city", DataType::Utf8, true), + Field::new("amount", DataType::Float64, true), + Field::new( + "created_at", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + ])); + + let cities = StringArray::from(vec![Some("Berlin"), None, Some("Lisbon")]); + let amounts = Float64Array::from(vec![Some(1.5), Some(2.0), None]); + // 0 -> 1970-01-01T00:00:00.000, 1_000 -> 1970-01-01T00:00:01.000 + let created = TimestampMillisecondArray::from(vec![Some(0i64), None, Some(1_000)]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(cities), Arc::new(amounts), Arc::new(created)], + ) + .unwrap(); + + let bytes = arrow_ipc_bytes(&batch); + let result = QueryResult::from_arrow(&bytes).expect("from_arrow"); + + assert_eq!(result.members, vec!["city", "amount", "created_at"]); + assert_eq!(result.row_count, 3); + assert_eq!(result.data.len(), 3); + + assert_eq!( + result.data[0].as_slice(), + &[ + DBResponsePrimitive::String("Berlin".to_string()), + DBResponsePrimitive::Null, + DBResponsePrimitive::String("Lisbon".to_string()), + ] + ); + assert_eq!( + result.data[1].as_slice(), + &[ + DBResponsePrimitive::Number(1.5), + DBResponsePrimitive::Number(2.0), + DBResponsePrimitive::Null, + ] + ); + + // Timestamps land in the dedicated variant and serialize to the ISO format. + match &result.data[2].as_slice()[0] { + DBResponsePrimitive::Timestamp(_) => {} + other => panic!("expected Timestamp, got {other:?}"), + } + assert_eq!(result.data[2].as_slice()[1], DBResponsePrimitive::Null); + let json = serde_json::to_value(result.data[2].as_slice()).unwrap(); + assert_eq!(json[0], "1970-01-01T00:00:00.000"); + assert_eq!(json[1], serde_json::Value::Null); + assert_eq!(json[2], "1970-01-01T00:00:01.000"); + } + + #[test] + fn test_from_arrow_unsupported_type() { + use arrow::array::BinaryArray; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use std::sync::Arc; + + let schema = Arc::new(Schema::new(vec![Field::new( + "blob", + DataType::Binary, + false, + )])); + let blobs = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref()]); + let batch = RecordBatch::try_new(schema, vec![Arc::new(blobs)]).unwrap(); + + let bytes = arrow_ipc_bytes(&batch); + let err = QueryResult::from_arrow(&bytes).expect_err("should reject Binary"); + assert!(matches!(err, ParseError::UnsupportedArrowType(_))); + } + + #[test] + fn test_from_cubestore_fb_arrow_query_result() { + use arrow::array::{Float64Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use cubeshared::codegen::{ + HttpMessage, HttpMessageArgs, HttpQueryResult, HttpQueryResultArgs, + HttpQueryResultArrow, HttpQueryResultArrowArgs, HttpQueryResultData, + }; + use cubeshared::flatbuffers::FlatBufferBuilder; + use std::sync::Arc; + + // Arrow IPC stream payload, as CubeStore would emit it. + let schema = Arc::new(Schema::new(vec![ + Field::new("city", DataType::Utf8, false), + Field::new("amount", DataType::Float64, false), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec!["Berlin", "Lisbon"])), + Arc::new(Float64Array::from(vec![1.5, 2.0])), + ], + ) + .unwrap(); + let ipc = arrow_ipc_bytes(&batch); + + // Wrap it in a FlatBuffer HttpMessage with the HttpQueryResult command. + let mut builder = FlatBufferBuilder::new(); + let data_vec = builder.create_vector(&ipc); + let arrow = HttpQueryResultArrow::create( + &mut builder, + &HttpQueryResultArrowArgs { + data: Some(data_vec), + is_last: true, + }, + ); + let query_result = HttpQueryResult::create( + &mut builder, + &HttpQueryResultArgs { + data_type: HttpQueryResultData::HttpQueryResultArrow, + data: Some(arrow.as_union_value()), + }, + ); + let connection_id = builder.create_string("test_connection"); + let message = HttpMessage::create( + &mut builder, + &HttpMessageArgs { + message_id: 1, + command_type: HttpCommand::HttpQueryResult, + command: Some(query_result.as_union_value()), + connection_id: Some(connection_id), + }, + ); + builder.finish(message, None); + let msg_data = builder.finished_data().to_vec(); + + let result = QueryResult::from_cubestore_fb(&msg_data).expect("from_cubestore_fb arrow"); + assert_eq!(result.members, vec!["city", "amount"]); + assert_eq!(result.row_count, 2); + assert_eq!( + result.data[0].as_slice(), + &[ + DBResponsePrimitive::String("Berlin".to_string()), + DBResponsePrimitive::String("Lisbon".to_string()), + ] + ); + assert_eq!( + result.data[1].as_slice(), + &[ + DBResponsePrimitive::Number(1.5), + DBResponsePrimitive::Number(2.0), + ] + ); + } + #[test] fn test_parse_with_custom_verifier_options() { use cubeshared::codegen::root_as_http_message_with_opts; diff --git a/rust/cube/cubeorchestrator/src/query_result_transform.rs b/rust/cube/cubeorchestrator/src/query_result_transform.rs index a069d8bda04de..064e15d8a25fd 100644 --- a/rust/cube/cubeorchestrator/src/query_result_transform.rs +++ b/rust/cube/cubeorchestrator/src/query_result_transform.rs @@ -1152,16 +1152,35 @@ pub struct RequestResultArray { pub results: Vec, } -#[derive(Debug, Clone, Serialize, PartialEq)] -#[serde(untagged)] +#[derive(Debug, Clone, PartialEq)] pub enum DBResponsePrimitive { Null, Boolean(bool), Number(f64), String(String), + Timestamp(NaiveDateTime), Uncommon(Value), } +const TIMESTAMP_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.3f"; + +// Hand-written `Serialize` mirroring serde's untagged behavior for the JSON-native +// variants, plus a string rendering for `Timestamp`. +impl Serialize for DBResponsePrimitive { + fn serialize(&self, serializer: S) -> Result { + match self { + DBResponsePrimitive::Null => serializer.serialize_none(), + DBResponsePrimitive::Boolean(b) => serializer.serialize_bool(*b), + DBResponsePrimitive::Number(n) => serializer.serialize_f64(*n), + DBResponsePrimitive::String(s) => serializer.serialize_str(s), + DBResponsePrimitive::Timestamp(dt) => { + serializer.serialize_str(&dt.format(TIMESTAMP_FORMAT).to_string()) + } + DBResponsePrimitive::Uncommon(v) => v.serialize(serializer), + } + } +} + // Hand-written `Deserialize` that avoids serde's untagged-enum buffering. impl<'de> Deserialize<'de> for DBResponsePrimitive { fn deserialize(deserializer: D) -> Result @@ -1256,6 +1275,7 @@ impl Display for DBResponsePrimitive { DBResponsePrimitive::Boolean(b) => b.to_string(), DBResponsePrimitive::Number(n) => n.to_string(), DBResponsePrimitive::String(s) => s.clone(), + DBResponsePrimitive::Timestamp(dt) => dt.format(TIMESTAMP_FORMAT).to_string(), DBResponsePrimitive::Uncommon(v) => { serde_json::to_string(&v).unwrap_or_else(|_| v.to_string()) } @@ -1322,6 +1342,15 @@ mod tests { use serde_json::from_str; use std::{fmt, sync::LazyLock}; + /// Guards the in-memory size of the per-cell primitive. It is materialized + /// once per cell across every parse/transform path, so an accidental growth + /// (e.g. a fat new variant) would regress memory and throughput for large + /// result sets. Bump this deliberately if the layout must change. + #[test] + fn test_db_response_primitive_size() { + assert_eq!(std::mem::size_of::(), 32); + } + type TestSuiteData = HashMap; #[derive(Clone, Deserialize)] From 74746ffd057a108ea479aae1e71e87e1ef907285 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 27 May 2026 16:28:38 +0200 Subject: [PATCH 02/13] chore: up code quality --- .../src/query_message_parser.rs | 104 +++++++++--------- 1 file changed, 51 insertions(+), 53 deletions(-) diff --git a/rust/cube/cubeorchestrator/src/query_message_parser.rs b/rust/cube/cubeorchestrator/src/query_message_parser.rs index c248300b25d8c..21895e90b0891 100644 --- a/rust/cube/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cube/cubeorchestrator/src/query_message_parser.rs @@ -11,7 +11,7 @@ use arrow::array::{ }; use arrow::datatypes::{DataType, TimeUnit}; use arrow::ipc::reader::StreamReader; -use cubeshared::codegen::{root_as_http_message_with_opts, HttpCommand}; +use cubeshared::codegen::{root_as_http_message_with_opts, HttpCommand, HttpResultSet}; use cubeshared::flatbuffers::VerifierOptions; use indexmap::IndexMap; use neon::prelude::Finalize; @@ -166,6 +166,11 @@ impl QueryResult { }) } + pub fn from_js_raw_data(js_raw_data: JsRawColumnarData) -> Result { + let JsRawColumnarData { members, columns } = js_raw_data; + QueryResult::try_new(members, columns) + } + pub fn from_cubestore_fb(msg_data: &[u8]) -> Result { let opts = VerifierOptions { max_tables: 10_000_000, // Support up to 10M tables @@ -189,48 +194,7 @@ impl QueryResult { .command_as_http_result_set() .ok_or(ParseError::EmptyResultSet)?; - let members: Vec = match result_set.columns() { - Some(result_set_columns) => { - if result_set_columns.iter().any(|c| c.is_empty()) { - return Err(ParseError::ColumnNameNotDefined); - } - result_set_columns.iter().map(|c| c.to_owned()).collect() - } - None => Vec::new(), - }; - - let n_cols = members.len(); - let data: Vec = if let Some(result_set_rows) = result_set.rows() { - let row_count = result_set_rows.len(); - let mut data: Vec = (0..n_cols) - .map(|_| ColumnarArray::with_capacity(row_count)) - .collect(); - - for row in result_set_rows.iter() { - let values = row.values().ok_or(ParseError::NullRow)?; - for (col_idx, val) in values.iter().enumerate() { - if col_idx >= n_cols { - break; - } - let cell = match val.string_value() { - Some(s) => DBResponsePrimitive::String(s.to_owned()), - None => DBResponsePrimitive::Null, - }; - data[col_idx].push(cell); - } - - // Pad short rows with Null to keep all columns aligned. - for col in data.iter_mut().take(n_cols).skip(values.len()) { - col.push(DBResponsePrimitive::Null); - } - } - - data - } else { - (0..n_cols).map(|_| ColumnarArray::new()).collect() - }; - - QueryResult::try_new(members, data) + Self::parse_legacy(result_set) } HttpCommand::HttpQueryResult => { let query_result = http_message @@ -250,18 +214,52 @@ impl QueryResult { } } - pub fn from_js_raw_data(js_raw_data: JsRawColumnarData) -> Result { - let JsRawColumnarData { members, columns } = js_raw_data; - QueryResult::try_new(members, columns) + fn parse_legacy(result_set: HttpResultSet<'_>) -> Result { + let members: Vec = match result_set.columns() { + Some(result_set_columns) => { + if result_set_columns.iter().any(|c| c.is_empty()) { + return Err(ParseError::ColumnNameNotDefined); + } + result_set_columns.iter().map(|c| c.to_owned()).collect() + } + None => Vec::new(), + }; + + let n_cols = members.len(); + let data: Vec = if let Some(result_set_rows) = result_set.rows() { + let row_count = result_set_rows.len(); + let mut data: Vec = (0..n_cols) + .map(|_| ColumnarArray::with_capacity(row_count)) + .collect(); + + for row in result_set_rows.iter() { + let values = row.values().ok_or(ParseError::NullRow)?; + for (col_idx, val) in values.iter().enumerate() { + if col_idx >= n_cols { + break; + } + let cell = match val.string_value() { + Some(s) => DBResponsePrimitive::String(s.to_owned()), + None => DBResponsePrimitive::Null, + }; + data[col_idx].push(cell); + } + + // Pad short rows with Null to keep all columns aligned. + for col in data.iter_mut().take(n_cols).skip(values.len()) { + col.push(DBResponsePrimitive::Null); + } + } + + data + } else { + (0..n_cols).map(|_| ColumnarArray::new()).collect() + }; + + QueryResult::try_new(members, data) } - /// Parse an Arrow IPC **stream** payload into the columnar [`QueryResult`]. - /// - /// Member names come from the schema field names; each Arrow column is - /// converted element-wise into [`DBResponsePrimitive`] (see - /// [`append_arrow_array`]). Multiple record batches are concatenated so a - /// streamed result with several batches yields one column per field. - pub fn from_arrow(bytes: &[u8]) -> Result { + fn from_arrow(bytes: &[u8]) -> Result { let reader = StreamReader::try_new(Cursor::new(bytes), None) .map_err(|err| ParseError::ArrowError(err.to_string()))?; From 6e7ac8ad27f3d4ec73f5bfe2e66642f505debca8 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 27 May 2026 16:34:35 +0200 Subject: [PATCH 03/13] chore: up code quality --- .../src/CubeStoreDriver.ts | 41 +++++++++---------- .../src/WebSocketConnection.ts | 12 ++---- 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts index 27a05ab6df37f..47551cbca73fa 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts @@ -1,26 +1,31 @@ import { pipeline, Writable } from 'stream'; import { createGzip } from 'zlib'; -import { createWriteStream, createReadStream } from 'fs'; +import { createReadStream, createWriteStream } from 'fs'; import { unlink } from 'fs-extra'; import tempy from 'tempy'; import csvWriter from 'csv-write-stream'; import { BaseDriver, + CreateTableIndex, DownloadTableCSVData, + DownloadTableMemoryData, + DriverInterface, ExternalCreateTableOptions, - DownloadTableMemoryData, DriverInterface, IndexesSQL, CreateTableIndex, - StreamTableData, - StreamingSourceTableData, + ExternalDriverCompatibilities, + IndexesSQL, QueryOptions, - ExternalDriverCompatibilities, TableStructure, TableColumnQueryResult, + StreamingSourceTableData, + StreamTableData, + TableColumnQueryResult, + TableStructure, } from '@cubejs-backend/base-driver'; import { AsyncDebounce, getEnv, isVersionGte } from '@cubejs-backend/shared'; -import { format as formatSql, escape } from 'sqlstring'; +import { escape, format as formatSql } from 'sqlstring'; import fetch from 'node-fetch'; -import { ConnectionConfig } from './types'; -import { WebSocketConnection } from './WebSocketConnection'; -import { QueryResultFormat } from '../codegen'; +import {ConnectionConfig} from './types'; +import {WebSocketConnection} from './WebSocketConnection'; +import {QueryResultFormat} from '../codegen'; const CubeStoreCapabilityMinVersion = { queueExclusive: '1.6.22', @@ -62,6 +67,7 @@ type CreateTableOptions = { type CubeStoreQueryOptions = QueryOptions & { sendParameters?: boolean, + responseFormat?: QueryResultFormat, }; export class CubeStoreDriver extends BaseDriver implements DriverInterface { @@ -88,17 +94,6 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { this.connection = new WebSocketConnection(`${this.baseUrl}/ws`); } - private static toResponseFormat(format: string): QueryResultFormat { - switch (format) { - case 'arrow': - return QueryResultFormat.Arrow; - case 'legacy': - return QueryResultFormat.Legacy; - default: - throw new Error(`Unsupported CubeStore response format: ${format}. Expected 'arrow' or 'legacy'.`); - } - } - public async hasCapability(capability: CubeStoreCapability): Promise { const minVersion = CubeStoreCapabilityMinVersion[capability]; @@ -110,7 +105,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { } public async query(query: string, values: any[], options?: CubeStoreQueryOptions): Promise { - const { inlineTables, sendParameters, ...queryTracingObj } = options ?? {}; + const { inlineTables, sendParameters, responseFormat, ...queryTracingObj } = options ?? {}; if (!sendParameters) { query = formatSql(query, values || []); @@ -121,7 +116,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { return this.connection.query(query, sendParameters ? values : [], { inlineTables: inlineTables ?? [], queryTracingObj: tracingObj, - responseFormat: CubeStoreDriver.toResponseFormat(this.config.responseFormat), + // Arrow is a default format for Cube Store, but + // an old Cube Store will ignore this option and will continue to serve results in Legacy format + responseFormat: responseFormat || QueryResultFormat.Arrow, }); } diff --git a/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts b/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts index 238f14b7528a5..6e70d08b2378b 100644 --- a/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts +++ b/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts @@ -33,7 +33,7 @@ export type QueryParameter = null | boolean | number | string | Buffer; export type WebSocketQueryOptions = { inlineTables?: InlineTable[]; queryTracingObj?: any; - responseFormat?: QueryResultFormat; + responseFormat: QueryResultFormat; }; interface CubeStoreWebSocket extends WebSocket { @@ -291,14 +291,8 @@ export class WebSocketConnection { } } - public async query(query: string, parameters: QueryParameter[], options: WebSocketQueryOptions = {}): Promise { - const { - inlineTables, - queryTracingObj, - // Why, it's not a breaking change: - // An old Cube Store will ignore this option and will continue to serve results in Legacy format - responseFormat = QueryResultFormat.Arrow - } = options; + public async query(query: string, parameters: QueryParameter[], options: WebSocketQueryOptions): Promise { + const { inlineTables, queryTracingObj, responseFormat } = options; const builder = new flatbuffers.Builder(1024); const queryOffset = builder.createString(query); From d6d691fef612022149e858ed0d66419d6f0806d4 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 27 May 2026 17:16:36 +0200 Subject: [PATCH 04/13] refactor(cubeorchestrator): split DBResponsePrimitive numeric variants --- .../cubejs-backend-native/src/orchestrator.rs | 6 +- .../cubeorchestrator/benches/common/mod.rs | 2 +- .../src/query_message_parser.rs | 70 ++++++++++++------- .../src/query_result_transform.rs | 35 +++++++--- 4 files changed, 78 insertions(+), 35 deletions(-) diff --git a/packages/cubejs-backend-native/src/orchestrator.rs b/packages/cubejs-backend-native/src/orchestrator.rs index c1faf324f0ef9..cce0910fed1c5 100644 --- a/packages/cubejs-backend-native/src/orchestrator.rs +++ b/packages/cubejs-backend-native/src/orchestrator.rs @@ -132,12 +132,14 @@ impl ResultWrapper { fn db_primitive_to_field_value(value: &DBResponsePrimitive) -> FieldValue<'_> { match value { DBResponsePrimitive::String(s) => FieldValue::String(Cow::Borrowed(s)), - DBResponsePrimitive::Number(n) => FieldValue::Number(*n), + DBResponsePrimitive::Int64(n) => FieldValue::Number(*n as f64), + DBResponsePrimitive::UInt64(n) => FieldValue::Number(*n as f64), + DBResponsePrimitive::Float64(n) => FieldValue::Number(*n), DBResponsePrimitive::Boolean(b) => FieldValue::Bool(*b), + DBResponsePrimitive::Timestamp(_) => FieldValue::String(Cow::Owned(value.to_string())), DBResponsePrimitive::Uncommon(v) => FieldValue::String(Cow::Owned( serde_json::to_string(&v).unwrap_or_else(|_| v.to_string()), )), - DBResponsePrimitive::Timestamp(_) => FieldValue::String(Cow::Owned(value.to_string())), DBResponsePrimitive::Null => FieldValue::Null, } } diff --git a/rust/cube/cubeorchestrator/benches/common/mod.rs b/rust/cube/cubeorchestrator/benches/common/mod.rs index a8246889c55a7..6b2aeb0200a70 100644 --- a/rust/cube/cubeorchestrator/benches/common/mod.rs +++ b/rust/cube/cubeorchestrator/benches/common/mod.rs @@ -56,7 +56,7 @@ pub fn build_dataset( members.push(alias.clone()); let mut col = ColumnarArray::with_capacity(row_count); for i in 0..row_count { - col.push(DBResponsePrimitive::Number(((i * (j + 1)) as f64) * 0.5)); + col.push(DBResponsePrimitive::Float64(((i * (j + 1)) as f64) * 0.5)); } columns.push(col); } diff --git a/rust/cube/cubeorchestrator/src/query_message_parser.rs b/rust/cube/cubeorchestrator/src/query_message_parser.rs index 21895e90b0891..787ba7697dc9b 100644 --- a/rust/cube/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cube/cubeorchestrator/src/query_message_parser.rs @@ -282,8 +282,7 @@ impl QueryResult { } /// Append every element of an Arrow `array` to a column accumulator, converting -/// each value to [`DBResponsePrimitive`]. Temporal types become -/// [`DBResponsePrimitive::Timestamp`]; integers/floats/decimals become `Number`. +/// each value to [`DBResponsePrimitive`]. fn append_arrow_array( col: &mut Vec, array: &dyn Array, @@ -291,14 +290,40 @@ fn append_arrow_array( let len = array.len(); col.reserve(len); - macro_rules! push_numeric { + macro_rules! push_int { ($ty:ty) => {{ let a = array.as_any().downcast_ref::<$ty>().unwrap(); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); } else { - col.push(DBResponsePrimitive::Number(a.value(i) as f64)); + col.push(DBResponsePrimitive::Int64(a.value(i) as i64)); + } + } + }}; + } + + macro_rules! push_uint { + ($ty:ty) => {{ + let a = array.as_any().downcast_ref::<$ty>().unwrap(); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::UInt64(a.value(i) as u64)); + } + } + }}; + } + + macro_rules! push_float { + ($ty:ty) => {{ + let a = array.as_any().downcast_ref::<$ty>().unwrap(); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::Float64(a.value(i) as f64)); } } }}; @@ -342,7 +367,7 @@ fn append_arrow_array( } else { let s = a.value_as_string(i); match s.parse::() { - Ok(n) => col.push(DBResponsePrimitive::Number(n)), + Ok(n) => col.push(DBResponsePrimitive::Float64(n)), Err(_) => col.push(DBResponsePrimitive::String(s)), } } @@ -366,23 +391,23 @@ fn append_arrow_array( } } } - DataType::Int8 => push_numeric!(Int8Array), - DataType::Int16 => push_numeric!(Int16Array), - DataType::Int32 => push_numeric!(Int32Array), - DataType::Int64 => push_numeric!(Int64Array), - DataType::UInt8 => push_numeric!(UInt8Array), - DataType::UInt16 => push_numeric!(UInt16Array), - DataType::UInt32 => push_numeric!(UInt32Array), - DataType::UInt64 => push_numeric!(UInt64Array), - DataType::Float32 => push_numeric!(Float32Array), - DataType::Float64 => push_numeric!(Float64Array), + DataType::Int8 => push_int!(Int8Array), + DataType::Int16 => push_int!(Int16Array), + DataType::Int32 => push_int!(Int32Array), + DataType::Int64 => push_int!(Int64Array), + DataType::UInt8 => push_uint!(UInt8Array), + DataType::UInt16 => push_uint!(UInt16Array), + DataType::UInt32 => push_uint!(UInt32Array), + DataType::UInt64 => push_uint!(UInt64Array), + DataType::Float32 => push_float!(Float32Array), + DataType::Float64 => push_float!(Float64Array), DataType::Float16 => { let a = array.as_any().downcast_ref::().unwrap(); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); } else { - col.push(DBResponsePrimitive::Number(a.value(i).to_f64())); + col.push(DBResponsePrimitive::Float64(a.value(i).to_f64())); } } } @@ -416,13 +441,12 @@ mod tests { fn create_test_message(num_rows: usize, num_columns: usize) -> Vec { let mut builder = FlatBufferBuilder::new(); - // Create column names let column_names: Vec<_> = (0..num_columns) .map(|i| builder.create_string(&format!("column_{}", i))) .collect(); - // Create rows with values let mut rows_vec = Vec::with_capacity(num_rows); + for row_idx in 0..num_rows { // Create column values for this row let mut values_vec = Vec::with_capacity(num_columns); @@ -459,7 +483,6 @@ mod tests { }, ); - // Create the message let connection_id = builder.create_string("test_connection"); let message = HttpMessage::create( &mut builder, @@ -477,7 +500,6 @@ mod tests { #[test] fn test_parse_small_result_set() { - // Small result set should work fine let msg_data = create_test_message(10, 5); let result = QueryResult::from_cubestore_fb(&msg_data); assert!(result.is_ok()); @@ -637,8 +659,8 @@ mod tests { assert_eq!( result.data[1].as_slice(), &[ - DBResponsePrimitive::Number(1.5), - DBResponsePrimitive::Number(2.0), + DBResponsePrimitive::Float64(1.5), + DBResponsePrimitive::Float64(2.0), DBResponsePrimitive::Null, ] ); @@ -745,8 +767,8 @@ mod tests { assert_eq!( result.data[1].as_slice(), &[ - DBResponsePrimitive::Number(1.5), - DBResponsePrimitive::Number(2.0), + DBResponsePrimitive::Float64(1.5), + DBResponsePrimitive::Float64(2.0), ] ); } diff --git a/rust/cube/cubeorchestrator/src/query_result_transform.rs b/rust/cube/cubeorchestrator/src/query_result_transform.rs index 064e15d8a25fd..94fe31ada09bc 100644 --- a/rust/cube/cubeorchestrator/src/query_result_transform.rs +++ b/rust/cube/cubeorchestrator/src/query_result_transform.rs @@ -1156,7 +1156,9 @@ pub struct RequestResultArray { pub enum DBResponsePrimitive { Null, Boolean(bool), - Number(f64), + Int64(i64), + UInt64(u64), + Float64(f64), String(String), Timestamp(NaiveDateTime), Uncommon(Value), @@ -1171,7 +1173,9 @@ impl Serialize for DBResponsePrimitive { match self { DBResponsePrimitive::Null => serializer.serialize_none(), DBResponsePrimitive::Boolean(b) => serializer.serialize_bool(*b), - DBResponsePrimitive::Number(n) => serializer.serialize_f64(*n), + DBResponsePrimitive::Int64(n) => serializer.serialize_i64(*n), + DBResponsePrimitive::UInt64(n) => serializer.serialize_u64(*n), + DBResponsePrimitive::Float64(n) => serializer.serialize_f64(*n), DBResponsePrimitive::String(s) => serializer.serialize_str(s), DBResponsePrimitive::Timestamp(dt) => { serializer.serialize_str(&dt.format(TIMESTAMP_FORMAT).to_string()) @@ -1201,23 +1205,35 @@ impl<'de> Deserialize<'de> for DBResponsePrimitive { } fn visit_i64(self, v: i64) -> Result { - Ok(DBResponsePrimitive::Number(v as f64)) + Ok(DBResponsePrimitive::Int64(v)) } fn visit_i128(self, v: i128) -> Result { - Ok(DBResponsePrimitive::Number(v as f64)) + if let Ok(n) = i64::try_from(v) { + Ok(DBResponsePrimitive::Int64(n)) + } else if let Ok(n) = u64::try_from(v) { + Ok(DBResponsePrimitive::UInt64(n)) + } else { + Err(E::custom(format!("integer {v} out of range for i64/u64"))) + } } fn visit_u64(self, v: u64) -> Result { - Ok(DBResponsePrimitive::Number(v as f64)) + Ok(DBResponsePrimitive::UInt64(v)) } fn visit_u128(self, v: u128) -> Result { - Ok(DBResponsePrimitive::Number(v as f64)) + if let Ok(n) = i64::try_from(v) { + Ok(DBResponsePrimitive::Int64(n)) + } else if let Ok(n) = u64::try_from(v) { + Ok(DBResponsePrimitive::UInt64(n)) + } else { + Err(E::custom(format!("integer {v} out of range for i64/u64"))) + } } fn visit_f64(self, v: f64) -> Result { - Ok(DBResponsePrimitive::Number(v)) + Ok(DBResponsePrimitive::Float64(v)) } fn visit_str(self, v: &str) -> Result { @@ -1273,13 +1289,16 @@ impl Display for DBResponsePrimitive { let str = match self { DBResponsePrimitive::Null => "null".to_string(), DBResponsePrimitive::Boolean(b) => b.to_string(), - DBResponsePrimitive::Number(n) => n.to_string(), + DBResponsePrimitive::Int64(n) => n.to_string(), + DBResponsePrimitive::UInt64(n) => n.to_string(), + DBResponsePrimitive::Float64(n) => n.to_string(), DBResponsePrimitive::String(s) => s.clone(), DBResponsePrimitive::Timestamp(dt) => dt.format(TIMESTAMP_FORMAT).to_string(), DBResponsePrimitive::Uncommon(v) => { serde_json::to_string(&v).unwrap_or_else(|_| v.to_string()) } }; + write!(f, "{}", str) } } From 1968731a9dc5549770fe625513970aa8e2f42e75 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 27 May 2026 17:41:01 +0200 Subject: [PATCH 05/13] refactor(cubeorchestrator): return error on Arrow array downcast mismatch --- .../src/query_message_parser.rs | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/rust/cube/cubeorchestrator/src/query_message_parser.rs b/rust/cube/cubeorchestrator/src/query_message_parser.rs index 787ba7697dc9b..ab3c73f219fca 100644 --- a/rust/cube/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cube/cubeorchestrator/src/query_message_parser.rs @@ -290,9 +290,20 @@ fn append_arrow_array( let len = array.len(); col.reserve(len); + macro_rules! downcast_array_ref { + ($ty:ty) => { + array.as_any().downcast_ref::<$ty>().ok_or_else(|| { + ParseError::ArrowError(format!( + "Failed to downcast Arrow array to {}", + stringify!($ty) + )) + })? + }; + } + macro_rules! push_int { ($ty:ty) => {{ - let a = array.as_any().downcast_ref::<$ty>().unwrap(); + let a = downcast_array_ref!($ty); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); @@ -305,7 +316,7 @@ fn append_arrow_array( macro_rules! push_uint { ($ty:ty) => {{ - let a = array.as_any().downcast_ref::<$ty>().unwrap(); + let a = downcast_array_ref!($ty); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); @@ -318,7 +329,7 @@ fn append_arrow_array( macro_rules! push_float { ($ty:ty) => {{ - let a = array.as_any().downcast_ref::<$ty>().unwrap(); + let a = downcast_array_ref!($ty); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); @@ -331,7 +342,7 @@ fn append_arrow_array( macro_rules! push_str { ($ty:ty) => {{ - let a = array.as_any().downcast_ref::<$ty>().unwrap(); + let a = downcast_array_ref!($ty); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); @@ -344,7 +355,7 @@ fn append_arrow_array( macro_rules! push_datetime { ($ty:ty) => {{ - let a = array.as_any().downcast_ref::<$ty>().unwrap(); + let a = downcast_array_ref!($ty); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); @@ -360,7 +371,7 @@ fn append_arrow_array( macro_rules! push_decimal { ($ty:ty) => {{ - let a = array.as_any().downcast_ref::<$ty>().unwrap(); + let a = downcast_array_ref!($ty); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); @@ -382,7 +393,7 @@ fn append_arrow_array( } } DataType::Boolean => { - let a = array.as_any().downcast_ref::().unwrap(); + let a = downcast_array_ref!(BooleanArray); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); @@ -402,7 +413,7 @@ fn append_arrow_array( DataType::Float32 => push_float!(Float32Array), DataType::Float64 => push_float!(Float64Array), DataType::Float16 => { - let a = array.as_any().downcast_ref::().unwrap(); + let a = downcast_array_ref!(Float16Array); for i in 0..len { if a.is_null(i) { col.push(DBResponsePrimitive::Null); From aa68bc7d1b8594085ee32492df93466ae4d2420a Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 27 May 2026 18:47:21 +0200 Subject: [PATCH 06/13] perf: optimize timestamp format --- .../src/query_result_transform.rs | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/rust/cube/cubeorchestrator/src/query_result_transform.rs b/rust/cube/cubeorchestrator/src/query_result_transform.rs index 94fe31ada09bc..c5ae7de6cd079 100644 --- a/rust/cube/cubeorchestrator/src/query_result_transform.rs +++ b/rust/cube/cubeorchestrator/src/query_result_transform.rs @@ -6,6 +6,7 @@ use crate::{ }, }; use anyhow::{bail, Context, Result}; +use chrono::format::{Fixed, Item, Numeric, Pad}; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use indexmap::{Equivalent, IndexMap}; use itertools::multizip; @@ -1164,7 +1165,22 @@ pub enum DBResponsePrimitive { Uncommon(Value), } -const TIMESTAMP_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.3f"; +/// `%Y-%m-%dT%H:%M:%S%.3f` +const TIMESTAMP_ITEMS: &[Item<'static>] = &[ + Item::Numeric(Numeric::Year, Pad::Zero), + Item::Literal("-"), + Item::Numeric(Numeric::Month, Pad::Zero), + Item::Literal("-"), + Item::Numeric(Numeric::Day, Pad::Zero), + Item::Literal("T"), + Item::Numeric(Numeric::Hour, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Minute, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Second, Pad::Zero), + // `%.3f`: leading dot followed by 3 fractional-second digits. + Item::Fixed(Fixed::Nanosecond3), +]; // Hand-written `Serialize` mirroring serde's untagged behavior for the JSON-native // variants, plus a string rendering for `Timestamp`. @@ -1178,7 +1194,7 @@ impl Serialize for DBResponsePrimitive { DBResponsePrimitive::Float64(n) => serializer.serialize_f64(*n), DBResponsePrimitive::String(s) => serializer.serialize_str(s), DBResponsePrimitive::Timestamp(dt) => { - serializer.serialize_str(&dt.format(TIMESTAMP_FORMAT).to_string()) + serializer.collect_str(&dt.format_with_items(TIMESTAMP_ITEMS.iter())) } DBResponsePrimitive::Uncommon(v) => v.serialize(serializer), } @@ -1293,7 +1309,9 @@ impl Display for DBResponsePrimitive { DBResponsePrimitive::UInt64(n) => n.to_string(), DBResponsePrimitive::Float64(n) => n.to_string(), DBResponsePrimitive::String(s) => s.clone(), - DBResponsePrimitive::Timestamp(dt) => dt.format(TIMESTAMP_FORMAT).to_string(), + DBResponsePrimitive::Timestamp(dt) => { + dt.format_with_items(TIMESTAMP_ITEMS.iter()).to_string() + } DBResponsePrimitive::Uncommon(v) => { serde_json::to_string(&v).unwrap_or_else(|_| v.to_string()) } From 008f5eafe9e88758a0420cf80e0b999ec928515a Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 28 May 2026 13:09:32 +0200 Subject: [PATCH 07/13] Update packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com> --- packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts index 47551cbca73fa..9ac1fba4812dc 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts @@ -118,7 +118,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { queryTracingObj: tracingObj, // Arrow is a default format for Cube Store, but // an old Cube Store will ignore this option and will continue to serve results in Legacy format - responseFormat: responseFormat || QueryResultFormat.Arrow, + responseFormat: responseFormat ?? QueryResultFormat.Arrow, }); } From 607e0d7dae5e169c8e6d8609a7152c159449af86 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 28 May 2026 13:15:25 +0200 Subject: [PATCH 08/13] refactor(cubeorchestrator): bench Arrow path through from_cubestore_fb --- rust/cube/cubeorchestrator/benches/parser.rs | 51 ++++++++++++++++--- .../src/query_message_parser.rs | 2 +- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/rust/cube/cubeorchestrator/benches/parser.rs b/rust/cube/cubeorchestrator/benches/parser.rs index a66a82ff39dde..7358bd56ed6de 100644 --- a/rust/cube/cubeorchestrator/benches/parser.rs +++ b/rust/cube/cubeorchestrator/benches/parser.rs @@ -4,8 +4,9 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Through use cubeorchestrator::query_message_parser::QueryResult; use cubeorchestrator::transport::JsRawColumnarData; use cubeshared::codegen::{ - HttpColumnValue, HttpColumnValueArgs, HttpCommand, HttpMessage, HttpMessageArgs, HttpResultSet, - HttpResultSetArgs, HttpRow, HttpRowArgs, + HttpColumnValue, HttpColumnValueArgs, HttpCommand, HttpMessage, HttpMessageArgs, + HttpQueryResult, HttpQueryResultArgs, HttpQueryResultArrow, HttpQueryResultArrowArgs, + HttpQueryResultData, HttpResultSet, HttpResultSetArgs, HttpRow, HttpRowArgs, }; use cubeshared::flatbuffers::FlatBufferBuilder; @@ -152,8 +153,40 @@ fn bench_from_js_raw_data(c: &mut Criterion) { group.finish(); } -fn bench_from_arrow(c: &mut Criterion) { - let mut group = c.benchmark_group("QueryResult::from_arrow"); +/// Wrap raw Arrow IPC bytes in an `HttpMessage` FlatBuffer carrying +fn build_cubestore_fb_arrow_message(arrow_ipc: &[u8]) -> Vec { + let mut builder = FlatBufferBuilder::new(); + let data_vec = builder.create_vector(arrow_ipc); + let arrow = HttpQueryResultArrow::create( + &mut builder, + &HttpQueryResultArrowArgs { + data: Some(data_vec), + is_last: true, + }, + ); + let query_result = HttpQueryResult::create( + &mut builder, + &HttpQueryResultArgs { + data_type: HttpQueryResultData::HttpQueryResultArrow, + data: Some(arrow.as_union_value()), + }, + ); + let connection_id = builder.create_string("bench_connection"); + let message = HttpMessage::create( + &mut builder, + &HttpMessageArgs { + message_id: 1, + command_type: HttpCommand::HttpQueryResult, + command: Some(query_result.as_union_value()), + connection_id: Some(connection_id), + }, + ); + builder.finish(message, None); + builder.finished_data().to_vec() +} + +fn bench_from_cubestore_fb_arrow(c: &mut Criterion) { + let mut group = c.benchmark_group("QueryResult::from_cubestore_fb_arrow"); let combos: &[(usize, usize)] = &[ (8, 1), @@ -169,11 +202,12 @@ fn bench_from_arrow(c: &mut Criterion) { let dimensions = make_member_aliases("dim", dim_count); let measures = make_member_aliases("measure", measure_count); - let payload = build_arrow_ipc(row_count, &dimensions, &measures, &[]); + let arrow_ipc = build_arrow_ipc(row_count, &dimensions, &measures, &[]); + let payload = build_cubestore_fb_arrow_message(&arrow_ipc); let payload_len = payload.len(); eprintln!( - "from_arrow: c{:02}_r{} payload_bytes={}", + "from_cubestore_fb_arrow: c{:02}_r{} payload_bytes={}", col_count, row_count, payload_len ); @@ -184,7 +218,8 @@ fn bench_from_arrow(c: &mut Criterion) { // the equivalent of from_js_raw_data's `parse_plus_build`. group.bench_with_input(BenchmarkId::from_parameter(id), &(), |b, _| { b.iter(|| { - let built = QueryResult::from_arrow(black_box(&payload)).expect("from_arrow"); + let built = QueryResult::from_cubestore_fb(black_box(&payload)) + .expect("from_cubestore_fb arrow"); black_box(built); }); }); @@ -197,6 +232,6 @@ criterion_group!( benches, bench_from_cubestore_fb, bench_from_js_raw_data, - bench_from_arrow + bench_from_cubestore_fb_arrow ); criterion_main!(benches); diff --git a/rust/cube/cubeorchestrator/src/query_message_parser.rs b/rust/cube/cubeorchestrator/src/query_message_parser.rs index ab3c73f219fca..e89ea7478b22c 100644 --- a/rust/cube/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cube/cubeorchestrator/src/query_message_parser.rs @@ -259,7 +259,7 @@ impl QueryResult { QueryResult::try_new(members, data) } - fn from_arrow(bytes: &[u8]) -> Result { + pub(crate) fn from_arrow(bytes: &[u8]) -> Result { let reader = StreamReader::try_new(Cursor::new(bytes), None) .map_err(|err| ParseError::ArrowError(err.to_string()))?; From 8f0a966ae038917737c07284ccd6db809a7007d4 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Thu, 28 May 2026 13:23:10 +0200 Subject: [PATCH 09/13] chore: lint --- packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts index 9ac1fba4812dc..b80c356664d0e 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts @@ -23,9 +23,9 @@ import { AsyncDebounce, getEnv, isVersionGte } from '@cubejs-backend/shared'; import { escape, format as formatSql } from 'sqlstring'; import fetch from 'node-fetch'; -import {ConnectionConfig} from './types'; -import {WebSocketConnection} from './WebSocketConnection'; -import {QueryResultFormat} from '../codegen'; +import { ConnectionConfig } from './types'; +import { WebSocketConnection } from './WebSocketConnection'; +import { QueryResultFormat } from '../codegen'; const CubeStoreCapabilityMinVersion = { queueExclusive: '1.6.22', From 6bb58b6efb007f376d922b7e9cb2007032259b8d Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 5 Jun 2026 15:42:00 +0200 Subject: [PATCH 10/13] feat(cubeorchestrator): support parsing Completed query result --- .../src/query_message_parser.rs | 77 ++++++++++++++++--- 1 file changed, 67 insertions(+), 10 deletions(-) diff --git a/rust/cube/cubeorchestrator/src/query_message_parser.rs b/rust/cube/cubeorchestrator/src/query_message_parser.rs index e89ea7478b22c..2edf9a9782574 100644 --- a/rust/cube/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cube/cubeorchestrator/src/query_message_parser.rs @@ -11,7 +11,9 @@ use arrow::array::{ }; use arrow::datatypes::{DataType, TimeUnit}; use arrow::ipc::reader::StreamReader; -use cubeshared::codegen::{root_as_http_message_with_opts, HttpCommand, HttpResultSet}; +use cubeshared::codegen::{ + root_as_http_message_with_opts, HttpCommand, HttpQueryResultData, HttpResultSet, +}; use cubeshared::flatbuffers::VerifierOptions; use indexmap::IndexMap; use neon::prelude::Finalize; @@ -200,15 +202,30 @@ impl QueryResult { let query_result = http_message .command_as_http_query_result() .ok_or(ParseError::EmptyResultSet)?; - let arrow = query_result - .data_as_http_query_result_arrow() - .ok_or_else(|| { - ParseError::FlatBufferError( - "HttpQueryResult.data is not HttpQueryResultArrow".to_string(), - ) - })?; - - Self::from_arrow(arrow.data().bytes()) + + match query_result.data_type() { + HttpQueryResultData::HttpQueryResultArrow => { + let arrow = + query_result + .data_as_http_query_result_arrow() + .ok_or_else(|| { + ParseError::FlatBufferError( + "HttpQueryResult.data is not HttpQueryResultArrow" + .to_string(), + ) + })?; + + Self::from_arrow(arrow.data().bytes()) + } + // Marker for statements that complete without a result set + // (CREATE TABLE/INSERT, queue/cache writes). Carries no payload, + // so it maps to an empty result, like a zero-column legacy result set. + HttpQueryResultData::HttpQueryResultCompleted => Ok(QueryResult::empty()), + other => Err(ParseError::FlatBufferError(format!( + "Unsupported HttpQueryResult.data type: {:?}", + other + ))), + } } _ => Err(ParseError::UnsupportedCommand), } @@ -784,6 +801,46 @@ mod tests { ); } + #[test] + fn test_from_cubestore_fb_completed_query_result() { + use cubeshared::codegen::{ + HttpMessage, HttpMessageArgs, HttpQueryResult, HttpQueryResultArgs, + HttpQueryResultCompleted, HttpQueryResultCompletedArgs, HttpQueryResultData, + }; + use cubeshared::flatbuffers::FlatBufferBuilder; + + // A statement that completes without a result set (e.g. CREATE TABLE/INSERT) + // is reported as an empty HttpQueryResultCompleted marker. + let mut builder = FlatBufferBuilder::new(); + let completed = + HttpQueryResultCompleted::create(&mut builder, &HttpQueryResultCompletedArgs {}); + let query_result = HttpQueryResult::create( + &mut builder, + &HttpQueryResultArgs { + data_type: HttpQueryResultData::HttpQueryResultCompleted, + data: Some(completed.as_union_value()), + }, + ); + let connection_id = builder.create_string("test_connection"); + let message = HttpMessage::create( + &mut builder, + &HttpMessageArgs { + message_id: 1, + command_type: HttpCommand::HttpQueryResult, + command: Some(query_result.as_union_value()), + connection_id: Some(connection_id), + }, + ); + builder.finish(message, None); + let msg_data = builder.finished_data().to_vec(); + + let result = + QueryResult::from_cubestore_fb(&msg_data).expect("from_cubestore_fb completed"); + assert!(result.members.is_empty()); + assert_eq!(result.row_count, 0); + assert!(result.data.is_empty()); + } + #[test] fn test_parse_with_custom_verifier_options() { use cubeshared::codegen::root_as_http_message_with_opts; From fcc3dab9e2c4c5a71f6680ec8ebed9f96a0a00fb Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 5 Jun 2026 15:43:03 +0200 Subject: [PATCH 11/13] chore: use arrow format only with new cube store --- packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts index b80c356664d0e..9d21478b22710 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts @@ -31,6 +31,8 @@ const CubeStoreCapabilityMinVersion = { queueExclusive: '1.6.22', queueExternalId: '1.6.26', sendableParameters: '1.6.38', + // Arrow format + Completed response type + arrowFormat: '1.6.55', } satisfies Record; type CubeStoreCapability = keyof typeof CubeStoreCapabilityMinVersion; @@ -116,9 +118,9 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { return this.connection.query(query, sendParameters ? values : [], { inlineTables: inlineTables ?? [], queryTracingObj: tracingObj, - // Arrow is a default format for Cube Store, but - // an old Cube Store will ignore this option and will continue to serve results in Legacy format - responseFormat: responseFormat ?? QueryResultFormat.Arrow, + responseFormat: responseFormat ?? ( + await this.hasCapability('arrowFormat') ? QueryResultFormat.Arrow : QueryResultFormat.Legacy + ), }); } From bcda3ca88605e445fabcc767ae35c244f821bc5e Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 5 Jun 2026 15:58:40 +0200 Subject: [PATCH 12/13] chore: some love --- .../src/query_message_parser.rs | 142 ++++++++---------- 1 file changed, 62 insertions(+), 80 deletions(-) diff --git a/rust/cube/cubeorchestrator/src/query_message_parser.rs b/rust/cube/cubeorchestrator/src/query_message_parser.rs index 2edf9a9782574..c74631503e2f9 100644 --- a/rust/cube/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cube/cubeorchestrator/src/query_message_parser.rs @@ -459,11 +459,18 @@ fn append_arrow_array( #[cfg(test)] mod tests { use super::*; + use arrow::array::BinaryArray; + use arrow::datatypes::{Field, Schema}; + use arrow::ipc::writer::StreamWriter; + use arrow::record_batch::RecordBatch; use cubeshared::codegen::{ - root_as_http_message_unchecked, HttpColumnValue, HttpColumnValueArgs, HttpCommand, - HttpMessage, HttpMessageArgs, HttpResultSet, HttpResultSetArgs, HttpRow, HttpRowArgs, + root_as_http_message_unchecked, HttpColumnValue, HttpColumnValueArgs, HttpMessage, + HttpMessageArgs, HttpQueryResult, HttpQueryResultArgs, HttpQueryResultArrow, + HttpQueryResultArrowArgs, HttpQueryResultCompleted, HttpQueryResultCompletedArgs, + HttpResultSetArgs, HttpRow, HttpRowArgs, }; use cubeshared::flatbuffers::FlatBufferBuilder; + use std::sync::Arc; /// Helper function to create a test HttpMessage with a given number of rows and columns fn create_test_message(num_rows: usize, num_columns: usize) -> Vec { @@ -527,82 +534,82 @@ mod tests { } #[test] - fn test_parse_small_result_set() { + fn test_parse_small_result_set() -> Result<(), ParseError> { let msg_data = create_test_message(10, 5); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 5); assert_eq!(query_result.row_count, 10); assert_eq!(query_result.data.len(), 5); assert!(query_result.data.iter().all(|c| c.len() == 10)); + + Ok(()) } #[test] - fn test_parse_medium_result_set() { + fn test_parse_medium_result_set() -> Result<(), ParseError> { // Medium result set: 1000 rows, 20 columns let msg_data = create_test_message(1000, 20); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 20); assert_eq!(query_result.row_count, 1000); assert_eq!(query_result.data.len(), 20); assert!(query_result.data.iter().all(|c| c.len() == 1000)); + + Ok(()) } #[test] - fn test_parse_large_result_set() { + fn test_parse_large_result_set() -> Result<(), ParseError> { // Large result set: 10,000 rows, 30 columns // This should start showing verification issues let msg_data = create_test_message(10_000, 30); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 30); assert_eq!(query_result.row_count, 10_000); assert_eq!(query_result.data.len(), 30); assert!(query_result.data.iter().all(|c| c.len() == 10_000)); + + Ok(()) } #[test] - fn test_parse_very_large_result_set() { + fn test_parse_very_large_result_set() -> Result<(), ParseError> { // Very large result set: 33,000 rows, 40 columns let msg_data = create_test_message(33_000, 40); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 40); assert_eq!(query_result.row_count, 33_000); assert_eq!(query_result.data.len(), 40); assert!(query_result.data.iter().all(|c| c.len() == 33_000)); + + Ok(()) } #[test] - fn test_parse_huge_result_set() { + fn test_parse_huge_result_set() -> Result<(), ParseError> { // Huge result set: 50,000 rows, 100 columns let msg_data = create_test_message(50_000, 100); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 100); assert_eq!(query_result.row_count, 50_000); assert_eq!(query_result.data.len(), 100); assert!(query_result.data.iter().all(|c| c.len() == 50_000)); + + Ok(()) } #[test] - fn test_compare_with_unchecked_parse() { + fn test_compare_with_unchecked_parse() -> Result<(), ParseError> { // Test to demonstrate that unchecked parsing would work let msg_data = create_test_message(33_000, 40); // Checked version (current implementation) - let checked_result = QueryResult::from_cubestore_fb(&msg_data); + let checked_result = QueryResult::from_cubestore_fb(&msg_data)?; // Try unchecked version to verify the data itself is valid let unchecked_result = unsafe { @@ -625,12 +632,13 @@ mod tests { } }; - assert!(checked_result.is_ok()); + assert_eq!(checked_result.row_count, 33_000); assert!(unchecked_result.is_ok()); + + Ok(()) } - fn arrow_ipc_bytes(batch: &arrow::record_batch::RecordBatch) -> Vec { - use arrow::ipc::writer::StreamWriter; + fn arrow_ipc_bytes(batch: &RecordBatch) -> Vec { let mut buf = Vec::new(); { let schema = batch.schema(); @@ -642,12 +650,7 @@ mod tests { } #[test] - fn test_from_arrow_basic_types() { - use arrow::array::{Float64Array, StringArray, TimestampMillisecondArray}; - use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; - use arrow::record_batch::RecordBatch; - use std::sync::Arc; - + fn test_from_arrow_basic_types() -> Result<(), ParseError> { let schema = Arc::new(Schema::new(vec![ Field::new("city", DataType::Utf8, true), Field::new("amount", DataType::Float64, true), @@ -670,7 +673,7 @@ mod tests { .unwrap(); let bytes = arrow_ipc_bytes(&batch); - let result = QueryResult::from_arrow(&bytes).expect("from_arrow"); + let result = QueryResult::from_arrow(&bytes)?; assert_eq!(result.members, vec!["city", "amount", "created_at"]); assert_eq!(result.row_count, 3); @@ -703,15 +706,12 @@ mod tests { assert_eq!(json[0], "1970-01-01T00:00:00.000"); assert_eq!(json[1], serde_json::Value::Null); assert_eq!(json[2], "1970-01-01T00:00:01.000"); + + Ok(()) } #[test] - fn test_from_arrow_unsupported_type() { - use arrow::array::BinaryArray; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use std::sync::Arc; - + fn test_from_arrow_unsupported_type() -> Result<(), ParseError> { let schema = Arc::new(Schema::new(vec![Field::new( "blob", DataType::Binary, @@ -723,20 +723,12 @@ mod tests { let bytes = arrow_ipc_bytes(&batch); let err = QueryResult::from_arrow(&bytes).expect_err("should reject Binary"); assert!(matches!(err, ParseError::UnsupportedArrowType(_))); + + Ok(()) } #[test] - fn test_from_cubestore_fb_arrow_query_result() { - use arrow::array::{Float64Array, StringArray}; - use arrow::datatypes::{DataType, Field, Schema}; - use arrow::record_batch::RecordBatch; - use cubeshared::codegen::{ - HttpMessage, HttpMessageArgs, HttpQueryResult, HttpQueryResultArgs, - HttpQueryResultArrow, HttpQueryResultArrowArgs, HttpQueryResultData, - }; - use cubeshared::flatbuffers::FlatBufferBuilder; - use std::sync::Arc; - + fn test_from_cubestore_fb_arrow_query_result() -> Result<(), ParseError> { // Arrow IPC stream payload, as CubeStore would emit it. let schema = Arc::new(Schema::new(vec![ Field::new("city", DataType::Utf8, false), @@ -782,7 +774,7 @@ mod tests { builder.finish(message, None); let msg_data = builder.finished_data().to_vec(); - let result = QueryResult::from_cubestore_fb(&msg_data).expect("from_cubestore_fb arrow"); + let result = QueryResult::from_cubestore_fb(&msg_data)?; assert_eq!(result.members, vec!["city", "amount"]); assert_eq!(result.row_count, 2); assert_eq!( @@ -799,18 +791,12 @@ mod tests { DBResponsePrimitive::Float64(2.0), ] ); + + Ok(()) } #[test] - fn test_from_cubestore_fb_completed_query_result() { - use cubeshared::codegen::{ - HttpMessage, HttpMessageArgs, HttpQueryResult, HttpQueryResultArgs, - HttpQueryResultCompleted, HttpQueryResultCompletedArgs, HttpQueryResultData, - }; - use cubeshared::flatbuffers::FlatBufferBuilder; - - // A statement that completes without a result set (e.g. CREATE TABLE/INSERT) - // is reported as an empty HttpQueryResultCompleted marker. + fn test_from_cubestore_fb_completed_query_result() -> Result<(), ParseError> { let mut builder = FlatBufferBuilder::new(); let completed = HttpQueryResultCompleted::create(&mut builder, &HttpQueryResultCompletedArgs {}); @@ -834,18 +820,16 @@ mod tests { builder.finish(message, None); let msg_data = builder.finished_data().to_vec(); - let result = - QueryResult::from_cubestore_fb(&msg_data).expect("from_cubestore_fb completed"); + let result = QueryResult::from_cubestore_fb(&msg_data)?; assert!(result.members.is_empty()); assert_eq!(result.row_count, 0); assert!(result.data.is_empty()); + + Ok(()) } #[test] - fn test_parse_with_custom_verifier_options() { - use cubeshared::codegen::root_as_http_message_with_opts; - use cubeshared::flatbuffers::VerifierOptions; - + fn test_parse_with_custom_verifier_options() -> Result<(), ParseError> { // Test that custom verifier options can handle large datasets let msg_data = create_test_message(33_000, 40); @@ -857,23 +841,21 @@ mod tests { }; // This should succeed with custom options - let result = root_as_http_message_with_opts(&opts, &msg_data); + let http_message = root_as_http_message_with_opts(&opts, &msg_data) + .map_err(|err| ParseError::FlatBufferError(err.to_string()))?; - match result { - Ok(http_message) => match http_message.command_type() { - HttpCommand::HttpResultSet => { - let result_set = http_message.command_as_http_result_set(); - if let Some(rs) = result_set { - if let Some(rows) = rs.rows() { - assert_eq!(rows.len(), 33_000); - } + match http_message.command_type() { + HttpCommand::HttpResultSet => { + let result_set = http_message.command_as_http_result_set(); + if let Some(rs) = result_set { + if let Some(rows) = rs.rows() { + assert_eq!(rows.len(), 33_000); } } - _ => panic!("Wrong command type"), - }, - Err(e) => { - panic!("Failed to parse with custom verifier options: {:?}", e); } + _ => panic!("Wrong command type"), } + + Ok(()) } } From a8f79950fb238910e554fb0997ccb18ae3eb3b40 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Fri, 5 Jun 2026 16:31:56 +0200 Subject: [PATCH 13/13] fix(cubeorchestrator): serialize numeric query results as JSON strings The legacy CubeStore result set carried every value as a string, so numbers always serialized to JSON strings. The Arrow path produces real Int64/UInt64/Float64 primitives that serialized to JSON numbers, which changed the wire shape and broke pre-aggregation snapshots (e.g. "5" -> 5, "0.6666666666666666" -> 0.6666666666666666). Render the numeric DBResponsePrimitive variants via collect_str so the Arrow path matches the legacy string output. f64 Display is the shortest round-tripping form, so values are stable. --- .../cubeorchestrator/src/query_message_parser.rs | 6 ++++++ .../cubeorchestrator/src/query_result_transform.rs | 14 +++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/rust/cube/cubeorchestrator/src/query_message_parser.rs b/rust/cube/cubeorchestrator/src/query_message_parser.rs index c74631503e2f9..0ad7bc254946a 100644 --- a/rust/cube/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cube/cubeorchestrator/src/query_message_parser.rs @@ -696,6 +696,12 @@ mod tests { ] ); + // Numeric values serialize as JSON strings, matching the legacy result set. + let amounts_json = serde_json::to_value(result.data[1].as_slice()).unwrap(); + assert_eq!(amounts_json[0], "1.5"); + assert_eq!(amounts_json[1], "2"); + assert_eq!(amounts_json[2], serde_json::Value::Null); + // Timestamps land in the dedicated variant and serialize to the ISO format. match &result.data[2].as_slice()[0] { DBResponsePrimitive::Timestamp(_) => {} diff --git a/rust/cube/cubeorchestrator/src/query_result_transform.rs b/rust/cube/cubeorchestrator/src/query_result_transform.rs index c5ae7de6cd079..b839848da91ed 100644 --- a/rust/cube/cubeorchestrator/src/query_result_transform.rs +++ b/rust/cube/cubeorchestrator/src/query_result_transform.rs @@ -1182,16 +1182,20 @@ const TIMESTAMP_ITEMS: &[Item<'static>] = &[ Item::Fixed(Fixed::Nanosecond3), ]; -// Hand-written `Serialize` mirroring serde's untagged behavior for the JSON-native -// variants, plus a string rendering for `Timestamp`. +// Hand-written `Serialize`. Numeric variants (`Int64`/`UInt64`/`Float64`) are +// rendered as JSON strings, not numbers: the legacy CubeStore result set carried +// every value as a string, and downstream consumers (and snapshots) rely on that +// shape. The Arrow path produces real numeric primitives, so we stringify them +// here to keep the JSON output identical to the legacy path. `Timestamp` is also +// string-rendered; `Boolean`/`Null`/`String` map to their JSON-native forms. impl Serialize for DBResponsePrimitive { fn serialize(&self, serializer: S) -> Result { match self { DBResponsePrimitive::Null => serializer.serialize_none(), DBResponsePrimitive::Boolean(b) => serializer.serialize_bool(*b), - DBResponsePrimitive::Int64(n) => serializer.serialize_i64(*n), - DBResponsePrimitive::UInt64(n) => serializer.serialize_u64(*n), - DBResponsePrimitive::Float64(n) => serializer.serialize_f64(*n), + DBResponsePrimitive::Int64(n) => serializer.collect_str(n), + DBResponsePrimitive::UInt64(n) => serializer.collect_str(n), + DBResponsePrimitive::Float64(n) => serializer.collect_str(n), DBResponsePrimitive::String(s) => serializer.serialize_str(s), DBResponsePrimitive::Timestamp(dt) => { serializer.collect_str(&dt.format_with_items(TIMESTAMP_ITEMS.iter()))