Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion vine-core/tests/arrow_bridge_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::sync::Arc;

#[test]
fn test_arrow_ipc_serialization_roundtrip() {
// Create a simple RecordBatch directly without CSV conversion
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
Expand Down
90 changes: 24 additions & 66 deletions vine-core/tests/storage_reader_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use vine_core::storage_reader::read_vine_data;
use vine_core::metadata::{Metadata, MetadataField};
use vine_core::vortex_exp::write_vortex_file;
use vine_core::arrow_bridge::vortex_to_arrow;
use tempfile::tempdir;
use std::fs;

Expand All @@ -24,118 +25,97 @@ fn create_test_metadata() -> Metadata {
)
}

/// Helper: count total rows from read results using Arrow conversion
fn total_rows(arrays: &[vortex::ArrayRef]) -> usize {
arrays.iter().map(|a| {
vortex_to_arrow(a, true).expect("Failed to convert").num_rows()
}).sum()
}

#[test]
fn test_read_vine_data_single_file() {
let temp_dir = tempdir().expect("Failed to create temp dir");
let base_path = temp_dir.path();

// Create metadata
let metadata = create_test_metadata();
let meta_path = base_path.join("vine_meta.json");
metadata.save(meta_path.to_str().unwrap()).expect("Failed to save metadata");

// Create date directory
let date_dir = base_path.join("2024-01-15");
fs::create_dir(&date_dir).expect("Failed to create date dir");

// Write test data
let csv_rows = vec!["1,Alice".to_string(), "2,Bob".to_string()];
let csv_rows_refs: Vec<&str> = csv_rows.iter().map(|s| s.as_str()).collect();
let vtx_path = date_dir.join("data_120000_000000.vtx");
write_vortex_file(&vtx_path, &metadata, &csv_rows_refs)
write_vortex_file(&vtx_path, &metadata, &["1,Alice", "2,Bob"])
.expect("Failed to write vortex file");

// Read data
let result = read_vine_data(base_path.to_str().unwrap());
assert_eq!(result.len(), 1); // 1 file = 1 array
assert_eq!(total_rows(&result), 2);

assert_eq!(result.len(), 2);
assert_eq!(result[0], "1,Alice");
assert_eq!(result[1], "2,Bob");
// Verify column structure
let batch = vortex_to_arrow(&result[0], true).expect("Failed to convert");
assert_eq!(batch.num_columns(), 2);
}

#[test]
fn test_read_vine_data_multiple_files() {
let temp_dir = tempdir().expect("Failed to create temp dir");
let base_path = temp_dir.path();

// Create metadata
let metadata = create_test_metadata();
let meta_path = base_path.join("vine_meta.json");
metadata.save(meta_path.to_str().unwrap()).expect("Failed to save metadata");

// Create date directory
let date_dir = base_path.join("2024-01-15");
fs::create_dir(&date_dir).expect("Failed to create date dir");

// Write first file
let csv_rows1 = vec!["1,Alice".to_string()];
let csv_rows1_refs: Vec<&str> = csv_rows1.iter().map(|s| s.as_str()).collect();
let vtx_path1 = date_dir.join("data_120000_000000.vtx");
write_vortex_file(&vtx_path1, &metadata, &csv_rows1_refs)
write_vortex_file(&vtx_path1, &metadata, &["1,Alice"])
.expect("Failed to write first vortex file");

// Write second file
let csv_rows2 = vec!["2,Bob".to_string()];
let csv_rows2_refs: Vec<&str> = csv_rows2.iter().map(|s| s.as_str()).collect();
let vtx_path2 = date_dir.join("data_130000_000000.vtx");
write_vortex_file(&vtx_path2, &metadata, &csv_rows2_refs)
write_vortex_file(&vtx_path2, &metadata, &["2,Bob"])
.expect("Failed to write second vortex file");

// Read data
let result = read_vine_data(base_path.to_str().unwrap());

assert_eq!(result.len(), 2);
assert_eq!(total_rows(&result), 2);
}

