Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,220 @@ pub fn core_lake_snapshot_to_ffi(snapshot: &fcore::metadata::LakeSnapshot) -> ff
bucket_offsets,
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
BinaryArray, BooleanArray, Date32Array, FixedSizeBinaryArray, Float32Array, Float64Array,
Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, RecordBatch,
Time32MillisecondArray, Time64MicrosecondArray, TimestampMillisecondArray,
};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use fcore::record::{ChangeType, ScanRecord, ScanRecords};
use fcore::row::{ColumnarRow, InternalRow};
use std::collections::HashMap;
use std::sync::Arc;

fn make_ffi_datum(datum_type: i32) -> ffi::FfiDatum {
ffi::FfiDatum {
datum_type,
bool_val: false,
i32_val: 0,
i64_val: 0,
f32_val: 0.0,
f64_val: 0.0,
string_val: String::new(),
bytes_val: vec![],
}
}

#[test]
fn ffi_descriptor_to_core_rejects_invalid_type() {
let descriptor = ffi::FfiTableDescriptor {
schema: ffi::FfiSchema {
columns: vec![ffi::FfiColumn {
name: "bad".to_string(),
data_type: 999,
comment: String::new(),
}],
primary_keys: vec![],
},
partition_keys: vec![],
bucket_count: 0,
bucket_keys: vec![],
properties: vec![],
comment: String::new(),
};

let result = ffi_descriptor_to_core(&descriptor);
assert!(result.is_err());
}

#[test]
fn ffi_row_to_core_maps_datum_types() {
let mut fields = Vec::new();

fields.push(make_ffi_datum(DATUM_TYPE_NULL));

let mut bool_datum = make_ffi_datum(DATUM_TYPE_BOOL);
bool_datum.bool_val = true;
fields.push(bool_datum);

let mut int32_datum = make_ffi_datum(DATUM_TYPE_INT32);
int32_datum.i32_val = 11;
fields.push(int32_datum);

let mut int64_datum = make_ffi_datum(DATUM_TYPE_INT64);
int64_datum.i64_val = 22;
fields.push(int64_datum);

let mut f32_datum = make_ffi_datum(DATUM_TYPE_FLOAT32);
f32_datum.f32_val = 1.25;
fields.push(f32_datum);

let mut f64_datum = make_ffi_datum(DATUM_TYPE_FLOAT64);
f64_datum.f64_val = 2.5;
fields.push(f64_datum);

let mut str_datum = make_ffi_datum(DATUM_TYPE_STRING);
str_datum.string_val = "hello".to_string();
fields.push(str_datum);

let mut bytes_datum = make_ffi_datum(DATUM_TYPE_BYTES);
bytes_datum.bytes_val = vec![1, 2, 3];
fields.push(bytes_datum);

let row = ffi::FfiGenericRow { fields };
let core_row = ffi_row_to_core(&row);

assert_eq!(core_row.get_field_count(), 8);
assert!(core_row.is_null_at(0));
assert!(core_row.get_boolean(1));
assert_eq!(core_row.get_int(2), 11);
assert_eq!(core_row.get_long(3), 22);
assert!((core_row.get_float(4) - 1.25).abs() < f32::EPSILON);
assert!((core_row.get_double(5) - 2.5).abs() < f64::EPSILON);
assert_eq!(core_row.get_string(6), "hello");
assert_eq!(core_row.get_bytes(7), &[1, 2, 3]);
}

#[test]
fn core_scan_records_to_ffi_maps_records() {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![7]))])
.expect("record batch");
let row = ColumnarRow::new(Arc::new(batch));
let record = ScanRecord::new(row, 10, 20, ChangeType::Insert);

let bucket = fcore::metadata::TableBucket::new(1, 2);
let records = ScanRecords::new(HashMap::from([(bucket.clone(), vec![record])]));

let ffi_records = core_scan_records_to_ffi(&records);
assert_eq!(ffi_records.records.len(), 1);
let ffi_record = &ffi_records.records[0];
assert_eq!(ffi_record.bucket_id, 2);
assert_eq!(ffi_record.offset, 10);
assert_eq!(ffi_record.timestamp, 20);
assert_eq!(ffi_record.row.fields.len(), 1);
assert_eq!(ffi_record.row.fields[0].datum_type, DATUM_TYPE_INT32);
assert_eq!(ffi_record.row.fields[0].i32_val, 7);
}

