diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index feec1281..c6a652cf 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -3182,7 +3182,7 @@ mod tests { assert_eq!(identifier.object(), "t1"); assert_eq!(changes.len(), 1); assert!( - matches!(&changes[0], SchemaChange::AddColumn { field_name, .. } if field_name == "age") + matches!(&changes[0], SchemaChange::AddColumn { field_names, .. } if field_names.first().map(String::as_str) == Some("age")) ); } else { panic!("expected AlterTable call"); @@ -3206,10 +3206,10 @@ mod tests { assert!(matches!( &changes[0], SchemaChange::AddColumn { - field_name, + field_names, data_type, .. - } if field_name == "payload" && matches!(data_type, PaimonDataType::Blob(_)) + } if field_names.first().map(String::as_str) == Some("payload") && matches!(data_type, PaimonDataType::Blob(_)) )); } else { panic!("expected AlterTable call"); @@ -3231,7 +3231,7 @@ mod tests { if let CatalogCall::AlterTable { changes, .. } = &calls[0] { assert_eq!(changes.len(), 1); assert!( - matches!(&changes[0], SchemaChange::DropColumn { field_name } if field_name == "age") + matches!(&changes[0], SchemaChange::DropColumn { field_names } if field_names.first().map(String::as_str) == Some("age")) ); } else { panic!("expected AlterTable call"); @@ -3254,8 +3254,8 @@ mod tests { assert_eq!(changes.len(), 1); assert!(matches!( &changes[0], - SchemaChange::RenameColumn { field_name, new_name } - if field_name == "old_name" && new_name == "new_name" + SchemaChange::RenameColumn { field_names, new_name } + if field_names.first().map(String::as_str) == Some("old_name") && new_name == "new_name" )); } else { panic!("expected AlterTable call"); diff --git a/crates/integrations/datafusion/tests/sql_context_tests.rs b/crates/integrations/datafusion/tests/sql_context_tests.rs index 7deed175..d4f0c1da 100644 --- a/crates/integrations/datafusion/tests/sql_context_tests.rs +++ b/crates/integrations/datafusion/tests/sql_context_tests.rs @@ -480,23 +480,18 @@ async fn test_alter_table_add_column() { .await .unwrap(); - // ALTER TABLE is not yet implemented in FileSystemCatalog, so we expect an error - let result = sql_context + sql_context .sql("ALTER TABLE paimon.mydb.alter_test ADD COLUMN age INT") - .await; + .await + .expect("ALTER TABLE ADD COLUMN should succeed"); - // FileSystemCatalog does not support AddColumn schema change yet - assert!( - result.is_err(), - "ALTER TABLE ADD COLUMN should fail because AddColumn is not yet supported" - ); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("not yet implemented") - || err_msg.contains("Unsupported") - || err_msg.contains("not yet supported"), - "Error should indicate alter_table is not implemented, got: {err_msg}" - ); + // The new column is appended to the table schema. + let table = catalog + .get_table(&Identifier::new("mydb", "alter_test")) + .await + .unwrap(); + let names: Vec<&str> = table.schema().fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "name", "age"]); } #[tokio::test] diff --git a/crates/paimon/src/api/api_request.rs b/crates/paimon/src/api/api_request.rs index 33a52fd7..28e4bacf 100644 --- a/crates/paimon/src/api/api_request.rs +++ b/crates/paimon/src/api/api_request.rs @@ -22,7 +22,10 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use crate::{catalog::Identifier, spec::Schema}; +use crate::{ + catalog::Identifier, + spec::{Schema, SchemaChange}, +}; /// Request to create a new database. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -95,6 +98,23 @@ impl CreateTableRequest { } } +/// Request to alter a table's schema. +/// +/// Wire-compatible with Java Paimon's `AlterTableRequest` (`{"changes": [...]}`). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AlterTableRequest { + /// The ordered list of schema changes to apply. + pub changes: Vec, +} + +impl AlterTableRequest { + /// Create a new AlterTableRequest. + pub fn new(changes: Vec) -> Self { + Self { changes } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/paimon/src/api/mod.rs b/crates/paimon/src/api/mod.rs index 8584cb08..7672b9a7 100644 --- a/crates/paimon/src/api/mod.rs +++ b/crates/paimon/src/api/mod.rs @@ -31,7 +31,8 @@ mod api_response; // Re-export request types pub use api_request::{ - AlterDatabaseRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest, + AlterDatabaseRequest, AlterTableRequest, CreateDatabaseRequest, CreateTableRequest, + RenameTableRequest, }; // Re-export response types diff --git a/crates/paimon/src/api/rest_api.rs b/crates/paimon/src/api/rest_api.rs index 5333fcae..8721f48c 100644 --- a/crates/paimon/src/api/rest_api.rs +++ b/crates/paimon/src/api/rest_api.rs @@ -25,11 +25,12 @@ use std::collections::HashMap; use crate::api::rest_client::HttpClient; use crate::catalog::Identifier; use crate::common::{CatalogOptions, Options}; -use crate::spec::{Partition, PartitionStatistics, Schema, Snapshot}; +use crate::spec::{Partition, PartitionStatistics, Schema, SchemaChange, Snapshot}; use crate::Result; use super::api_request::{ - AlterDatabaseRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest, + AlterDatabaseRequest, AlterTableRequest, CreateDatabaseRequest, CreateTableRequest, + RenameTableRequest, }; use super::api_response::{ ConfigResponse, GetDatabaseResponse, GetTableResponse, ListDatabasesResponse, @@ -343,6 +344,21 @@ impl RESTApi { Ok(()) } + /// Alter a table's schema by applying a list of schema changes. + pub async fn alter_table( + &self, + identifier: &Identifier, + changes: Vec, + ) -> Result<()> { + let database = identifier.database(); + let table = identifier.object(); + validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?; + let path = self.resource_paths.table(database, table); + let request = AlterTableRequest::new(changes); + let _resp: serde_json::Value = self.client.post(&path, &request).await?; + Ok(()) + } + /// Get table information. pub async fn get_table(&self, identifier: &Identifier) -> Result { let database = identifier.database(); diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index 7f18f952..cedc6251 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -439,11 +439,29 @@ impl Catalog for FileSystemCatalog { full_name: identifier.full_name(), })?; - let new_schema = current.apply_changes(changes)?; + let new_schema = current + .apply_changes(changes) + .map_err(|e| fill_table_name(e, identifier))?; self.save_table_schema(&table_path, &new_schema).await } } +/// `TableSchema::apply_changes` returns column errors without a table name; +/// fill in the identifier's full name so the message identifies the table. +fn fill_table_name(err: Error, identifier: &Identifier) -> Error { + match err { + Error::ColumnNotExist { column, .. } => Error::ColumnNotExist { + full_name: identifier.full_name(), + column, + }, + Error::ColumnAlreadyExist { column, .. } => Error::ColumnAlreadyExist { + full_name: identifier.full_name(), + column, + }, + other => other, + } +} + #[cfg(test)] #[cfg(not(windows))] // Skip on Windows due to path compatibility issues mod tests { @@ -728,4 +746,452 @@ mod tests { ); } } + + use crate::spec::{ + ColumnMove, DataField, DataType, IntType, RowType, SchemaChange, VarCharType, + }; + + /// Two-column table (id INT, name VARCHAR) used by the alter-table tests. + fn two_column_schema() -> Schema { + Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .build() + .unwrap() + } + + async fn setup_table(catalog: &FileSystemCatalog, schema: Schema) -> Identifier { + catalog + .create_database("db", false, HashMap::new()) + .await + .unwrap(); + let id = Identifier::new("db", "t"); + catalog.create_table(&id, schema, false).await.unwrap(); + id + } + + #[tokio::test] + async fn test_alter_table_column_changes() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + // Add a column at the end; it must take highest_field_id + 1. + catalog + .alter_table( + &id, + vec![SchemaChange::add_column( + "age".to_string(), + DataType::Int(IntType::new()), + )], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let ts = ts.schema(); + let names: Vec<&str> = ts.fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "name", "age"]); + let age = ts.fields().iter().find(|f| f.name() == "age").unwrap(); + assert_eq!(age.id(), 2, "new column gets highest_field_id + 1"); + assert_eq!(ts.id(), 1, "schema version bumped"); + + // Add a column moved to the front. + catalog + .alter_table( + &id, + vec![SchemaChange::add_column_with_description_and_column_move( + "rowkey".to_string(), + DataType::Int(IntType::new()), + "primary".to_string(), + ColumnMove::move_first("rowkey".to_string()), + )], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let ts = ts.schema(); + assert_eq!(ts.fields()[0].name(), "rowkey"); + assert_eq!(ts.fields()[0].description(), Some("primary")); + + // Converting nullable `id` to NOT NULL below is rejected by default; + // allow it explicitly. The flag is read from the pre-alter options, so + // it must be set in a separate alter. + catalog + .alter_table( + &id, + vec![SchemaChange::set_option( + "alter-column-null-to-not-null.disabled".to_string(), + "false".to_string(), + )], + false, + ) + .await + .unwrap(); + + // Rename, update comment, update type, update nullability, drop. + catalog + .alter_table( + &id, + vec![ + SchemaChange::rename_column("name".to_string(), "full_name".to_string()), + SchemaChange::update_column_comment("id".to_string(), "the id".to_string()), + SchemaChange::update_column_type( + "age".to_string(), + DataType::BigInt(crate::spec::BigIntType::new()), + ), + SchemaChange::update_column_nullability("id".to_string(), false), + SchemaChange::drop_column("rowkey".to_string()), + ], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let ts = ts.schema(); + let names: Vec<&str> = ts.fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "full_name", "age"]); + let id_field = ts.fields().iter().find(|f| f.name() == "id").unwrap(); + assert_eq!(id_field.description(), Some("the id")); + assert!(!id_field.data_type().is_nullable()); + let age_field = ts.fields().iter().find(|f| f.name() == "age").unwrap(); + assert!(matches!(age_field.data_type(), DataType::BigInt(_))); + } + + #[tokio::test] + async fn test_alter_table_reposition_column() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + // Move `name` before `id`. + catalog + .alter_table( + &id, + vec![SchemaChange::update_column_position( + ColumnMove::move_first("name".to_string()), + )], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let names: Vec<&str> = ts.schema().fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["name", "id"]); + } + + #[tokio::test] + async fn test_alter_table_errors() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + // Add a duplicate column -> ColumnAlreadyExist. + let err = catalog + .alter_table( + &id, + vec![SchemaChange::add_column( + "name".to_string(), + DataType::Int(IntType::new()), + )], + false, + ) + .await + .unwrap_err(); + assert!( + matches!(err, Error::ColumnAlreadyExist { .. }), + "got {err:?}" + ); + + // Drop a missing column -> ColumnNotExist. + let err = catalog + .alter_table( + &id, + vec![SchemaChange::drop_column("ghost".to_string())], + false, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::ColumnNotExist { .. }), "got {err:?}"); + + // Altering a missing table: ignored vs error. + let missing = Identifier::new("db", "nope"); + catalog + .alter_table( + &missing, + vec![SchemaChange::update_column_comment( + "id".to_string(), + "x".to_string(), + )], + true, + ) + .await + .unwrap(); + let err = catalog + .alter_table( + &missing, + vec![SchemaChange::update_column_comment( + "id".to_string(), + "x".to_string(), + )], + false, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::TableNotExist { .. }), "got {err:?}"); + } + + #[tokio::test] + async fn test_alter_table_drop_primary_key_column_rejected() { + let (_tmp, catalog) = create_test_catalog(); + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .primary_key(["id"]) + .build() + .unwrap(); + let id = setup_table(&catalog, schema).await; + + let err = catalog + .alter_table( + &id, + vec![SchemaChange::drop_column("id".to_string())], + false, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::Unsupported { .. }), "got {err:?}"); + } + + #[tokio::test] + async fn test_alter_table_add_not_null_column_rejected() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + let err = catalog + .alter_table( + &id, + vec![SchemaChange::add_column( + "age".to_string(), + DataType::Int(IntType::with_nullable(false)), + )], + false, + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains("cannot specify NOT NULL"), + "got {err:?}" + ); + } + + /// Collect the IDs of all fields nested inside row types. + fn nested_row_field_ids(data_type: &DataType, ids: &mut Vec) { + if let DataType::Row(row) = data_type { + for f in row.fields() { + ids.push(f.id()); + nested_row_field_ids(f.data_type(), ids); + } + } + } + + #[tokio::test] + async fn test_alter_table_add_nested_column_reassigns_field_ids() { + let (_tmp, catalog) = create_test_catalog(); + // Existing columns take IDs 0 and 1. + let id = setup_table(&catalog, two_column_schema()).await; + + // Nested field IDs as requested by the caller deliberately collide + // with the existing columns; they must all be reassigned. + let nested = DataType::Row(RowType::new(vec![ + DataField::new(0, "a".to_string(), DataType::Int(IntType::new())), + DataField::new( + 1, + "b".to_string(), + DataType::Row(RowType::new(vec![DataField::new( + 2, + "c".to_string(), + DataType::Int(IntType::new()), + )])), + ), + ])); + catalog + .alter_table( + &id, + vec![SchemaChange::add_column("s".to_string(), nested)], + false, + ) + .await + .unwrap(); + + let table = catalog.get_table(&id).await.unwrap(); + let ts = table.schema(); + let s = ts.fields().iter().find(|f| f.name() == "s").unwrap(); + assert_eq!(s.id(), 2, "top-level column takes highest_field_id + 1"); + let mut ids = Vec::new(); + nested_row_field_ids(s.data_type(), &mut ids); + ids.sort_unstable(); + assert_eq!(ids, vec![3, 4, 5], "nested IDs are fresh and unique"); + assert_eq!(ts.highest_field_id(), 5); + } + + #[tokio::test] + async fn test_alter_table_drop_all_columns_rejected() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + let err = catalog + .alter_table( + &id, + vec![ + SchemaChange::drop_column("id".to_string()), + SchemaChange::drop_column("name".to_string()), + ], + false, + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains("Cannot drop all fields"), + "got {err:?}" + ); + } + + /// Partitioned primary-key table: dt VARCHAR (partition), id INT, v INT, + /// primary key (dt, id). + fn partitioned_pk_schema() -> Schema { + Schema::builder() + .column("dt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .column("v", DataType::Int(IntType::new())) + .partition_keys(["dt"]) + .primary_key(["dt", "id"]) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_alter_table_key_column_guards() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, partitioned_pk_schema()).await; + + // Renaming a partition column is rejected. + let err = catalog + .alter_table( + &id, + vec![SchemaChange::rename_column( + "dt".to_string(), + "day".to_string(), + )], + false, + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains("Cannot rename partition column"), + "got {err:?}" + ); + + // Updating the type of a partition column is rejected. + let err = catalog + .alter_table( + &id, + vec![SchemaChange::update_column_type( + "dt".to_string(), + DataType::VarChar(VarCharType::string_type()), + )], + false, + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains("Cannot update partition column"), + "got {err:?}" + ); + + // Updating the type of a primary key column is rejected. + let err = catalog + .alter_table( + &id, + vec![SchemaChange::update_column_type( + "id".to_string(), + DataType::BigInt(crate::spec::BigIntType::new()), + )], + false, + ) + .await + .unwrap_err(); + assert!( + err.to_string().contains("Cannot update primary key"), + "got {err:?}" + ); + + // Making a primary key column nullable is rejected ... + let err = catalog + .alter_table( + &id, + vec![SchemaChange::update_column_nullability( + "id".to_string(), + true, + )], + false, + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("Cannot change nullability of primary key"), + "got {err:?}" + ); + + // ... while NOT NULL stays allowed (it is already non-nullable). + catalog + .alter_table( + &id, + vec![SchemaChange::update_column_nullability( + "id".to_string(), + false, + )], + false, + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_alter_table_rename_primary_key_column_propagates() { + let (_tmp, catalog) = create_test_catalog(); + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .column("v", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("bucket-key", "id") + .option("sequence.field", "v") + .build() + .unwrap(); + let id = setup_table(&catalog, schema).await; + + catalog + .alter_table( + &id, + vec![ + SchemaChange::rename_column("id".to_string(), "user_id".to_string()), + SchemaChange::rename_column("v".to_string(), "ver".to_string()), + ], + false, + ) + .await + .unwrap(); + + let table = catalog.get_table(&id).await.unwrap(); + let ts = table.schema(); + assert_eq!(ts.primary_keys(), ["user_id".to_string()]); + assert_eq!( + ts.options().get("bucket-key").map(String::as_str), + Some("user_id") + ); + assert_eq!( + ts.options().get("sequence.field").map(String::as_str), + Some("ver") + ); + } } diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs b/crates/paimon/src/catalog/rest/rest_catalog.rs index 09b93547..5ff5d6b0 100644 --- a/crates/paimon/src/catalog/rest/rest_catalog.rs +++ b/crates/paimon/src/catalog/rest/rest_catalog.rs @@ -330,13 +330,17 @@ impl Catalog for RESTCatalog { async fn alter_table( &self, - _identifier: &Identifier, - _changes: Vec, - _ignore_if_not_exists: bool, + identifier: &Identifier, + changes: Vec, + ignore_if_not_exists: bool, ) -> Result<()> { - // TODO: Implement alter_table when RESTApi supports it - Err(Error::Unsupported { - message: "Alter table is not yet implemented for REST catalog".to_string(), + let result = self + .api + .alter_table(identifier, changes) + .await + .map_err(|e| map_rest_error_for_table(e, identifier)); + ignore_error_if(result, |e| { + ignore_if_not_exists && matches!(e, Error::TableNotExist { .. }) }) } diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index 2a8b686d..f5cde553 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -24,7 +24,7 @@ const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size"; const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost"; const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name"; const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name"; -const BUCKET_KEY_OPTION: &str = "bucket-key"; +pub(crate) const BUCKET_KEY_OPTION: &str = "bucket-key"; const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type"; const BUCKET_OPTION: &str = "bucket"; const DEFAULT_BUCKET: i32 = -1; @@ -46,7 +46,10 @@ const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression"; const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode"; const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled"; const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size"; -const SEQUENCE_FIELD_OPTION: &str = "sequence.field"; +pub(crate) const SEQUENCE_FIELD_OPTION: &str = "sequence.field"; +pub(crate) const DISABLE_EXPLICIT_TYPE_CASTING_OPTION: &str = "disable-explicit-type-casting"; +pub(crate) const DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL_OPTION: &str = + "alter-column-null-to-not-null.disabled"; const MERGE_ENGINE_OPTION: &str = "merge-engine"; const CHANGELOG_PRODUCER_OPTION: &str = "changelog-producer"; const ROWKIND_FIELD_OPTION: &str = "rowkind.field"; diff --git a/crates/paimon/src/spec/data_type_casts.rs b/crates/paimon/src/spec/data_type_casts.rs new file mode 100644 index 00000000..7f871cd9 --- /dev/null +++ b/crates/paimon/src/spec/data_type_casts.rs @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for casting [`DataType`], mirroring Java `DataTypeCasts`. +//! +//! Like Java, the rules are defined at type-root granularity: two types with +//! the same root (e.g. two `DECIMAL`s with different precision) are always +//! implicitly castable, and parameters of the target type are not inspected. + +use crate::spec::DataType; + +/// Returns whether the source type can be cast to the target type, mirroring +/// Java `DataTypeCasts.supportsCast(sourceType, targetType, allowExplicit)`. +/// +/// With `allow_explicit` false, only implicit casts are considered: type +/// widening and generalization that never lose information. With +/// `allow_explicit` true, casts corresponding to the SQL `CAST` specification +/// are also allowed (e.g. `BIGINT` to `INT`, or most predefined types to +/// character strings). +pub(crate) fn supports_cast(source: &DataType, target: &DataType, allow_explicit: bool) -> bool { + // A NOT NULL type cannot store a NULL type, but it might be useful to + // cast explicitly with knowledge about the data. + if source.is_nullable() && !target.is_nullable() && !allow_explicit { + return false; + } + // Ignore nullability during compare. + match ( + source.copy_with_nullable(true), + target.copy_with_nullable(true), + ) { + (Ok(s), Ok(t)) if s == t => return true, + (Err(_), _) | (_, Err(_)) => return false, + _ => {} + } + if implicit_cast_supported(source, target) { + return true; + } + allow_explicit && explicit_cast_supported(source, target) +} + +fn same_root(a: &DataType, b: &DataType) -> bool { + std::mem::discriminant(a) == std::mem::discriminant(b) +} + +/// Java `DataTypeFamily.CHARACTER_STRING`. +fn is_character_string(t: &DataType) -> bool { + matches!(t, DataType::Char(_) | DataType::VarChar(_)) +} + +/// Java `DataTypeFamily.BINARY_STRING`. +fn is_binary_string(t: &DataType) -> bool { + matches!(t, DataType::Binary(_) | DataType::VarBinary(_)) +} + +/// Java `DataTypeFamily.INTEGER_NUMERIC`. +fn is_integer_numeric(t: &DataType) -> bool { + matches!( + t, + DataType::TinyInt(_) | DataType::SmallInt(_) | DataType::Int(_) | DataType::BigInt(_) + ) +} + +/// Java `DataTypeFamily.NUMERIC`. +fn is_numeric(t: &DataType) -> bool { + is_integer_numeric(t) + || matches!( + t, + DataType::Decimal(_) | DataType::Float(_) | DataType::Double(_) + ) +} + +/// Java `DataTypeFamily.TIMESTAMP`. +fn is_timestamp(t: &DataType) -> bool { + matches!(t, DataType::Timestamp(_) | DataType::LocalZonedTimestamp(_)) +} + +/// Java `DataTypeFamily.DATETIME`. +fn is_datetime(t: &DataType) -> bool { + is_timestamp(t) || matches!(t, DataType::Date(_) | DataType::Time(_)) +} + +/// Mirrors the `implicitCastingRules` table in Java `DataTypeCasts`. Every +/// root is implicitly castable from itself. +fn implicit_cast_supported(source: &DataType, target: &DataType) -> bool { + if same_root(source, target) { + return true; + } + match target { + DataType::VarChar(_) => is_character_string(source), + DataType::VarBinary(_) => is_binary_string(source), + DataType::Decimal(_) | DataType::Double(_) => is_numeric(source), + DataType::SmallInt(_) => matches!(source, DataType::TinyInt(_)), + DataType::Int(_) => matches!(source, DataType::TinyInt(_) | DataType::SmallInt(_)), + DataType::BigInt(_) => matches!( + source, + DataType::TinyInt(_) | DataType::SmallInt(_) | DataType::Int(_) + ), + DataType::Float(_) => matches!( + source, + DataType::TinyInt(_) + | DataType::SmallInt(_) + | DataType::Int(_) + | DataType::BigInt(_) + | DataType::Decimal(_) + ), + DataType::Date(_) | DataType::Time(_) => matches!(source, DataType::Timestamp(_)), + DataType::Timestamp(_) => matches!(source, DataType::LocalZonedTimestamp(_)), + DataType::LocalZonedTimestamp(_) => matches!(source, DataType::Timestamp(_)), + _ => false, + } +} + +/// Mirrors the `explicitCastingRules` table in Java `DataTypeCasts`. +fn explicit_cast_supported(source: &DataType, target: &DataType) -> bool { + match target { + // PREDEFINED and CONSTRUCTED cover every type root. + DataType::Char(_) | DataType::VarChar(_) => true, + DataType::Boolean(_) => is_character_string(source) || is_integer_numeric(source), + DataType::Binary(_) => { + is_character_string(source) || matches!(source, DataType::VarBinary(_)) + } + DataType::VarBinary(_) => { + is_character_string(source) || matches!(source, DataType::Binary(_)) + } + DataType::Decimal(_) => { + is_character_string(source) + || matches!(source, DataType::Boolean(_)) + || is_timestamp(source) + } + DataType::TinyInt(_) + | DataType::SmallInt(_) + | DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) => { + is_numeric(source) + || is_character_string(source) + || matches!(source, DataType::Boolean(_)) + || is_timestamp(source) + } + DataType::Date(_) => is_timestamp(source) || is_character_string(source), + DataType::Time(_) => { + matches!(source, DataType::Time(_)) + || is_timestamp(source) + || is_character_string(source) + } + DataType::Timestamp(_) | DataType::LocalZonedTimestamp(_) => { + is_datetime(source) || is_character_string(source) || is_numeric(source) + } + DataType::Blob(_) + | DataType::Array(_) + | DataType::Map(_) + | DataType::Multiset(_) + | DataType::Row(_) => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::{ + ArrayType, BigIntType, BooleanType, CharType, DataType, DateType, DecimalType, DoubleType, + IntType, TimestampType, VarCharType, + }; + + fn int() -> DataType { + DataType::Int(IntType::new()) + } + + fn bigint() -> DataType { + DataType::BigInt(BigIntType::new()) + } + + fn varchar() -> DataType { + DataType::VarChar(VarCharType::new(10).unwrap()) + } + + fn timestamp() -> DataType { + DataType::Timestamp(TimestampType::new(3).unwrap()) + } + + #[test] + fn test_identity_and_same_root() { + assert!(supports_cast(&int(), &int(), false)); + // Same root with different parameters is implicitly castable. + assert!(supports_cast( + &DataType::Decimal(DecimalType::new(10, 2).unwrap()), + &DataType::Decimal(DecimalType::new(20, 4).unwrap()), + false + )); + assert!(supports_cast( + &varchar(), + &DataType::VarChar(VarCharType::new(20).unwrap()), + false + )); + } + + #[test] + fn test_implicit_widening() { + assert!(supports_cast(&int(), &bigint(), false)); + assert!(supports_cast( + &int(), + &DataType::Double(DoubleType::new()), + false + )); + assert!(supports_cast( + &int(), + &DataType::Decimal(DecimalType::new(10, 2).unwrap()), + false + )); + assert!(supports_cast( + ×tamp(), + &DataType::Date(DateType::new()), + false + )); + assert!(supports_cast( + &DataType::Char(CharType::new(5).unwrap()), + &varchar(), + false + )); + } + + #[test] + fn test_narrowing_requires_explicit() { + assert!(!supports_cast(&bigint(), &int(), false)); + assert!(supports_cast(&bigint(), &int(), true)); + assert!(!supports_cast(&varchar(), &int(), false)); + assert!(supports_cast(&varchar(), &int(), true)); + assert!(!supports_cast(&int(), &varchar(), false)); + assert!(supports_cast(&int(), &varchar(), true)); + } + + #[test] + fn test_nullability_rule() { + let nullable_int = int(); + let not_null_int = int().copy_with_nullable(false).unwrap(); + // Nullable to NOT NULL is only allowed explicitly. + assert!(!supports_cast(&nullable_int, ¬_null_int, false)); + assert!(supports_cast(&nullable_int, ¬_null_int, true)); + assert!(supports_cast(¬_null_int, &nullable_int, false)); + } + + #[test] + fn test_unsupported_casts() { + let array = DataType::Array(ArrayType::new(int())); + assert!(!supports_cast(&int(), &array, true)); + assert!(!supports_cast(&array, &int(), true)); + // Any type can be cast to a character string explicitly, even arrays. + assert!(supports_cast(&array, &varchar(), true)); + assert!(!supports_cast(&array, &varchar(), false)); + assert!(!supports_cast( + ×tamp(), + &DataType::Boolean(BooleanType::new()), + true + )); + } +} diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index 8760d1b3..491a7418 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -38,6 +38,9 @@ pub(crate) use partial_update::PartialUpdateConfig; mod aggregation; pub(crate) use aggregation::AggregationConfig; +mod data_type_casts; +pub(crate) use data_type_casts::supports_cast; + mod schema; pub use schema::*; diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index 40e33746..2a84300a 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::spec::core_options::{first_row_supports_changelog_producer, CoreOptions}; +use crate::spec::core_options::{ + first_row_supports_changelog_producer, CoreOptions, BUCKET_KEY_OPTION, SEQUENCE_FIELD_OPTION, +}; use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType}; -use crate::spec::AggregationConfig; -use crate::spec::PartialUpdateConfig; +use crate::spec::{AggregationConfig, ColumnMove, ColumnMoveType, PartialUpdateConfig}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::collections::{HashMap, HashSet}; @@ -64,9 +65,14 @@ impl TableSchema { } } - /// Get the highest field ID from a list of fields. + /// Get the highest field ID from a list of fields, including fields nested + /// inside row types (mirrors Java `RowType.currentHighestFieldId`). pub fn current_highest_field_id(fields: &[DataField]) -> i32 { - fields.iter().map(|f| f.id()).max().unwrap_or(-1) + fields + .iter() + .map(|f| f.id().max(highest_nested_field_id(f.data_type()))) + .max() + .unwrap_or(-1) } pub fn version(&self) -> i32 { @@ -140,32 +146,268 @@ impl TableSchema { } /// Apply a list of schema changes and return a new schema with incremented ID. + /// + /// Column-level changes operate on **top-level** columns only: a + /// `field_names` path with more than one element (a nested struct field) is + /// rejected with [`crate::Error::Unsupported`]. + /// + /// Column errors ([`crate::Error::ColumnNotExist`] / + /// [`crate::Error::ColumnAlreadyExist`]) are returned with an empty table + /// name; the calling catalog fills in the table's full name. pub fn apply_changes(&self, changes: Vec) -> crate::Result { + use crate::spec::SchemaChange; + + // Column errors carry no table name here; the catalog layer fills it in. + let full_name = ""; + + // Both flags are read from the pre-alter options, mirroring Java + // `SchemaManager.applySchemaChanges`. + let disable_null_to_not_null = self + .options + .get(crate::spec::DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL_OPTION) + .map(|v| v == "true") + .unwrap_or(true); + let allow_explicit_cast = self + .options + .get(crate::spec::DISABLE_EXPLICIT_TYPE_CASTING_OPTION) + .map(|v| v != "true") + .unwrap_or(true); + let mut new_schema = self.clone(); new_schema.id += 1; new_schema.time_millis = chrono::Utc::now().timestamp_millis(); + // Operate on an owned field list, then write it back. + let mut fields = std::mem::take(&mut new_schema.fields); + let mut highest_field_id = new_schema.highest_field_id; + for change in changes { match change { - crate::spec::SchemaChange::SetOption { key, value } => { + SchemaChange::SetOption { key, value } => { new_schema.options.insert(key, value); } - crate::spec::SchemaChange::RemoveOption { key } => { + SchemaChange::RemoveOption { key } => { new_schema.options.remove(&key); } - other => { - return Err(crate::Error::Unsupported { - message: format!("Schema change not yet supported: {other:?}"), - }); + SchemaChange::UpdateComment { comment } => { + new_schema.comment = comment; + } + SchemaChange::AddColumn { + field_names, + data_type, + comment, + column_move, + } => { + let name = top_level_field(&field_names)?; + if field_index(&fields, name).is_some() { + return Err(crate::Error::ColumnAlreadyExist { + full_name: full_name.to_string(), + column: name.to_string(), + }); + } + // Mirrors Java: an added column has no value for existing + // rows, so it must be nullable. + if !data_type.is_nullable() { + return Err(crate::Error::ConfigInvalid { + message: format!("Column {name} cannot specify NOT NULL."), + }); + } + highest_field_id += 1; + let id = highest_field_id; + let data_type = reassign_field_ids(data_type, &mut highest_field_id); + let field = + DataField::new(id, name.to_string(), data_type).with_description(comment); + insert_field_with_move(&mut fields, field, column_move.as_ref(), full_name)?; + } + SchemaChange::RenameColumn { + field_names, + new_name, + } => { + let name = top_level_field(&field_names)?; + // Existing partition data is laid out with the old key name + // in paths and metadata; renaming would break resolution. + if new_schema.partition_keys.iter().any(|k| k == name) { + return Err(crate::Error::Unsupported { + message: format!("Cannot rename partition column: [{name}]"), + }); + } + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + if new_name != name && field_index(&fields, &new_name).is_some() { + return Err(crate::Error::ColumnAlreadyExist { + full_name: full_name.to_string(), + column: new_name, + }); + } + fields[idx] = fields[idx].clone().with_name(new_name.clone()); + rename_in_keys(&mut new_schema.primary_keys, name, &new_name); + rename_in_option_list( + &mut new_schema.options, + BUCKET_KEY_OPTION, + name, + &new_name, + ); + rename_in_option_list( + &mut new_schema.options, + SEQUENCE_FIELD_OPTION, + name, + &new_name, + ); + } + SchemaChange::DropColumn { field_names } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + if new_schema.partition_keys.iter().any(|k| k == name) + || new_schema.primary_keys.iter().any(|k| k == name) + { + return Err(crate::Error::Unsupported { + message: format!( + "Cannot drop partition or primary key column '{name}' of table {full_name}" + ), + }); + } + if fields.len() == 1 { + return Err(crate::Error::Unsupported { + message: "Cannot drop all fields in table".to_string(), + }); + } + fields.remove(idx); + } + SchemaChange::UpdateColumnType { + field_names, + new_data_type, + keep_nullability, + } => { + let name = top_level_field(&field_names)?; + // Existing partitions, bucket assignment, and key encoding + // were all written with the old key type. + if new_schema.partition_keys.iter().any(|k| k == name) { + return Err(crate::Error::Unsupported { + message: format!("Cannot update partition column: [{name}]"), + }); + } + if new_schema.primary_keys.iter().any(|k| k == name) { + return Err(crate::Error::Unsupported { + message: "Cannot update primary key".to_string(), + }); + } + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + let old = &fields[idx]; + // Mirrors Java `assertNotChangingBlobColumnType`: BLOB + // columns use a dedicated storage layout that other types + // cannot be converted to or from. + if old.data_type().is_blob_type() || new_data_type.is_blob_type() { + return Err(crate::Error::Unsupported { + message: format!( + "Cannot change column type involving BLOB: [{name}] {:?} -> {new_data_type:?}", + old.data_type() + ), + }); + } + let target = if keep_nullability { + new_data_type.copy_with_nullable(old.data_type().is_nullable())? + } else { + assert_nullability_change( + old.data_type().is_nullable(), + new_data_type.is_nullable(), + name, + disable_null_to_not_null, + )?; + new_data_type + }; + // Existing data files keep the old schema; the read path + // casts old columns to the new type, so the change must be + // both a supported Paimon cast and executable by arrow. + let arrow_castable = arrow_cast::can_cast_types( + &crate::arrow::paimon_type_to_arrow(old.data_type())?, + &crate::arrow::paimon_type_to_arrow(&target)?, + ); + if !crate::spec::supports_cast(old.data_type(), &target, allow_explicit_cast) + || !arrow_castable + { + return Err(crate::Error::Unsupported { + message: format!( + "Column type {name}[{:?}] cannot be converted to {target:?} without losing information.", + old.data_type() + ), + }); + } + fields[idx] = DataField::new(old.id(), old.name().to_string(), target) + .with_description(old.description().map(|s| s.to_string())); + } + SchemaChange::UpdateColumnNullability { + field_names, + new_nullability, + } => { + let name = top_level_field(&field_names)?; + // Primary keys are normalized to NOT NULL at create time; + // a nullable key column would break key/bucket semantics. + if new_nullability && new_schema.primary_keys.iter().any(|k| k == name) { + return Err(crate::Error::Unsupported { + message: "Cannot change nullability of primary key".to_string(), + }); + } + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + let old = &fields[idx]; + assert_nullability_change( + old.data_type().is_nullable(), + new_nullability, + name, + disable_null_to_not_null, + )?; + let nt = old.data_type().copy_with_nullable(new_nullability)?; + fields[idx] = DataField::new(old.id(), old.name().to_string(), nt) + .with_description(old.description().map(|s| s.to_string())); + } + SchemaChange::UpdateColumnComment { + field_names, + new_comment, + } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + fields[idx] = fields[idx].clone().with_description(Some(new_comment)); + } + SchemaChange::UpdateColumnPosition { column_move } => { + apply_move(&mut fields, &column_move, full_name)?; } } } - Schema::validate_first_row_changelog_producer(&new_schema.options)?; + new_schema.fields = fields; + new_schema.highest_field_id = + highest_field_id.max(Self::current_highest_field_id(&new_schema.fields)); + + // Re-run create-time validations on the final schema, mirroring Java + // `SchemaValidation.validateTableSchema` after applying changes. + Schema::validate_blob_fields( + &new_schema.fields, + &new_schema.partition_keys, + &new_schema.options, + )?; PartialUpdateConfig::new(&new_schema.options) .validate_create_mode(!new_schema.primary_keys.is_empty())?; AggregationConfig::new(&new_schema.options) .validate_create_mode(&new_schema.primary_keys, &new_schema.fields)?; + Schema::validate_first_row_changelog_producer(&new_schema.options)?; Ok(new_schema) } @@ -197,6 +439,205 @@ impl TableSchema { } } +/// Extract the single top-level column name from a `field_names` path. +/// +/// Nested struct field paths (length > 1) are not yet supported. +fn top_level_field(field_names: &[String]) -> crate::Result<&str> { + match field_names { + [name] => Ok(name.as_str()), + [] => Err(crate::Error::ConfigInvalid { + message: "Schema change has empty fieldNames".to_string(), + }), + _ => Err(crate::Error::Unsupported { + message: format!("Altering nested struct fields is not supported yet: {field_names:?}"), + }), + } +} + +/// Index of the field with the given name, if any. +fn field_index(fields: &[DataField], name: &str) -> Option { + fields.iter().position(|f| f.name() == name) +} + +/// Mirrors Java `SchemaManager.assertNullabilityChange`: converting a nullable +/// column to NOT NULL is rejected unless explicitly enabled, because existing +/// rows may already contain NULLs. +fn assert_nullability_change( + old_nullable: bool, + new_nullable: bool, + field_name: &str, + disable_null_to_not_null: bool, +) -> crate::Result<()> { + if disable_null_to_not_null && old_nullable && !new_nullable { + return Err(crate::Error::Unsupported { + message: format!( + "Cannot update column type from nullable to non nullable for {field_name}. \ + You can set table configuration option 'alter-column-null-to-not-null.disabled' = 'false' \ + to allow converting null columns to not null" + ), + }); + } + Ok(()) +} + +/// Rename a key in a partition/primary key list, if present. +fn rename_in_keys(keys: &mut [String], old: &str, new: &str) { + for key in keys.iter_mut() { + if key == old { + *key = new.to_string(); + } + } +} + +/// Rename a column inside a comma-separated column-list option (`bucket-key`, +/// `sequence.field`), if the option is set and references the column. +/// +/// Mirrors Java `SchemaManager.applyRenameColumnsToOptions`. +fn rename_in_option_list( + options: &mut HashMap, + option_key: &str, + old: &str, + new: &str, +) { + let Some(value) = options.get(option_key) else { + return; + }; + let renamed = value + .split(',') + .map(|col| if col == old { new } else { col }) + .collect::>() + .join(","); + options.insert(option_key.to_string(), renamed); +} + +/// The highest field ID nested inside a data type, or -1 if it contains none. +fn highest_nested_field_id(data_type: &DataType) -> i32 { + match data_type { + DataType::Array(t) => highest_nested_field_id(t.element_type()), + DataType::Multiset(t) => highest_nested_field_id(t.element_type()), + DataType::Map(t) => { + highest_nested_field_id(t.key_type()).max(highest_nested_field_id(t.value_type())) + } + DataType::Row(t) => t + .fields() + .iter() + .map(|f| f.id().max(highest_nested_field_id(f.data_type()))) + .max() + .unwrap_or(-1), + _ => -1, + } +} + +/// Reassign the IDs of all row fields nested inside a data type from the +/// table-wide highest field ID, so they cannot collide with existing fields. +/// +/// Mirrors Java `ReassignFieldId`: IDs nested inside a field's type are +/// assigned before the field's own ID. +fn reassign_field_ids(data_type: DataType, next_id: &mut i32) -> DataType { + let nullable = data_type.is_nullable(); + match data_type { + DataType::Array(t) => DataType::Array(ArrayType::with_nullable( + nullable, + reassign_field_ids(t.element_type().clone(), next_id), + )), + DataType::Multiset(t) => DataType::Multiset(MultisetType::with_nullable( + nullable, + reassign_field_ids(t.element_type().clone(), next_id), + )), + DataType::Map(t) => DataType::Map(MapType::with_nullable( + nullable, + reassign_field_ids(t.key_type().clone(), next_id), + reassign_field_ids(t.value_type().clone(), next_id), + )), + DataType::Row(t) => { + let fields = t + .fields() + .iter() + .map(|f| { + let typ = reassign_field_ids(f.data_type().clone(), next_id); + *next_id += 1; + DataField::new(*next_id, f.name().to_string(), typ) + .with_description(f.description().map(|s| s.to_string())) + }) + .collect(); + DataType::Row(RowType::with_nullable(nullable, fields)) + } + other => other, + } +} + +/// Insert a brand-new field according to an optional move (used by `AddColumn`). +fn insert_field_with_move( + fields: &mut Vec, + field: DataField, + column_move: Option<&ColumnMove>, + full_name: &str, +) -> crate::Result<()> { + let Some(mv) = column_move else { + fields.push(field); + return Ok(()); + }; + match mv.move_type() { + ColumnMoveType::FIRST => fields.insert(0, field), + ColumnMoveType::LAST => fields.push(field), + ColumnMoveType::AFTER | ColumnMoveType::BEFORE => { + let reference = move_reference(mv)?; + let ref_idx = + field_index(fields, reference).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: reference.to_string(), + })?; + let at = match mv.move_type() { + ColumnMoveType::AFTER => ref_idx + 1, + _ => ref_idx, + }; + fields.insert(at, field); + } + } + Ok(()) +} + +/// Move an existing field to a new position (used by `UpdateColumnPosition`). +/// +/// Mirrors Java `SchemaManager.applyMove`: remove the field first, then resolve +/// the reference index in the reduced list so the offset is already adjusted. +fn apply_move(fields: &mut Vec, mv: &ColumnMove, full_name: &str) -> crate::Result<()> { + let idx = field_index(fields, mv.field_name()).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: mv.field_name().to_string(), + })?; + let field = fields.remove(idx); + match mv.move_type() { + ColumnMoveType::FIRST => fields.insert(0, field), + ColumnMoveType::LAST => fields.push(field), + ColumnMoveType::AFTER | ColumnMoveType::BEFORE => { + let reference = move_reference(mv)?; + let ref_idx = + field_index(fields, reference).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: reference.to_string(), + })?; + let at = match mv.move_type() { + ColumnMoveType::AFTER => ref_idx + 1, + _ => ref_idx, + }; + fields.insert(at, field); + } + } + Ok(()) +} + +/// The reference (anchor) field name required by `AFTER`/`BEFORE` moves. +fn move_reference(mv: &ColumnMove) -> crate::Result<&str> { + mv.reference_field_name() + .ok_or_else(|| crate::Error::ConfigInvalid { + message: format!( + "Move of type {:?} requires a reference field name", + mv.move_type() + ), + }) +} + pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID"; pub const ROW_ID_FIELD_ID: i32 = i32::MAX - 5; @@ -789,6 +1230,25 @@ mod tests { assert_eq!(data_field.description(), Some(description).as_deref()); } + #[test] + fn test_current_highest_field_id_includes_nested_fields() { + let fields = vec![ + DataField::new(0, "id".to_string(), DataType::Int(IntType::new())), + DataField::new( + 1, + "s".to_string(), + DataType::Row(RowType::new(vec![DataField::new( + 7, + "a".to_string(), + DataType::Array(ArrayType::new(DataType::Row(RowType::new(vec![ + DataField::new(9, "b".to_string(), DataType::Int(IntType::new())), + ])))), + )])), + ), + ]; + assert_eq!(TableSchema::current_highest_field_id(&fields), 9); + } + #[test] fn test_new_id() { let d_type = DataType::Int(IntType::new()); @@ -1285,6 +1745,228 @@ mod tests { ); } + fn cast_test_schema(options: &[(&str, &str)]) -> TableSchema { + let mut builder = Schema::builder() + .column("a", DataType::Int(IntType::new())) + .column("b", DataType::BigInt(crate::spec::BigIntType::new())) + .column( + "d", + DataType::Timestamp(crate::spec::TimestampType::new(3).unwrap()), + ); + for (key, value) in options { + builder = builder.option(*key, *value); + } + TableSchema::new(0, &builder.build().unwrap()) + } + + #[test] + fn test_apply_changes_update_column_type_cast_compatibility() { + let table_schema = cast_test_schema(&[]); + + // Implicit widening. + let new_schema = table_schema + .apply_changes(vec![crate::spec::SchemaChange::update_column_type( + "a".to_string(), + DataType::BigInt(crate::spec::BigIntType::new()), + )]) + .unwrap(); + assert!(matches!( + new_schema.fields()[0].data_type(), + DataType::BigInt(_) + )); + + // Narrowing is an explicit cast, allowed by default. + let new_schema = table_schema + .apply_changes(vec![crate::spec::SchemaChange::update_column_type( + "b".to_string(), + DataType::Int(IntType::new()), + )]) + .unwrap(); + assert!(matches!( + new_schema.fields()[1].data_type(), + DataType::Int(_) + )); + + // Unsupported conversions are rejected before committing the schema. + for new_type in [ + DataType::Array(ArrayType::new(DataType::Int(IntType::new()))), + DataType::Boolean(crate::spec::BooleanType::new()), + ] { + let err = table_schema + .apply_changes(vec![crate::spec::SchemaChange::update_column_type( + "d".to_string(), + new_type, + )]) + .unwrap_err(); + assert!( + matches!(err, crate::Error::Unsupported { ref message } + if message.contains("cannot be converted") && message.contains('d')), + "expected cast rejection, got {err:?}" + ); + } + } + + #[test] + fn test_apply_changes_update_column_type_respects_disable_explicit_casting() { + let table_schema = cast_test_schema(&[("disable-explicit-type-casting", "true")]); + + let err = table_schema + .apply_changes(vec![crate::spec::SchemaChange::update_column_type( + "b".to_string(), + DataType::Int(IntType::new()), + )]) + .unwrap_err(); + assert!( + matches!(err, crate::Error::Unsupported { ref message } + if message.contains("cannot be converted")), + "narrowing should be rejected when explicit casting is disabled, got {err:?}" + ); + + // Implicit widening is still allowed. + table_schema + .apply_changes(vec![crate::spec::SchemaChange::update_column_type( + "a".to_string(), + DataType::BigInt(crate::spec::BigIntType::new()), + )]) + .unwrap(); + } + + #[test] + fn test_apply_changes_update_column_type_rejects_blob() { + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("payload", DataType::Blob(BlobType::new())) + .option("data-evolution.enabled", "true") + .build() + .unwrap(), + ); + + for (column, new_type) in [ + ( + "payload", + DataType::VarChar(crate::spec::VarCharType::new(10).unwrap()), + ), + ("id", DataType::Blob(BlobType::new())), + ] { + let err = table_schema + .apply_changes(vec![crate::spec::SchemaChange::update_column_type( + column.to_string(), + new_type, + )]) + .unwrap_err(); + assert!( + matches!(err, crate::Error::Unsupported { ref message } + if message.contains("involving BLOB") && message.contains(column)), + "expected BLOB type-change rejection for {column}, got {err:?}" + ); + } + } + + #[test] + fn test_apply_changes_nullable_to_not_null_guard() { + let table_schema = cast_test_schema(&[]); + let not_null_int = DataType::Int(IntType::new()) + .copy_with_nullable(false) + .unwrap(); + + // Both nullability change paths are rejected by default. + let changes: Vec = vec![ + crate::spec::SchemaChange::update_column_nullability("a".to_string(), false), + crate::spec::SchemaChange::update_column_type("a".to_string(), not_null_int.clone()), + ]; + for change in changes { + let err = table_schema.apply_changes(vec![change]).unwrap_err(); + assert!( + matches!(err, crate::Error::Unsupported { ref message } + if message.contains("nullable to non nullable")), + "expected null-to-not-null rejection, got {err:?}" + ); + } + + // Allowed when explicitly enabled via table option. + let table_schema = cast_test_schema(&[("alter-column-null-to-not-null.disabled", "false")]); + let new_schema = table_schema + .apply_changes(vec![crate::spec::SchemaChange::update_column_nullability( + "a".to_string(), + false, + )]) + .unwrap(); + assert!(!new_schema.fields()[0].data_type().is_nullable()); + let new_schema = table_schema + .apply_changes(vec![crate::spec::SchemaChange::update_column_type( + "a".to_string(), + not_null_int, + )]) + .unwrap(); + assert!(!new_schema.fields()[0].data_type().is_nullable()); + } + + #[test] + fn test_apply_changes_revalidates_blob_fields() { + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .build() + .unwrap(), + ); + + let err = table_schema + .apply_changes(vec![crate::spec::SchemaChange::add_column( + "payload".to_string(), + DataType::Blob(BlobType::new()), + )]) + .unwrap_err(); + assert!( + matches!(err, crate::Error::ConfigInvalid { ref message } + if message.contains("Data evolution config must enabled")), + "adding a BLOB column without data-evolution.enabled should fail, got {err:?}" + ); + + // Enabling data evolution in the same alter makes the final schema valid. + let new_schema = table_schema + .apply_changes(vec![ + crate::spec::SchemaChange::set_option( + "data-evolution.enabled".to_string(), + "true".to_string(), + ), + crate::spec::SchemaChange::add_column( + "payload".to_string(), + DataType::Blob(BlobType::new()), + ), + ]) + .unwrap(); + assert_eq!(new_schema.fields().len(), 2); + } + + #[test] + fn test_apply_changes_revalidates_partial_update_options() { + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("merge-engine", "partial-update") + .build() + .unwrap(), + ); + + let err = table_schema + .apply_changes(vec![crate::spec::SchemaChange::set_option( + "fields.value.sequence-group".to_string(), + "value".to_string(), + )]) + .unwrap_err(); + assert!( + matches!(err, crate::Error::ConfigInvalid { ref message } + if message.contains("partial-update") && message.contains("sequence-group")), + "unsupported partial-update option should be rejected on alter, got {err:?}" + ); + } + #[test] fn test_aggregation_apply_changes_rejects_unknown_field() { let table_schema = TableSchema::new( diff --git a/crates/paimon/src/spec/schema_change.rs b/crates/paimon/src/spec/schema_change.rs index 6a11c0a4..9f69400b 100644 --- a/crates/paimon/src/spec/schema_change.rs +++ b/crates/paimon/src/spec/schema_change.rs @@ -20,77 +20,71 @@ use serde::{Deserialize, Serialize}; /// Schema change to table. /// -/// Reference: -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] +/// The JSON wire format is kept compatible with Java Paimon's `SchemaChange`, +/// which is an internally-tagged polymorphic type (`@JsonTypeInfo` with an +/// `"action"` discriminator). Each variant therefore serializes as +/// `{"action": "", ...fields}` with `fieldNames` arrays (a column path; +/// only top-level single-element paths are currently applied — see +/// `TableSchema::apply_changes`). +/// +/// Reference: +// +// Note: `dropPrimaryKey` and `updateColumnDefaultValue` from Java are not yet +// modeled here; they are out of scope for the current alter-table support. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "action", rename_all = "camelCase")] pub enum SchemaChange { /// A SchemaChange to set a table option. - /// - /// Reference: SetOption { key: String, value: String }, /// A SchemaChange to remove a table option. - /// - /// Reference: RemoveOption { key: String }, /// A SchemaChange to update a table comment. - /// - /// Reference: UpdateComment { comment: Option }, /// A SchemaChange to add a new field. - /// - /// Reference: #[serde(rename_all = "camelCase")] AddColumn { - field_name: String, + field_names: Vec, data_type: DataType, - description: Option, + comment: Option, #[serde(rename = "move")] column_move: Option, }, /// A SchemaChange to rename a field. - /// - /// Reference: #[serde(rename_all = "camelCase")] RenameColumn { - field_name: String, + field_names: Vec, new_name: String, }, /// A SchemaChange to drop a field. - /// - /// Reference: #[serde(rename_all = "camelCase")] - DropColumn { field_name: String }, + DropColumn { field_names: Vec }, /// A SchemaChange to update the field's type. - /// - /// Reference: #[serde(rename_all = "camelCase")] UpdateColumnType { - field_name: String, - data_type: DataType, - }, - /// A SchemaChange to update the field's position. - /// - /// Reference: - #[serde(rename_all = "camelCase")] - UpdateColumnPosition { - #[serde(rename = "move")] - column_move: ColumnMove, + field_names: Vec, + new_data_type: DataType, + /// When true, keep the existing column's nullability instead of taking + /// it from `new_data_type`. + #[serde(default)] + keep_nullability: bool, }, /// A SchemaChange to update the field's nullability. - /// - /// Reference: #[serde(rename_all = "camelCase")] UpdateColumnNullability { - field_name: Vec, - nullable: bool, + field_names: Vec, + new_nullability: bool, }, /// A SchemaChange to update the (nested) field's comment. - /// - /// Reference: #[serde(rename_all = "camelCase")] UpdateColumnComment { field_names: Vec, - new_description: String, + new_comment: String, + }, + /// A SchemaChange to update the field's position. + #[serde(rename_all = "camelCase")] + UpdateColumnPosition { + #[serde(rename = "move")] + column_move: ColumnMove, }, } @@ -113,9 +107,9 @@ impl SchemaChange { /// impl the `add_column`. pub fn add_column(field_name: String, data_type: DataType) -> Self { SchemaChange::AddColumn { - field_name, + field_names: vec![field_name], data_type, - description: None, + comment: None, column_move: None, } } @@ -127,9 +121,9 @@ impl SchemaChange { description: String, ) -> Self { SchemaChange::AddColumn { - field_name, + field_names: vec![field_name], data_type, - description: Some(description), + comment: Some(description), column_move: None, } } @@ -142,9 +136,9 @@ impl SchemaChange { column_move: ColumnMove, ) -> Self { SchemaChange::AddColumn { - field_name, + field_names: vec![field_name], data_type, - description: Some(description), + comment: Some(description), column_move: Some(column_move), } } @@ -152,21 +146,24 @@ impl SchemaChange { /// impl the `rename_column`. pub fn rename_column(field_name: String, new_name: String) -> Self { SchemaChange::RenameColumn { - field_name, + field_names: vec![field_name], new_name, } } /// impl the `drop_column`. pub fn drop_column(field_name: String) -> Self { - SchemaChange::DropColumn { field_name } + SchemaChange::DropColumn { + field_names: vec![field_name], + } } /// impl the `update_column_type`. pub fn update_column_type(field_name: String, new_data_type: DataType) -> Self { SchemaChange::UpdateColumnType { - field_name, - data_type: new_data_type, + field_names: vec![field_name], + new_data_type, + keep_nullability: false, } } @@ -175,19 +172,19 @@ impl SchemaChange { SchemaChange::UpdateColumnPosition { column_move } } - /// impl the `update_column_position`. + /// impl the `update_column_nullability`. pub fn update_column_nullability(field_name: String, new_nullability: bool) -> Self { SchemaChange::UpdateColumnNullability { - field_name: vec![field_name], - nullable: new_nullability, + field_names: vec![field_name], + new_nullability, } } /// impl the `update_columns_nullability`. pub fn update_columns_nullability(field_names: Vec, new_nullability: bool) -> Self { SchemaChange::UpdateColumnNullability { - field_name: field_names, - nullable: new_nullability, + field_names, + new_nullability, } } @@ -195,7 +192,7 @@ impl SchemaChange { pub fn update_column_comment(field_name: String, comment: String) -> Self { SchemaChange::UpdateColumnComment { field_names: vec![field_name], - new_description: comment, + new_comment: comment, } } @@ -203,28 +200,32 @@ impl SchemaChange { pub fn update_columns_comment(field_names: Vec, comment: String) -> Self { SchemaChange::UpdateColumnComment { field_names, - new_description: comment, + new_comment: comment, } } } /// The type of move. /// -/// Reference: +/// Reference: #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub enum ColumnMoveType { FIRST, AFTER, + BEFORE, + LAST, } /// Represents a requested column move in a struct. /// -/// Reference: +/// Reference: #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct ColumnMove { pub field_name: String, - pub referenced_field_name: Option, + /// The anchor column for `AFTER`/`BEFORE` moves (`None` for `FIRST`/`LAST`). + /// Named `referenceFieldName` on the wire to match Java Paimon. + pub reference_field_name: Option, #[serde(rename = "type")] pub move_type: ColumnMoveType, } @@ -235,9 +236,9 @@ impl ColumnMove { &self.field_name } - /// Get the referenced field name. - pub fn referenced_field_name(&self) -> Option<&str> { - self.referenced_field_name.as_deref() + /// Get the reference field name. + pub fn reference_field_name(&self) -> Option<&str> { + self.reference_field_name.as_deref() } /// Get the move type. @@ -249,19 +250,37 @@ impl ColumnMove { pub fn move_first(field_name: String) -> Self { ColumnMove { field_name, - referenced_field_name: None, + reference_field_name: None, move_type: ColumnMoveType::FIRST, } } + /// Create a new `Move` with `LAST` move type. + pub fn move_last(field_name: String) -> Self { + ColumnMove { + field_name, + reference_field_name: None, + move_type: ColumnMoveType::LAST, + } + } + /// Create a new `Move` with `AFTER` move type. - pub fn move_after(field_name: String, referenced_field_name: String) -> Self { + pub fn move_after(field_name: String, reference_field_name: String) -> Self { ColumnMove { field_name, - referenced_field_name: Some(referenced_field_name), + reference_field_name: Some(reference_field_name), move_type: ColumnMoveType::AFTER, } } + + /// Create a new `Move` with `BEFORE` move type. + pub fn move_before(field_name: String, reference_field_name: String) -> Self { + ColumnMove { + field_name, + reference_field_name: Some(reference_field_name), + move_type: ColumnMoveType::BEFORE, + } + } } #[cfg(test)] @@ -271,79 +290,66 @@ mod tests { #[test] fn test_schema_change_serialize_deserialize() { + // Java-compatible wire format: internally tagged by "action", with + // `fieldNames` arrays and `referenceFieldName` move anchors. let json_data = r#" [ { - "setOption": { - "key": "snapshot.time-retained", - "value": "2h" - } + "action": "setOption", + "key": "snapshot.time-retained", + "value": "2h" }, { - "removeOption": { - "key": "compaction.max.file-num" - } + "action": "removeOption", + "key": "compaction.max.file-num" }, { - "updateComment": { - "comment": "table.comment" - } + "action": "updateComment", + "comment": "table.comment" }, { - "addColumn": { + "action": "addColumn", + "fieldNames": ["col1"], + "dataType": "INT", + "comment": "col1_description", + "move": { "fieldName": "col1", - "dataType": "INT", - "description": "col1_description", - "move": { - "fieldName": "col1_first", - "referencedFieldName": null, - "type": "FIRST" - } + "referenceFieldName": null, + "type": "FIRST" } }, { - "renameColumn": { - "fieldName": "col3", - "newName": "col3_new_name" - } + "action": "renameColumn", + "fieldNames": ["col3"], + "newName": "col3_new_name" }, { - "dropColumn": { - "fieldName": "col1" - } + "action": "dropColumn", + "fieldNames": ["col1"] }, { - "updateColumnType": { - "fieldName": "col14", - "dataType": "DOUBLE" - } + "action": "updateColumnType", + "fieldNames": ["col14"], + "newDataType": "DOUBLE", + "keepNullability": false }, { - "updateColumnPosition": { - "move": { - "fieldName": "col4_first", - "referencedFieldName": null, - "type": "FIRST" - } + "action": "updateColumnPosition", + "move": { + "fieldName": "col4", + "referenceFieldName": "col3", + "type": "AFTER" } }, { - "updateColumnNullability": { - "fieldName": [ - "col5", - "f2" - ], - "nullable": false - } + "action": "updateColumnNullability", + "fieldNames": ["col5", "f2"], + "newNullability": false }, { - "updateColumnComment": { - "fieldNames": [ - "col5", - "f1" - ], - "newDescription": "col5 f1 field" - } + "action": "updateColumnComment", + "fieldNames": ["col5", "f1"], + "newComment": "col5 f1 field" } ]"#; @@ -364,57 +370,63 @@ mod tests { comment: Some("table.comment".to_string()), }, SchemaChange::AddColumn { - field_name: "col1".to_string(), + field_names: vec!["col1".to_string()], data_type: DataType::Int(IntType::new()), - description: Some("col1_description".to_string()), - column_move: Some(ColumnMove { - field_name: "col1_first".to_string(), - referenced_field_name: None, - move_type: ColumnMoveType::FIRST, - }), + comment: Some("col1_description".to_string()), + column_move: Some(ColumnMove::move_first("col1".to_string())), }, SchemaChange::RenameColumn { - field_name: "col3".to_string(), + field_names: vec!["col3".to_string()], new_name: "col3_new_name".to_string(), }, SchemaChange::DropColumn { - field_name: "col1".to_string(), + field_names: vec!["col1".to_string()], }, SchemaChange::UpdateColumnType { - field_name: "col14".to_string(), - data_type: DataType::Double(DoubleType::new()), + field_names: vec!["col14".to_string()], + new_data_type: DataType::Double(DoubleType::new()), + keep_nullability: false, }, SchemaChange::UpdateColumnPosition { - column_move: ColumnMove { - field_name: "col4_first".to_string(), - referenced_field_name: None, - move_type: ColumnMoveType::FIRST, - }, + column_move: ColumnMove::move_after("col4".to_string(), "col3".to_string()), }, SchemaChange::UpdateColumnNullability { - field_name: vec!["col5".to_string(), "f2".to_string()], - nullable: false, + field_names: vec!["col5".to_string(), "f2".to_string()], + new_nullability: false, }, SchemaChange::UpdateColumnComment { field_names: vec!["col5".to_string(), "f1".to_string()], - new_description: "col5 f1 field".to_string(), + new_comment: "col5 f1 field".to_string(), }, ] ); } + #[test] + fn test_schema_change_serialize_shape() { + // Verify the serialized JSON carries the Java "action" discriminator. + let change = SchemaChange::add_column("c".to_string(), DataType::Int(IntType::new())); + let value = serde_json::to_value(&change).unwrap(); + assert_eq!(value["action"], "addColumn"); + assert_eq!(value["fieldNames"][0], "c"); + + // Round-trip through JSON. + let round: SchemaChange = serde_json::from_value(value).unwrap(); + assert_eq!(round, change); + } + #[test] fn test_column_move_serialize_deserialize() { let json_data = r#" [ { "fieldName": "col1", - "referencedFieldName": null, + "referenceFieldName": null, "type": "FIRST" }, { "fieldName": "col2_after", - "referencedFieldName": "col2", + "referenceFieldName": "col2", "type": "AFTER" } ]"#; diff --git a/crates/paimon/tests/mock_server.rs b/crates/paimon/tests/mock_server.rs index 69abf233..6a9f38d0 100644 --- a/crates/paimon/tests/mock_server.rs +++ b/crates/paimon/tests/mock_server.rs @@ -34,8 +34,9 @@ use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; use paimon::api::{ - AlterDatabaseRequest, AuditRESTResponse, ConfigResponse, ErrorResponse, GetDatabaseResponse, - GetTableResponse, ListDatabasesResponse, ListTablesResponse, RenameTableRequest, ResourcePaths, + AlterDatabaseRequest, AlterTableRequest, AuditRESTResponse, ConfigResponse, ErrorResponse, + GetDatabaseResponse, GetTableResponse, ListDatabasesResponse, ListTablesResponse, + RenameTableRequest, ResourcePaths, }; #[derive(Clone, Debug, Default)] @@ -451,6 +452,40 @@ impl RESTServer { } } + /// Handle POST /databases/:db/tables/:table - alter a table. + /// + /// The mock does not mutate the stored schema; it only validates that the + /// table exists, which is enough to exercise the client's alter-table path + /// (request serialization + 2xx handling). + pub async fn alter_table( + Path((db, table)): Path<(String, String)>, + Extension(state): Extension>, + Json(_request): Json, + ) -> impl IntoResponse { + let s = state.inner.lock().unwrap(); + let key = format!("{db}.{table}"); + if s.no_permission_tables.contains(&key) { + let err = ErrorResponse::new( + Some("table".to_string()), + Some(table), + Some("No Permission".to_string()), + Some(403), + ); + return (StatusCode::FORBIDDEN, Json(err)).into_response(); + } + if s.tables.contains_key(&key) { + (StatusCode::OK, Json(serde_json::json!(""))).into_response() + } else { + let err = ErrorResponse::new( + Some("table".to_string()), + Some(table), + Some("Not Found".to_string()), + Some(404), + ); + (StatusCode::NOT_FOUND, Json(err)).into_response() + } + } + /// Handle POST /rename-table - rename a table. pub async fn rename_table( Extension(state): Extension>, @@ -722,7 +757,9 @@ pub async fn start_mock_server( ) .route( &format!("{prefix}/databases/:db/tables/:table"), - get(RESTServer::get_table).delete(RESTServer::drop_table), + get(RESTServer::get_table) + .post(RESTServer::alter_table) + .delete(RESTServer::drop_table), ) .route( &format!("{prefix}/tables/rename"), diff --git a/crates/paimon/tests/rest_catalog_test.rs b/crates/paimon/tests/rest_catalog_test.rs index 2166b44d..70a86c5c 100644 --- a/crates/paimon/tests/rest_catalog_test.rs +++ b/crates/paimon/tests/rest_catalog_test.rs @@ -25,7 +25,7 @@ use std::collections::HashMap; use paimon::api::ConfigResponse; use paimon::catalog::{Catalog, Identifier, RESTCatalog}; use paimon::common::Options; -use paimon::spec::{BigIntType, DataType, Schema, VarCharType}; +use paimon::spec::{BigIntType, DataType, Schema, SchemaChange, VarCharType}; mod mock_server; use mock_server::{start_mock_server, RESTServer}; @@ -465,17 +465,35 @@ async fn test_catalog_rename_table_ignore_if_not_exists() { // ==================== Alter Table Tests ==================== #[tokio::test] -async fn test_catalog_alter_table_unsupported() { +async fn test_catalog_alter_table() { let ctx = setup_catalog(vec!["default"]).await; let identifier = Identifier::new("default", "some_table"); - - // alter_table should return Unsupported error - let result = ctx.catalog.alter_table(&identifier, vec![], false).await; - assert!( - result.is_err(), - "alter_table should return Unsupported error" - ); + ctx.catalog + .create_table(&identifier, test_schema(), false) + .await + .unwrap(); + + // alter_table on an existing table succeeds (client builds the request and + // POSTs it; the mock accepts it). + let changes = vec![SchemaChange::update_column_comment( + "id".to_string(), + "the id".to_string(), + )]; + let result = ctx.catalog.alter_table(&identifier, changes, false).await; + assert!(result.is_ok(), "alter_table should succeed: {result:?}"); + + // alter_table on a missing table: error, unless ignore_if_not_exists. + let missing = Identifier::new("default", "ghost"); + assert!(ctx + .catalog + .alter_table(&missing, vec![], false) + .await + .is_err()); + ctx.catalog + .alter_table(&missing, vec![], true) + .await + .unwrap(); } // ==================== Multiple Databases Tests ====================