#[test]
fn test_read_vine_data_multiple_dates() {
let temp_dir = tempdir().expect("Failed to create temp dir");
let base_path = temp_dir.path();

// Create metadata
let metadata = create_test_metadata();
let meta_path = base_path.join("vine_meta.json");
metadata.save(meta_path.to_str().unwrap()).expect("Failed to save metadata");

// Create first date directory
let date_dir1 = base_path.join("2024-01-14");
fs::create_dir(&date_dir1).expect("Failed to create first date dir");
let csv_rows1 = vec!["1,Alice".to_string()];
let csv_rows1_refs: Vec<&str> = csv_rows1.iter().map(|s| s.as_str()).collect();
let vtx_path1 = date_dir1.join("data_120000_000000.vtx");
write_vortex_file(&vtx_path1, &metadata, &csv_rows1_refs)
write_vortex_file(&vtx_path1, &metadata, &["1,Alice"])
.expect("Failed to write first vortex file");

// Create second date directory
let date_dir2 = base_path.join("2024-01-15");
fs::create_dir(&date_dir2).expect("Failed to create second date dir");
let csv_rows2 = vec!["2,Bob".to_string()];
let csv_rows2_refs: Vec<&str> = csv_rows2.iter().map(|s| s.as_str()).collect();
let vtx_path2 = date_dir2.join("data_120000_000000.vtx");
write_vortex_file(&vtx_path2, &metadata, &csv_rows2_refs)
write_vortex_file(&vtx_path2, &metadata, &["2,Bob"])
.expect("Failed to write second vortex file");

// Read data (should be in chronological order)
let result = read_vine_data(base_path.to_str().unwrap());

assert_eq!(result.len(), 2);
assert_eq!(result[0], "1,Alice"); // 2024-01-14 comes first
assert_eq!(result[1], "2,Bob"); // 2024-01-15 comes second
assert_eq!(total_rows(&result), 2);
}

#[test]
fn test_read_vine_data_empty_directory() {
let temp_dir = tempdir().expect("Failed to create temp dir");
let base_path = temp_dir.path();

// Create metadata
let metadata = create_test_metadata();
let meta_path = base_path.join("vine_meta.json");
metadata.save(meta_path.to_str().unwrap()).expect("Failed to save metadata");

// Read data from empty directory
let result = read_vine_data(base_path.to_str().unwrap());

assert!(result.is_empty());
}

Expand All @@ -144,10 +124,7 @@ fn test_read_vine_data_missing_metadata() {
let temp_dir = tempdir().expect("Failed to create temp dir");
let base_path = temp_dir.path();

// Don't create metadata file
let result = read_vine_data(base_path.to_str().unwrap());

// Should return empty vector on error
assert!(result.is_empty());
}

Expand All @@ -156,61 +133,42 @@ fn test_read_vine_data_ignores_non_vtx_files() {
let temp_dir = tempdir().expect("Failed to create temp dir");
let base_path = temp_dir.path();

// Create metadata
let metadata = create_test_metadata();
let meta_path = base_path.join("vine_meta.json");
metadata.save(meta_path.to_str().unwrap()).expect("Failed to save metadata");

// Create date directory
let date_dir = base_path.join("2024-01-15");
fs::create_dir(&date_dir).expect("Failed to create date dir");

// Write vtx file
let csv_rows = vec!["1,Alice".to_string()];
let csv_rows_refs: Vec<&str> = csv_rows.iter().map(|s| s.as_str()).collect();
let vtx_path = date_dir.join("data_120000_000000.vtx");
write_vortex_file(&vtx_path, &metadata, &csv_rows_refs)
write_vortex_file(&vtx_path, &metadata, &["1,Alice"])
.expect("Failed to write vortex file");

// Create non-vtx file
let txt_path = date_dir.join("README.txt");
fs::write(&txt_path, "This should be ignored").expect("Failed to write txt file");

// Read data
let result = read_vine_data(base_path.to_str().unwrap());

// Should only read the .vtx file
assert_eq!(result.len(), 1);
assert_eq!(result[0], "1,Alice");
assert_eq!(total_rows(&result), 1);
}

