Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 340 additions & 4 deletions rust/lance-namespace-impls/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ use lance_namespace::models::{
ListTableIndicesResponse, ListTableTagsRequest, ListTableTagsResponse,
ListTableVersionsRequest, ListTableVersionsResponse, ListTablesRequest, ListTablesResponse,
MergeInsertIntoTableRequest, MergeInsertIntoTableResponse, NamespaceExistsRequest,
QueryTableRequest, QueryTableRequestColumns, QueryTableRequestVector, RestoreTableRequest,
RestoreTableResponse, TableExistsRequest, TableVersion, TagContents as ModelTagContents,
UpdateTableSchemaMetadataRequest, UpdateTableSchemaMetadataResponse, UpdateTableTagRequest,
UpdateTableTagResponse,
QueryTableRequest, QueryTableRequestColumns, QueryTableRequestVector, RenameTableRequest,
RenameTableResponse, RestoreTableRequest, RestoreTableResponse, TableExistsRequest,
TableVersion, TagContents as ModelTagContents, UpdateTableSchemaMetadataRequest,
UpdateTableSchemaMetadataResponse, UpdateTableTagRequest, UpdateTableTagResponse,
};

use lance_core::{Error, Result};
Expand Down Expand Up @@ -2921,6 +2921,70 @@ impl LanceNamespace for DirectoryNamespace {
})
}

async fn rename_table(&self, request: RenameTableRequest) -> Result<RenameTableResponse> {
self.record_op("rename_table");
if let Some(ref manifest_ns) = self.manifest_ns {
return manifest_ns.rename_table(request).await;
}

if request.new_table_name.trim().is_empty() {
return Err(NamespaceError::InvalidInput {
message: "new_table_name cannot be empty".to_string(),
}
.into());
}

// Without manifest mode, only the root namespace exists, so a table cannot
// be moved into a different namespace.
if request
.new_namespace_id
.as_ref()
.is_some_and(|namespace| !namespace.is_empty())
{
return Err(NamespaceError::Unsupported {
message: "Cross-namespace rename is only supported when manifest mode is enabled"
.to_string(),
}
.into());
}

let source_name = Self::table_name_from_id(&request.id)?;
let new_table_name = request.new_table_name.clone();

let source_status = self.check_table_status(&source_name).await;
if !source_status.exists || source_status.is_deregistered {
return Err(NamespaceError::TableNotFound {
message: Self::format_table_id_from_request(&request.id),
}
.into());
}

let destination_status = self.check_table_status(&new_table_name).await;
if destination_status.exists && !destination_status.is_deregistered {
return Err(NamespaceError::TableAlreadyExists {
message: new_table_name,
}
.into());
}

let source_path = self.table_path(&source_name);
let destination_path = self.table_path(&new_table_name);
manifest::copy_dir_all(&self.object_store, &source_path, &destination_path).await?;
self.object_store
.remove_dir_all(source_path)
.await
.map_err(|e| {
lance_core::Error::from(NamespaceError::Internal {
message: format!(
"Failed to remove source table {} after rename: {:?}",
source_name, e
),
})
})?;

Ok(RenameTableResponse::new())
}

async fn list_table_versions(
&self,
request: ListTableVersionsRequest,
Expand Down Expand Up @@ -7143,6 +7207,278 @@ mod tests {
let _ = result;
}

async fn rows_at_location(namespace: &DirectoryNamespace, id: Vec<String>) -> usize {
let mut describe = DescribeTableRequest::new();
describe.id = Some(id);
let location = namespace
.describe_table(describe)
.await
.unwrap()
.location
.unwrap();
Dataset::open(&location)
.await
.unwrap()
.count_rows(None)
.await
.unwrap()
}

#[tokio::test]
async fn test_rename_table_v1_mode_moves_data() {
use lance_namespace::models::{RenameTableRequest, TableExistsRequest};

let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.manifest_enabled(false)
.build()
.await
.unwrap();

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["original".to_string()]);
namespace
.create_table(
create_request,
Bytes::from(create_non_empty_test_ipc_data()),
)
.await
.unwrap();

let mut rename_request = RenameTableRequest::new("renamed".to_string());
rename_request.id = Some(vec!["original".to_string()]);
namespace.rename_table(rename_request).await.unwrap();

let mut old_exists = TableExistsRequest::new();
old_exists.id = Some(vec!["original".to_string()]);
assert!(namespace.table_exists(old_exists).await.is_err());

let mut new_exists = TableExistsRequest::new();
new_exists.id = Some(vec!["renamed".to_string()]);
namespace.table_exists(new_exists).await.unwrap();

assert_eq!(
rows_at_location(&namespace, vec!["renamed".to_string()]).await,
2
);
}

#[tokio::test]
async fn test_rename_table_v1_mode_rejects_existing_destination() {
use lance_namespace::models::RenameTableRequest;

let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.manifest_enabled(false)
.build()
.await
.unwrap();

for name in ["source", "destination"] {
let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec![name.to_string()]);
namespace
.create_table(
create_request,
Bytes::from(create_non_empty_test_ipc_data()),
)
.await
.unwrap();
}

let mut rename_request = RenameTableRequest::new("destination".to_string());
rename_request.id = Some(vec!["source".to_string()]);
let err = namespace.rename_table(rename_request).await.unwrap_err();
assert!(
err.to_string().contains("already exists"),
"unexpected error: {err}"
);
}

