From 2c00b996e41bdaa8e5e2f708ee7e8b89d75fc37d Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Mon, 15 Jun 2026 00:52:45 +0800 Subject: [PATCH] feat: add Mosaic data file reader Signed-off-by: QuakeWang --- .github/workflows/ci.yml | 6 +- crates/integrations/datafusion/Cargo.toml | 1 + crates/paimon/Cargo.toml | 2 + crates/paimon/src/arrow/format/mod.rs | 22 +- crates/paimon/src/arrow/format/mosaic.rs | 573 ++++++++++++++++++++++ 5 files changed, 600 insertions(+), 4 deletions(-) create mode 100644 crates/paimon/src/arrow/format/mosaic.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 038ece72..3fd54d19 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,7 +58,7 @@ jobs: run: cargo fmt --all -- --check - name: Clippy - run: cargo clippy --all-targets --workspace --features fulltext,vortex -- -D warnings + run: cargo clippy --all-targets --workspace --features fulltext,vortex,mosaic -- -D warnings build: runs-on: ${{ matrix.os }} @@ -71,7 +71,7 @@ jobs: steps: - uses: actions/checkout@v6 - name: Build - run: cargo build --features fulltext,vortex + run: cargo build --features fulltext,vortex,mosaic unit: runs-on: ${{ matrix.os }} @@ -85,7 +85,7 @@ jobs: - uses: actions/checkout@v6 - name: Test - run: cargo test -p paimon --all-targets --features fulltext,vortex + run: cargo test -p paimon --all-targets --features fulltext,vortex,mosaic env: RUST_LOG: DEBUG RUST_BACKTRACE: full diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 3e7392de..a0a9bf54 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -29,6 +29,7 @@ keywords = ["paimon", "datafusion", "integrations"] [features] fulltext = ["paimon/fulltext"] +mosaic = ["paimon/mosaic"] vortex = ["paimon/vortex"] [dependencies] diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 0741a50c..2a45ff55 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -42,6 +42,7 @@ storage-all = [ "storage-hdfs", ] fulltext = ["tantivy", "tempfile"] +mosaic = ["dep:paimon-mosaic-core"] vortex = ["dep:vortex"] storage-memory = ["opendal/services-memory"] @@ -101,6 +102,7 @@ uuid = { version = "1", features = ["v4"] } urlencoding = "2.1" tantivy = { version = "0.22", optional = true } tempfile = { version = "3", optional = true } +paimon-mosaic-core = { version = "0.1.0", optional = true } vortex = { version = "0.68", features = ["tokio"], optional = true } libloading = "0.9" # Keep CI on the dependency set that passed before unicode-segmentation 1.13.3. diff --git a/crates/paimon/src/arrow/format/mod.rs b/crates/paimon/src/arrow/format/mod.rs index 17552e1b..955e96af 100644 --- a/crates/paimon/src/arrow/format/mod.rs +++ b/crates/paimon/src/arrow/format/mod.rs @@ -17,6 +17,8 @@ mod avro; pub(crate) mod blob; +#[cfg(feature = "mosaic")] +mod mosaic; mod orc; mod parquet; #[cfg(feature = "vortex")] @@ -105,18 +107,36 @@ pub(crate) fn create_format_reader( } else if lower.ends_with(".avro") { Ok(Box::new(avro::AvroFormatReader)) } else { + #[cfg(feature = "mosaic")] + if lower.ends_with(".mosaic") { + return Ok(Box::new(mosaic::MosaicFormatReader)); + } #[cfg(feature = "vortex")] if lower.ends_with(".vortex") { return Ok(Box::new(vortex::VortexFormatReader)); } Err(Error::Unsupported { message: format!( - "unsupported file format: expected .parquet, .blob, .orc, or .avro, got: {path}" + "unsupported file format: expected {}, got: {path}", + supported_read_formats().join(", ") ), }) } } +fn supported_read_formats() -> Vec<&'static str> { + vec![ + ".parquet", + ".blob", + ".orc", + ".avro", + #[cfg(feature = "mosaic")] + ".mosaic", + #[cfg(feature = "vortex")] + ".vortex", + ] +} + /// Create a format writer that streams directly to storage. pub(crate) async fn create_format_writer( output: &OutputFile, diff --git a/crates/paimon/src/arrow/format/mosaic.rs b/crates/paimon/src/arrow/format/mosaic.rs new file mode 100644 index 00000000..b7e3c47a --- /dev/null +++ b/crates/paimon/src/arrow/format/mosaic.rs @@ -0,0 +1,573 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{FilePredicates, FormatFileReader}; +use crate::arrow::build_target_arrow_schema; +use crate::io::FileRead; +use crate::spec::DataField; +use crate::table::{ArrowRecordBatchStream, RowRange}; +use crate::Error; +use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; +use arrow_schema::{DataType as ArrowDataType, SchemaRef, TimeUnit}; +use async_stream::try_stream; +use async_trait::async_trait; +use bytes::Bytes; +use futures::StreamExt; +use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess}; +use std::io; + +pub(crate) struct MosaicFormatReader; + +const DEFAULT_BATCH_SIZE: usize = 8192; + +#[async_trait] +impl FormatFileReader for MosaicFormatReader { + async fn read_batch_stream( + &self, + reader: Box, + file_size: u64, + read_fields: &[DataField], + _predicates: Option<&FilePredicates>, + batch_size: Option, + row_selection: Option>, + ) -> crate::Result { + // Mosaic predicates are currently residual; callers must re-check them for exact filtering. + let target_schema = build_target_arrow_schema(read_fields)?; + validate_mosaic_schema(&target_schema)?; + + let file_bytes = reader.read(0..file_size).await?; + let mosaic_reader = MosaicReader::new(MemoryInputFile::new(file_bytes), file_size) + .map_err(mosaic_read_error)?; + let projected_names = read_fields + .iter() + .map(|field| field.name().to_string()) + .collect::>(); + let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + + Ok(try_stream! { + let mut row_group_start = 0usize; + for row_group_index in 0..mosaic_reader.num_row_groups() { + let row_group_rows = mosaic_reader + .row_group_num_rows(row_group_index) + .map_err(mosaic_read_error)?; + let selected_indices = selected_indices_for_row_group( + row_group_rows, + row_group_start, + row_selection.as_deref(), + )?; + row_group_start = row_group_start + .checked_add(row_group_rows) + .ok_or_else(|| Error::DataInvalid { + message: "Mosaic row group row count overflow".to_string(), + source: None, + })?; + + if let Some(indices) = selected_indices.as_ref() { + if indices.is_empty() { + continue; + } + } + + let mut row_group_reader = if projected_names.is_empty() { + mosaic_reader + .row_group_reader_by_names(row_group_index, &[]) + .map_err(mosaic_read_error)? + } else { + let names = projected_names + .iter() + .map(String::as_str) + .collect::>(); + mosaic_reader + .row_group_reader_by_names(row_group_index, &names) + .map_err(mosaic_read_error)? + }; + + let batch = row_group_reader + .read_columns() + .map_err(mosaic_read_error)?; + let batch = take_rows(batch, selected_indices.as_ref(), &target_schema)?; + for chunk in split_batch(batch, batch_size) { + yield chunk; + } + } + } + .boxed()) + } +} + +#[derive(Clone)] +struct MemoryInputFile { + data: Bytes, +} + +impl MemoryInputFile { + fn new(data: Bytes) -> Self { + Self { data } + } +} + +impl InputFile for MemoryInputFile { + fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> { + let offset = usize::try_from(offset).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "mosaic read offset exceeds usize", + ) + })?; + let end = offset.checked_add(buf.len()).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "mosaic read range overflows") + })?; + let src = self.data.get(offset..end).ok_or_else(|| { + io::Error::new( + io::ErrorKind::UnexpectedEof, + "mosaic read range exceeds file size", + ) + })?; + buf.copy_from_slice(src); + Ok(()) + } +} + +fn validate_mosaic_schema(schema: &SchemaRef) -> crate::Result<()> { + for field in schema.fields() { + validate_mosaic_arrow_type(field.data_type()).map_err(|message| Error::Unsupported { + message: format!( + "Mosaic format does not support column '{}' with type {:?}: {message}", + field.name(), + field.data_type() + ), + })?; + } + Ok(()) +} + +fn validate_mosaic_arrow_type(data_type: &ArrowDataType) -> Result<(), String> { + match data_type { + ArrowDataType::Boolean + | ArrowDataType::Int8 + | ArrowDataType::Int16 + | ArrowDataType::Int32 + | ArrowDataType::Int64 + | ArrowDataType::Float32 + | ArrowDataType::Float64 + | ArrowDataType::Date32 + | ArrowDataType::Utf8 + | ArrowDataType::Binary => Ok(()), + ArrowDataType::Time32(TimeUnit::Millisecond) => Ok(()), + ArrowDataType::Decimal128(precision, _) => { + if *precision == 0 || *precision > 38 { + Err(format!( + "Decimal precision must be in 1..=38, got {precision}" + )) + } else { + Ok(()) + } + } + ArrowDataType::Timestamp( + TimeUnit::Millisecond | TimeUnit::Microsecond | TimeUnit::Nanosecond, + _, + ) => Ok(()), + ArrowDataType::Struct(fields) if is_timestamp_nanos_struct(fields) => Ok(()), + other => Err(format!("unsupported Arrow type {other:?}")), + } +} + +fn is_timestamp_nanos_struct(fields: &arrow_schema::Fields) -> bool { + fields.len() == 2 + && fields[0].name() == "millis" + && *fields[0].data_type() == ArrowDataType::Int64 + && fields[1].name() == "nanos_of_milli" + && *fields[1].data_type() == ArrowDataType::Int32 +} + +fn selected_indices_for_row_group( + row_group_rows: usize, + row_group_start: usize, + row_selection: Option<&[RowRange]>, +) -> crate::Result> { + let Some(row_selection) = row_selection else { + return Ok(None); + }; + + let row_group_end = + row_group_start + .checked_add(row_group_rows) + .ok_or_else(|| Error::DataInvalid { + message: "Mosaic row group row range overflow".to_string(), + source: None, + })?; + + let mut indices = Vec::new(); + for range in row_selection { + let from = usize::try_from(range.from()).map_err(|e| Error::DataInvalid { + message: format!( + "Invalid negative Mosaic row selection start: {}", + range.from() + ), + source: Some(Box::new(e)), + })?; + let to_inclusive = usize::try_from(range.to()).map_err(|e| Error::DataInvalid { + message: format!("Invalid negative Mosaic row selection end: {}", range.to()), + source: Some(Box::new(e)), + })?; + let to = to_inclusive + .checked_add(1) + .ok_or_else(|| Error::DataInvalid { + message: "Mosaic row selection end overflows".to_string(), + source: None, + })?; + let start = from.max(row_group_start); + let end = to.min(row_group_end); + if start >= end { + continue; + } + indices.extend((start - row_group_start..end - row_group_start).map(|idx| idx as u64)); + } + + Ok(Some(UInt64Array::from(indices))) +} + +fn take_rows( + batch: RecordBatch, + indices: Option<&UInt64Array>, + target_schema: &SchemaRef, +) -> crate::Result { + let Some(indices) = indices else { + return ensure_schema(batch, target_schema); + }; + + if batch.num_columns() == 0 { + return RecordBatch::try_new_with_options( + target_schema.clone(), + Vec::new(), + &RecordBatchOptions::new().with_row_count(Some(indices.len())), + ) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build empty Mosaic RecordBatch: {e}"), + source: Some(Box::new(e)), + }); + } + + let columns = batch + .columns() + .iter() + .map(|column| { + arrow_select::take::take(column.as_ref(), indices, None).map_err(|e| { + Error::UnexpectedError { + message: format!("Failed to apply Mosaic row selection: {e}"), + source: Some(Box::new(e)), + } + }) + }) + .collect::>>()?; + + RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| Error::UnexpectedError { + message: format!("Failed to build Mosaic RecordBatch: {e}"), + source: Some(Box::new(e)), + }) +} + +fn ensure_schema(batch: RecordBatch, target_schema: &SchemaRef) -> crate::Result { + if batch.schema().as_ref() == target_schema.as_ref() { + return Ok(batch); + } + + if batch.num_columns() == 0 { + return RecordBatch::try_new_with_options( + target_schema.clone(), + Vec::new(), + &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), + ) + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to build empty Mosaic RecordBatch: {e}"), + source: Some(Box::new(e)), + }); + } + + RecordBatch::try_new(target_schema.clone(), batch.columns().to_vec()).map_err(|e| { + Error::UnexpectedError { + message: format!("Failed to align Mosaic RecordBatch schema: {e}"), + source: Some(Box::new(e)), + } + }) +} + +fn split_batch(batch: RecordBatch, batch_size: usize) -> Vec { + if batch_size == 0 || batch.num_rows() <= batch_size { + return vec![batch]; + } + + let mut batches = Vec::new(); + let mut offset = 0; + while offset < batch.num_rows() { + let len = batch_size.min(batch.num_rows() - offset); + batches.push(batch.slice(offset, len)); + offset += len; + } + batches +} + +fn mosaic_read_error(error: io::Error) -> Error { + Error::DataInvalid { + message: format!("Failed to read Mosaic file: {error}"), + source: Some(Box::new(error)), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::format::FormatFileReader; + use crate::spec::{ArrayType, DataType, IntType, RowType, VarCharType}; + use arrow_array::{Array, Int32Array, StringArray}; + use arrow_schema::{DataType as ArrowDataType, Field, Schema}; + use bytes::Bytes; + use futures::TryStreamExt; + use paimon_mosaic_core::spec::COMPRESSION_NONE; + use paimon_mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions}; + use std::ops::Range; + use std::sync::Arc; + + struct TestFileRead { + data: Bytes, + } + + #[async_trait] + impl FileRead for TestFileRead { + async fn read(&self, range: Range) -> crate::Result { + let start = usize::try_from(range.start).unwrap(); + let end = usize::try_from(range.end).unwrap(); + Ok(self.data.slice(start..end)) + } + } + + struct MemOutputFile { + data: Vec, + } + + impl MemOutputFile { + fn new() -> Self { + Self { data: Vec::new() } + } + } + + impl OutputFile for MemOutputFile { + fn write(&mut self, data: &[u8]) -> io::Result<()> { + self.data.extend_from_slice(data); + Ok(()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + + fn pos(&self) -> u64 { + self.data.len() as u64 + } + } + + fn data_fields() -> Vec { + vec![ + DataField::new( + 0, + "id".to_string(), + DataType::Int(IntType::with_nullable(false)), + ), + DataField::new( + 1, + "name".to_string(), + DataType::VarChar(VarCharType::with_nullable(true, 20).unwrap()), + ), + DataField::new( + 2, + "score".to_string(), + DataType::Int(IntType::with_nullable(true)), + ), + ] + } + + fn arrow_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("id", ArrowDataType::Int32, false), + Field::new("name", ArrowDataType::Utf8, true), + Field::new("score", ArrowDataType::Int32, true), + ])) + } + + fn sample_batch() -> RecordBatch { + RecordBatch::try_new( + arrow_schema(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])), + ], + ) + .unwrap() + } + + fn write_mosaic(batch: &RecordBatch) -> Bytes { + let out = MemOutputFile::new(); + let mut writer = MosaicWriter::new( + out, + batch.schema().as_ref(), + WriterOptions { + compression: COMPRESSION_NONE, + num_buckets: 2, + row_group_max_size: u64::MAX, + ..Default::default() + }, + ) + .unwrap(); + writer.write_batch(batch).unwrap(); + writer.close().unwrap(); + Bytes::from(writer.output().data.to_vec()) + } + + async fn read_batches( + data: Bytes, + read_fields: &[DataField], + row_selection: Option>, + ) -> crate::Result> { + let file_size = data.len() as u64; + MosaicFormatReader + .read_batch_stream( + Box::new(TestFileRead { data }), + file_size, + read_fields, + None, + None, + row_selection, + ) + .await? + .try_collect() + .await + } + + #[tokio::test] + async fn test_read_basic_mosaic_file() { + let data = write_mosaic(&sample_batch()); + let batches = read_batches(data, &data_fields(), None).await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 5); + assert_eq!(batches[0].schema().fields().len(), 3); + let ids = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), 1); + assert_eq!(ids.value(4), 5); + } + + #[tokio::test] + async fn test_read_projection_order() { + let fields = data_fields(); + let projected = vec![fields[2].clone(), fields[0].clone()]; + let data = write_mosaic(&sample_batch()); + let batches = read_batches(data, &projected, None).await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].schema().field(0).name(), "score"); + assert_eq!(batches[0].schema().field(1).name(), "id"); + let scores = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(scores.value(2), 30); + } + + #[tokio::test] + async fn test_read_empty_projection() { + let data = write_mosaic(&sample_batch()); + let batches = read_batches(data, &[], None).await.unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_columns(), 0); + assert_eq!(batches[0].num_rows(), 5); + } + + #[tokio::test] + async fn test_read_row_selection() { + let fields = data_fields(); + let data = write_mosaic(&sample_batch()); + let batches = read_batches( + data, + &fields, + Some(vec![RowRange::new(1, 2), RowRange::new(4, 4)]), + ) + .await + .unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 3); + let ids = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.values(), &[2, 3, 5]); + } + + #[tokio::test] + async fn test_unsupported_type_returns_error() { + let unsupported = vec![DataField::new( + 0, + "items".to_string(), + DataType::Array(ArrayType::new(DataType::Int(IntType::new()))), + )]; + let result = MosaicFormatReader + .read_batch_stream( + Box::new(TestFileRead { data: Bytes::new() }), + 0, + &unsupported, + None, + None, + None, + ) + .await; + let err = match result { + Ok(_) => panic!("expected unsupported Mosaic type error"), + Err(err) => err, + }; + + assert!( + matches!(err, Error::Unsupported { message } if message.contains("Mosaic format does not support column 'items'")) + ); + } + + #[test] + fn test_validate_row_type_as_unsupported() { + let unsupported = vec![DataField::new( + 0, + "nested".to_string(), + DataType::Row(RowType::new(vec![DataField::new( + 1, + "v".to_string(), + DataType::Int(IntType::new()), + )])), + )]; + let schema = build_target_arrow_schema(&unsupported).unwrap(); + let err = validate_mosaic_schema(&schema).unwrap_err(); + + assert!( + matches!(err, Error::Unsupported { message } if message.contains("Mosaic format does not support column 'nested'")) + ); + } +}