diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 8859e4bc237..f44b3fbf208 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -65,10 +65,10 @@ use lance_namespace::models::{ ListTableIndicesResponse, ListTableTagsRequest, ListTableTagsResponse, ListTableVersionsRequest, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, MergeInsertIntoTableRequest, MergeInsertIntoTableResponse, NamespaceExistsRequest, - QueryTableRequest, QueryTableRequestColumns, QueryTableRequestVector, RestoreTableRequest, - RestoreTableResponse, TableExistsRequest, TableVersion, TagContents as ModelTagContents, - UpdateTableSchemaMetadataRequest, UpdateTableSchemaMetadataResponse, UpdateTableTagRequest, - UpdateTableTagResponse, + QueryTableRequest, QueryTableRequestColumns, QueryTableRequestVector, RenameTableRequest, + RenameTableResponse, RestoreTableRequest, RestoreTableResponse, TableExistsRequest, + TableVersion, TagContents as ModelTagContents, UpdateTableSchemaMetadataRequest, + UpdateTableSchemaMetadataResponse, UpdateTableTagRequest, UpdateTableTagResponse, }; use lance_core::{Error, Result}; @@ -2921,6 +2921,70 @@ impl LanceNamespace for DirectoryNamespace { }) } + async fn rename_table(&self, request: RenameTableRequest) -> Result { + self.record_op("rename_table"); + if let Some(ref manifest_ns) = self.manifest_ns { + return manifest_ns.rename_table(request).await; + } + + if request.new_table_name.trim().is_empty() { + return Err(NamespaceError::InvalidInput { + message: "new_table_name cannot be empty".to_string(), + } + .into()); + } + + // Without manifest mode, only the root namespace exists, so a table cannot + // be moved into a different namespace. + if request + .new_namespace_id + .as_ref() + .is_some_and(|namespace| !namespace.is_empty()) + { + return Err(NamespaceError::Unsupported { + message: "Cross-namespace rename is only supported when manifest mode is enabled" + .to_string(), + } + .into()); + } + + let source_name = Self::table_name_from_id(&request.id)?; + let new_table_name = request.new_table_name.clone(); + + let source_status = self.check_table_status(&source_name).await; + if !source_status.exists || source_status.is_deregistered { + return Err(NamespaceError::TableNotFound { + message: Self::format_table_id_from_request(&request.id), + } + .into()); + } + + let destination_status = self.check_table_status(&new_table_name).await; + if destination_status.exists && !destination_status.is_deregistered { + return Err(NamespaceError::TableAlreadyExists { + message: new_table_name, + } + .into()); + } + + let source_path = self.table_path(&source_name); + let destination_path = self.table_path(&new_table_name); + manifest::copy_dir_all(&self.object_store, &source_path, &destination_path).await?; + self.object_store + .remove_dir_all(source_path) + .await + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!( + "Failed to remove source table {} after rename: {:?}", + source_name, e + ), + }) + })?; + + Ok(RenameTableResponse::new()) + } + async fn list_table_versions( &self, request: ListTableVersionsRequest, @@ -7143,6 +7207,278 @@ mod tests { let _ = result; } + async fn rows_at_location(namespace: &DirectoryNamespace, id: Vec) -> usize { + let mut describe = DescribeTableRequest::new(); + describe.id = Some(id); + let location = namespace + .describe_table(describe) + .await + .unwrap() + .location + .unwrap(); + Dataset::open(&location) + .await + .unwrap() + .count_rows(None) + .await + .unwrap() + } + + #[tokio::test] + async fn test_rename_table_v1_mode_moves_data() { + use lance_namespace::models::{RenameTableRequest, TableExistsRequest}; + + let temp_dir = TempStdDir::default(); + let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) + .manifest_enabled(false) + .build() + .await + .unwrap(); + + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["original".to_string()]); + namespace + .create_table( + create_request, + Bytes::from(create_non_empty_test_ipc_data()), + ) + .await + .unwrap(); + + let mut rename_request = RenameTableRequest::new("renamed".to_string()); + rename_request.id = Some(vec!["original".to_string()]); + namespace.rename_table(rename_request).await.unwrap(); + + let mut old_exists = TableExistsRequest::new(); + old_exists.id = Some(vec!["original".to_string()]); + assert!(namespace.table_exists(old_exists).await.is_err()); + + let mut new_exists = TableExistsRequest::new(); + new_exists.id = Some(vec!["renamed".to_string()]); + namespace.table_exists(new_exists).await.unwrap(); + + assert_eq!( + rows_at_location(&namespace, vec!["renamed".to_string()]).await, + 2 + ); + } + + #[tokio::test] + async fn test_rename_table_v1_mode_rejects_existing_destination() { + use lance_namespace::models::RenameTableRequest; + + let temp_dir = TempStdDir::default(); + let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) + .manifest_enabled(false) + .build() + .await + .unwrap(); + + for name in ["source", "destination"] { + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec![name.to_string()]); + namespace + .create_table( + create_request, + Bytes::from(create_non_empty_test_ipc_data()), + ) + .await + .unwrap(); + } + + let mut rename_request = RenameTableRequest::new("destination".to_string()); + rename_request.id = Some(vec!["source".to_string()]); + let err = namespace.rename_table(rename_request).await.unwrap_err(); + assert!( + err.to_string().contains("already exists"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_rename_table_v1_mode_source_not_found() { + use lance_namespace::models::RenameTableRequest; + + let temp_dir = TempStdDir::default(); + let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) + .manifest_enabled(false) + .build() + .await + .unwrap(); + + let mut rename_request = RenameTableRequest::new("renamed".to_string()); + rename_request.id = Some(vec!["missing".to_string()]); + let err = namespace.rename_table(rename_request).await.unwrap_err(); + assert!( + err.to_string().contains("not found"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_rename_table_rejects_empty_new_name() { + use lance_namespace::models::RenameTableRequest; + + let temp_dir = TempStdDir::default(); + let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) + .manifest_enabled(false) + .build() + .await + .unwrap(); + + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["source".to_string()]); + namespace + .create_table( + create_request, + Bytes::from(create_non_empty_test_ipc_data()), + ) + .await + .unwrap(); + + let mut rename_request = RenameTableRequest::new(" ".to_string()); + rename_request.id = Some(vec!["source".to_string()]); + let err = namespace.rename_table(rename_request).await.unwrap_err(); + assert!( + err.to_string().contains("Invalid input"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_rename_table_cross_namespace_rejected_without_manifest() { + use lance_namespace::models::RenameTableRequest; + + let temp_dir = TempStdDir::default(); + let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap()) + .manifest_enabled(false) + .build() + .await + .unwrap(); + + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["source".to_string()]); + namespace + .create_table( + create_request, + Bytes::from(create_non_empty_test_ipc_data()), + ) + .await + .unwrap(); + + let mut rename_request = RenameTableRequest::new("renamed".to_string()); + rename_request.id = Some(vec!["source".to_string()]); + rename_request.new_namespace_id = Some(vec!["child".to_string()]); + let err = namespace.rename_table(rename_request).await.unwrap_err(); + assert!( + err.to_string().contains("Cross-namespace rename"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_rename_table_with_manifest_moves_data() { + use lance_namespace::models::{RenameTableRequest, TableExistsRequest}; + + let (namespace, _temp_dir) = create_test_namespace().await; + + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["original".to_string()]); + namespace + .create_table( + create_request, + Bytes::from(create_non_empty_test_ipc_data()), + ) + .await + .unwrap(); + + let mut rename_request = RenameTableRequest::new("renamed".to_string()); + rename_request.id = Some(vec!["original".to_string()]); + namespace.rename_table(rename_request).await.unwrap(); + + let mut old_exists = TableExistsRequest::new(); + old_exists.id = Some(vec!["original".to_string()]); + assert!(namespace.table_exists(old_exists).await.is_err()); + + let mut new_exists = TableExistsRequest::new(); + new_exists.id = Some(vec!["renamed".to_string()]); + namespace.table_exists(new_exists).await.unwrap(); + + assert_eq!( + rows_at_location(&namespace, vec!["renamed".to_string()]).await, + 2 + ); + } + + #[tokio::test] + async fn test_rename_table_with_manifest_rejects_existing_destination() { + use lance_namespace::models::RenameTableRequest; + + let (namespace, _temp_dir) = create_test_namespace().await; + + for name in ["source", "destination"] { + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec![name.to_string()]); + namespace + .create_table( + create_request, + Bytes::from(create_non_empty_test_ipc_data()), + ) + .await + .unwrap(); + } + + let mut rename_request = RenameTableRequest::new("destination".to_string()); + rename_request.id = Some(vec!["source".to_string()]); + let err = namespace.rename_table(rename_request).await.unwrap_err(); + assert!( + err.to_string().contains("already exists"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_rename_table_cross_namespace_with_manifest() { + use lance_namespace::models::{RenameTableRequest, TableExistsRequest}; + + let (namespace, _temp_dir) = create_test_namespace().await; + + let mut create_namespace_request = CreateNamespaceRequest::new(); + create_namespace_request.id = Some(vec!["child".to_string()]); + namespace + .create_namespace(create_namespace_request) + .await + .unwrap(); + + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["source".to_string()]); + namespace + .create_table( + create_request, + Bytes::from(create_non_empty_test_ipc_data()), + ) + .await + .unwrap(); + + let mut rename_request = RenameTableRequest::new("moved".to_string()); + rename_request.id = Some(vec!["source".to_string()]); + rename_request.new_namespace_id = Some(vec!["child".to_string()]); + namespace.rename_table(rename_request).await.unwrap(); + + let mut old_exists = TableExistsRequest::new(); + old_exists.id = Some(vec!["source".to_string()]); + assert!(namespace.table_exists(old_exists).await.is_err()); + + let mut new_exists = TableExistsRequest::new(); + new_exists.id = Some(vec!["child".to_string(), "moved".to_string()]); + namespace.table_exists(new_exists).await.unwrap(); + + assert_eq!( + rows_at_location(&namespace, vec!["child".to_string(), "moved".to_string()]).await, + 2 + ); + } + #[tokio::test] async fn test_root_namespace_operations() { let (namespace, _temp_dir) = create_test_namespace().await; diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 0e22f1e8b69..67dfaea59b1 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -37,8 +37,8 @@ use lance_namespace::models::{ DescribeTableResponse, DescribeTableVersionResponse, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, DropTableResponse, ListNamespacesRequest, ListNamespacesResponse, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse, - NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, TableExistsRequest, - TableVersion, + NamespaceExistsRequest, RegisterTableRequest, RegisterTableResponse, RenameTableRequest, + RenameTableResponse, TableExistsRequest, TableVersion, }; use lance_namespace::schema::arrow_schema_to_json; use object_store::{Error as ObjectStoreError, path::Path}; @@ -389,6 +389,38 @@ fn convert_lance_commit_error(e: &LanceError, operation: &str, object_id: Option } } +/// Recursively copy every object under `source_dir` to the corresponding path under +/// `destination_dir`, preserving the relative layout of each file. +pub(crate) async fn copy_dir_all( + object_store: &ObjectStore, + source_dir: &Path, + destination_dir: &Path, +) -> Result<()> { + let mut entries = object_store.list(Some(source_dir.clone())); + while let Some(meta) = entries.try_next().await? { + let source_file = meta.location; + let Some(relative_parts) = source_file.prefix_match(source_dir) else { + continue; + }; + let mut destination_file = destination_dir.clone(); + for part in relative_parts { + destination_file = destination_file.join(part); + } + object_store + .copy(&source_file, &destination_file) + .await + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!( + "Failed to copy {} to {}: {:?}", + source_file, destination_file, e + ), + }) + })?; + } + Ok(()) +} + impl ManifestNamespace { /// Create a new ManifestNamespace from an existing DirectoryNamespace #[allow(clippy::too_many_arguments)] @@ -2400,6 +2432,115 @@ impl LanceNamespace for ManifestNamespace { } } + async fn rename_table(&self, request: RenameTableRequest) -> Result { + let table_id = request.id.as_ref().ok_or_else(|| { + lance_core::Error::from(NamespaceError::InvalidInput { + message: "Table ID is required".to_string(), + }) + })?; + + if table_id.is_empty() { + return Err(NamespaceError::InvalidInput { + message: "Table ID cannot be empty".to_string(), + } + .into()); + } + + if request.new_table_name.trim().is_empty() { + return Err(NamespaceError::InvalidInput { + message: "new_table_name cannot be empty".to_string(), + } + .into()); + } + + let (source_namespace, _) = Self::split_object_id(table_id); + let source_object_id = Self::str_object_id(table_id); + + let source_info = self + .query_manifest_for_table(&source_object_id) + .boxed() + .await? + .ok_or_else(|| { + lance_core::Error::from(NamespaceError::TableNotFound { + message: Self::format_table_id(table_id), + }) + })?; + + // Default to the source namespace when no destination namespace is provided. + let destination_namespace = match request.new_namespace_id.as_ref() { + Some(namespace) => namespace.clone(), + None => source_namespace, + }; + if !destination_namespace.is_empty() { + self.validate_namespace_levels_exist(&destination_namespace) + .boxed() + .await?; + } + + let destination_object_id = + Self::build_object_id(&destination_namespace, &request.new_table_name); + if self + .manifest_contains_object(&destination_object_id) + .boxed() + .await? + { + return Err(NamespaceError::TableAlreadyExists { + message: request.new_table_name.clone(), + } + .into()); + } + + // Choose the destination directory the same way create_table does so the + // physical layout stays consistent with how tables are otherwise created. + let destination_dir_name = if destination_namespace.is_empty() && self.dir_listing_enabled { + format!("{}.lance", request.new_table_name) + } else { + Self::generate_dir_name(&destination_object_id) + }; + + let source_dir = self.base_path.clone().join(source_info.location.as_str()); + let destination_dir = self.base_path.clone().join(destination_dir_name.as_str()); + copy_dir_all(&self.object_store, &source_dir, &destination_dir) + .boxed() + .await?; + + // Register the renamed table before dropping the old entry so the table stays + // reachable under at least one name if a later step fails. + let metadata = Self::serialize_metadata( + source_info.metadata.as_ref(), + "table", + &destination_object_id, + )?; + self.insert_into_manifest_with_metadata( + vec![ManifestEntry { + object_id: destination_object_id, + object_type: ObjectType::Table, + location: Some(destination_dir_name), + metadata, + }], + None, + ) + .boxed() + .await?; + self.delete_from_manifest(&source_object_id).boxed().await?; + + // Remove the original directory now that the rename has been committed. + self.object_store + .remove_dir_all(source_dir) + .boxed() + .await + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!( + "Failed to remove source table directory after rename: {:?}", + e + ), + }) + })?; + + Ok(RenameTableResponse::new()) + } + async fn list_namespaces( &self, request: ListNamespacesRequest,