#[tokio::test]
async fn test_rename_table_v1_mode_source_not_found() {
use lance_namespace::models::RenameTableRequest;

let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.manifest_enabled(false)
.build()
.await
.unwrap();

let mut rename_request = RenameTableRequest::new("renamed".to_string());
rename_request.id = Some(vec!["missing".to_string()]);
let err = namespace.rename_table(rename_request).await.unwrap_err();
assert!(
err.to_string().contains("not found"),
"unexpected error: {err}"
);
}

#[tokio::test]
async fn test_rename_table_rejects_empty_new_name() {
use lance_namespace::models::RenameTableRequest;

let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.manifest_enabled(false)
.build()
.await
.unwrap();

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["source".to_string()]);
namespace
.create_table(
create_request,
Bytes::from(create_non_empty_test_ipc_data()),
)
.await
.unwrap();

let mut rename_request = RenameTableRequest::new(" ".to_string());
rename_request.id = Some(vec!["source".to_string()]);
let err = namespace.rename_table(rename_request).await.unwrap_err();
assert!(
err.to_string().contains("Invalid input"),
"unexpected error: {err}"
);
}

#[tokio::test]
async fn test_rename_table_cross_namespace_rejected_without_manifest() {
use lance_namespace::models::RenameTableRequest;

let temp_dir = TempStdDir::default();
let namespace = DirectoryNamespaceBuilder::new(temp_dir.to_str().unwrap())
.manifest_enabled(false)
.build()
.await
.unwrap();

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["source".to_string()]);
namespace
.create_table(
create_request,
Bytes::from(create_non_empty_test_ipc_data()),
)
.await
.unwrap();

let mut rename_request = RenameTableRequest::new("renamed".to_string());
rename_request.id = Some(vec!["source".to_string()]);
rename_request.new_namespace_id = Some(vec!["child".to_string()]);
let err = namespace.rename_table(rename_request).await.unwrap_err();
assert!(
err.to_string().contains("Cross-namespace rename"),
"unexpected error: {err}"
);
}

#[tokio::test]
async fn test_rename_table_with_manifest_moves_data() {
use lance_namespace::models::{RenameTableRequest, TableExistsRequest};

let (namespace, _temp_dir) = create_test_namespace().await;

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["original".to_string()]);
namespace
.create_table(
create_request,
Bytes::from(create_non_empty_test_ipc_data()),
)
.await
.unwrap();

let mut rename_request = RenameTableRequest::new("renamed".to_string());
rename_request.id = Some(vec!["original".to_string()]);
namespace.rename_table(rename_request).await.unwrap();

let mut old_exists = TableExistsRequest::new();
old_exists.id = Some(vec!["original".to_string()]);
assert!(namespace.table_exists(old_exists).await.is_err());

let mut new_exists = TableExistsRequest::new();
new_exists.id = Some(vec!["renamed".to_string()]);
namespace.table_exists(new_exists).await.unwrap();

assert_eq!(
rows_at_location(&namespace, vec!["renamed".to_string()]).await,
2
);
}

#[tokio::test]
async fn test_rename_table_with_manifest_rejects_existing_destination() {
use lance_namespace::models::RenameTableRequest;

let (namespace, _temp_dir) = create_test_namespace().await;

for name in ["source", "destination"] {
let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec![name.to_string()]);
namespace
.create_table(
create_request,
Bytes::from(create_non_empty_test_ipc_data()),
)
.await
.unwrap();
}

let mut rename_request = RenameTableRequest::new("destination".to_string());
rename_request.id = Some(vec!["source".to_string()]);
let err = namespace.rename_table(rename_request).await.unwrap_err();
assert!(
err.to_string().contains("already exists"),
"unexpected error: {err}"
);
}

#[tokio::test]
async fn test_rename_table_cross_namespace_with_manifest() {
use lance_namespace::models::{RenameTableRequest, TableExistsRequest};

let (namespace, _temp_dir) = create_test_namespace().await;

let mut create_namespace_request = CreateNamespaceRequest::new();
create_namespace_request.id = Some(vec!["child".to_string()]);
namespace
.create_namespace(create_namespace_request)
.await
.unwrap();

let mut create_request = CreateTableRequest::new();
create_request.id = Some(vec!["source".to_string()]);
namespace
.create_table(
create_request,
Bytes::from(create_non_empty_test_ipc_data()),
)
.await
.unwrap();

let mut rename_request = RenameTableRequest::new("moved".to_string());
rename_request.id = Some(vec!["source".to_string()]);
rename_request.new_namespace_id = Some(vec!["child".to_string()]);
namespace.rename_table(rename_request).await.unwrap();

let mut old_exists = TableExistsRequest::new();
old_exists.id = Some(vec!["source".to_string()]);
assert!(namespace.table_exists(old_exists).await.is_err());

let mut new_exists = TableExistsRequest::new();
new_exists.id = Some(vec!["child".to_string(), "moved".to_string()]);
namespace.table_exists(new_exists).await.unwrap();

assert_eq!(
rows_at_location(&namespace, vec!["child".to_string(), "moved".to_string()]).await,
2
);
}

#[tokio::test]
async fn test_root_namespace_operations() {
let (namespace, _temp_dir) = create_test_namespace().await;
Expand Down
Loading
Loading