#[test]
fn core_row_to_ffi_fields_maps_arrow_types() {
let schema = Arc::new(Schema::new(vec![
Field::new("b", DataType::Boolean, false),
Field::new("i32", DataType::Int32, false),
Field::new("i64", DataType::Int64, false),
Field::new("f32", DataType::Float32, false),
Field::new("f64", DataType::Float64, false),
Field::new("s", DataType::Utf8, false),
Field::new("bin", DataType::Binary, false),
Field::new("fix", DataType::FixedSizeBinary(2), false),
Field::new("date", DataType::Date32, false),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("t32", DataType::Time32(TimeUnit::Millisecond), false),
Field::new("t64", DataType::Time64(TimeUnit::Microsecond), false),
Field::new("ls", DataType::LargeUtf8, false),
Field::new("lb", DataType::LargeBinary, false),
]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(BooleanArray::from(vec![true])),
Arc::new(Int32Array::from(vec![1])),
Arc::new(Int64Array::from(vec![2])),
Arc::new(Float32Array::from(vec![1.5])),
Arc::new(Float64Array::from(vec![2.5])),
Arc::new(arrow::array::StringArray::from(vec!["text"])),
Arc::new(BinaryArray::from(vec![b"bin".as_slice()])),
Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter_with_size(
vec![Some(b"ab".as_slice())].into_iter(),
2,
)
.expect("fixed array"),
),
Arc::new(Date32Array::from(vec![3])),
Arc::new(TimestampMillisecondArray::from(vec![30])),
Arc::new(Time32MillisecondArray::from(vec![10])),
Arc::new(Time64MicrosecondArray::from(vec![20])),
Arc::new(LargeStringArray::from(vec!["large"])),
Arc::new(LargeBinaryArray::from(vec![b"big".as_slice()])),
],
)
.expect("record batch");

let row = ColumnarRow::new(Arc::new(batch));
let fields = core_row_to_ffi_fields(&row);
assert_eq!(fields.len(), 14);

assert_eq!(fields[0].datum_type, DATUM_TYPE_BOOL);
assert!(fields[0].bool_val);

assert_eq!(fields[1].datum_type, DATUM_TYPE_INT32);
assert_eq!(fields[1].i32_val, 1);

assert_eq!(fields[2].datum_type, DATUM_TYPE_INT64);
assert_eq!(fields[2].i64_val, 2);

assert_eq!(fields[3].datum_type, DATUM_TYPE_FLOAT32);
assert!((fields[3].f32_val - 1.5).abs() < f32::EPSILON);

assert_eq!(fields[4].datum_type, DATUM_TYPE_FLOAT64);
assert!((fields[4].f64_val - 2.5).abs() < f64::EPSILON);

assert_eq!(fields[5].datum_type, DATUM_TYPE_STRING);
assert_eq!(fields[5].string_val, "text");

assert_eq!(fields[6].datum_type, DATUM_TYPE_BYTES);
assert_eq!(fields[6].bytes_val, b"bin");

assert_eq!(fields[7].datum_type, DATUM_TYPE_BYTES);
assert_eq!(fields[7].bytes_val, b"ab");

assert_eq!(fields[8].datum_type, DATUM_TYPE_INT32);
assert_eq!(fields[8].i32_val, 3);

assert_eq!(fields[9].datum_type, DATUM_TYPE_INT64);
assert_eq!(fields[9].i64_val, 30);

assert_eq!(fields[10].datum_type, DATUM_TYPE_INT32);
assert_eq!(fields[10].i32_val, 10);

assert_eq!(fields[11].datum_type, DATUM_TYPE_INT64);
assert_eq!(fields[11].i64_val, 20);

assert_eq!(fields[12].datum_type, DATUM_TYPE_STRING);
assert_eq!(fields[12].string_val, "large");

assert_eq!(fields[13].datum_type, DATUM_TYPE_BYTES);
assert_eq!(fields[13].bytes_val, b"big");
}
}
Loading