#[test]
fn test_read_vine_data_ignores_invalid_date_directories() {
let temp_dir = tempdir().expect("Failed to create temp dir");
let base_path = temp_dir.path();

// Create metadata
let metadata = create_test_metadata();
let meta_path = base_path.join("vine_meta.json");
metadata.save(meta_path.to_str().unwrap()).expect("Failed to save metadata");

// Create valid date directory
let valid_date_dir = base_path.join("2024-01-15");
fs::create_dir(&valid_date_dir).expect("Failed to create valid date dir");
let csv_rows = vec!["1,Alice".to_string()];
let csv_rows_refs: Vec<&str> = csv_rows.iter().map(|s| s.as_str()).collect();
let vtx_path = valid_date_dir.join("data_120000_000000.vtx");
write_vortex_file(&vtx_path, &metadata, &csv_rows_refs)
write_vortex_file(&vtx_path, &metadata, &["1,Alice"])
.expect("Failed to write vortex file");

// Create invalid date directory
let invalid_date_dir = base_path.join("not-a-date");
fs::create_dir(&invalid_date_dir).expect("Failed to create invalid date dir");

// Read data
let result = read_vine_data(base_path.to_str().unwrap());

// Should only read from valid date directory
assert_eq!(result.len(), 1);
assert_eq!(result[0], "1,Alice");
assert_eq!(total_rows(&result), 1);
}
27 changes: 17 additions & 10 deletions vine-core/tests/streaming_writer_v2_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use vine_core::streaming_writer_v2::StreamingWriterV2;
use vine_core::writer_config::WriterConfig;
use vine_core::metadata::{Metadata, MetadataField};
use vine_core::vortex_exp::build_struct_array;
use tempfile::tempdir;
use chrono::Local;

Expand All @@ -24,6 +25,11 @@ fn create_test_metadata() -> Metadata {
)
}

/// Helper: build VortexArrayRef from comma-separated rows
fn build_test_array(metadata: &Metadata, rows: &[&str]) -> vortex::ArrayRef {
build_struct_array(metadata, rows).expect("Failed to build test array")
}

#[test]
fn test_streaming_writer_v2_basic() {
let temp_dir = tempdir().expect("Failed to create temp dir");
Expand All @@ -37,15 +43,17 @@ fn test_streaming_writer_v2_basic() {
.expect("Failed to create writer");

// Write and accumulate
writer.write_batch(&["1,Alice", "2,Bob"]).expect("Write failed");
let array1 = build_test_array(&metadata, &["1,Alice", "2,Bob"]);
writer.write_batch(&array1).expect("Write failed");
assert_eq!(writer.buffered_rows(), 2);
assert_eq!(writer.buffered_chunks(), 1);

writer.write_batch(&["3,Charlie"]).expect("Write failed");
let array2 = build_test_array(&metadata, &["3,Charlie"]);
writer.write_batch(&array2).expect("Write failed");
assert_eq!(writer.buffered_rows(), 3);
assert_eq!(writer.buffered_chunks(), 2);

// Flush - should write to file and return summary
// Flush
let summary = writer.flush().expect("Flush failed");
assert!(summary.is_some(), "Should return flush summary");
let summary = summary.unwrap();
Expand All @@ -59,7 +67,8 @@ fn test_streaming_writer_v2_basic() {
assert!(writer.bytes_written() > 0);

// Write more (new file)
writer.write_batch(&["4,Diana"]).expect("Write failed");
let array3 = build_test_array(&metadata, &["4,Diana"]);
writer.write_batch(&array3).expect("Write failed");
writer.close().expect("Close failed");

// Verify files
Expand All @@ -85,19 +94,18 @@ fn test_auto_flush() {
let metadata = create_test_metadata();
metadata.save(meta_path.to_str().unwrap()).expect("Failed to save metadata");

// Create writer with small max_rows_per_file
let mut config = WriterConfig::default();
config.max_rows_per_file = 5;

let mut writer = StreamingWriterV2::with_config(path.to_path_buf(), config)
.expect("Failed to create writer");

// Write 3 rows (no flush yet)
writer.write_batch(&["1,A", "2,B", "3,C"]).expect("Write failed");
let array1 = build_test_array(&metadata, &["1,A", "2,B", "3,C"]);
writer.write_batch(&array1).expect("Write failed");
assert_eq!(writer.buffered_rows(), 3);

// Write 3 more rows (3+3 > 5, so flushes first 3 data, then add 3)
writer.write_batch(&["4,D", "5,E", "6,F"]).expect("Write failed");
let array2 = build_test_array(&metadata, &["4,D", "5,E", "6,F"]);
writer.write_batch(&array2).expect("Write failed");
assert_eq!(writer.buffered_rows(), 3);

writer.close().expect("Close failed");
Expand All @@ -115,7 +123,6 @@ fn test_empty_flush() {
let mut writer = StreamingWriterV2::new(path.to_path_buf())
.expect("Failed to create writer");

// Flush without writing should return None
let summary = writer.flush().expect("Flush should succeed");
assert!(summary.is_none(), "Empty flush should return None");
assert_eq!(writer.bytes_written(), 0);
Expand Down
Loading
Loading