Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
394 changes: 351 additions & 43 deletions packages/cubejs-backend-native/Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion packages/cubejs-backend-native/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ impl ResultWrapper {
fn db_primitive_to_field_value(value: &DBResponsePrimitive) -> FieldValue<'_> {
match value {
DBResponsePrimitive::String(s) => FieldValue::String(Cow::Borrowed(s)),
DBResponsePrimitive::Number(n) => FieldValue::Number(*n),
DBResponsePrimitive::Int64(n) => FieldValue::Number(*n as f64),
DBResponsePrimitive::UInt64(n) => FieldValue::Number(*n as f64),
DBResponsePrimitive::Float64(n) => FieldValue::Number(*n),
DBResponsePrimitive::Boolean(b) => FieldValue::Bool(*b),
DBResponsePrimitive::Timestamp(_) => FieldValue::String(Cow::Owned(value.to_string())),
Comment thread
claude[bot] marked this conversation as resolved.
DBResponsePrimitive::Uncommon(v) => FieldValue::String(Cow::Owned(
serde_json::to_string(&v).unwrap_or_else(|_| v.to_string()),
)),
Expand Down
14 changes: 9 additions & 5 deletions packages/cubejs-cubestore-driver/codegen/http-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,43 @@

import { HttpError } from './http-error.js';
import { HttpQuery } from './http-query.js';
import { HttpQueryResult } from './http-query-result.js';
import { HttpResultSet } from './http-result-set.js';


export enum HttpCommand {
NONE = 0,
HttpQuery = 1,
HttpResultSet = 2,
HttpError = 3
HttpError = 3,
HttpQueryResult = 4
}

export function unionToHttpCommand(
type: HttpCommand,
accessor: (obj:HttpError|HttpQuery|HttpResultSet) => HttpError|HttpQuery|HttpResultSet|null
): HttpError|HttpQuery|HttpResultSet|null {
accessor: (obj:HttpError|HttpQuery|HttpQueryResult|HttpResultSet) => HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null
): HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null {
switch(HttpCommand[type]) {
case 'NONE': return null;
case 'HttpQuery': return accessor(new HttpQuery())! as HttpQuery;
case 'HttpResultSet': return accessor(new HttpResultSet())! as HttpResultSet;
case 'HttpError': return accessor(new HttpError())! as HttpError;
case 'HttpQueryResult': return accessor(new HttpQueryResult())! as HttpQueryResult;
default: return null;
}
}

export function unionListToHttpCommand(
type: HttpCommand,
accessor: (index: number, obj:HttpError|HttpQuery|HttpResultSet) => HttpError|HttpQuery|HttpResultSet|null,
accessor: (index: number, obj:HttpError|HttpQuery|HttpQueryResult|HttpResultSet) => HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null,
index: number
): HttpError|HttpQuery|HttpResultSet|null {
): HttpError|HttpQuery|HttpQueryResult|HttpResultSet|null {
switch(HttpCommand[type]) {
case 'NONE': return null;
case 'HttpQuery': return accessor(index, new HttpQuery())! as HttpQuery;
case 'HttpResultSet': return accessor(index, new HttpResultSet())! as HttpResultSet;
case 'HttpError': return accessor(index, new HttpError())! as HttpError;
case 'HttpQueryResult': return accessor(index, new HttpQueryResult())! as HttpQueryResult;
default: return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// automatically generated by the FlatBuffers compiler, do not modify

/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */

import * as flatbuffers from 'flatbuffers';

export class HttpQueryResultArrow {
bb: flatbuffers.ByteBuffer|null = null;
bb_pos = 0;
__init(i:number, bb:flatbuffers.ByteBuffer):HttpQueryResultArrow {
this.bb_pos = i;
this.bb = bb;
return this;
}

static getRootAsHttpQueryResultArrow(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResultArrow):HttpQueryResultArrow {
return (obj || new HttpQueryResultArrow()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}

static getSizePrefixedRootAsHttpQueryResultArrow(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResultArrow):HttpQueryResultArrow {
bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH);
return (obj || new HttpQueryResultArrow()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}

data(index: number):number|null {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.readUint8(this.bb!.__vector(this.bb_pos + offset) + index) : 0;
}

dataLength():number {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0;
}

dataArray():Uint8Array|null {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? new Uint8Array(this.bb!.bytes().buffer, this.bb!.bytes().byteOffset + this.bb!.__vector(this.bb_pos + offset), this.bb!.__vector_len(this.bb_pos + offset)) : null;
}

isLast():boolean {
const offset = this.bb!.__offset(this.bb_pos, 6);
return offset ? !!this.bb!.readInt8(this.bb_pos + offset) : false;
}

static startHttpQueryResultArrow(builder:flatbuffers.Builder) {
builder.startObject(2);
}

static addData(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset) {
builder.addFieldOffset(0, dataOffset, 0);
}

static createDataVector(builder:flatbuffers.Builder, data:number[]|Uint8Array):flatbuffers.Offset {
builder.startVector(1, data.length, 1);
for (let i = data.length - 1; i >= 0; i--) {
builder.addInt8(data[i]!);
}
return builder.endVector();
}

static startDataVector(builder:flatbuffers.Builder, numElems:number) {
builder.startVector(1, numElems, 1);
}

static addIsLast(builder:flatbuffers.Builder, isLast:boolean) {
builder.addFieldInt8(1, +isLast, +false);
}

static endHttpQueryResultArrow(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
builder.requiredField(offset, 4) // data
return offset;
}

static createHttpQueryResultArrow(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset, isLast:boolean):flatbuffers.Offset {
HttpQueryResultArrow.startHttpQueryResultArrow(builder);
HttpQueryResultArrow.addData(builder, dataOffset);
HttpQueryResultArrow.addIsLast(builder, isLast);
return HttpQueryResultArrow.endHttpQueryResultArrow(builder);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// automatically generated by the FlatBuffers compiler, do not modify

/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */

import { HttpQueryResultArrow } from './http-query-result-arrow.js';


export enum HttpQueryResultData {
NONE = 0,
HttpQueryResultArrow = 1
}

export function unionToHttpQueryResultData(
type: HttpQueryResultData,
accessor: (obj:HttpQueryResultArrow) => HttpQueryResultArrow|null
): HttpQueryResultArrow|null {
switch(HttpQueryResultData[type]) {
case 'NONE': return null;
case 'HttpQueryResultArrow': return accessor(new HttpQueryResultArrow())! as HttpQueryResultArrow;
default: return null;
}
}

export function unionListToHttpQueryResultData(
type: HttpQueryResultData,
accessor: (index: number, obj:HttpQueryResultArrow) => HttpQueryResultArrow|null,
index: number
): HttpQueryResultArrow|null {
switch(HttpQueryResultData[type]) {
case 'NONE': return null;
case 'HttpQueryResultArrow': return accessor(index, new HttpQueryResultArrow())! as HttpQueryResultArrow;
default: return null;
}
}
62 changes: 62 additions & 0 deletions packages/cubejs-cubestore-driver/codegen/http-query-result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// automatically generated by the FlatBuffers compiler, do not modify

/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */

import * as flatbuffers from 'flatbuffers';

import { HttpQueryResultData, unionToHttpQueryResultData, unionListToHttpQueryResultData } from './http-query-result-data.js';


export class HttpQueryResult {
bb: flatbuffers.ByteBuffer|null = null;
bb_pos = 0;
__init(i:number, bb:flatbuffers.ByteBuffer):HttpQueryResult {
this.bb_pos = i;
this.bb = bb;
return this;
}

static getRootAsHttpQueryResult(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResult):HttpQueryResult {
return (obj || new HttpQueryResult()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}

static getSizePrefixedRootAsHttpQueryResult(bb:flatbuffers.ByteBuffer, obj?:HttpQueryResult):HttpQueryResult {
bb.setPosition(bb.position() + flatbuffers.SIZE_PREFIX_LENGTH);
return (obj || new HttpQueryResult()).__init(bb.readInt32(bb.position()) + bb.position(), bb);
}

dataType():HttpQueryResultData {
const offset = this.bb!.__offset(this.bb_pos, 4);
return offset ? this.bb!.readUint8(this.bb_pos + offset) : HttpQueryResultData.NONE;
}

data<T extends flatbuffers.Table>(obj:any):any|null {
const offset = this.bb!.__offset(this.bb_pos, 6);
return offset ? this.bb!.__union(obj, this.bb_pos + offset) : null;
}

static startHttpQueryResult(builder:flatbuffers.Builder) {
builder.startObject(2);
}

static addDataType(builder:flatbuffers.Builder, dataType:HttpQueryResultData) {
builder.addFieldInt8(0, dataType, HttpQueryResultData.NONE);
}

static addData(builder:flatbuffers.Builder, dataOffset:flatbuffers.Offset) {
builder.addFieldOffset(1, dataOffset, 0);
}

static endHttpQueryResult(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
builder.requiredField(offset, 6) // data
return offset;
}

static createHttpQueryResult(builder:flatbuffers.Builder, dataType:HttpQueryResultData, dataOffset:flatbuffers.Offset):flatbuffers.Offset {
HttpQueryResult.startHttpQueryResult(builder);
HttpQueryResult.addDataType(builder, dataType);
HttpQueryResult.addData(builder, dataOffset);
return HttpQueryResult.endHttpQueryResult(builder);
}
}
15 changes: 13 additions & 2 deletions packages/cubejs-cubestore-driver/codegen/http-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import * as flatbuffers from 'flatbuffers';

import { HttpParameter } from './http-parameter.js';
import { HttpTable } from './http-table.js';
import { QueryResultFormat } from './query-result-format.js';


export class HttpQuery {
Expand Down Expand Up @@ -60,8 +61,13 @@ parametersLength():number {
return offset ? this.bb!.__vector_len(this.bb_pos + offset) : 0;
}

responseFormat():QueryResultFormat {
const offset = this.bb!.__offset(this.bb_pos, 12);
return offset ? this.bb!.readUint8(this.bb_pos + offset) : QueryResultFormat.Legacy;
}

static startHttpQuery(builder:flatbuffers.Builder) {
builder.startObject(4);
builder.startObject(5);
}

static addQuery(builder:flatbuffers.Builder, queryOffset:flatbuffers.Offset) {
Expand Down Expand Up @@ -104,17 +110,22 @@ static startParametersVector(builder:flatbuffers.Builder, numElems:number) {
builder.startVector(4, numElems, 4);
}

static addResponseFormat(builder:flatbuffers.Builder, responseFormat:QueryResultFormat) {
builder.addFieldInt8(4, responseFormat, QueryResultFormat.Legacy);
}

static endHttpQuery(builder:flatbuffers.Builder):flatbuffers.Offset {
const offset = builder.endObject();
return offset;
}

static createHttpQuery(builder:flatbuffers.Builder, queryOffset:flatbuffers.Offset, traceObjOffset:flatbuffers.Offset, inlineTablesOffset:flatbuffers.Offset, parametersOffset:flatbuffers.Offset):flatbuffers.Offset {
static createHttpQuery(builder:flatbuffers.Builder, queryOffset:flatbuffers.Offset, traceObjOffset:flatbuffers.Offset, inlineTablesOffset:flatbuffers.Offset, parametersOffset:flatbuffers.Offset, responseFormat:QueryResultFormat):flatbuffers.Offset {
HttpQuery.startHttpQuery(builder);
HttpQuery.addQuery(builder, queryOffset);
HttpQuery.addTraceObj(builder, traceObjOffset);
HttpQuery.addInlineTables(builder, inlineTablesOffset);
HttpQuery.addParameters(builder, parametersOffset);
HttpQuery.addResponseFormat(builder, responseFormat);
return HttpQuery.endHttpQuery(builder);
}
}
4 changes: 4 additions & 0 deletions packages/cubejs-cubestore-driver/codegen/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ export { HttpMessage } from './http-message.js';
export { HttpParameter } from './http-parameter.js';
export { HttpParameterValue } from './http-parameter-value.js';
export { HttpQuery } from './http-query.js';
export { HttpQueryResult } from './http-query-result.js';
export { HttpQueryResultArrow } from './http-query-result-arrow.js';
export { HttpQueryResultData } from './http-query-result-data.js';
export { HttpResultSet } from './http-result-set.js';
export { HttpRow } from './http-row.js';
export { HttpTable } from './http-table.js';
export { Int64Value } from './int64-value.js';
export { NullValue } from './null-value.js';
export { QueryResultFormat } from './query-result-format.js';
export { StringValue } from './string-value.js';
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// automatically generated by the FlatBuffers compiler, do not modify

/* eslint-disable @typescript-eslint/no-unused-vars, @typescript-eslint/no-explicit-any, @typescript-eslint/no-non-null-assertion */

export enum QueryResultFormat {
Legacy = 0,
Arrow = 1
}
31 changes: 23 additions & 8 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
import { pipeline, Writable } from 'stream';
import { createGzip } from 'zlib';
import { createWriteStream, createReadStream } from 'fs';
import { createReadStream, createWriteStream } from 'fs';
import { unlink } from 'fs-extra';
import tempy from 'tempy';
import csvWriter from 'csv-write-stream';
import {
BaseDriver,
CreateTableIndex,
DownloadTableCSVData,
DownloadTableMemoryData,
DriverInterface,
ExternalCreateTableOptions,
DownloadTableMemoryData, DriverInterface, IndexesSQL, CreateTableIndex,
StreamTableData,
StreamingSourceTableData,
ExternalDriverCompatibilities,
IndexesSQL,
QueryOptions,
ExternalDriverCompatibilities, TableStructure, TableColumnQueryResult,
StreamingSourceTableData,
StreamTableData,
TableColumnQueryResult,
TableStructure,
} from '@cubejs-backend/base-driver';
import { AsyncDebounce, getEnv, isVersionGte } from '@cubejs-backend/shared';
import { format as formatSql, escape } from 'sqlstring';
import { escape, format as formatSql } from 'sqlstring';
import fetch from 'node-fetch';

import { ConnectionConfig } from './types';
import { WebSocketConnection } from './WebSocketConnection';
import { QueryResultFormat } from '../codegen';

const CubeStoreCapabilityMinVersion = {
queueExclusive: '1.6.22',
queueExternalId: '1.6.26',
sendableParameters: '1.6.38',
// Arrow format + Completed response type
arrowFormat: '1.6.55',
} satisfies Record<string, string>;
type CubeStoreCapability = keyof typeof CubeStoreCapabilityMinVersion;

Expand Down Expand Up @@ -61,6 +69,7 @@ type CreateTableOptions = {

type CubeStoreQueryOptions = QueryOptions & {
sendParameters?: boolean,
responseFormat?: QueryResultFormat,
};

export class CubeStoreDriver extends BaseDriver implements DriverInterface {
Expand Down Expand Up @@ -98,15 +107,21 @@ export class CubeStoreDriver extends BaseDriver implements DriverInterface {
}

public async query<R = any>(query: string, values: any[], options?: CubeStoreQueryOptions): Promise<R[]> {
const { inlineTables, sendParameters, ...queryTracingObj } = options ?? {};
const { inlineTables, sendParameters, responseFormat, ...queryTracingObj } = options ?? {};

if (!sendParameters) {
query = formatSql(query, values || []);
}

const tracingObj = { ...queryTracingObj, instance: getEnv('instanceId') };

return this.connection.query(query, inlineTables ?? [], tracingObj, sendParameters ? values : undefined);
return this.connection.query(query, sendParameters ? values : [], {
inlineTables: inlineTables ?? [],
queryTracingObj: tracingObj,
responseFormat: responseFormat ?? (
await this.hasCapability('arrowFormat') ? QueryResultFormat.Arrow : QueryResultFormat.Legacy
),
});
Comment thread
ovr marked this conversation as resolved.
}

public async release() {
Expand Down
Loading
Loading