From 16432971c9d55db1f63fef31af3731ceb5135b61 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 18 Nov 2025 17:02:44 -0800 Subject: [PATCH 1/5] create table exploration --- crates/iceberg/src/arrow/id_assigner.rs | 357 ++++++++++++++++++ crates/iceberg/src/arrow/mod.rs | 1 + crates/iceberg/src/arrow/schema.rs | 26 +- crates/iceberg/src/arrow/value.rs | 4 +- crates/iceberg/src/lib.rs | 6 +- crates/integrations/datafusion/src/schema.rs | 153 ++++++-- .../testdata/schedules/df_test.toml | 8 + .../testdata/slts/df_test/create_table.slt | 44 +++ .../slts/df_test/partitioned_table.slt | 118 ++++++ 9 files changed, 686 insertions(+), 31 deletions(-) create mode 100644 crates/iceberg/src/arrow/id_assigner.rs create mode 100644 crates/sqllogictest/testdata/slts/df_test/create_table.slt create mode 100644 crates/sqllogictest/testdata/slts/df_test/partitioned_table.slt diff --git a/crates/iceberg/src/arrow/id_assigner.rs b/crates/iceberg/src/arrow/id_assigner.rs new file mode 100644 index 0000000000..99a57bb2c6 --- /dev/null +++ b/crates/iceberg/src/arrow/id_assigner.rs @@ -0,0 +1,357 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Arrow schema field ID assignment using breadth-first traversal + +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; + +use super::get_field_doc; +use crate::error::Result; +use crate::spec::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, +}; +use crate::{Error, ErrorKind}; + +/// Helper for assigning field IDs using breadth-first traversal. +/// +/// This struct implements BFS traversal to assign field IDs level-by-level, +/// similar to how `ReassignFieldIds` works in the spec module. All fields at +/// one level are assigned IDs before descending to nested fields. +pub(super) struct ArrowSchemaIdAssigner { + next_id: i32, +} + +impl ArrowSchemaIdAssigner { + pub(super) fn new(start_id: i32) -> Self { + Self { next_id: start_id } + } + + fn next_field_id(&mut self) -> i32 { + let id = self.next_id; + self.next_id += 1; + id + } + + pub(super) fn convert_schema(&mut self, schema: &ArrowSchema) -> Result { + let fields = self.convert_fields(schema.fields())?; + Schema::builder().with_fields(fields).build() + } + + fn convert_fields(&mut self, fields: &Fields) -> Result> { + // First pass: convert all fields at this level and assign IDs + let fields_with_types: Vec<_> = fields + .iter() + .map(|field| { + let id = self.next_field_id(); + let field_type = arrow_type_to_primitive_or_placeholder(field.data_type())?; + Ok((field, id, field_type)) + }) + .collect::>>()?; + + // Second pass: recursively process nested types + fields_with_types + .into_iter() + .map(|(field, id, field_type)| { + let final_type = self.process_nested_type(field.data_type(), field_type)?; + let doc = get_field_doc(field); + Ok(Arc::new(NestedField { + id, + doc, + name: field.name().clone(), + required: !field.is_nullable(), + field_type: Box::new(final_type), + initial_default: None, + write_default: None, + })) + }) + .collect() + } + + fn process_nested_type(&mut self, arrow_type: &DataType, placeholder: Type) -> Result { + match arrow_type { + DataType::Struct(fields) => { + let nested_fields = self.convert_fields(fields)?; + Ok(Type::Struct(StructType::new(nested_fields))) + } + DataType::List(element_field) + | DataType::LargeList(element_field) + | DataType::FixedSizeList(element_field, _) => { + let element_id = self.next_field_id(); + let element_type = + arrow_type_to_primitive_or_placeholder(element_field.data_type())?; + let final_element_type = + self.process_nested_type(element_field.data_type(), element_type)?; + + let doc = get_field_doc(element_field); + let mut element = NestedField::list_element( + element_id, + final_element_type, + !element_field.is_nullable(), + ); + if let Some(doc) = doc { + element = element.with_doc(doc); + } + Ok(Type::List(ListType { + element_field: Arc::new(element), + })) + } + DataType::Map(field, _) => match field.data_type() { + DataType::Struct(fields) if fields.len() == 2 => { + let key_field = &fields[0]; + let value_field = &fields[1]; + + let key_id = self.next_field_id(); + let key_type = arrow_type_to_primitive_or_placeholder(key_field.data_type())?; + let final_key_type = + self.process_nested_type(key_field.data_type(), key_type)?; + + let value_id = self.next_field_id(); + let value_type = + arrow_type_to_primitive_or_placeholder(value_field.data_type())?; + let final_value_type = + self.process_nested_type(value_field.data_type(), value_type)?; + + let key_doc = get_field_doc(key_field); + let mut key = NestedField::map_key_element(key_id, final_key_type); + if let Some(doc) = key_doc { + key = key.with_doc(doc); + } + + let value_doc = get_field_doc(value_field); + let mut value = NestedField::map_value_element( + value_id, + final_value_type, + !value_field.is_nullable(), + ); + if let Some(doc) = value_doc { + value = value.with_doc(doc); + } + + Ok(Type::Map(MapType { + key_field: Arc::new(key), + value_field: Arc::new(value), + })) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have struct type with 2 fields", + )), + }, + _ => Ok(placeholder), // Primitive type, return as-is + } + } +} + +/// Convert Arrow type to Iceberg type for primitives, or return a placeholder for complex types +fn arrow_type_to_primitive_or_placeholder(ty: &DataType) -> Result { + match ty { + DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)), + DataType::Int8 | DataType::Int16 | DataType::Int32 => { + Ok(Type::Primitive(PrimitiveType::Int)) + } + DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)), + DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)), + DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)), + DataType::UInt64 => Err(Error::new( + ErrorKind::DataInvalid, + "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.", + )), + DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)), + DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)), + DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to create decimal type".to_string(), + ) + .with_source(e) + }), + DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)), + DataType::Time64(unit) if unit == &TimeUnit::Microsecond => { + Ok(Type::Primitive(PrimitiveType::Time)) + } + DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => { + Ok(Type::Primitive(PrimitiveType::Timestamp)) + } + DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => { + Ok(Type::Primitive(PrimitiveType::TimestampNs)) + } + DataType::Timestamp(unit, Some(zone)) + if unit == &TimeUnit::Microsecond + && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") => + { + Ok(Type::Primitive(PrimitiveType::Timestamptz)) + } + DataType::Timestamp(unit, Some(zone)) + if unit == &TimeUnit::Nanosecond + && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") => + { + Ok(Type::Primitive(PrimitiveType::TimestamptzNs)) + } + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { + Ok(Type::Primitive(PrimitiveType::Binary)) + } + DataType::FixedSizeBinary(width) => { + Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64))) + } + DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => { + Ok(Type::Primitive(PrimitiveType::String)) + } + // For complex types, return a placeholder that will be replaced + DataType::Struct(_) + | DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + | DataType::Map(_, _) => { + Ok(Type::Primitive(PrimitiveType::Boolean)) // Placeholder + } + other => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported Arrow data type: {other}"), + )), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::DEFAULT_MAP_FIELD_NAME; + + #[test] + fn test_arrow_schema_to_schema_with_assigned_ids() { + // Create an Arrow schema without field IDs (like DataFusion CREATE TABLE) + // Include nested structures to test ID assignment in BFS order + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + // Struct field with nested fields + Field::new( + "address", + DataType::Struct(Fields::from(vec![ + Field::new("street", DataType::Utf8, false), + Field::new("city", DataType::Utf8, false), + Field::new("zip", DataType::Int32, true), + ])), + true, + ), + // List field + Field::new( + "tags", + DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), + true, + ), + // Map field + Field::new( + "properties", + DataType::Map( + Arc::new(Field::new( + DEFAULT_MAP_FIELD_NAME, + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, true), + ])), + false, + )), + false, + ), + false, + ), + Field::new("value", DataType::Float64, false), + ]); + + // Convert to Iceberg schema with auto-assigned IDs + let mut assigner = ArrowSchemaIdAssigner::new(1); + let iceberg_schema = assigner.convert_schema(&arrow_schema).unwrap(); + + // Verify the schema structure + let fields = iceberg_schema.as_struct().fields(); + assert_eq!(fields.len(), 6); + + // BFS ordering: top-level fields get IDs 1-6, then nested fields get IDs 7+ + + // Check field 1: id + assert_eq!(fields[0].id, 1); + assert_eq!(fields[0].name, "id"); + assert!(fields[0].required); + assert!(matches!( + fields[0].field_type.as_ref(), + Type::Primitive(PrimitiveType::Int) + )); + + // Check field 2: name + assert_eq!(fields[1].id, 2); + assert_eq!(fields[1].name, "name"); + assert!(!fields[1].required); + assert!(matches!( + fields[1].field_type.as_ref(), + Type::Primitive(PrimitiveType::String) + )); + + // Check field 3: address (struct with nested fields) + assert_eq!(fields[2].id, 3); + assert_eq!(fields[2].name, "address"); + assert!(!fields[2].required); + if let Type::Struct(struct_type) = fields[2].field_type.as_ref() { + let nested_fields = struct_type.fields(); + assert_eq!(nested_fields.len(), 3); + // Nested field IDs are assigned after all top-level fields (7, 8, 9) + assert_eq!(nested_fields[0].id, 7); + assert_eq!(nested_fields[0].name, "street"); + assert_eq!(nested_fields[1].id, 8); + assert_eq!(nested_fields[1].name, "city"); + assert_eq!(nested_fields[2].id, 9); + assert_eq!(nested_fields[2].name, "zip"); + } else { + panic!("Expected struct type for address field"); + } + + // Check field 4: tags (list) + assert_eq!(fields[3].id, 4); + assert_eq!(fields[3].name, "tags"); + assert!(!fields[3].required); + if let Type::List(list_type) = fields[3].field_type.as_ref() { + // List element ID is assigned after top-level fields + assert_eq!(list_type.element_field.id, 10); + assert!(list_type.element_field.required); + } else { + panic!("Expected list type for tags field"); + } + + // Check field 5: properties (map) + assert_eq!(fields[4].id, 5); + assert_eq!(fields[4].name, "properties"); + assert!(fields[4].required); + if let Type::Map(map_type) = fields[4].field_type.as_ref() { + // Map key and value IDs are assigned after top-level fields + assert_eq!(map_type.key_field.id, 11); + assert_eq!(map_type.value_field.id, 12); + assert!(!map_type.value_field.required); + } else { + panic!("Expected map type for properties field"); + } + + // Check field 6: value + assert_eq!(fields[5].id, 6); + assert_eq!(fields[5].name, "value"); + assert!(fields[5].required); + assert!(matches!( + fields[5].field_type.as_ref(), + Type::Primitive(PrimitiveType::Double) + )); + } +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index c091c45177..545454887e 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -17,6 +17,7 @@ //! Conversion between Iceberg and Arrow schema +mod id_assigner; mod schema; pub use schema::*; diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 4f4f083c73..b08e22549e 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -33,6 +33,7 @@ use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; use uuid::Uuid; +use super::id_assigner::ArrowSchemaIdAssigner; use crate::error::Result; use crate::spec::{ Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema, @@ -221,6 +222,17 @@ pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result { visit_schema(schema, &mut visitor) } +/// Convert Arrow schema to Iceberg schema with auto-assigned field IDs. +/// +/// This function is useful when converting Arrow schemas that don't have field IDs +/// in their metadata (e.g., from DataFusion CREATE TABLE statements). Field IDs +/// are assigned sequentially starting from 1, using breadth-first traversal to assign +/// IDs level by level (all fields at one level before descending to nested fields). +pub fn arrow_schema_to_schema_with_assigned_ids(schema: &ArrowSchema) -> Result { + let mut assigner = ArrowSchemaIdAssigner::new(1); + assigner.convert_schema(schema) +} + /// Convert Arrow type to iceberg type. pub fn arrow_type_to_type(ty: &DataType) -> Result { let mut visitor = ArrowSchemaConverter::new(); @@ -229,7 +241,7 @@ pub fn arrow_type_to_type(ty: &DataType) -> Result { const ARROW_FIELD_DOC_KEY: &str = "doc"; -pub(super) fn get_field_id(field: &Field) -> Result { +pub(super) fn get_field_id_from_metadata(field: &Field) -> Result { if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) { return value.parse::().map_err(|e| { Error::new( @@ -246,7 +258,7 @@ pub(super) fn get_field_id(field: &Field) -> Result { )) } -fn get_field_doc(field: &Field) -> Option { +pub(super) fn get_field_doc(field: &Field) -> Option { if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) { return Some(value.clone()); } @@ -257,7 +269,7 @@ struct ArrowSchemaConverter; impl ArrowSchemaConverter { fn new() -> Self { - Self {} + Self } fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result> { @@ -265,7 +277,7 @@ impl ArrowSchemaConverter { for i in 0..fields.len() { let field = &fields[i]; let field_type = &field_results[i]; - let id = get_field_id(field)?; + let id = get_field_id_from_metadata(field)?; let doc = get_field_doc(field); let nested_field = NestedField { id, @@ -310,7 +322,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { } }; - let id = get_field_id(element_field)?; + let id = get_field_id_from_metadata(element_field)?; let doc = get_field_doc(element_field); let mut element_field = NestedField::list_element(id, value.clone(), !element_field.is_nullable()); @@ -335,7 +347,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { let key_field = &fields[0]; let value_field = &fields[1]; - let key_id = get_field_id(key_field)?; + let key_id = get_field_id_from_metadata(key_field)?; let key_doc = get_field_doc(key_field); let mut key_field = NestedField::map_key_element(key_id, key_value.clone()); if let Some(doc) = key_doc { @@ -343,7 +355,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { } let key_field = Arc::new(key_field); - let value_id = get_field_id(value_field)?; + let value_id = get_field_id_from_metadata(value_field)?; let value_doc = get_field_doc(value_field); let mut value_field = NestedField::map_value_element( value_id, diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index bc123d99e8..2f53f25c21 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -27,7 +27,7 @@ use arrow_buffer::NullBuffer; use arrow_schema::{DataType, FieldRef}; use uuid::Uuid; -use super::get_field_id; +use super::get_field_id_from_metadata; use crate::spec::{ ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType, SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner, @@ -450,7 +450,7 @@ impl FieldMatchMode { /// Determines if an Arrow field matches an Iceberg field based on the matching mode. pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool { match self { - FieldMatchMode::Id => get_field_id(arrow_field) + FieldMatchMode::Id => get_field_id_from_metadata(arrow_field) .map(|id| id == iceberg_field.id) .unwrap_or(false), FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name, diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 8d8f40f72d..62afc51c0f 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -87,7 +87,11 @@ pub mod expr; pub mod transaction; pub mod transform; -mod runtime; +/// Runtime abstraction for async operations. +/// +/// Provides runtime-agnostic functions for spawning tasks and blocking operations, +/// supporting both tokio and smol runtimes. +pub mod runtime; pub mod arrow; pub(crate) mod delete_file_index; diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 31bbdbd67f..74ac6d1eb1 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -24,8 +24,12 @@ use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result as DFResult}; use futures::future::try_join_all; +use iceberg::arrow::arrow_type_to_type; use iceberg::inspect::MetadataTableType; -use iceberg::{Catalog, NamespaceIdent, Result}; +use iceberg::runtime::spawn_blocking; +use iceberg::spec::{NestedField, Schema as IcebergSchema}; +use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; +use tokio::sync::RwLock; use crate::table::IcebergTableProvider; use crate::to_datafusion_error; @@ -34,13 +38,42 @@ use crate::to_datafusion_error; /// access to table providers within a specific namespace. #[derive(Debug)] pub(crate) struct IcebergSchemaProvider { + /// Reference to the Iceberg catalog + catalog: Arc, + /// The namespace this schema represents + namespace: NamespaceIdent, /// A `HashMap` where keys are table names /// and values are dynamic references to objects implementing the /// [`TableProvider`] trait. - tables: HashMap>, + tables: Arc>>>, } impl IcebergSchemaProvider { + /// Convert an Arrow schema to an Iceberg schema, assigning field IDs automatically. + /// + /// This is needed because DataFusion's CREATE TABLE doesn't include field IDs in the + /// Arrow schema metadata, but Iceberg requires them. We assign sequential IDs starting from 1. + fn arrow_schema_to_iceberg_schema( + arrow_schema: &datafusion::arrow::datatypes::Schema, + ) -> Result { + let mut field_id = 1; + let mut fields = Vec::new(); + + for field in arrow_schema.fields() { + // Use iceberg's arrow_type_to_type for conversion + let iceberg_type = arrow_type_to_type(field.data_type())?; + let nested_field = if field.is_nullable() { + NestedField::optional(field_id, field.name(), iceberg_type) + } else { + NestedField::required(field_id, field.name(), iceberg_type) + }; + fields.push(nested_field.into()); + field_id += 1; + } + + IcebergSchema::builder().with_fields(fields).build() + } + /// Asynchronously tries to construct a new [`IcebergSchemaProvider`] /// using the given client to fetch and initialize table providers for /// the provided namespace in the Iceberg [`Catalog`]. @@ -77,7 +110,11 @@ impl IcebergSchemaProvider { .map(|(name, provider)| (name, Arc::new(provider))) .collect(); - Ok(IcebergSchemaProvider { tables }) + Ok(IcebergSchemaProvider { + catalog: client, + namespace, + tables: Arc::new(RwLock::new(tables)), + }) } } @@ -88,32 +125,45 @@ impl SchemaProvider for IcebergSchemaProvider { } fn table_names(&self) -> Vec { - self.tables - .keys() - .flat_map(|table_name| { - [table_name.clone()] - .into_iter() - .chain(MetadataTableType::all_types().map(|metadata_table_name| { - format!("{}${}", table_name.clone(), metadata_table_name.as_str()) - })) - }) - .collect() + // Try to get a read lock without blocking + // If we can't get it immediately, return empty list + // This is a limitation of the sync API + match self.tables.try_read() { + Ok(tables) => tables + .keys() + .flat_map(|table_name| { + [table_name.clone()] + .into_iter() + .chain(MetadataTableType::all_types().map(|metadata_table_name| { + format!("{}${}", table_name.clone(), metadata_table_name.as_str()) + })) + }) + .collect(), + Err(_) => Vec::new(), + } } fn table_exist(&self, name: &str) -> bool { - if let Some((table_name, metadata_table_name)) = name.split_once('$') { - self.tables.contains_key(table_name) - && MetadataTableType::try_from(metadata_table_name).is_ok() - } else { - self.tables.contains_key(name) + // Try to get a read lock without blocking + match self.tables.try_read() { + Ok(tables) => { + if let Some((table_name, metadata_table_name)) = name.split_once('$') { + tables.contains_key(table_name) + && MetadataTableType::try_from(metadata_table_name).is_ok() + } else { + tables.contains_key(name) + } + } + Err(_) => false, } } async fn table(&self, name: &str) -> DFResult>> { + let tables = self.tables.read().await; if let Some((table_name, metadata_table_name)) = name.split_once('$') { let metadata_table_type = MetadataTableType::try_from(metadata_table_name).map_err(DataFusionError::Plan)?; - if let Some(table) = self.tables.get(table_name) { + if let Some(table) = tables.get(table_name) { let metadata_table = table .metadata_table(metadata_table_type) .await @@ -124,10 +174,71 @@ impl SchemaProvider for IcebergSchemaProvider { } } - Ok(self - .tables + Ok(tables .get(name) .cloned() .map(|t| t as Arc)) } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> DFResult>> { + // Convert DataFusion schema to Iceberg schema + // DataFusion schemas don't have field IDs, so we need to build the schema manually + let df_schema = table.schema(); + let iceberg_schema = Self::arrow_schema_to_iceberg_schema(df_schema.as_ref()) + .map_err(to_datafusion_error)?; + + // Create the table in the Iceberg catalog + let table_creation = TableCreation::builder() + .name(name.clone()) + .schema(iceberg_schema) + .build(); + + let catalog = self.catalog.clone(); + let namespace = self.namespace.clone(); + let tables = self.tables.clone(); + let name_clone = name.clone(); + + // Use iceberg's spawn_blocking to handle the async work on a blocking thread pool + // This avoids the "cannot block within async runtime" error and works with both + // tokio and smol runtimes + let result = spawn_blocking(move || { + // Create a new runtime handle to execute the async work + let rt = tokio::runtime::Handle::current(); + rt.block_on(async move { + catalog + .create_table(&namespace, table_creation) + .await + .map_err(to_datafusion_error)?; + + // Create a new table provider using the catalog reference + let table_provider = IcebergTableProvider::try_new( + catalog.clone(), + namespace.clone(), + name_clone.clone(), + ) + .await + .map_err(to_datafusion_error)?; + + // Store the new table provider + let mut tables_guard = tables.write().await; + let old_table = tables_guard.insert(name_clone, Arc::new(table_provider)); + + Ok(old_table.map(|t| t as Arc)) + }) + }); + + // Block on the spawned task to get the result + // This is safe because spawn_blocking moves the blocking to a dedicated thread pool + futures::executor::block_on(result) + } + + fn deregister_table(&self, _name: &str) -> DFResult>> { + Err(DataFusionError::NotImplemented( + "deregister_table is not supported".to_string(), + )) + } } diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml b/crates/sqllogictest/testdata/schedules/df_test.toml index df5e638d5a..9d50902d9d 100644 --- a/crates/sqllogictest/testdata/schedules/df_test.toml +++ b/crates/sqllogictest/testdata/schedules/df_test.toml @@ -22,6 +22,14 @@ df = { type = "datafusion" } engine = "df" slt = "df_test/show_tables.slt" +[[steps]] +engine = "df" +slt = "df_test/create_table.slt" + [[steps]] engine = "df" slt = "df_test/insert_into.slt" + +[[steps]] +engine = "df" +slt = "df_test/partitioned_table.slt" diff --git a/crates/sqllogictest/testdata/slts/df_test/create_table.slt b/crates/sqllogictest/testdata/slts/df_test/create_table.slt new file mode 100644 index 0000000000..2ab64081f5 --- /dev/null +++ b/crates/sqllogictest/testdata/slts/df_test/create_table.slt @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test CREATE TABLE with simple schema +statement ok +CREATE TABLE default.default.test_create_simple ( + id INT, + name STRING +) + +# Verify table was created and is empty +query IT rowsort +SELECT * FROM default.default.test_create_simple +---- + +# Insert data to verify table works +query I +INSERT INTO default.default.test_create_simple VALUES (1, 'test') +---- +1 + +# Verify the insert worked +query IT rowsort +SELECT * FROM default.default.test_create_simple +---- +1 test + +# Note: CREATE TABLE with PARTITIONED BY is not supported in DataFusion SQL syntax +# Partitioned tables must be created using the Iceberg catalog API directly +# See crates/sqllogictest/src/engine/datafusion.rs for examples diff --git a/crates/sqllogictest/testdata/slts/df_test/partitioned_table.slt b/crates/sqllogictest/testdata/slts/df_test/partitioned_table.slt new file mode 100644 index 0000000000..6e240c327e --- /dev/null +++ b/crates/sqllogictest/testdata/slts/df_test/partitioned_table.slt @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test partitioned table functionality +# The test_partitioned_table is created programmatically in datafusion.rs +# with partitioning on the 'category' column +# +# Note: This test runs after insert_into.slt, so the table already has data (ids 1-6) + +# Verify the partitioned table has existing data from previous test +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table +---- +1 electronics laptop +2 electronics phone +3 books novel +4 books textbook +5 clothing shirt +6 electronics NULL + +# Test partition pruning - filter by partition column +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table WHERE category = 'books' +---- +3 books novel +4 books textbook + +# Test partition pruning with another partition value +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table WHERE category = 'electronics' +---- +1 electronics laptop +2 electronics phone +6 electronics NULL + +# Test filtering on non-partition column +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table WHERE value = 'laptop' +---- +1 electronics laptop + +# Test combined filter (partition + non-partition column) +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table WHERE category = 'books' AND value = 'novel' +---- +3 books novel + +# Test aggregation with partition column +query TI rowsort +SELECT category, COUNT(*) as count FROM default.default.test_partitioned_table GROUP BY category +---- +books 2 +clothing 1 +electronics 3 + +# Insert batch data into multiple partitions (starting from id 7 since 6 exists) +query I +INSERT INTO default.default.test_partitioned_table VALUES + (7, 'electronics', 'tablet'), + (8, 'books', 'magazine'), + (9, 'clothing', 'pants') +---- +3 + +# Verify batch insert worked +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table WHERE id > 6 +---- +7 electronics tablet +8 books magazine +9 clothing pants + +# Test NULL handling - verify existing NULL +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table WHERE value IS NULL +---- +6 electronics NULL + +# Insert another NULL value +query I +INSERT INTO default.default.test_partitioned_table VALUES (10, 'books', NULL) +---- +1 + +# Verify both NULL values +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table WHERE value IS NULL +---- +10 books NULL +6 electronics NULL + +# Final count verification +query I +SELECT COUNT(*) FROM default.default.test_partitioned_table +---- +10 + +# Verify final aggregation by partition +query TI rowsort +SELECT category, COUNT(*) as count FROM default.default.test_partitioned_table GROUP BY category ORDER BY category +---- +books 4 +clothing 2 +electronics 4 From 36bb219a865ca70b65b95a0cae0751e6703ba1d0 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 9 Dec 2025 15:13:08 -0800 Subject: [PATCH 2/5] clean up --- Cargo.lock | 1 + Cargo.toml | 1 + crates/iceberg/src/arrow/id_assigner.rs | 4 +- crates/iceberg/src/arrow/schema.rs | 14 +-- crates/iceberg/src/arrow/value.rs | 4 +- crates/iceberg/src/lib.rs | 6 +- crates/integrations/datafusion/Cargo.toml | 1 + crates/integrations/datafusion/src/schema.rs | 116 ++++++------------- 8 files changed, 54 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7c103d279..24139b5967 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3565,6 +3565,7 @@ version = "0.7.0" dependencies = [ "anyhow", "async-trait", + "dashmap", "datafusion", "expect-test", "futures", diff --git a/Cargo.toml b/Cargo.toml index 9904820dea..9d77d1799d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ fs-err = "3.1.0" futures = "0.3" hive_metastore = "0.2.0" home = "=0.5.11" +dashmap = "6" http = "1.2" iceberg = { version = "0.7.0", path = "./crates/iceberg" } iceberg-catalog-glue = { version = "0.7.0", path = "./crates/catalog/glue" } diff --git a/crates/iceberg/src/arrow/id_assigner.rs b/crates/iceberg/src/arrow/id_assigner.rs index 99a57bb2c6..02dacb089d 100644 --- a/crates/iceberg/src/arrow/id_assigner.rs +++ b/crates/iceberg/src/arrow/id_assigner.rs @@ -19,7 +19,7 @@ use std::sync::Arc; -use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; +use arrow_schema::{DataType, Fields, Schema as ArrowSchema, TimeUnit}; use super::get_field_doc; use crate::error::Result; @@ -229,6 +229,8 @@ fn arrow_type_to_primitive_or_placeholder(ty: &DataType) -> Result { #[cfg(test)] mod tests { + use arrow_schema::Field; + use super::*; use crate::arrow::DEFAULT_MAP_FIELD_NAME; diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index b08e22549e..fd0081ca67 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -241,7 +241,7 @@ pub fn arrow_type_to_type(ty: &DataType) -> Result { const ARROW_FIELD_DOC_KEY: &str = "doc"; -pub(super) fn get_field_id_from_metadata(field: &Field) -> Result { +pub(super) fn get_field_id(field: &Field) -> Result { if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) { return value.parse::().map_err(|e| { Error::new( @@ -258,7 +258,7 @@ pub(super) fn get_field_id_from_metadata(field: &Field) -> Result { )) } -pub(super) fn get_field_doc(field: &Field) -> Option { +fn get_field_doc(field: &Field) -> Option { if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) { return Some(value.clone()); } @@ -269,7 +269,7 @@ struct ArrowSchemaConverter; impl ArrowSchemaConverter { fn new() -> Self { - Self + Self {} } fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result> { @@ -277,7 +277,7 @@ impl ArrowSchemaConverter { for i in 0..fields.len() { let field = &fields[i]; let field_type = &field_results[i]; - let id = get_field_id_from_metadata(field)?; + let id = get_field_id(field)?; let doc = get_field_doc(field); let nested_field = NestedField { id, @@ -322,7 +322,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { } }; - let id = get_field_id_from_metadata(element_field)?; + let id = get_field_id(element_field)?; let doc = get_field_doc(element_field); let mut element_field = NestedField::list_element(id, value.clone(), !element_field.is_nullable()); @@ -347,7 +347,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { let key_field = &fields[0]; let value_field = &fields[1]; - let key_id = get_field_id_from_metadata(key_field)?; + let key_id = get_field_id(key_field)?; let key_doc = get_field_doc(key_field); let mut key_field = NestedField::map_key_element(key_id, key_value.clone()); if let Some(doc) = key_doc { @@ -355,7 +355,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { } let key_field = Arc::new(key_field); - let value_id = get_field_id_from_metadata(value_field)?; + let value_id = get_field_id(value_field)?; let value_doc = get_field_doc(value_field); let mut value_field = NestedField::map_value_element( value_id, diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 2f53f25c21..bc123d99e8 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -27,7 +27,7 @@ use arrow_buffer::NullBuffer; use arrow_schema::{DataType, FieldRef}; use uuid::Uuid; -use super::get_field_id_from_metadata; +use super::get_field_id; use crate::spec::{ ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType, SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner, @@ -450,7 +450,7 @@ impl FieldMatchMode { /// Determines if an Arrow field matches an Iceberg field based on the matching mode. pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool { match self { - FieldMatchMode::Id => get_field_id_from_metadata(arrow_field) + FieldMatchMode::Id => get_field_id(arrow_field) .map(|id| id == iceberg_field.id) .unwrap_or(false), FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name, diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 62afc51c0f..8d8f40f72d 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -87,11 +87,7 @@ pub mod expr; pub mod transaction; pub mod transform; -/// Runtime abstraction for async operations. -/// -/// Provides runtime-agnostic functions for spawning tasks and blocking operations, -/// supporting both tokio and smol runtimes. -pub mod runtime; +mod runtime; pub mod arrow; pub(crate) mod delete_file_index; diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 0ee1738b4f..fd3e489e4b 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -31,6 +31,7 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +dashmap = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 74ac6d1eb1..6aee71d1ac 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -16,20 +16,17 @@ // under the License. use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use dashmap::DashMap; use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result as DFResult}; use futures::future::try_join_all; -use iceberg::arrow::arrow_type_to_type; +use iceberg::arrow::arrow_schema_to_schema_with_assigned_ids; use iceberg::inspect::MetadataTableType; -use iceberg::runtime::spawn_blocking; -use iceberg::spec::{NestedField, Schema as IcebergSchema}; use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; -use tokio::sync::RwLock; use crate::table::IcebergTableProvider; use crate::to_datafusion_error; @@ -42,38 +39,13 @@ pub(crate) struct IcebergSchemaProvider { catalog: Arc, /// The namespace this schema represents namespace: NamespaceIdent, - /// A `HashMap` where keys are table names + /// A concurrent map where keys are table names /// and values are dynamic references to objects implementing the /// [`TableProvider`] trait. - tables: Arc>>>, + tables: Arc>>, } impl IcebergSchemaProvider { - /// Convert an Arrow schema to an Iceberg schema, assigning field IDs automatically. - /// - /// This is needed because DataFusion's CREATE TABLE doesn't include field IDs in the - /// Arrow schema metadata, but Iceberg requires them. We assign sequential IDs starting from 1. - fn arrow_schema_to_iceberg_schema( - arrow_schema: &datafusion::arrow::datatypes::Schema, - ) -> Result { - let mut field_id = 1; - let mut fields = Vec::new(); - - for field in arrow_schema.fields() { - // Use iceberg's arrow_type_to_type for conversion - let iceberg_type = arrow_type_to_type(field.data_type())?; - let nested_field = if field.is_nullable() { - NestedField::optional(field_id, field.name(), iceberg_type) - } else { - NestedField::required(field_id, field.name(), iceberg_type) - }; - fields.push(nested_field.into()); - field_id += 1; - } - - IcebergSchema::builder().with_fields(fields).build() - } - /// Asynchronously tries to construct a new [`IcebergSchemaProvider`] /// using the given client to fetch and initialize table providers for /// the provided namespace in the Iceberg [`Catalog`]. @@ -104,16 +76,15 @@ impl IcebergSchemaProvider { ) .await?; - let tables: HashMap> = table_names - .into_iter() - .zip(providers.into_iter()) - .map(|(name, provider)| (name, Arc::new(provider))) - .collect(); + let tables = DashMap::new(); + for (name, provider) in table_names.into_iter().zip(providers.into_iter()) { + tables.insert(name, Arc::new(provider)); + } Ok(IcebergSchemaProvider { catalog: client, namespace, - tables: Arc::new(RwLock::new(tables)), + tables: Arc::new(tables), }) } } @@ -125,45 +96,35 @@ impl SchemaProvider for IcebergSchemaProvider { } fn table_names(&self) -> Vec { - // Try to get a read lock without blocking - // If we can't get it immediately, return empty list - // This is a limitation of the sync API - match self.tables.try_read() { - Ok(tables) => tables - .keys() - .flat_map(|table_name| { - [table_name.clone()] - .into_iter() - .chain(MetadataTableType::all_types().map(|metadata_table_name| { - format!("{}${}", table_name.clone(), metadata_table_name.as_str()) - })) - }) - .collect(), - Err(_) => Vec::new(), - } + self.tables + .iter() + .flat_map(|entry| { + let table_name = entry.key().clone(); + [table_name.clone()] + .into_iter() + .chain( + MetadataTableType::all_types().map(move |metadata_table_name| { + format!("{}${}", table_name, metadata_table_name.as_str()) + }), + ) + }) + .collect() } fn table_exist(&self, name: &str) -> bool { - // Try to get a read lock without blocking - match self.tables.try_read() { - Ok(tables) => { - if let Some((table_name, metadata_table_name)) = name.split_once('$') { - tables.contains_key(table_name) - && MetadataTableType::try_from(metadata_table_name).is_ok() - } else { - tables.contains_key(name) - } - } - Err(_) => false, + if let Some((table_name, metadata_table_name)) = name.split_once('$') { + self.tables.contains_key(table_name) + && MetadataTableType::try_from(metadata_table_name).is_ok() + } else { + self.tables.contains_key(name) } } async fn table(&self, name: &str) -> DFResult>> { - let tables = self.tables.read().await; if let Some((table_name, metadata_table_name)) = name.split_once('$') { let metadata_table_type = MetadataTableType::try_from(metadata_table_name).map_err(DataFusionError::Plan)?; - if let Some(table) = tables.get(table_name) { + if let Some(table) = self.tables.get(table_name) { let metadata_table = table .metadata_table(metadata_table_type) .await @@ -174,10 +135,10 @@ impl SchemaProvider for IcebergSchemaProvider { } } - Ok(tables + Ok(self + .tables .get(name) - .cloned() - .map(|t| t as Arc)) + .map(|entry| entry.value().clone() as Arc)) } fn register_table( @@ -186,9 +147,9 @@ impl SchemaProvider for IcebergSchemaProvider { table: Arc, ) -> DFResult>> { // Convert DataFusion schema to Iceberg schema - // DataFusion schemas don't have field IDs, so we need to build the schema manually + // DataFusion schemas don't have field IDs, so we use the function that assigns them automatically let df_schema = table.schema(); - let iceberg_schema = Self::arrow_schema_to_iceberg_schema(df_schema.as_ref()) + let iceberg_schema = arrow_schema_to_schema_with_assigned_ids(df_schema.as_ref()) .map_err(to_datafusion_error)?; // Create the table in the Iceberg catalog @@ -202,10 +163,9 @@ impl SchemaProvider for IcebergSchemaProvider { let tables = self.tables.clone(); let name_clone = name.clone(); - // Use iceberg's spawn_blocking to handle the async work on a blocking thread pool - // This avoids the "cannot block within async runtime" error and works with both - // tokio and smol runtimes - let result = spawn_blocking(move || { + // Use tokio's spawn_blocking to handle the async work on a blocking thread pool + // This avoids the "cannot block within async runtime" error + let result = tokio::task::spawn_blocking(move || { // Create a new runtime handle to execute the async work let rt = tokio::runtime::Handle::current(); rt.block_on(async move { @@ -224,8 +184,7 @@ impl SchemaProvider for IcebergSchemaProvider { .map_err(to_datafusion_error)?; // Store the new table provider - let mut tables_guard = tables.write().await; - let old_table = tables_guard.insert(name_clone, Arc::new(table_provider)); + let old_table = tables.insert(name_clone, Arc::new(table_provider)); Ok(old_table.map(|t| t as Arc)) }) @@ -234,6 +193,7 @@ impl SchemaProvider for IcebergSchemaProvider { // Block on the spawned task to get the result // This is safe because spawn_blocking moves the blocking to a dedicated thread pool futures::executor::block_on(result) + .map_err(|e| DataFusionError::Execution(format!("Failed to create Iceberg table: {}", e)))? } fn deregister_table(&self, _name: &str) -> DFResult>> { From e612910d02232218a4de43af08032c5558e38803 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 9 Dec 2025 16:10:19 -0800 Subject: [PATCH 3/5] update slt for create table --- crates/iceberg/src/arrow/schema.rs | 2 +- crates/sqllogictest/src/engine/datafusion.rs | 35 +----- .../testdata/schedules/df_test.toml | 13 +- .../testdata/slts/df_test/insert_into.slt | 49 +++++--- .../slts/df_test/partitioned_table.slt | 118 ------------------ .../testdata/slts/df_test/show_tables.slt | 9 +- 6 files changed, 49 insertions(+), 177 deletions(-) delete mode 100644 crates/sqllogictest/testdata/slts/df_test/partitioned_table.slt diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index fd0081ca67..ec97f1e846 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -258,7 +258,7 @@ pub(super) fn get_field_id(field: &Field) -> Result { )) } -fn get_field_doc(field: &Field) -> Option { +pub(super) fn get_field_doc(field: &Field) -> Option { if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) { return Some(value.clone()); } diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index e3402dfa97..0288a98ebe 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -85,12 +85,12 @@ impl DataFusionEngine { ) .await?; - // Create a test namespace for INSERT INTO tests + // Create a test namespace let namespace = NamespaceIdent::new("default".to_string()); catalog.create_namespace(&namespace, HashMap::new()).await?; - // Create test tables - Self::create_unpartitioned_table(&catalog, &namespace).await?; + // Create partitioned table programmatically (can't be created via SQL yet) + // This table is automatically registered to DataFusion via IcebergCatalogProvider Self::create_partitioned_table(&catalog, &namespace).await?; Ok(Arc::new( @@ -98,35 +98,10 @@ impl DataFusionEngine { )) } - /// Create an unpartitioned test table with id and name columns - /// TODO: this can be removed when we support CREATE TABLE - async fn create_unpartitioned_table( - catalog: &impl Catalog, - namespace: &NamespaceIdent, - ) -> anyhow::Result<()> { - let schema = Schema::builder() - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), - ]) - .build()?; - - catalog - .create_table( - namespace, - TableCreation::builder() - .name("test_unpartitioned_table".to_string()) - .schema(schema) - .build(), - ) - .await?; - - Ok(()) - } - /// Create a partitioned test table with id, category, and value columns /// Partitioned by category using identity transform - /// TODO: this can be removed when we support CREATE TABLE + /// Note: Partitioned tables can't be created via SQL yet, so we create them programmatically. + /// The table is automatically registered to DataFusion via IcebergCatalogProvider. async fn create_partitioned_table( catalog: &impl Catalog, namespace: &NamespaceIdent, diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml b/crates/sqllogictest/testdata/schedules/df_test.toml index 9d50902d9d..3375fbb5c1 100644 --- a/crates/sqllogictest/testdata/schedules/df_test.toml +++ b/crates/sqllogictest/testdata/schedules/df_test.toml @@ -18,18 +18,21 @@ [engines] df = { type = "datafusion" } +# Step 1: Create tables first [[steps]] engine = "df" -slt = "df_test/show_tables.slt" +slt = "df_test/create_table.slt" +# Step 2: Verify tables exist [[steps]] engine = "df" -slt = "df_test/create_table.slt" +slt = "df_test/show_tables.slt" +# Step 3: Insert data and verify (covers both unpartitioned and partitioned tables) [[steps]] engine = "df" slt = "df_test/insert_into.slt" -[[steps]] -engine = "df" -slt = "df_test/partitioned_table.slt" +# Note: CREATE EXTERNAL TABLE tests are in: +# crates/integrations/datafusion/src/table/table_provider_factory.rs +# They require absolute paths which sqllogictest doesn't support variable substitution for. diff --git a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt index 2ba33afcd1..124ab60458 100644 --- a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt +++ b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt @@ -15,53 +15,62 @@ # specific language governing permissions and limitations # under the License. -# Verify the table is initially empty -query IT rowsort -SELECT * FROM default.default.test_unpartitioned_table ----- +# ============================================ +# Test INSERT INTO for unpartitioned table +# Uses test_create_simple created by create_table.slt +# ============================================ -# Insert a single row and verify the count +# Note: test_create_simple already has (1, 'test') from create_table.slt +# Insert more rows query I -INSERT INTO default.default.test_unpartitioned_table VALUES (1, 'Alice') +INSERT INTO default.default.test_create_simple VALUES (2, 'Alice') ---- 1 # Verify the inserted row query IT rowsort -SELECT * FROM default.default.test_unpartitioned_table +SELECT * FROM default.default.test_create_simple ---- -1 Alice +1 test +2 Alice # Insert multiple rows and verify the count query I -INSERT INTO default.default.test_unpartitioned_table VALUES (2, 'Bob'), (3, 'Charlie') +INSERT INTO default.default.test_create_simple VALUES (3, 'Bob'), (4, 'Charlie') ---- 2 # Verify all rows query IT rowsort -SELECT * FROM default.default.test_unpartitioned_table +SELECT * FROM default.default.test_create_simple ---- -1 Alice -2 Bob -3 Charlie +1 test +2 Alice +3 Bob +4 Charlie # Insert with NULL value and verify the count query I -INSERT INTO default.default.test_unpartitioned_table VALUES (4, NULL) +INSERT INTO default.default.test_create_simple VALUES (5, NULL) ---- 1 # Verify NULL handling query IT rowsort -SELECT * FROM default.default.test_unpartitioned_table +SELECT * FROM default.default.test_create_simple ---- -1 Alice -2 Bob -3 Charlie -4 NULL +1 test +2 Alice +3 Bob +4 Charlie +5 NULL + +# ============================================ +# Test INSERT INTO for partitioned table +# Uses test_partitioned_table +# ============================================ -# Test partitioned table - verify initially empty +# Verify partitioned table is initially empty query ITT rowsort SELECT * FROM default.default.test_partitioned_table ---- diff --git a/crates/sqllogictest/testdata/slts/df_test/partitioned_table.slt b/crates/sqllogictest/testdata/slts/df_test/partitioned_table.slt deleted file mode 100644 index 6e240c327e..0000000000 --- a/crates/sqllogictest/testdata/slts/df_test/partitioned_table.slt +++ /dev/null @@ -1,118 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Test partitioned table functionality -# The test_partitioned_table is created programmatically in datafusion.rs -# with partitioning on the 'category' column -# -# Note: This test runs after insert_into.slt, so the table already has data (ids 1-6) - -# Verify the partitioned table has existing data from previous test -query ITT rowsort -SELECT * FROM default.default.test_partitioned_table ----- -1 electronics laptop -2 electronics phone -3 books novel -4 books textbook -5 clothing shirt -6 electronics NULL - -# Test partition pruning - filter by partition column -query ITT rowsort -SELECT * FROM default.default.test_partitioned_table WHERE category = 'books' ----- -3 books novel -4 books textbook - -# Test partition pruning with another partition value -query ITT rowsort -SELECT * FROM default.default.test_partitioned_table WHERE category = 'electronics' ----- -1 electronics laptop -2 electronics phone -6 electronics NULL - -# Test filtering on non-partition column -query ITT rowsort -SELECT * FROM default.default.test_partitioned_table WHERE value = 'laptop' ----- -1 electronics laptop - -# Test combined filter (partition + non-partition column) -query ITT rowsort -SELECT * FROM default.default.test_partitioned_table WHERE category = 'books' AND value = 'novel' ----- -3 books novel - -# Test aggregation with partition column -query TI rowsort -SELECT category, COUNT(*) as count FROM default.default.test_partitioned_table GROUP BY category ----- -books 2 -clothing 1 -electronics 3 - -# Insert batch data into multiple partitions (starting from id 7 since 6 exists) -query I -INSERT INTO default.default.test_partitioned_table VALUES - (7, 'electronics', 'tablet'), - (8, 'books', 'magazine'), - (9, 'clothing', 'pants') ----- -3 - -# Verify batch insert worked -query ITT rowsort -SELECT * FROM default.default.test_partitioned_table WHERE id > 6 ----- -7 electronics tablet -8 books magazine -9 clothing pants - -# Test NULL handling - verify existing NULL -query ITT rowsort -SELECT * FROM default.default.test_partitioned_table WHERE value IS NULL ----- -6 electronics NULL - -# Insert another NULL value -query I -INSERT INTO default.default.test_partitioned_table VALUES (10, 'books', NULL) ----- -1 - -# Verify both NULL values -query ITT rowsort -SELECT * FROM default.default.test_partitioned_table WHERE value IS NULL ----- -10 books NULL -6 electronics NULL - -# Final count verification -query I -SELECT COUNT(*) FROM default.default.test_partitioned_table ----- -10 - -# Verify final aggregation by partition -query TI rowsort -SELECT category, COUNT(*) as count FROM default.default.test_partitioned_table GROUP BY category ORDER BY category ----- -books 4 -clothing 2 -electronics 4 diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt index c5da5f6276..0e671506d7 100644 --- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt +++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt @@ -15,6 +15,9 @@ # specific language governing permissions and limitations # under the License. +# Verify tables created in create_table.slt and programmatically in datafusion.rs +# - test_create_simple: created via CREATE TABLE in create_table.slt +# - test_partitioned_table: created programmatically (partitioned tables can't be created via SQL yet) query TTTT rowsort SHOW TABLES ---- @@ -25,12 +28,12 @@ datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW +default default test_create_simple BASE TABLE +default default test_create_simple$manifests BASE TABLE +default default test_create_simple$snapshots BASE TABLE default default test_partitioned_table BASE TABLE default default test_partitioned_table$manifests BASE TABLE default default test_partitioned_table$snapshots BASE TABLE -default default test_unpartitioned_table BASE TABLE -default default test_unpartitioned_table$manifests BASE TABLE -default default test_unpartitioned_table$snapshots BASE TABLE default information_schema columns VIEW default information_schema df_settings VIEW default information_schema parameters VIEW From 7db602f8f92ef3f48d363b0bdf6f4e967aad6639 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 9 Dec 2025 16:19:59 -0800 Subject: [PATCH 4/5] support drop table as well --- crates/integrations/datafusion/src/schema.rs | 38 ++++++++++-- .../testdata/schedules/df_test.toml | 5 ++ .../testdata/slts/df_test/drop_table.slt | 62 +++++++++++++++++++ 3 files changed, 101 insertions(+), 4 deletions(-) create mode 100644 crates/sqllogictest/testdata/slts/df_test/drop_table.slt diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 6aee71d1ac..6703020c20 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -196,9 +196,39 @@ impl SchemaProvider for IcebergSchemaProvider { .map_err(|e| DataFusionError::Execution(format!("Failed to create Iceberg table: {}", e)))? } - fn deregister_table(&self, _name: &str) -> DFResult>> { - Err(DataFusionError::NotImplemented( - "deregister_table is not supported".to_string(), - )) + fn deregister_table(&self, name: &str) -> DFResult>> { + // Don't allow dropping metadata tables directly + if name.contains('$') { + return Err(DataFusionError::Plan( + "Cannot drop metadata tables directly. Drop the parent table instead.".to_string(), + )); + } + + let catalog = self.catalog.clone(); + let namespace = self.namespace.clone(); + let tables = self.tables.clone(); + let name = name.to_string(); + + let result = tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Handle::current(); + rt.block_on(async move { + let table_ident = + iceberg::TableIdent::new(namespace.clone(), name.clone()); + + // Drop the table from the Iceberg catalog + catalog + .drop_table(&table_ident) + .await + .map_err(to_datafusion_error)?; + + // Remove from local cache and return the old provider + let old_table = tables.remove(&name); + Ok(old_table.map(|(_, provider)| provider as Arc)) + }) + }); + + futures::executor::block_on(result).map_err(|e| { + DataFusionError::Execution(format!("Failed to drop Iceberg table: {}", e)) + })? } } diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml b/crates/sqllogictest/testdata/schedules/df_test.toml index 3375fbb5c1..427e63bd37 100644 --- a/crates/sqllogictest/testdata/schedules/df_test.toml +++ b/crates/sqllogictest/testdata/schedules/df_test.toml @@ -33,6 +33,11 @@ slt = "df_test/show_tables.slt" engine = "df" slt = "df_test/insert_into.slt" +# Step 4: Test DROP TABLE +[[steps]] +engine = "df" +slt = "df_test/drop_table.slt" + # Note: CREATE EXTERNAL TABLE tests are in: # crates/integrations/datafusion/src/table/table_provider_factory.rs # They require absolute paths which sqllogictest doesn't support variable substitution for. diff --git a/crates/sqllogictest/testdata/slts/df_test/drop_table.slt b/crates/sqllogictest/testdata/slts/df_test/drop_table.slt new file mode 100644 index 0000000000..3f82387b04 --- /dev/null +++ b/crates/sqllogictest/testdata/slts/df_test/drop_table.slt @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test DROP TABLE functionality + +# Drop tables created in create_table.slt and used in insert_into.slt +# - test_create_simple: created via CREATE TABLE +# - test_partitioned_table: created programmatically + +# Drop test_create_simple +statement ok +DROP TABLE default.default.test_create_simple + +# Verify test_create_simple no longer exists (should error) +statement error +SELECT * FROM default.default.test_create_simple + +# Drop test_partitioned_table +statement ok +DROP TABLE default.default.test_partitioned_table + +# Verify test_partitioned_table no longer exists (should error) +statement error +SELECT * FROM default.default.test_partitioned_table + +# Verify tables are gone with SHOW TABLES +# Only information_schema tables should remain in the default catalog +query TTTT rowsort +SHOW TABLES +---- +datafusion information_schema columns VIEW +datafusion information_schema df_settings VIEW +datafusion information_schema parameters VIEW +datafusion information_schema routines VIEW +datafusion information_schema schemata VIEW +datafusion information_schema tables VIEW +datafusion information_schema views VIEW +default information_schema columns VIEW +default information_schema df_settings VIEW +default information_schema parameters VIEW +default information_schema routines VIEW +default information_schema schemata VIEW +default information_schema tables VIEW +default information_schema views VIEW + +# Test DROP TABLE IF EXISTS on non-existent table (should not error) +statement ok +DROP TABLE IF EXISTS default.default.test_create_simple From c1295e7f936ccd1551dee1fe0d40f5df2675f288 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 9 Dec 2025 16:22:40 -0800 Subject: [PATCH 5/5] fix fmt --- crates/integrations/datafusion/src/schema.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 6703020c20..3f76e32327 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -192,8 +192,9 @@ impl SchemaProvider for IcebergSchemaProvider { // Block on the spawned task to get the result // This is safe because spawn_blocking moves the blocking to a dedicated thread pool - futures::executor::block_on(result) - .map_err(|e| DataFusionError::Execution(format!("Failed to create Iceberg table: {}", e)))? + futures::executor::block_on(result).map_err(|e| { + DataFusionError::Execution(format!("Failed to create Iceberg table: {}", e)) + })? } fn deregister_table(&self, name: &str) -> DFResult>> { @@ -212,8 +213,7 @@ impl SchemaProvider for IcebergSchemaProvider { let result = tokio::task::spawn_blocking(move || { let rt = tokio::runtime::Handle::current(); rt.block_on(async move { - let table_ident = - iceberg::TableIdent::new(namespace.clone(), name.clone()); + let table_ident = iceberg::TableIdent::new(namespace.clone(), name.clone()); // Drop the table from the Iceberg catalog catalog