diff --git a/packages/cubejs-backend-native/Cargo.lock b/packages/cubejs-backend-native/Cargo.lock index 6b48fb72cea71..65ed59ea96992 100644 --- a/packages/cubejs-backend-native/Cargo.lock +++ b/packages/cubejs-backend-native/Cargo.lock @@ -45,9 +45,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", + "getrandom 0.2.11", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.32", ] [[package]] @@ -65,12 +67,6 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - [[package]] name = "android_system_properties" version = "0.1.5" @@ -114,11 +110,11 @@ dependencies = [ "comfy-table 5.0.1", "csv", "flatbuffers 2.1.2", - "half", + "half 1.8.2", "hex", "indexmap 1.9.3", "lazy_static", - "lexical-core", + "lexical-core 0.8.5", "multiversion", "num", "rand", @@ -129,6 +125,180 @@ dependencies = [ "serde_json", ] +[[package]] +name = "arrow" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "378530e55cd479eda3c14eb345310799717e6f76d0c332041e8487022166b471" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0ab212d2c1886e802f51c5212d78ebbcbb0bec980fff9dadc1eb8d45cd0b738" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num-traits", +] + +[[package]] +name = "arrow-array" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfd33d3e92f207444098c75b42de99d329562be0cf686b307b097cc52b4e999e" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half 2.7.1", + "hashbrown 0.17.0", + "num-complex", + "num-integer", + "num-traits", +] + +[[package]] +name = "arrow-buffer" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6cd424c2693bcdbc150d843dc9d4d137dd2de4782ce6df491ad11a3a0416c0" +dependencies = [ + "bytes", + "half 2.7.1", + "num-bigint", + "num-traits", +] + +[[package]] +name = "arrow-cast" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c5aefb56a2c02e9e2b30746241058b85f8983f0fcff2ba0c6d09006e1cded7f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-ord", + "arrow-schema", + "arrow-select", + "atoi", + "base64 0.22.1", + "chrono", + "half 2.7.1", + "lexical-core 1.0.6", + "num-traits", + "ryu", +] + +[[package]] +name = "arrow-data" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c88210023a2bfee1896af366309a3028fc3bcbd6515fa29a7990ee1baa08ee0" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half 2.7.1", + "num-integer", + "num-traits", +] + +[[package]] +name = "arrow-ipc" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "238438f0834483703d88896db6fe5a7138b2230debc31b34c0336c2996e3c64f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "flatbuffers 25.12.19", +] + +[[package]] +name = "arrow-ord" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bffd8fd2579286a5d63bac898159873e5094a79009940bcb42bbfce4f19f1d0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab5994731204603c73ba69267616c50f80780774c6bb0476f1f830625115e0c" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half 2.7.1", +] + +[[package]] +name = "arrow-schema" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f633dbfdf39c039ada1bf9e34c694816eb71fbb7dc78f613993b7245e078a1ed" + +[[package]] +name = "arrow-select" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cd065c54172ac787cf3f2f8d4107e0d3fdc26edba76fdf4f4cc170258942222" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num-traits", +] + +[[package]] +name = "arrow-string" +version = "58.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29dd7cda3ab9692f43a2e4acc444d760cc17b12bb6d8232ddf64e9bab7c06b42" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num-traits", + "regex", + "regex-syntax 0.8.5", +] + [[package]] name = "async-channel" version = "2.1.1" @@ -186,6 +356,15 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atty" version = "0.2.14" @@ -464,17 +643,16 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.39" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ - "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.0", + "windows-link", ] [[package]] @@ -573,6 +751,26 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.11", + "once_cell", + "tiny-keccak", +] + [[package]] name = "constant_time_eq" version = "0.3.0" @@ -658,6 +856,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-common" version = "0.1.6" @@ -694,7 +898,7 @@ name = "cube-ext" version = "1.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ - "arrow", + "arrow 13.0.0", "chrono", "datafusion-common", "datafusion-expr", @@ -773,6 +977,7 @@ name = "cubeorchestrator" version = "0.1.0" dependencies = [ "anyhow", + "arrow 58.3.0", "chrono", "cubeshared", "indexmap 2.14.0", @@ -870,7 +1075,7 @@ version = "7.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ "ahash 0.7.8", - "arrow", + "arrow 13.0.0", "async-trait", "chrono", "datafusion-common", @@ -902,7 +1107,7 @@ name = "datafusion-common" version = "7.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ - "arrow", + "arrow 13.0.0", "ordered-float 2.10.1", "parquet", "sqlparser", @@ -927,7 +1132,7 @@ version = "7.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ "ahash 0.7.8", - "arrow", + "arrow 13.0.0", "datafusion-common", "sqlparser", ] @@ -938,7 +1143,7 @@ version = "7.0.0" source = "git+https://github.com/cube-js/arrow-datafusion.git?rev=1a75802e3ccaf7f4f3a518dd71e1043615e68e0c#1a75802e3ccaf7f4f3a518dd71e1043615e68e0c" dependencies = [ "ahash 0.7.8", - "arrow", + "arrow 13.0.0", "blake2", "blake3", "chrono", @@ -1305,6 +1510,18 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "half" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" +dependencies = [ + "cfg-if", + "crunchy", + "num-traits", + "zerocopy 0.8.27", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1771,11 +1988,24 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" dependencies = [ - "lexical-parse-float", - "lexical-parse-integer", - "lexical-util", - "lexical-write-float", - "lexical-write-integer", + "lexical-parse-float 0.8.5", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "lexical-write-float 0.8.5", + "lexical-write-integer 0.8.5", +] + +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float 1.0.6", + "lexical-parse-integer 1.0.6", + "lexical-util 1.0.7", + "lexical-write-float 1.0.6", + "lexical-write-integer 1.0.6", ] [[package]] @@ -1784,21 +2014,40 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" dependencies = [ - "lexical-parse-integer", - "lexical-util", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", "static_assertions", ] +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer 1.0.6", + "lexical-util 1.0.7", +] + [[package]] name = "lexical-parse-integer" version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", "static_assertions", ] +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util 1.0.7", +] + [[package]] name = "lexical-util" version = "0.8.5" @@ -1808,27 +2057,52 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + [[package]] name = "lexical-write-float" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" dependencies = [ - "lexical-util", - "lexical-write-integer", + "lexical-util 0.8.5", + "lexical-write-integer 0.8.5", "static_assertions", ] +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util 1.0.7", + "lexical-write-integer 1.0.6", +] + [[package]] name = "lexical-write-integer" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", "static_assertions", ] +[[package]] +name = "lexical-write-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" +dependencies = [ + "lexical-util 1.0.7", +] + [[package]] name = "libc" version = "0.2.177" @@ -1932,9 +2206,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.4" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "memo-map" @@ -2095,20 +2369,19 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.4.4" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" dependencies = [ - "autocfg", "num-integer", "num-traits", ] [[package]] name = "num-complex" -version = "0.4.4" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" dependencies = [ "num-traits", ] @@ -2121,11 +2394,10 @@ checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" [[package]] name = "num-integer" -version = "0.1.45" +version = "0.1.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" dependencies = [ - "autocfg", "num-traits", ] @@ -2154,11 +2426,12 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -2247,7 +2520,7 @@ name = "parquet" version = "13.0.0" source = "git+https://github.com/cube-js/arrow-rs.git?rev=64899e4bbf07a111a94ac2084a6b70ae2374e421#64899e4bbf07a111a94ac2084a6b70ae2374e421" dependencies = [ - "arrow", + "arrow 13.0.0", "base64 0.13.1", "byteorder", "chrono", @@ -3477,6 +3750,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -4043,6 +4325,12 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-sys" version = "0.48.0" @@ -4254,7 +4542,16 @@ version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" dependencies = [ - "zerocopy-derive", + "zerocopy-derive 0.7.32", +] + +[[package]] +name = "zerocopy" +version = "0.8.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0894878a5fa3edfd6da3f88c4805f4c8558e2b996227a3d864f47fe11e38282c" +dependencies = [ + "zerocopy-derive 0.8.27", ] [[package]] @@ -4268,6 +4565,17 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "zerofrom" version = "0.1.5" diff --git a/packages/cubejs-backend-native/src/orchestrator.rs b/packages/cubejs-backend-native/src/orchestrator.rs index dd8b0a9994802..cce0910fed1c5 100644 --- a/packages/cubejs-backend-native/src/orchestrator.rs +++ b/packages/cubejs-backend-native/src/orchestrator.rs @@ -132,8 +132,11 @@ impl ResultWrapper { fn db_primitive_to_field_value(value: &DBResponsePrimitive) -> FieldValue<'_> { match value { DBResponsePrimitive::String(s) => FieldValue::String(Cow::Borrowed(s)), - DBResponsePrimitive::Number(n) => FieldValue::Number(*n), + DBResponsePrimitive::Int64(n) => FieldValue::Number(*n as f64), + DBResponsePrimitive::UInt64(n) => FieldValue::Number(*n as f64), + DBResponsePrimitive::Float64(n) => FieldValue::Number(*n), DBResponsePrimitive::Boolean(b) => FieldValue::Bool(*b), + DBResponsePrimitive::Timestamp(_) => FieldValue::String(Cow::Owned(value.to_string())), DBResponsePrimitive::Uncommon(v) => FieldValue::String(Cow::Owned( serde_json::to_string(&v).unwrap_or_else(|_| v.to_string()), )), diff --git a/packages/cubejs-cubestore-driver/codegen/http-command.ts b/packages/cubejs-cubestore-driver/codegen/http-command.ts index eaca7b7120327..439b5b9d95824 100644 --- a/packages/cubejs-cubestore-driver/codegen/http-command.ts +++ b/packages/cubejs-cubestore-driver/codegen/http-command.ts @@ -4,6 +4,7 @@ import { HttpError } from './http-error.js'; import { HttpQuery } from './http-query.js'; +import { HttpQueryResult } from './http-query-result.js'; import { HttpResultSet } from './http-result-set.js'; @@ -11,32 +12,35 @@ export enum HttpCommand { NONE = 0, HttpQuery = 1, HttpResultSet = 2, - HttpError = 3 + HttpError = 3, + HttpQueryResult = 4 } export function unionToHttpCommand( type: HttpCommand, - accessor: (obj:HttpError|HttpQuery|HttpResultSet) => HttpError|HttpQuery|HttpResultSet|null -): HttpError|HttpQuery|HttpResultSet|null { + accessor: (obj:HttpError|HttpQuery|HttpQueryResult|HttpResultSet) => HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null +): HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null { switch(HttpCommand[type]) { case 'NONE': return null; case 'HttpQuery': return accessor(new HttpQuery())! as HttpQuery; case 'HttpResultSet': return accessor(new HttpResultSet())! as HttpResultSet; case 'HttpError': return accessor(new HttpError())! as HttpError; + case 'HttpQueryResult': return accessor(new HttpQueryResult())! as HttpQueryResult; default: return null; } } export function unionListToHttpCommand( type: HttpCommand, - accessor: (index: number, obj:HttpError|HttpQuery|HttpResultSet) => HttpError|HttpQuery|HttpResultSet|null, + accessor: (index: number, obj:HttpError|HttpQuery|HttpQueryResult|HttpResultSet) => HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null, index: number -): HttpError|HttpQuery|HttpResultSet|null { +): HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null { switch(HttpCommand[type]) { case 'NONE': return null; case 'HttpQuery': return accessor(index, new HttpQuery())! as HttpQuery; case 'HttpResultSet': return accessor(index, new HttpResultSet())! as HttpResultSet; case 'HttpError': return accessor(index, new HttpError())! as HttpError; + case 'HttpQueryResult': return accessor(index, new HttpQueryResult())! as HttpQueryResult; default: return null; } } diff --git a/packages/cubejs-cubestore-driver/codegen/http-query-result-arrow.ts b/packages/cubejs-cubestore-driver/codegen/http-query-result-arrow.ts new file mode 100644 index 0000000000000..4af90e979dd18 --- /dev/null +++ b/packages/cubejs-cubestore-driver/codegen/http-query-result-arrow.ts @@ -0,0 +1,81 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */ + +import * as flatbuffers from 'flatbuffers'; + +export class HttpQueryResultArrow { + bb: flatbuffers.ByteBuffer|null = null; + bb_pos = 0; + __init(i:number, bb:flatbuffers.ByteBuffer):HttpQueryResultArrow { + this.bb_pos = i; + this.bb = bb; + return this; +} + +static getRootAsHttpQueryResultArrow(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResultArrow):HttpQueryResultArrow { + return (obj || new HttpQueryResultArrow()).__init(bb.readInt32(bb.position()) + bb.position(), bb); +} + +static getSizePrefixedRootAsHttpQueryResultArrow(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResultArrow):HttpQueryResultArrow { + bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH); + return (obj || new HttpQueryResultArrow()).__init(bb.readInt32(bb.position()) + bb.position(), bb); +} + +data(index: number):number|null { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? this.bb!.readUint8(this.bb!.__vector(this.bb_pos + offset) + index) : 0; +} + +dataLength():number { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0; +} + +dataArray():Uint8Array|null { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? new Uint8Array(this.bb!.bytes().buffer, this.bb!.bytes().byteOffset + this.bb!.__vector(this.bb_pos + offset), this.bb!.__vector_len(this.bb_pos + offset)) : null; +} + +isLast():boolean { + const offset = this.bb!.__offset(this.bb_pos, 6); + return offset ? !!this.bb!.readInt8(this.bb_pos + offset) : false; +} + +static startHttpQueryResultArrow(builder:flatbuffers.Builder) { + builder.startObject(2); +} + +static addData(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset) { + builder.addFieldOffset(0, dataOffset, 0); +} + +static createDataVector(builder:flatbuffers.Builder, data:number[]|Uint8Array):flatbuffers.Offset { + builder.startVector(1, data.length, 1); + for (let i = data.length - 1; i >= 0; i--) { + builder.addInt8(data[i]!); + } + return builder.endVector(); +} + +static startDataVector(builder:flatbuffers.Builder, numElems:number) { + builder.startVector(1, numElems, 1); +} + +static addIsLast(builder:flatbuffers.Builder, isLast:boolean) { + builder.addFieldInt8(1, +isLast, +false); +} + +static endHttpQueryResultArrow(builder:flatbuffers.Builder):flatbuffers.Offset { + const offset = builder.endObject(); + builder.requiredField(offset, 4) // data + return offset; +} + +static createHttpQueryResultArrow(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset, isLast:boolean):flatbuffers.Offset { + HttpQueryResultArrow.startHttpQueryResultArrow(builder); + HttpQueryResultArrow.addData(builder, dataOffset); + HttpQueryResultArrow.addIsLast(builder, isLast); + return HttpQueryResultArrow.endHttpQueryResultArrow(builder); +} +} diff --git a/packages/cubejs-cubestore-driver/codegen/http-query-result-data.ts b/packages/cubejs-cubestore-driver/codegen/http-query-result-data.ts new file mode 100644 index 0000000000000..5ee08968db103 --- /dev/null +++ b/packages/cubejs-cubestore-driver/codegen/http-query-result-data.ts @@ -0,0 +1,34 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */ + +import { HttpQueryResultArrow } from './http-query-result-arrow.js'; + + +export enum HttpQueryResultData { + NONE = 0, + HttpQueryResultArrow = 1 +} + +export function unionToHttpQueryResultData( + type: HttpQueryResultData, + accessor: (obj:HttpQueryResultArrow) => HttpQueryResultArrow|null +): HttpQueryResultArrow|null { + switch(HttpQueryResultData[type]) { + case 'NONE': return null; + case 'HttpQueryResultArrow': return accessor(new HttpQueryResultArrow())! as HttpQueryResultArrow; + default: return null; + } +} + +export function unionListToHttpQueryResultData( + type: HttpQueryResultData, + accessor: (index: number, obj:HttpQueryResultArrow) => HttpQueryResultArrow|null, + index: number +): HttpQueryResultArrow|null { + switch(HttpQueryResultData[type]) { + case 'NONE': return null; + case 'HttpQueryResultArrow': return accessor(index, new HttpQueryResultArrow())! as HttpQueryResultArrow; + default: return null; + } +} diff --git a/packages/cubejs-cubestore-driver/codegen/http-query-result.ts b/packages/cubejs-cubestore-driver/codegen/http-query-result.ts new file mode 100644 index 0000000000000..edf026048ddf9 --- /dev/null +++ b/packages/cubejs-cubestore-driver/codegen/http-query-result.ts @@ -0,0 +1,62 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */ + +import * as flatbuffers from 'flatbuffers'; + +import { HttpQueryResultData, unionToHttpQueryResultData, unionListToHttpQueryResultData } from './http-query-result-data.js'; + + +export class HttpQueryResult { + bb: flatbuffers.ByteBuffer|null = null; + bb_pos = 0; + __init(i:number, bb:flatbuffers.ByteBuffer):HttpQueryResult { + this.bb_pos = i; + this.bb = bb; + return this; +} + +static getRootAsHttpQueryResult(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResult):HttpQueryResult { + return (obj || new HttpQueryResult()).__init(bb.readInt32(bb.position()) + bb.position(), bb); +} + +static getSizePrefixedRootAsHttpQueryResult(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResult):HttpQueryResult { + bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH); + return (obj || new HttpQueryResult()).__init(bb.readInt32(bb.position()) + bb.position(), bb); +} + +dataType():HttpQueryResultData { + const offset = this.bb!.__offset(this.bb_pos, 4); + return offset ? this.bb!.readUint8(this.bb_pos + offset) : HttpQueryResultData.NONE; +} + +data(obj:any):any|null { + const offset = this.bb!.__offset(this.bb_pos, 6); + return offset ? this.bb!.__union(obj, this.bb_pos + offset) : null; +} + +static startHttpQueryResult(builder:flatbuffers.Builder) { + builder.startObject(2); +} + +static addDataType(builder:flatbuffers.Builder, dataType:HttpQueryResultData) { + builder.addFieldInt8(0, dataType, HttpQueryResultData.NONE); +} + +static addData(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset) { + builder.addFieldOffset(1, dataOffset, 0); +} + +static endHttpQueryResult(builder:flatbuffers.Builder):flatbuffers.Offset { + const offset = builder.endObject(); + builder.requiredField(offset, 6) // data + return offset; +} + +static createHttpQueryResult(builder:flatbuffers.Builder, dataType:HttpQueryResultData, dataOffset:flatbuffers.Offset):flatbuffers.Offset { + HttpQueryResult.startHttpQueryResult(builder); + HttpQueryResult.addDataType(builder, dataType); + HttpQueryResult.addData(builder, dataOffset); + return HttpQueryResult.endHttpQueryResult(builder); +} +} diff --git a/packages/cubejs-cubestore-driver/codegen/http-query.ts b/packages/cubejs-cubestore-driver/codegen/http-query.ts index d695cefe2611f..9161d8a274c56 100644 --- a/packages/cubejs-cubestore-driver/codegen/http-query.ts +++ b/packages/cubejs-cubestore-driver/codegen/http-query.ts @@ -6,6 +6,7 @@ import * as flatbuffers from 'flatbuffers'; import { HttpParameter } from './http-parameter.js'; import { HttpTable } from './http-table.js'; +import { QueryResultFormat } from './query-result-format.js'; export class HttpQuery { @@ -60,8 +61,13 @@ parametersLength():number { return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0; } +responseFormat():QueryResultFormat { + const offset = this.bb!.__offset(this.bb_pos, 12); + return offset ? this.bb!.readUint8(this.bb_pos + offset) : QueryResultFormat.Legacy; +} + static startHttpQuery(builder:flatbuffers.Builder) { - builder.startObject(4); + builder.startObject(5); } static addQuery(builder:flatbuffers.Builder, queryOffset:flatbuffers.Offset) { @@ -104,17 +110,22 @@ static startParametersVector(builder:flatbuffers.Builder, numElems:number) { builder.startVector(4, numElems, 4); } +static addResponseFormat(builder:flatbuffers.Builder, responseFormat:QueryResultFormat) { + builder.addFieldInt8(4, responseFormat, QueryResultFormat.Legacy); +} + static endHttpQuery(builder:flatbuffers.Builder):flatbuffers.Offset { const offset = builder.endObject(); return offset; } -static createHttpQuery(builder:flatbuffers.Builder, queryOffset:flatbuffers.Offset, traceObjOffset:flatbuffers.Offset, inlineTablesOffset:flatbuffers.Offset, parametersOffset:flatbuffers.Offset):flatbuffers.Offset { +static createHttpQuery(builder:flatbuffers.Builder, queryOffset:flatbuffers.Offset, traceObjOffset:flatbuffers.Offset, inlineTablesOffset:flatbuffers.Offset, parametersOffset:flatbuffers.Offset, responseFormat:QueryResultFormat):flatbuffers.Offset { HttpQuery.startHttpQuery(builder); HttpQuery.addQuery(builder, queryOffset); HttpQuery.addTraceObj(builder, traceObjOffset); HttpQuery.addInlineTables(builder, inlineTablesOffset); HttpQuery.addParameters(builder, parametersOffset); + HttpQuery.addResponseFormat(builder, responseFormat); return HttpQuery.endHttpQuery(builder); } } diff --git a/packages/cubejs-cubestore-driver/codegen/index.ts b/packages/cubejs-cubestore-driver/codegen/index.ts index 1b52b62c90b56..25ab49a508fbc 100644 --- a/packages/cubejs-cubestore-driver/codegen/index.ts +++ b/packages/cubejs-cubestore-driver/codegen/index.ts @@ -12,9 +12,13 @@ export { HttpMessage } from './http-message.js'; export { HttpParameter } from './http-parameter.js'; export { HttpParameterValue } from './http-parameter-value.js'; export { HttpQuery } from './http-query.js'; +export { HttpQueryResult } from './http-query-result.js'; +export { HttpQueryResultArrow } from './http-query-result-arrow.js'; +export { HttpQueryResultData } from './http-query-result-data.js'; export { HttpResultSet } from './http-result-set.js'; export { HttpRow } from './http-row.js'; export { HttpTable } from './http-table.js'; export { Int64Value } from './int64-value.js'; export { NullValue } from './null-value.js'; +export { QueryResultFormat } from './query-result-format.js'; export { StringValue } from './string-value.js'; diff --git a/packages/cubejs-cubestore-driver/codegen/query-result-format.ts b/packages/cubejs-cubestore-driver/codegen/query-result-format.ts new file mode 100644 index 0000000000000..9f52455cbfb4c --- /dev/null +++ b/packages/cubejs-cubestore-driver/codegen/query-result-format.ts @@ -0,0 +1,8 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */ + +export enum QueryResultFormat { + Legacy = 0, + Arrow = 1 +} diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts index 43dc60eb2456b..9d21478b22710 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts @@ -1,30 +1,38 @@ import { pipeline, Writable } from 'stream'; import { createGzip } from 'zlib'; -import { createWriteStream, createReadStream } from 'fs'; +import { createReadStream, createWriteStream } from 'fs'; import { unlink } from 'fs-extra'; import tempy from 'tempy'; import csvWriter from 'csv-write-stream'; import { BaseDriver, + CreateTableIndex, DownloadTableCSVData, + DownloadTableMemoryData, + DriverInterface, ExternalCreateTableOptions, - DownloadTableMemoryData, DriverInterface, IndexesSQL, CreateTableIndex, - StreamTableData, - StreamingSourceTableData, + ExternalDriverCompatibilities, + IndexesSQL, QueryOptions, - ExternalDriverCompatibilities, TableStructure, TableColumnQueryResult, + StreamingSourceTableData, + StreamTableData, + TableColumnQueryResult, + TableStructure, } from '@cubejs-backend/base-driver'; import { AsyncDebounce, getEnv, isVersionGte } from '@cubejs-backend/shared'; -import { format as formatSql, escape } from 'sqlstring'; +import { escape, format as formatSql } from 'sqlstring'; import fetch from 'node-fetch'; import { ConnectionConfig } from './types'; import { WebSocketConnection } from './WebSocketConnection'; +import { QueryResultFormat } from '../codegen'; const CubeStoreCapabilityMinVersion = { queueExclusive: '1.6.22', queueExternalId: '1.6.26', sendableParameters: '1.6.38', + // Arrow format + Completed response type + arrowFormat: '1.6.55', } satisfies Record; type CubeStoreCapability = keyof typeof CubeStoreCapabilityMinVersion; @@ -61,6 +69,7 @@ type CreateTableOptions = { type CubeStoreQueryOptions = QueryOptions & { sendParameters?: boolean, + responseFormat?: QueryResultFormat, }; export class CubeStoreDriver extends BaseDriver implements DriverInterface { @@ -98,7 +107,7 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { } public async query(query: string, values: any[], options?: CubeStoreQueryOptions): Promise { - const { inlineTables, sendParameters, ...queryTracingObj } = options ?? {}; + const { inlineTables, sendParameters, responseFormat, ...queryTracingObj } = options ?? {}; if (!sendParameters) { query = formatSql(query, values || []); @@ -106,7 +115,13 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface { const tracingObj = { ...queryTracingObj, instance: getEnv('instanceId') }; - return this.connection.query(query, inlineTables ?? [], tracingObj, sendParameters ? values : undefined); + return this.connection.query(query, sendParameters ? values : [], { + inlineTables: inlineTables ?? [], + queryTracingObj: tracingObj, + responseFormat: responseFormat ?? ( + await this.hasCapability('arrowFormat') ? QueryResultFormat.Arrow : QueryResultFormat.Legacy + ), + }); } public async release() { diff --git a/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts b/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts index ae08e3f78badb..6e70d08b2378b 100644 --- a/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts +++ b/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts @@ -18,6 +18,7 @@ import { HttpTable, Int64Value, NullValue, + QueryResultFormat, StringValue, } from '../codegen'; @@ -27,6 +28,14 @@ interface SentMessage { buffer: Uint8Array; } +export type QueryParameter = null | boolean | number | string | Buffer; + +export type WebSocketQueryOptions = { + inlineTables?: InlineTable[]; + queryTracingObj?: any; + responseFormat: QueryResultFormat; +}; + interface CubeStoreWebSocket extends WebSocket { readyPromise: Promise; lastHeartBeat: Date; @@ -282,7 +291,9 @@ export class WebSocketConnection { } } - public async query(query: string, inlineTables: InlineTable[], queryTracingObj?: any, parameters?: any): Promise { + public async query(query: string, parameters: QueryParameter[], options: WebSocketQueryOptions): Promise { + const { inlineTables, queryTracingObj, responseFormat } = options; + const builder = new flatbuffers.Builder(1024); const queryOffset = builder.createString(query); @@ -321,7 +332,7 @@ export class WebSocketConnection { } let parametersOffset: flatbuffers.Offset | null = null; - if (parameters) { + if (parameters.length > 0) { const httpParameterValues: flatbuffers.Offset[] = []; for (const parameter of parameters) { @@ -349,6 +360,8 @@ export class WebSocketConnection { HttpQuery.addParameters(builder, parametersOffset); } + HttpQuery.addResponseFormat(builder, responseFormat); + const httpQueryOffset = HttpQuery.endHttpQuery(builder); const messageId = this.messageCounter++; const connectionIdOffset = builder.createString(this.connectionId); diff --git a/rust/cube/Cargo.lock b/rust/cube/Cargo.lock index 42f0fbffd43f7..e03145010b6d9 100644 --- a/rust/cube/Cargo.lock +++ b/rust/cube/Cargo.lock @@ -824,6 +824,7 @@ name = "cubeorchestrator" version = "0.1.0" dependencies = [ "anyhow", + "arrow", "chrono", "criterion", "cubeshared", diff --git a/rust/cube/cubeorchestrator/Cargo.toml b/rust/cube/cubeorchestrator/Cargo.toml index 20a1971e77d35..3dbde88404ef0 100644 --- a/rust/cube/cubeorchestrator/Cargo.toml +++ b/rust/cube/cubeorchestrator/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +arrow = { version = "58", default-features = false, features = ["ipc"] } chrono = { version = "0.4.31", features = ["serde"] } cubeshared = { path = "../cubeshared" } serde = { version = "1.0.217", features = ["derive", "rc"] } diff --git a/rust/cube/cubeorchestrator/benches/common/mod.rs b/rust/cube/cubeorchestrator/benches/common/mod.rs index 45d8abce374af..6b2aeb0200a70 100644 --- a/rust/cube/cubeorchestrator/benches/common/mod.rs +++ b/rust/cube/cubeorchestrator/benches/common/mod.rs @@ -56,7 +56,7 @@ pub fn build_dataset( members.push(alias.clone()); let mut col = ColumnarArray::with_capacity(row_count); for i in 0..row_count { - col.push(DBResponsePrimitive::Number(((i * (j + 1)) as f64) * 0.5)); + col.push(DBResponsePrimitive::Float64(((i * (j + 1)) as f64) * 0.5)); } columns.push(col); } @@ -78,3 +78,62 @@ pub fn build_dataset( JsRawColumnarData { members, columns } } + +/// Build an Arrow IPC **stream** payload with the same logical data shape as +/// [`build_dataset`]: dimensions as Utf8, measures as Float64, time dimensions +/// as Timestamp(Millisecond). Used to compare Arrow parse throughput against the +/// JSON path. +pub fn build_arrow_ipc( + row_count: usize, + dimensions: &[(String, String)], + measures: &[(String, String)], + time_dims: &[TimeColumn], +) -> Vec { + use arrow::array::{ArrayRef, Float64Array, StringArray, TimestampMillisecondArray}; + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use arrow::ipc::writer::StreamWriter; + use arrow::record_batch::RecordBatch; + use std::sync::Arc; + + let total_cols = dimensions.len() + measures.len() + time_dims.len(); + let mut fields = Vec::with_capacity(total_cols); + let mut columns: Vec = Vec::with_capacity(total_cols); + + for (j, (_, alias)) in dimensions.iter().enumerate() { + fields.push(Field::new(alias.clone(), DataType::Utf8, false)); + let values: Vec = (0..row_count) + .map(|i| format!("dim_{}_{}", j, i % 1000)) + .collect(); + columns.push(Arc::new(StringArray::from(values))); + } + for (j, (_, alias)) in measures.iter().enumerate() { + fields.push(Field::new(alias.clone(), DataType::Float64, false)); + let values: Vec = (0..row_count) + .map(|i| ((i * (j + 1)) as f64) * 0.5) + .collect(); + columns.push(Arc::new(Float64Array::from(values))); + } + for (j, td) in time_dims.iter().enumerate() { + fields.push(Field::new( + td.alias.clone(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + // One day apart, offset per column — arbitrary but realistic spread. + let values: Vec = (0..row_count) + .map(|i| ((i + j) as i64) * 86_400_000) + .collect(); + columns.push(Arc::new(TimestampMillisecondArray::from(values))); + } + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new(schema.clone(), columns).expect("arrow record batch"); + + let mut buf = Vec::new(); + { + let mut writer = StreamWriter::try_new(&mut buf, schema.as_ref()).expect("arrow writer"); + writer.write(&batch).expect("write arrow batch"); + writer.finish().expect("finish arrow stream"); + } + buf +} diff --git a/rust/cube/cubeorchestrator/benches/parser.rs b/rust/cube/cubeorchestrator/benches/parser.rs index eef0a51472e32..7358bd56ed6de 100644 --- a/rust/cube/cubeorchestrator/benches/parser.rs +++ b/rust/cube/cubeorchestrator/benches/parser.rs @@ -4,14 +4,18 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Through use cubeorchestrator::query_message_parser::QueryResult; use cubeorchestrator::transport::JsRawColumnarData; use cubeshared::codegen::{ - HttpColumnValue, HttpColumnValueArgs, HttpCommand, HttpMessage, HttpMessageArgs, HttpResultSet, - HttpResultSetArgs, HttpRow, HttpRowArgs, + HttpColumnValue, HttpColumnValueArgs, HttpCommand, HttpMessage, HttpMessageArgs, + HttpQueryResult, HttpQueryResultArgs, HttpQueryResultArrow, HttpQueryResultArrowArgs, + HttpQueryResultData, HttpResultSet, HttpResultSetArgs, HttpRow, HttpRowArgs, }; use cubeshared::flatbuffers::FlatBufferBuilder; #[path = "common/mod.rs"] mod common; -use common::{build_dataset, make_member_aliases, split_dim_measure, COLUMN_COUNTS, ROW_COUNTS}; +use common::{ + build_arrow_ipc, build_dataset, make_member_aliases, split_dim_measure, COLUMN_COUNTS, + ROW_COUNTS, +}; /// Build a FlatBuffer `HttpMessage` payload mirroring CubeStore's wire format /// for `from_cubestore_fb` to parse. Cells are 16-character strings to give @@ -95,7 +99,14 @@ fn bench_from_cubestore_fb(c: &mut Criterion) { fn bench_from_js_raw_data(c: &mut Criterion) { let mut group = c.benchmark_group("QueryResult::from_js_raw_data"); - let combos: &[(usize, usize)] = &[(8, 10_000), (16, 10_000), (16, 100_000), (32, 100_000)]; + let combos: &[(usize, usize)] = &[ + (8, 1), + (8, 10), + (8, 10_000), + (16, 10_000), + (16, 100_000), + (32, 100_000), + ]; for &(col_count, row_count) in combos { let (dim_count, measure_count) = split_dim_measure(col_count); @@ -142,5 +153,85 @@ fn bench_from_js_raw_data(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, bench_from_cubestore_fb, bench_from_js_raw_data); +/// Wrap raw Arrow IPC bytes in an `HttpMessage` FlatBuffer carrying +fn build_cubestore_fb_arrow_message(arrow_ipc: &[u8]) -> Vec { + let mut builder = FlatBufferBuilder::new(); + let data_vec = builder.create_vector(arrow_ipc); + let arrow = HttpQueryResultArrow::create( + &mut builder, + &HttpQueryResultArrowArgs { + data: Some(data_vec), + is_last: true, + }, + ); + let query_result = HttpQueryResult::create( + &mut builder, + &HttpQueryResultArgs { + data_type: HttpQueryResultData::HttpQueryResultArrow, + data: Some(arrow.as_union_value()), + }, + ); + let connection_id = builder.create_string("bench_connection"); + let message = HttpMessage::create( + &mut builder, + &HttpMessageArgs { + message_id: 1, + command_type: HttpCommand::HttpQueryResult, + command: Some(query_result.as_union_value()), + connection_id: Some(connection_id), + }, + ); + builder.finish(message, None); + builder.finished_data().to_vec() +} + +fn bench_from_cubestore_fb_arrow(c: &mut Criterion) { + let mut group = c.benchmark_group("QueryResult::from_cubestore_fb_arrow"); + + let combos: &[(usize, usize)] = &[ + (8, 1), + (8, 10), + (8, 10_000), + (16, 10_000), + (16, 100_000), + (32, 100_000), + ]; + + for &(col_count, row_count) in combos { + let (dim_count, measure_count) = split_dim_measure(col_count); + let dimensions = make_member_aliases("dim", dim_count); + let measures = make_member_aliases("measure", measure_count); + + let arrow_ipc = build_arrow_ipc(row_count, &dimensions, &measures, &[]); + let payload = build_cubestore_fb_arrow_message(&arrow_ipc); + let payload_len = payload.len(); + + eprintln!( + "from_cubestore_fb_arrow: c{:02}_r{} payload_bytes={}", + col_count, row_count, payload_len + ); + + group.throughput(Throughput::Elements((row_count * col_count) as u64)); + + let id = format!("c{:02}_r{}", col_count, row_count); + // Arrow IPC parse always materializes the QueryResult, so this measures + // the equivalent of from_js_raw_data's `parse_plus_build`. + group.bench_with_input(BenchmarkId::from_parameter(id), &(), |b, _| { + b.iter(|| { + let built = QueryResult::from_cubestore_fb(black_box(&payload)) + .expect("from_cubestore_fb arrow"); + black_box(built); + }); + }); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_from_cubestore_fb, + bench_from_js_raw_data, + bench_from_cubestore_fb_arrow +); criterion_main!(benches); diff --git a/rust/cube/cubeorchestrator/src/query_message_parser.rs b/rust/cube/cubeorchestrator/src/query_message_parser.rs index c3b489b55f209..0ad7bc254946a 100644 --- a/rust/cube/cubeorchestrator/src/query_message_parser.rs +++ b/rust/cube/cubeorchestrator/src/query_message_parser.rs @@ -2,10 +2,22 @@ use crate::{ query_result_transform::{ColumnarArray, DBResponsePrimitive}, transport::JsRawColumnarData, }; -use cubeshared::codegen::{root_as_http_message_with_opts, HttpCommand}; +use arrow::array::{ + Array, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, Float16Array, + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, + StringArray, StringViewArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, + UInt8Array, +}; +use arrow::datatypes::{DataType, TimeUnit}; +use arrow::ipc::reader::StreamReader; +use cubeshared::codegen::{ + root_as_http_message_with_opts, HttpCommand, HttpQueryResultData, HttpResultSet, +}; use cubeshared::flatbuffers::VerifierOptions; use indexmap::IndexMap; use neon::prelude::Finalize; +use std::io::Cursor; #[derive(Debug)] pub enum ParseError { @@ -28,6 +40,8 @@ pub enum ParseError { data_len: usize, }, FlatBufferError(String), + ArrowError(String), + UnsupportedArrowType(String), ErrorMessage(String), } @@ -70,6 +84,10 @@ impl std::fmt::Display for ParseError { members_len, data_len ), ParseError::FlatBufferError(msg) => write!(f, "FlatBuffer parsing error: {}", msg), + ParseError::ArrowError(msg) => write!(f, "Arrow parsing error: {}", msg), + ParseError::UnsupportedArrowType(ty) => { + write!(f, "Unsupported Arrow data type: {}", ty) + } ParseError::ErrorMessage(msg) => write!(f, "Error: {}", msg), } } @@ -150,6 +168,11 @@ impl QueryResult { }) } + pub fn from_js_raw_data(js_raw_data: JsRawColumnarData) -> Result { + let JsRawColumnarData { members, columns } = js_raw_data; + QueryResult::try_new(members, columns) + } + pub fn from_cubestore_fb(msg_data: &[u8]) -> Result { let opts = VerifierOptions { max_tables: 10_000_000, // Support up to 10M tables @@ -173,79 +196,292 @@ impl QueryResult { .command_as_http_result_set() .ok_or(ParseError::EmptyResultSet)?; - let members: Vec = match result_set.columns() { - Some(result_set_columns) => { - if result_set_columns.iter().any(|c| c.is_empty()) { - return Err(ParseError::ColumnNameNotDefined); - } - result_set_columns.iter().map(|c| c.to_owned()).collect() + Self::parse_legacy(result_set) + } + HttpCommand::HttpQueryResult => { + let query_result = http_message + .command_as_http_query_result() + .ok_or(ParseError::EmptyResultSet)?; + + match query_result.data_type() { + HttpQueryResultData::HttpQueryResultArrow => { + let arrow = + query_result + .data_as_http_query_result_arrow() + .ok_or_else(|| { + ParseError::FlatBufferError( + "HttpQueryResult.data is not HttpQueryResultArrow" + .to_string(), + ) + })?; + + Self::from_arrow(arrow.data().bytes()) } - None => Vec::new(), - }; - - let n_cols = members.len(); - let data: Vec = if let Some(result_set_rows) = result_set.rows() { - let row_count = result_set_rows.len(); - let mut data: Vec = (0..n_cols) - .map(|_| ColumnarArray::with_capacity(row_count)) - .collect(); - - for row in result_set_rows.iter() { - let values = row.values().ok_or(ParseError::NullRow)?; - for (col_idx, val) in values.iter().enumerate() { - if col_idx >= n_cols { - break; - } - let cell = match val.string_value() { - Some(s) => DBResponsePrimitive::String(s.to_owned()), - None => DBResponsePrimitive::Null, - }; - data[col_idx].push(cell); - } + // Marker for statements that complete without a result set + // (CREATE TABLE/INSERT, queue/cache writes). Carries no payload, + // so it maps to an empty result, like a zero-column legacy result set. + HttpQueryResultData::HttpQueryResultCompleted => Ok(QueryResult::empty()), + other => Err(ParseError::FlatBufferError(format!( + "Unsupported HttpQueryResult.data type: {:?}", + other + ))), + } + } + _ => Err(ParseError::UnsupportedCommand), + } + } - // Pad short rows with Null to keep all columns aligned. - for col in data.iter_mut().take(n_cols).skip(values.len()) { - col.push(DBResponsePrimitive::Null); - } + fn parse_legacy(result_set: HttpResultSet<'_>) -> Result { + let members: Vec = match result_set.columns() { + Some(result_set_columns) => { + if result_set_columns.iter().any(|c| c.is_empty()) { + return Err(ParseError::ColumnNameNotDefined); + } + result_set_columns.iter().map(|c| c.to_owned()).collect() + } + None => Vec::new(), + }; + + let n_cols = members.len(); + let data: Vec = if let Some(result_set_rows) = result_set.rows() { + let row_count = result_set_rows.len(); + let mut data: Vec = (0..n_cols) + .map(|_| ColumnarArray::with_capacity(row_count)) + .collect(); + + for row in result_set_rows.iter() { + let values = row.values().ok_or(ParseError::NullRow)?; + for (col_idx, val) in values.iter().enumerate() { + if col_idx >= n_cols { + break; } + let cell = match val.string_value() { + Some(s) => DBResponsePrimitive::String(s.to_owned()), + None => DBResponsePrimitive::Null, + }; + data[col_idx].push(cell); + } - data - } else { - (0..n_cols).map(|_| ColumnarArray::new()).collect() - }; + // Pad short rows with Null to keep all columns aligned. + for col in data.iter_mut().take(n_cols).skip(values.len()) { + col.push(DBResponsePrimitive::Null); + } + } + + data + } else { + (0..n_cols).map(|_| ColumnarArray::new()).collect() + }; + + QueryResult::try_new(members, data) + } + + pub(crate) fn from_arrow(bytes: &[u8]) -> Result { + let reader = StreamReader::try_new(Cursor::new(bytes), None) + .map_err(|err| ParseError::ArrowError(err.to_string()))?; + + let schema = reader.schema(); + let members: Vec = schema.fields().iter().map(|f| f.name().clone()).collect(); + let n_cols = members.len(); - QueryResult::try_new(members, data) + let mut columns: Vec> = (0..n_cols).map(|_| Vec::new()).collect(); + + for batch in reader { + let batch = batch.map_err(|err| ParseError::ArrowError(err.to_string()))?; + for (idx, col) in columns.iter_mut().enumerate() { + append_arrow_array(col, batch.column(idx).as_ref())?; } - _ => Err(ParseError::UnsupportedCommand), } + + let data: Vec = columns.into_iter().map(ColumnarArray::from).collect(); + QueryResult::try_new(members, data) } +} - pub fn from_js_raw_data(js_raw_data: JsRawColumnarData) -> Result { - let JsRawColumnarData { members, columns } = js_raw_data; - QueryResult::try_new(members, columns) +/// Append every element of an Arrow `array` to a column accumulator, converting +/// each value to [`DBResponsePrimitive`]. +fn append_arrow_array( + col: &mut Vec, + array: &dyn Array, +) -> Result<(), ParseError> { + let len = array.len(); + col.reserve(len); + + macro_rules! downcast_array_ref { + ($ty:ty) => { + array.as_any().downcast_ref::<$ty>().ok_or_else(|| { + ParseError::ArrowError(format!( + "Failed to downcast Arrow array to {}", + stringify!($ty) + )) + })? + }; + } + + macro_rules! push_int { + ($ty:ty) => {{ + let a = downcast_array_ref!($ty); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::Int64(a.value(i) as i64)); + } + } + }}; + } + + macro_rules! push_uint { + ($ty:ty) => {{ + let a = downcast_array_ref!($ty); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::UInt64(a.value(i) as u64)); + } + } + }}; + } + + macro_rules! push_float { + ($ty:ty) => {{ + let a = downcast_array_ref!($ty); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::Float64(a.value(i) as f64)); + } + } + }}; + } + + macro_rules! push_str { + ($ty:ty) => {{ + let a = downcast_array_ref!($ty); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::String(a.value(i).to_owned())); + } + } + }}; + } + + macro_rules! push_datetime { + ($ty:ty) => {{ + let a = downcast_array_ref!($ty); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + match a.value_as_datetime(i) { + Some(dt) => col.push(DBResponsePrimitive::Timestamp(dt)), + None => col.push(DBResponsePrimitive::Null), + } + } + } + }}; + } + + macro_rules! push_decimal { + ($ty:ty) => {{ + let a = downcast_array_ref!($ty); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + let s = a.value_as_string(i); + match s.parse::() { + Ok(n) => col.push(DBResponsePrimitive::Float64(n)), + Err(_) => col.push(DBResponsePrimitive::String(s)), + } + } + } + }}; + } + + match array.data_type() { + DataType::Null => { + for _ in 0..len { + col.push(DBResponsePrimitive::Null); + } + } + DataType::Boolean => { + let a = downcast_array_ref!(BooleanArray); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::Boolean(a.value(i))); + } + } + } + DataType::Int8 => push_int!(Int8Array), + DataType::Int16 => push_int!(Int16Array), + DataType::Int32 => push_int!(Int32Array), + DataType::Int64 => push_int!(Int64Array), + DataType::UInt8 => push_uint!(UInt8Array), + DataType::UInt16 => push_uint!(UInt16Array), + DataType::UInt32 => push_uint!(UInt32Array), + DataType::UInt64 => push_uint!(UInt64Array), + DataType::Float32 => push_float!(Float32Array), + DataType::Float64 => push_float!(Float64Array), + DataType::Float16 => { + let a = downcast_array_ref!(Float16Array); + for i in 0..len { + if a.is_null(i) { + col.push(DBResponsePrimitive::Null); + } else { + col.push(DBResponsePrimitive::Float64(a.value(i).to_f64())); + } + } + } + DataType::Utf8 => push_str!(StringArray), + DataType::LargeUtf8 => push_str!(LargeStringArray), + DataType::Utf8View => push_str!(StringViewArray), + DataType::Date32 => push_datetime!(Date32Array), + DataType::Date64 => push_datetime!(Date64Array), + DataType::Timestamp(TimeUnit::Second, _) => push_datetime!(TimestampSecondArray), + DataType::Timestamp(TimeUnit::Millisecond, _) => push_datetime!(TimestampMillisecondArray), + DataType::Timestamp(TimeUnit::Microsecond, _) => push_datetime!(TimestampMicrosecondArray), + DataType::Timestamp(TimeUnit::Nanosecond, _) => push_datetime!(TimestampNanosecondArray), + DataType::Decimal128(_, _) => push_decimal!(Decimal128Array), + DataType::Decimal256(_, _) => push_decimal!(Decimal256Array), + other => return Err(ParseError::UnsupportedArrowType(format!("{other:?}"))), } + + Ok(()) } #[cfg(test)] mod tests { use super::*; + use arrow::array::BinaryArray; + use arrow::datatypes::{Field, Schema}; + use arrow::ipc::writer::StreamWriter; + use arrow::record_batch::RecordBatch; use cubeshared::codegen::{ - root_as_http_message_unchecked, HttpColumnValue, HttpColumnValueArgs, HttpCommand, - HttpMessage, HttpMessageArgs, HttpResultSet, HttpResultSetArgs, HttpRow, HttpRowArgs, + root_as_http_message_unchecked, HttpColumnValue, HttpColumnValueArgs, HttpMessage, + HttpMessageArgs, HttpQueryResult, HttpQueryResultArgs, HttpQueryResultArrow, + HttpQueryResultArrowArgs, HttpQueryResultCompleted, HttpQueryResultCompletedArgs, + HttpResultSetArgs, HttpRow, HttpRowArgs, }; use cubeshared::flatbuffers::FlatBufferBuilder; + use std::sync::Arc; /// Helper function to create a test HttpMessage with a given number of rows and columns fn create_test_message(num_rows: usize, num_columns: usize) -> Vec { let mut builder = FlatBufferBuilder::new(); - // Create column names let column_names: Vec<_> = (0..num_columns) .map(|i| builder.create_string(&format!("column_{}", i))) .collect(); - // Create rows with values let mut rows_vec = Vec::with_capacity(num_rows); + for row_idx in 0..num_rows { // Create column values for this row let mut values_vec = Vec::with_capacity(num_columns); @@ -282,7 +518,6 @@ mod tests { }, ); - // Create the message let connection_id = builder.create_string("test_connection"); let message = HttpMessage::create( &mut builder, @@ -299,83 +534,82 @@ mod tests { } #[test] - fn test_parse_small_result_set() { - // Small result set should work fine + fn test_parse_small_result_set() -> Result<(), ParseError> { let msg_data = create_test_message(10, 5); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 5); assert_eq!(query_result.row_count, 10); assert_eq!(query_result.data.len(), 5); assert!(query_result.data.iter().all(|c| c.len() == 10)); + + Ok(()) } #[test] - fn test_parse_medium_result_set() { + fn test_parse_medium_result_set() -> Result<(), ParseError> { // Medium result set: 1000 rows, 20 columns let msg_data = create_test_message(1000, 20); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 20); assert_eq!(query_result.row_count, 1000); assert_eq!(query_result.data.len(), 20); assert!(query_result.data.iter().all(|c| c.len() == 1000)); + + Ok(()) } #[test] - fn test_parse_large_result_set() { + fn test_parse_large_result_set() -> Result<(), ParseError> { // Large result set: 10,000 rows, 30 columns // This should start showing verification issues let msg_data = create_test_message(10_000, 30); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 30); assert_eq!(query_result.row_count, 10_000); assert_eq!(query_result.data.len(), 30); assert!(query_result.data.iter().all(|c| c.len() == 10_000)); + + Ok(()) } #[test] - fn test_parse_very_large_result_set() { + fn test_parse_very_large_result_set() -> Result<(), ParseError> { // Very large result set: 33,000 rows, 40 columns let msg_data = create_test_message(33_000, 40); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 40); assert_eq!(query_result.row_count, 33_000); assert_eq!(query_result.data.len(), 40); assert!(query_result.data.iter().all(|c| c.len() == 33_000)); + + Ok(()) } #[test] - fn test_parse_huge_result_set() { + fn test_parse_huge_result_set() -> Result<(), ParseError> { // Huge result set: 50,000 rows, 100 columns let msg_data = create_test_message(50_000, 100); - let result = QueryResult::from_cubestore_fb(&msg_data); - assert!(result.is_ok()); + let query_result = QueryResult::from_cubestore_fb(&msg_data)?; - let query_result = result.unwrap(); assert_eq!(query_result.members.len(), 100); assert_eq!(query_result.row_count, 50_000); assert_eq!(query_result.data.len(), 100); assert!(query_result.data.iter().all(|c| c.len() == 50_000)); + + Ok(()) } #[test] - fn test_compare_with_unchecked_parse() { + fn test_compare_with_unchecked_parse() -> Result<(), ParseError> { // Test to demonstrate that unchecked parsing would work let msg_data = create_test_message(33_000, 40); // Checked version (current implementation) - let checked_result = QueryResult::from_cubestore_fb(&msg_data); + let checked_result = QueryResult::from_cubestore_fb(&msg_data)?; // Try unchecked version to verify the data itself is valid let unchecked_result = unsafe { @@ -398,15 +632,210 @@ mod tests { } }; - assert!(checked_result.is_ok()); + assert_eq!(checked_result.row_count, 33_000); assert!(unchecked_result.is_ok()); + + Ok(()) + } + + fn arrow_ipc_bytes(batch: &RecordBatch) -> Vec { + let mut buf = Vec::new(); + { + let schema = batch.schema(); + let mut writer = StreamWriter::try_new(&mut buf, schema.as_ref()).unwrap(); + writer.write(batch).unwrap(); + writer.finish().unwrap(); + } + buf + } + + #[test] + fn test_from_arrow_basic_types() -> Result<(), ParseError> { + let schema = Arc::new(Schema::new(vec![ + Field::new("city", DataType::Utf8, true), + Field::new("amount", DataType::Float64, true), + Field::new( + "created_at", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + ])); + + let cities = StringArray::from(vec![Some("Berlin"), None, Some("Lisbon")]); + let amounts = Float64Array::from(vec![Some(1.5), Some(2.0), None]); + // 0 -> 1970-01-01T00:00:00.000, 1_000 -> 1970-01-01T00:00:01.000 + let created = TimestampMillisecondArray::from(vec![Some(0i64), None, Some(1_000)]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(cities), Arc::new(amounts), Arc::new(created)], + ) + .unwrap(); + + let bytes = arrow_ipc_bytes(&batch); + let result = QueryResult::from_arrow(&bytes)?; + + assert_eq!(result.members, vec!["city", "amount", "created_at"]); + assert_eq!(result.row_count, 3); + assert_eq!(result.data.len(), 3); + + assert_eq!( + result.data[0].as_slice(), + &[ + DBResponsePrimitive::String("Berlin".to_string()), + DBResponsePrimitive::Null, + DBResponsePrimitive::String("Lisbon".to_string()), + ] + ); + assert_eq!( + result.data[1].as_slice(), + &[ + DBResponsePrimitive::Float64(1.5), + DBResponsePrimitive::Float64(2.0), + DBResponsePrimitive::Null, + ] + ); + + // Numeric values serialize as JSON strings, matching the legacy result set. + let amounts_json = serde_json::to_value(result.data[1].as_slice()).unwrap(); + assert_eq!(amounts_json[0], "1.5"); + assert_eq!(amounts_json[1], "2"); + assert_eq!(amounts_json[2], serde_json::Value::Null); + + // Timestamps land in the dedicated variant and serialize to the ISO format. + match &result.data[2].as_slice()[0] { + DBResponsePrimitive::Timestamp(_) => {} + other => panic!("expected Timestamp, got {other:?}"), + } + assert_eq!(result.data[2].as_slice()[1], DBResponsePrimitive::Null); + let json = serde_json::to_value(result.data[2].as_slice()).unwrap(); + assert_eq!(json[0], "1970-01-01T00:00:00.000"); + assert_eq!(json[1], serde_json::Value::Null); + assert_eq!(json[2], "1970-01-01T00:00:01.000"); + + Ok(()) } #[test] - fn test_parse_with_custom_verifier_options() { - use cubeshared::codegen::root_as_http_message_with_opts; - use cubeshared::flatbuffers::VerifierOptions; + fn test_from_arrow_unsupported_type() -> Result<(), ParseError> { + let schema = Arc::new(Schema::new(vec![Field::new( + "blob", + DataType::Binary, + false, + )])); + let blobs = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref()]); + let batch = RecordBatch::try_new(schema, vec![Arc::new(blobs)]).unwrap(); + + let bytes = arrow_ipc_bytes(&batch); + let err = QueryResult::from_arrow(&bytes).expect_err("should reject Binary"); + assert!(matches!(err, ParseError::UnsupportedArrowType(_))); + + Ok(()) + } + #[test] + fn test_from_cubestore_fb_arrow_query_result() -> Result<(), ParseError> { + // Arrow IPC stream payload, as CubeStore would emit it. + let schema = Arc::new(Schema::new(vec![ + Field::new("city", DataType::Utf8, false), + Field::new("amount", DataType::Float64, false), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec!["Berlin", "Lisbon"])), + Arc::new(Float64Array::from(vec![1.5, 2.0])), + ], + ) + .unwrap(); + let ipc = arrow_ipc_bytes(&batch); + + // Wrap it in a FlatBuffer HttpMessage with the HttpQueryResult command. + let mut builder = FlatBufferBuilder::new(); + let data_vec = builder.create_vector(&ipc); + let arrow = HttpQueryResultArrow::create( + &mut builder, + &HttpQueryResultArrowArgs { + data: Some(data_vec), + is_last: true, + }, + ); + let query_result = HttpQueryResult::create( + &mut builder, + &HttpQueryResultArgs { + data_type: HttpQueryResultData::HttpQueryResultArrow, + data: Some(arrow.as_union_value()), + }, + ); + let connection_id = builder.create_string("test_connection"); + let message = HttpMessage::create( + &mut builder, + &HttpMessageArgs { + message_id: 1, + command_type: HttpCommand::HttpQueryResult, + command: Some(query_result.as_union_value()), + connection_id: Some(connection_id), + }, + ); + builder.finish(message, None); + let msg_data = builder.finished_data().to_vec(); + + let result = QueryResult::from_cubestore_fb(&msg_data)?; + assert_eq!(result.members, vec!["city", "amount"]); + assert_eq!(result.row_count, 2); + assert_eq!( + result.data[0].as_slice(), + &[ + DBResponsePrimitive::String("Berlin".to_string()), + DBResponsePrimitive::String("Lisbon".to_string()), + ] + ); + assert_eq!( + result.data[1].as_slice(), + &[ + DBResponsePrimitive::Float64(1.5), + DBResponsePrimitive::Float64(2.0), + ] + ); + + Ok(()) + } + + #[test] + fn test_from_cubestore_fb_completed_query_result() -> Result<(), ParseError> { + let mut builder = FlatBufferBuilder::new(); + let completed = + HttpQueryResultCompleted::create(&mut builder, &HttpQueryResultCompletedArgs {}); + let query_result = HttpQueryResult::create( + &mut builder, + &HttpQueryResultArgs { + data_type: HttpQueryResultData::HttpQueryResultCompleted, + data: Some(completed.as_union_value()), + }, + ); + let connection_id = builder.create_string("test_connection"); + let message = HttpMessage::create( + &mut builder, + &HttpMessageArgs { + message_id: 1, + command_type: HttpCommand::HttpQueryResult, + command: Some(query_result.as_union_value()), + connection_id: Some(connection_id), + }, + ); + builder.finish(message, None); + let msg_data = builder.finished_data().to_vec(); + + let result = QueryResult::from_cubestore_fb(&msg_data)?; + assert!(result.members.is_empty()); + assert_eq!(result.row_count, 0); + assert!(result.data.is_empty()); + + Ok(()) + } + + #[test] + fn test_parse_with_custom_verifier_options() -> Result<(), ParseError> { // Test that custom verifier options can handle large datasets let msg_data = create_test_message(33_000, 40); @@ -418,23 +847,21 @@ mod tests { }; // This should succeed with custom options - let result = root_as_http_message_with_opts(&opts, &msg_data); + let http_message = root_as_http_message_with_opts(&opts, &msg_data) + .map_err(|err| ParseError::FlatBufferError(err.to_string()))?; - match result { - Ok(http_message) => match http_message.command_type() { - HttpCommand::HttpResultSet => { - let result_set = http_message.command_as_http_result_set(); - if let Some(rs) = result_set { - if let Some(rows) = rs.rows() { - assert_eq!(rows.len(), 33_000); - } + match http_message.command_type() { + HttpCommand::HttpResultSet => { + let result_set = http_message.command_as_http_result_set(); + if let Some(rs) = result_set { + if let Some(rows) = rs.rows() { + assert_eq!(rows.len(), 33_000); } } - _ => panic!("Wrong command type"), - }, - Err(e) => { - panic!("Failed to parse with custom verifier options: {:?}", e); } + _ => panic!("Wrong command type"), } + + Ok(()) } } diff --git a/rust/cube/cubeorchestrator/src/query_result_transform.rs b/rust/cube/cubeorchestrator/src/query_result_transform.rs index a069d8bda04de..b839848da91ed 100644 --- a/rust/cube/cubeorchestrator/src/query_result_transform.rs +++ b/rust/cube/cubeorchestrator/src/query_result_transform.rs @@ -6,6 +6,7 @@ use crate::{ }, }; use anyhow::{bail, Context, Result}; +use chrono::format::{Fixed, Item, Numeric, Pad}; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use indexmap::{Equivalent, IndexMap}; use itertools::multizip; @@ -1152,16 +1153,58 @@ pub struct RequestResultArray { pub results: Vec, } -#[derive(Debug, Clone, Serialize, PartialEq)] -#[serde(untagged)] +#[derive(Debug, Clone, PartialEq)] pub enum DBResponsePrimitive { Null, Boolean(bool), - Number(f64), + Int64(i64), + UInt64(u64), + Float64(f64), String(String), + Timestamp(NaiveDateTime), Uncommon(Value), } +/// `%Y-%m-%dT%H:%M:%S%.3f` +const TIMESTAMP_ITEMS: &[Item<'static>] = &[ + Item::Numeric(Numeric::Year, Pad::Zero), + Item::Literal("-"), + Item::Numeric(Numeric::Month, Pad::Zero), + Item::Literal("-"), + Item::Numeric(Numeric::Day, Pad::Zero), + Item::Literal("T"), + Item::Numeric(Numeric::Hour, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Minute, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Second, Pad::Zero), + // `%.3f`: leading dot followed by 3 fractional-second digits. + Item::Fixed(Fixed::Nanosecond3), +]; + +// Hand-written `Serialize`. Numeric variants (`Int64`/`UInt64`/`Float64`) are +// rendered as JSON strings, not numbers: the legacy CubeStore result set carried +// every value as a string, and downstream consumers (and snapshots) rely on that +// shape. The Arrow path produces real numeric primitives, so we stringify them +// here to keep the JSON output identical to the legacy path. `Timestamp` is also +// string-rendered; `Boolean`/`Null`/`String` map to their JSON-native forms. +impl Serialize for DBResponsePrimitive { + fn serialize(&self, serializer: S) -> Result { + match self { + DBResponsePrimitive::Null => serializer.serialize_none(), + DBResponsePrimitive::Boolean(b) => serializer.serialize_bool(*b), + DBResponsePrimitive::Int64(n) => serializer.collect_str(n), + DBResponsePrimitive::UInt64(n) => serializer.collect_str(n), + DBResponsePrimitive::Float64(n) => serializer.collect_str(n), + DBResponsePrimitive::String(s) => serializer.serialize_str(s), + DBResponsePrimitive::Timestamp(dt) => { + serializer.collect_str(&dt.format_with_items(TIMESTAMP_ITEMS.iter())) + } + DBResponsePrimitive::Uncommon(v) => v.serialize(serializer), + } + } +} + // Hand-written `Deserialize` that avoids serde's untagged-enum buffering. impl<'de> Deserialize<'de> for DBResponsePrimitive { fn deserialize(deserializer: D) -> Result @@ -1182,23 +1225,35 @@ impl<'de> Deserialize<'de> for DBResponsePrimitive { } fn visit_i64(self, v: i64) -> Result { - Ok(DBResponsePrimitive::Number(v as f64)) + Ok(DBResponsePrimitive::Int64(v)) } fn visit_i128(self, v: i128) -> Result { - Ok(DBResponsePrimitive::Number(v as f64)) + if let Ok(n) = i64::try_from(v) { + Ok(DBResponsePrimitive::Int64(n)) + } else if let Ok(n) = u64::try_from(v) { + Ok(DBResponsePrimitive::UInt64(n)) + } else { + Err(E::custom(format!("integer {v} out of range for i64/u64"))) + } } fn visit_u64(self, v: u64) -> Result { - Ok(DBResponsePrimitive::Number(v as f64)) + Ok(DBResponsePrimitive::UInt64(v)) } fn visit_u128(self, v: u128) -> Result { - Ok(DBResponsePrimitive::Number(v as f64)) + if let Ok(n) = i64::try_from(v) { + Ok(DBResponsePrimitive::Int64(n)) + } else if let Ok(n) = u64::try_from(v) { + Ok(DBResponsePrimitive::UInt64(n)) + } else { + Err(E::custom(format!("integer {v} out of range for i64/u64"))) + } } fn visit_f64(self, v: f64) -> Result { - Ok(DBResponsePrimitive::Number(v)) + Ok(DBResponsePrimitive::Float64(v)) } fn visit_str(self, v: &str) -> Result { @@ -1254,12 +1309,18 @@ impl Display for DBResponsePrimitive { let str = match self { DBResponsePrimitive::Null => "null".to_string(), DBResponsePrimitive::Boolean(b) => b.to_string(), - DBResponsePrimitive::Number(n) => n.to_string(), + DBResponsePrimitive::Int64(n) => n.to_string(), + DBResponsePrimitive::UInt64(n) => n.to_string(), + DBResponsePrimitive::Float64(n) => n.to_string(), DBResponsePrimitive::String(s) => s.clone(), + DBResponsePrimitive::Timestamp(dt) => { + dt.format_with_items(TIMESTAMP_ITEMS.iter()).to_string() + } DBResponsePrimitive::Uncommon(v) => { serde_json::to_string(&v).unwrap_or_else(|_| v.to_string()) } }; + write!(f, "{}", str) } } @@ -1322,6 +1383,15 @@ mod tests { use serde_json::from_str; use std::{fmt, sync::LazyLock}; + /// Guards the in-memory size of the per-cell primitive. It is materialized + /// once per cell across every parse/transform path, so an accidental growth + /// (e.g. a fat new variant) would regress memory and throughput for large + /// result sets. Bump this deliberately if the layout must change. + #[test] + fn test_db_response_primitive_size() { + assert_eq!(std::mem::size_of::(), 32); + } + type TestSuiteData = HashMap; #[derive(Clone, Deserialize)]