From 9218f24545ae8e668953b3f20ae1370f5fcdfb9b Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Wed, 10 Jun 2026 14:44:01 -0700 Subject: [PATCH 1/3] fix: branch support on object stores, python branch ops, s3/ddb integration tests --- python/pyproject.toml | 2 +- python/python/lance/namespace.py | 48 ++++++++ python/python/tests/test_namespace_dir.py | 106 ++++++++++++++++ .../tests/test_namespace_integration.py | 96 +++++++++++++++ python/python/tests/test_s3_ddb.py | 52 ++++++++ python/src/namespace.rs | 102 ++++++++++++++- rust/lance/src/dataset.rs | 5 +- rust/lance/src/dataset/branch_location.rs | 59 +++++++-- .../lance/src/io/commit/namespace_manifest.rs | 116 +++++++++++++++++- 9 files changed, 574 insertions(+), 12 deletions(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index a1e69855a0f..ff84e2fb6cd 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "pylance" dynamic = ["version"] -dependencies = ["pyarrow>=14", "numpy>=1.22", "lance-namespace>=0.8.0,<0.9"] +dependencies = ["pyarrow>=14", "numpy>=1.22", "lance-namespace>=0.8.2,<0.9"] description = "python wrapper for Lance columnar format" authors = [{ name = "Lance Devs", email = "dev@lance.org" }] license = { file = "LICENSE" } diff --git a/python/python/lance/namespace.py b/python/python/lance/namespace.py index f448e5c3368..fec3a1cfb1e 100644 --- a/python/python/lance/namespace.py +++ b/python/python/lance/namespace.py @@ -32,6 +32,8 @@ CreateMaterializedViewResponse, CreateNamespaceRequest, CreateNamespaceResponse, + CreateTableBranchRequest, + CreateTableBranchResponse, CreateTableIndexRequest, CreateTableIndexResponse, CreateTableRequest, @@ -42,6 +44,8 @@ DeclareTableResponse, DeleteFromTableRequest, DeleteFromTableResponse, + DeleteTableBranchRequest, + DeleteTableBranchResponse, DeleteTableTagRequest, DeleteTableTagResponse, DeregisterTableRequest, @@ -70,6 +74,8 @@ LanceNamespace, ListNamespacesRequest, ListNamespacesResponse, + ListTableBranchesRequest, + ListTableBranchesResponse, ListTableIndicesRequest, ListTableIndicesResponse, ListTablesRequest, @@ -850,6 +856,27 @@ def update_table_tag( response_dict = self._inner.update_table_tag(request.model_dump()) return UpdateTableTagResponse.from_dict(response_dict) + def create_table_branch( + self, request: CreateTableBranchRequest + ) -> CreateTableBranchResponse: + """Create a new branch forked from a table version.""" + response_dict = self._inner.create_table_branch(request.model_dump()) + return CreateTableBranchResponse.from_dict(response_dict) + + def list_table_branches( + self, request: ListTableBranchesRequest + ) -> ListTableBranchesResponse: + """List all branches of a table.""" + response_dict = self._inner.list_table_branches(request.model_dump()) + return ListTableBranchesResponse.from_dict(response_dict) + + def delete_table_branch( + self, request: DeleteTableBranchRequest + ) -> DeleteTableBranchResponse: + """Delete a branch from a table.""" + response_dict = self._inner.delete_table_branch(request.model_dump()) + return DeleteTableBranchResponse.from_dict(response_dict) + # Operation metrics methods def retrieve_ops_metrics(self) -> Dict[str, int]: @@ -1420,6 +1447,27 @@ def update_table_tag( response_dict = self._inner.update_table_tag(request.model_dump()) return UpdateTableTagResponse.from_dict(response_dict) + def create_table_branch( + self, request: CreateTableBranchRequest + ) -> CreateTableBranchResponse: + """Create a new branch forked from a table version.""" + response_dict = self._inner.create_table_branch(request.model_dump()) + return CreateTableBranchResponse.from_dict(response_dict) + + def list_table_branches( + self, request: ListTableBranchesRequest + ) -> ListTableBranchesResponse: + """List all branches of a table.""" + response_dict = self._inner.list_table_branches(request.model_dump()) + return ListTableBranchesResponse.from_dict(response_dict) + + def delete_table_branch( + self, request: DeleteTableBranchRequest + ) -> DeleteTableBranchResponse: + """Delete a branch from a table.""" + response_dict = self._inner.delete_table_branch(request.model_dump()) + return DeleteTableBranchResponse.from_dict(response_dict) + # Operation metrics methods def retrieve_ops_metrics(self) -> Dict[str, int]: diff --git a/python/python/tests/test_namespace_dir.py b/python/python/tests/test_namespace_dir.py index 1991b82946e..f76eda42070 100644 --- a/python/python/tests/test_namespace_dir.py +++ b/python/python/tests/test_namespace_dir.py @@ -29,6 +29,8 @@ CountTableRowsRequest, CreateNamespaceRequest, CreateNamespaceResponse, + CreateTableBranchRequest, + CreateTableBranchResponse, CreateTableIndexRequest, CreateTableIndexResponse, CreateTableRequest, @@ -37,6 +39,8 @@ CreateTableVersionResponse, DeclareTableRequest, DeclareTableResponse, + DeleteTableBranchRequest, + DeleteTableBranchResponse, DeregisterTableRequest, DeregisterTableResponse, DescribeNamespaceRequest, @@ -54,6 +58,8 @@ InsertIntoTableResponse, ListNamespacesRequest, ListNamespacesResponse, + ListTableBranchesRequest, + ListTableBranchesResponse, ListTableIndicesRequest, ListTableIndicesResponse, ListTablesRequest, @@ -151,6 +157,21 @@ def create_table_version( ) -> CreateTableVersionResponse: return self._inner.create_table_version(request) + def create_table_branch( + self, request: CreateTableBranchRequest + ) -> CreateTableBranchResponse: + return self._inner.create_table_branch(request) + + def list_table_branches( + self, request: ListTableBranchesRequest + ) -> ListTableBranchesResponse: + return self._inner.list_table_branches(request) + + def delete_table_branch( + self, request: DeleteTableBranchRequest + ) -> DeleteTableBranchResponse: + return self._inner.delete_table_branch(request) + def create_table_index( self, request: CreateTableIndexRequest ) -> CreateTableIndexResponse: @@ -564,6 +585,91 @@ def test_register_table_rejects_path_traversal(self, temp_ns_client): assert "Path traversal is not allowed" in str(exc_info.value) +class TestTableBranchOperations: + """Branch CRUD through the python bindings - mirrors the Rust branch + CRUD tests.""" + + def test_branch_crud_round_trip(self, temp_ns_client): + create_ns_req = CreateNamespaceRequest(id=["workspace"]) + temp_ns_client.create_namespace(create_ns_req) + ipc_data = table_to_ipc_bytes(create_test_data()) + table_id = ["workspace", "branched_table"] + temp_ns_client.create_table(CreateTableRequest(id=table_id), ipc_data) + + temp_ns_client.create_table_branch( + CreateTableBranchRequest(id=table_id, name="dev") + ) + listed = temp_ns_client.list_table_branches( + ListTableBranchesRequest(id=table_id) + ) + assert "dev" in listed.branches + assert listed.branches["dev"].parent_version == 1 + + temp_ns_client.delete_table_branch( + DeleteTableBranchRequest(id=table_id, name="dev") + ) + listed = temp_ns_client.list_table_branches( + ListTableBranchesRequest(id=table_id) + ) + assert "dev" not in listed.branches + + def test_create_branch_from_other_branch(self, temp_ns_client): + """Forking from a non-main source branch records the right parent.""" + create_ns_req = CreateNamespaceRequest(id=["workspace"]) + temp_ns_client.create_namespace(create_ns_req) + ipc_data = table_to_ipc_bytes(create_test_data()) + table_id = ["workspace", "fork_table"] + temp_ns_client.create_table(CreateTableRequest(id=table_id), ipc_data) + + temp_ns_client.create_table_branch( + CreateTableBranchRequest(id=table_id, name="dev") + ) + temp_ns_client.create_table_branch( + CreateTableBranchRequest(id=table_id, name="child", from_branch="dev") + ) + listed = temp_ns_client.list_table_branches( + ListTableBranchesRequest(id=table_id) + ) + assert listed.branches["child"].parent_branch == "dev" + + +class _ForeignCodeError(Exception): + """Not a LanceNamespaceError, but carries the same integer code as + TABLE_NOT_FOUND.""" + + code = 4 + + +class _RaisingNamespace(LanceNamespace): + """A namespace whose describe_table raises the configured exception.""" + + def __init__(self, exc: Exception): + self._exc = exc + + def namespace_id(self) -> str: + return "raising" + + def describe_table(self, request: DescribeTableRequest) -> DescribeTableResponse: + raise self._exc + + +class TestPythonNamespaceErrorMapping: + """The Rust adapter must trust the `code` attribute only on the + lance_namespace exception hierarchy.""" + + def test_namespace_error_identity_preserved(self): + ns = _RaisingNamespace(TableNotFoundError("no such table")) + with pytest.raises(TableNotFoundError, match="no such table"): + lance.dataset(namespace_client=ns, table_id=["t"]) + + def test_foreign_code_attribute_not_trusted(self): + # The foreign exception must surface as itself, not be reinterpreted + # as a namespace error via its `code` attribute. + ns = _RaisingNamespace(_ForeignCodeError("boom")) + with pytest.raises(_ForeignCodeError, match="boom"): + lance.dataset(namespace_client=ns, table_id=["t"]) + + class TestChildNamespaceOperations: """Tests for operations in child namespaces - mirrors Rust tests.""" diff --git a/python/python/tests/test_namespace_integration.py b/python/python/tests/test_namespace_integration.py index 4605b755816..fc08370d247 100644 --- a/python/python/tests/test_namespace_integration.py +++ b/python/python/tests/test_namespace_integration.py @@ -31,6 +31,8 @@ from lance_namespace import ( CreateNamespaceRequest, CreateNamespaceResponse, + CreateTableBranchRequest, + CreateTableBranchResponse, CreateTableRequest, CreateTableResponse, CreateTableVersionRequest, @@ -136,6 +138,11 @@ def create_table_version( ) -> CreateTableVersionResponse: return self._inner.create_table_version(request) + def create_table_branch( + self, request: CreateTableBranchRequest + ) -> CreateTableBranchResponse: + return self._inner.create_table_branch(request) + def retrieve_ops_metrics(self) -> Optional[Dict[str, int]]: return self._inner.retrieve_ops_metrics() @@ -199,6 +206,7 @@ def create_tracking_namespace( storage_options: dict, credential_expires_in_seconds: int = 60, use_custom: bool = False, + managed_versioning: bool = False, ): """Create a DirectoryNamespace with ops metrics and credential vending enabled. @@ -212,6 +220,9 @@ def create_tracking_namespace( storage_options: Storage options to pass through (credentials, endpoint, etc.) credential_expires_in_seconds: Interval in seconds for credential expiration use_custom: If True, wrap in CustomNamespace for testing custom implementations + managed_versioning: If True, enable the manifest catalog so table versions + are tracked by the namespace and commits route through + create_table_version Returns: Tuple of (namespace_client, inner_namespace_client) where inner is always @@ -238,6 +249,10 @@ def create_tracking_namespace( dir_props["vend_input_storage_options_refresh_interval_millis"] = str( credential_expires_in_seconds * 1000 ) + if managed_versioning: + dir_props["manifest_enabled"] = "true" + dir_props["table_version_tracking_enabled"] = "true" + dir_props["table_version_storage_enabled"] = "true" inner_ns_client = DirectoryNamespace(**dir_props) ns_client = _wrap_if_custom(inner_ns_client, use_custom) @@ -558,6 +573,87 @@ def test_namespace_write_overwrite_mode(s3_bucket: str, use_custom: bool): assert get_describe_call_count(inner_ns_client) == call_count_before_reads +@pytest.mark.integration +@pytest.mark.parametrize("use_custom", [False, True], ids=["DirectoryNS", "CustomNS"]) +def test_namespace_managed_branches(s3_bucket: str, use_custom: bool): + """Branches on a managed-versioning table over S3. + + Branch commits must route through the catalog (create_table_version) and + leave main's chain untouched. A cross-branch checkout at an overlapping + version number must resolve the requested chain: branch version numbers + continue from the fork point, so the same number exists on both chains + with different data. + """ + storage_options = copy.deepcopy(CONFIG) + + ns_client, inner_ns_client = create_tracking_namespace( + bucket_name=s3_bucket, + storage_options=storage_options, + credential_expires_in_seconds=3600, + use_custom=use_custom, + managed_versioning=True, + ) + + table_name = uuid.uuid4().hex + table_id = ["test_ns", table_name] + + def commit_count() -> int: + return inner_ns_client.retrieve_ops_metrics().get("create_table_version", 0) + + lance.write_dataset( + pa.Table.from_pylist([{"a": 1}]), + namespace_client=ns_client, + table_id=table_id, + mode="create", + storage_options=storage_options, + ) + ds = lance.write_dataset( + pa.Table.from_pylist([{"a": 2}]), + namespace_client=ns_client, + table_id=table_id, + mode="append", + storage_options=storage_options, + ) + assert commit_count() >= 2 + + ns_client.create_table_branch( + CreateTableBranchRequest(id=table_id, name="dev", from_version=2) + ) + + dev = ds.checkout_version(("dev", None)) + commits_before_branch_append = commit_count() + dev = lance.write_dataset( + pa.Table.from_pylist([{"a": 3}]), + dev, + mode="append", + storage_options=storage_options, + ) + assert commit_count() == commits_before_branch_append + 1 + assert sorted(dev.to_table()["a"].to_pylist()) == [1, 2, 3] + + # Diverge main to the same version number as dev's tip. + ds = lance.write_dataset( + pa.Table.from_pylist([{"a": 100}]), + namespace_client=ns_client, + table_id=table_id, + mode="append", + storage_options=storage_options, + ) + assert sorted(ds.to_table()["a"].to_pylist()) == [1, 2, 100] + + on_dev = ds.checkout_version(("dev", 3)) + assert sorted(on_dev.to_table()["a"].to_pylist()) == [1, 2, 3] + back_on_main = dev.checkout_version(("main", None)) + assert sorted(back_on_main.to_table()["a"].to_pylist()) == [1, 2, 100] + + fresh = lance.dataset( + namespace_client=ns_client, + table_id=table_id, + storage_options=storage_options, + ) + assert sorted(fresh.to_table()["a"].to_pylist()) == [1, 2, 100] + + @pytest.mark.integration @pytest.mark.parametrize("use_custom", [False, True], ids=["DirectoryNS", "CustomNS"]) def test_namespace_distributed_write(s3_bucket: str, use_custom: bool): diff --git a/python/python/tests/test_s3_ddb.py b/python/python/tests/test_s3_ddb.py index b9c9e4be6c0..dc9744115e2 100644 --- a/python/python/tests/test_s3_ddb.py +++ b/python/python/tests/test_s3_ddb.py @@ -212,6 +212,58 @@ def writh_dataset_with_start_barrier(): assert lance.dataset(table_dir).count_rows() == expected_version * 2 +@pytest.mark.integration +def test_s3_ddb_branches(s3_bucket: str, ddb_table: str): + """Branches on a table committed through the DynamoDB external manifest + store. + + The DDB store keys version chains by base uri, so each branch chain must + get its own entries via its branch-qualified path. Both chains are given + the same version number with diverged data so a wrong-chain resolution + cannot pass silently. + """ + storage_options = copy.deepcopy(CONFIG) + table_name = uuid.uuid4().hex + table_dir = f"s3+ddb://{s3_bucket}/{table_name}?ddbTableName={ddb_table}" + + # main: v1 (a=1), v2 (a=2) + lance.write_dataset( + pa.Table.from_pylist([{"a": 1}]), table_dir, storage_options=storage_options + ) + ds = lance.write_dataset( + pa.Table.from_pylist([{"a": 2}]), + table_dir, + mode="append", + storage_options=storage_options, + ) + + # Fork "dev" at v2 and commit on it, then diverge main to the same + # version number. + dev = ds.create_branch("dev", 2) + dev = lance.write_dataset( + pa.Table.from_pylist([{"a": 3}]), + dev, + mode="append", + storage_options=storage_options, + ) + ds = lance.write_dataset( + pa.Table.from_pylist([{"a": 100}]), + table_dir, + mode="append", + storage_options=storage_options, + ) + + assert sorted(dev.to_table()["a"].to_pylist()) == [1, 2, 3] + assert sorted(ds.to_table()["a"].to_pylist()) == [1, 2, 100] + + # Cross-branch checkout at the overlapping version number resolves each + # chain's own data. + on_dev = ds.checkout_version(("dev", 3)) + assert sorted(on_dev.to_table()["a"].to_pylist()) == [1, 2, 3] + back_on_main = dev.checkout_version(("main", None)) + assert sorted(back_on_main.to_table()["a"].to_pylist()) == [1, 2, 100] + + @pytest.mark.integration def test_s3_unsafe(s3_bucket: str): storage_options = copy.deepcopy(CONFIG) diff --git a/python/src/namespace.rs b/python/src/namespace.rs index cf5f7c41b0f..e88ff40de2c 100644 --- a/python/src/namespace.rs +++ b/python/src/namespace.rs @@ -392,6 +392,44 @@ impl PyDirectoryNamespace { pythonize(py, &response).map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string())) } + // Table branch operations + + fn create_table_branch<'py>( + &self, + py: Python<'py>, + request: &Bound<'_, PyAny>, + ) -> PyResult> { + let request = depythonize(request)?; + let response = crate::rt() + .block_on(Some(py), self.inner.create_table_branch(request))? + .infer_error()?; + pythonize(py, &response).map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string())) + } + + fn list_table_branches<'py>( + &self, + py: Python<'py>, + request: &Bound<'_, PyAny>, + ) -> PyResult> { + let request = depythonize(request)?; + let response = crate::rt() + .block_on(Some(py), self.inner.list_table_branches(request))? + .infer_error()?; + pythonize(py, &response).map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string())) + } + + fn delete_table_branch<'py>( + &self, + py: Python<'py>, + request: &Bound<'_, PyAny>, + ) -> PyResult> { + let request = depythonize(request)?; + let response = crate::rt() + .block_on(Some(py), self.inner.delete_table_branch(request))? + .infer_error()?; + pythonize(py, &response).map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string())) + } + // Data manipulation operations fn count_table_rows(&self, py: Python, request: &Bound<'_, PyAny>) -> PyResult { @@ -1054,6 +1092,44 @@ impl PyRestNamespace { pythonize(py, &response).map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string())) } + // Table branch operations + + fn create_table_branch<'py>( + &self, + py: Python<'py>, + request: &Bound<'_, PyAny>, + ) -> PyResult> { + let request = depythonize(request)?; + let response = crate::rt() + .block_on(Some(py), self.inner.create_table_branch(request))? + .infer_error()?; + pythonize(py, &response).map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string())) + } + + fn list_table_branches<'py>( + &self, + py: Python<'py>, + request: &Bound<'_, PyAny>, + ) -> PyResult> { + let request = depythonize(request)?; + let response = crate::rt() + .block_on(Some(py), self.inner.list_table_branches(request))? + .infer_error()?; + pythonize(py, &response).map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string())) + } + + fn delete_table_branch<'py>( + &self, + py: Python<'py>, + request: &Bound<'_, PyAny>, + ) -> PyResult> { + let request = depythonize(request)?; + let response = crate::rt() + .block_on(Some(py), self.inner.delete_table_branch(request))? + .infer_error()?; + pythonize(py, &response).map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string())) + } + // Data manipulation operations fn count_table_rows(&self, py: Python, request: &Bound<'_, PyAny>) -> PyResult { @@ -1472,6 +1548,30 @@ fn get_dict_with_model_dump_class(py: Python<'_>) -> PyResult> Ok(class) } +/// Convert a Python namespace exception into a lance error, preserving the +/// namespace error identity when the exception is a `lance_namespace` +/// `LanceNamespaceError` carrying an error `code`, so callers can react to +/// e.g. TableNotFound the same way they do for native clients. Foreign +/// exceptions that happen to carry an integer `code` (e.g. SystemExit) must +/// not be reinterpreted, so the extraction is gated on the exception type. +fn namespace_error_from_py(method_name: &'static str, e: PyErr) -> lance_core::Error { + Python::attach(|py| { + let value = e.value(py); + let is_namespace_error = py + .import("lance_namespace.errors") + .and_then(|module| module.getattr("LanceNamespaceError")) + .and_then(|class| value.is_instance(&class)) + .unwrap_or(false); + if is_namespace_error + && let Ok(code) = value.getattr("code").and_then(|code| code.extract::()) + { + return lance_namespace::error::NamespaceError::from_code(code, value.to_string()) + .into(); + } + lance_core::Error::io(format!("Python error in {}: {}", method_name, e)) + }) +} + /// Helper to call a Python namespace method with JSON serialization. /// For methods that take a request and return a response. /// Uses DictWithModelDump to pass a dict that also has model_dump() method, @@ -1519,7 +1619,7 @@ where }) .await .map_err(|e| lance_core::Error::io(format!("Task join error for {}: {}", method_name, e)))? - .map_err(|e: PyErr| lance_core::Error::io(format!("Python error in {}: {}", method_name, e)))?; + .map_err(|e: PyErr| namespace_error_from_py(method_name, e))?; serde_json::from_str(&response_json).map_err(|e| { lance_core::Error::io(format!( diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c9cc356aaa6..f6fd1ef6a20 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -514,7 +514,10 @@ impl Dataset { let transaction = Transaction::new(version_number, clone_op, None); let builder = CommitBuilder::new(WriteDestination::Uri(branch_location.uri.as_str())) - .with_store_params(store_params.unwrap_or_default()) + // Fall back to the dataset's own store params + .with_store_params( + store_params.unwrap_or(self.store_params.as_deref().cloned().unwrap_or_default()), + ) .with_object_store(Arc::new(self.object_store.as_ref().clone())) .with_commit_handler(self.commit_handler.clone()) .with_storage_format(self.manifest.data_storage_format.lance_file_version()?); diff --git a/rust/lance/src/dataset/branch_location.rs b/rust/lance/src/dataset/branch_location.rs index 3a1185c8cf8..7ebce36ec86 100644 --- a/rust/lance/src/dataset/branch_location.rs +++ b/rust/lance/src/dataset/branch_location.rs @@ -31,14 +31,20 @@ impl BranchLocation { } fn get_root_path(path_str: &str, branch_name: &str) -> Result { + // A uri may carry a query string (e.g. `s3+ddb://...?ddbTableName=t`); + // the branch suffix sits on the path part, before the query. + let (path_part, query) = match path_str.split_once('?') { + Some((path, query)) => (path, Some(query)), + None => (path_str, None), + }; let branch_suffix = format!("{}/{}", BRANCH_DIR, branch_name); let branch_suffix = branch_suffix.as_str(); - let root_path_str = path_str + let root_path_str = path_part .strip_suffix(branch_suffix) .or_else(|| { if cfg!(windows) { let windows_suffix = branch_suffix.replace('/', "\\"); - path_str.strip_suffix(&windows_suffix) + path_part.strip_suffix(&windows_suffix) } else { None } @@ -59,7 +65,10 @@ impl BranchLocation { root_path_str, path_str, ))); }; - Ok(root_path_str) + Ok(match query { + Some(query) => format!("{}?{}", root_path_str, query), + None => root_path_str, + }) } /// The branch a location under `root` targets: the inverse of @@ -132,13 +141,23 @@ impl BranchLocation { } fn join_str(base: &str, segment: &str) -> Result { + // A uri may carry a query string (e.g. `s3+ddb://...?ddbTableName=t`); + // path segments must be appended before it. + let (path_part, query) = match base.split_once('?') { + Some((path, query)) => (path, Some(query)), + None => (base, None), + }; let normalized_segment = segment.trim_start_matches('/'); - let is_base_dir = base.ends_with("/"); - if is_base_dir { - Ok(format!("{}{}", base, normalized_segment)) + let is_base_dir = path_part.ends_with("/"); + let joined = if is_base_dir { + format!("{}{}", path_part, normalized_segment) } else { - Ok(format!("{}/{}", base, normalized_segment)) - } + format!("{}/{}", path_part, normalized_segment) + }; + Ok(match query { + Some(query) => format!("{}?{}", joined, query), + None => joined, + }) } } @@ -255,6 +274,30 @@ mod tests { assert!(fs::create_dir_all(std::path::Path::new(new_location.uri.as_str())).is_ok()); } + #[test] + fn test_branch_location_with_query_uri() { + // Uris like `s3+ddb://...?ddbTableName=t` carry the commit handler + // config in the query string; branch path segments must be inserted + // before it and the query must survive the round trip. + let location = BranchLocation { + path: Path::parse("bucket/table.lance").unwrap(), + uri: "s3+ddb://bucket/table.lance?ddbTableName=t".to_string(), + branch: None, + }; + let dev = location.find_branch(Some("dev")).unwrap(); + assert_eq!( + dev.uri, + "s3+ddb://bucket/table.lance/tree/dev?ddbTableName=t" + ); + assert_eq!(dev.path.as_ref(), "bucket/table.lance/tree/dev"); + assert_eq!(dev.branch.as_deref(), Some("dev")); + + let main = dev.find_main().unwrap(); + assert_eq!(main.uri, "s3+ddb://bucket/table.lance?ddbTableName=t"); + assert_eq!(main.path.as_ref(), "bucket/table.lance"); + assert_eq!(main.branch, None); + } + #[test] fn test_branch_of() { let derive = |root: &str, location: &str| BranchLocation::branch_of(root, location); diff --git a/rust/lance/src/io/commit/namespace_manifest.rs b/rust/lance/src/io/commit/namespace_manifest.rs index 92d5e7bc789..f4f012adcca 100644 --- a/rust/lance/src/io/commit/namespace_manifest.rs +++ b/rust/lance/src/io/commit/namespace_manifest.rs @@ -14,8 +14,24 @@ use lance_table::io::commit::{ManifestLocation, ManifestNamingScheme}; use object_store::ObjectStore as OSObjectStore; use object_store::path::Path; +use lance_namespace::error::NamespaceError; + use crate::dataset::branch_location::BranchLocation; +/// Whether `e` says the requested chain (table or branch) does not exist, as +/// opposed to a failure talking to the namespace. +fn is_chain_not_found(e: &lance_core::Error) -> bool { + if let lance_core::Error::Namespace { source, .. } = e + && let Some(ns_err) = source.downcast_ref::() + { + return matches!( + ns_err, + NamespaceError::TableNotFound { .. } | NamespaceError::TableBranchNotFound { .. } + ); + } + false +} + #[derive(Debug)] pub struct LanceNamespaceExternalManifestStore { namespace_client: Arc, @@ -90,7 +106,15 @@ impl ExternalManifestStore for LanceNamespaceExternalManifestStore { ..Default::default() }; - let response = self.namespace_client.list_table_versions(request).await?; + let response = match self.namespace_client.list_table_versions(request).await { + Ok(response) => response, + // A chain that does not exist yet (e.g. probing a branch location + // before the branch is created) has no latest version; the + // ExternalManifestStore contract reports that as None, not an + // error, so existence checks can treat it as a missing dataset. + Err(e) if is_chain_not_found(&e) => return Ok(None), + Err(e) => return Err(e), + }; if response.versions.is_empty() { return Ok(None); @@ -182,3 +206,93 @@ impl ExternalManifestStore for LanceNamespaceExternalManifestStore { )) } } + +#[cfg(test)] +mod tests { + use super::*; + use lance_namespace::models::ListTableVersionsResponse; + + /// A namespace whose list_table_versions always fails with the configured + /// error, to pin how get_latest_version classifies failures. + #[derive(Debug)] + struct FailingNamespace { + error: fn() -> lance_core::Error, + } + + #[async_trait] + impl LanceNamespace for FailingNamespace { + fn namespace_id(&self) -> String { + "failing".to_string() + } + + async fn list_table_versions( + &self, + _request: ListTableVersionsRequest, + ) -> Result { + Err((self.error)()) + } + } + + fn store_with(error: fn() -> lance_core::Error) -> LanceNamespaceExternalManifestStore { + LanceNamespaceExternalManifestStore::new( + Arc::new(FailingNamespace { error }), + vec!["t".to_string()], + Path::parse("data/t.lance").unwrap(), + ) + } + + /// A chain that does not exist (missing table or branch) has no latest + /// version; everything else is a real failure and must propagate so an + /// outage is never mistaken for an absent dataset. + #[tokio::test] + async fn test_get_latest_version_error_classification() { + use lance_namespace::error::NamespaceError; + + let absent = [ + store_with(|| { + NamespaceError::TableNotFound { + message: "missing table".to_string(), + } + .into() + }), + store_with(|| { + NamespaceError::TableBranchNotFound { + message: "missing branch".to_string(), + } + .into() + }), + ]; + for store in absent { + let latest = store.get_latest_version("data/t.lance/tree/dev").await; + assert!( + matches!(latest, Ok(None)), + "a missing chain must read as no latest version, got: {:?}", + latest + ); + } + + let failures = [ + store_with(|| { + NamespaceError::Internal { + message: "server error".to_string(), + } + .into() + }), + store_with(|| { + NamespaceError::Throttling { + message: "slow down".to_string(), + } + .into() + }), + store_with(|| lance_core::Error::io("connection reset".to_string())), + ]; + for store in failures { + let latest = store.get_latest_version("data/t.lance/tree/dev").await; + assert!( + latest.is_err(), + "a real failure must propagate, got: {:?}", + latest + ); + } + } +} From 1ce6a101ac07f5eca551b01efdf747b9635ad211 Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Thu, 11 Jun 2026 09:38:36 -0700 Subject: [PATCH 2/3] fix(python): carry namespace managed versioning flag through dataset constructors --- python/python/lance/dataset.py | 4 ++++ python/python/tests/test_dataset.py | 3 +++ 2 files changed, 7 insertions(+) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index e96d9305ce5..dae72b88b1c 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -950,6 +950,9 @@ def create_branch( ds._base_store_params = self._base_store_params ds._namespace_client = self._namespace_client ds._table_id = self._table_id + ds._namespace_client_managed_versioning = ( + self._namespace_client_managed_versioning + ) ds._default_scan_options = self._default_scan_options ds._read_params = self._read_params return ds @@ -4579,6 +4582,7 @@ def commit_batch( ds._base_store_params = base_store_params ds._namespace_client = None ds._table_id = None + ds._namespace_client_managed_versioning = False ds._default_scan_options = None ds._read_params = None return BulkCommitResult( diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 4af363868e1..89bd78b82c8 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1742,6 +1742,7 @@ def test_commit_batch_append(): result = lance.LanceDataset.commit_batch(dataset, [txn2, txn3]) dataset = result["dataset"] assert dataset.version == 2 + assert dataset.checkout_version(1).version == 1 assert len(dataset.get_fragments()) == 3 assert dataset.to_table() == pa.concat_tables([data1, data2, data3]) merged_txn = result["merged"] @@ -5538,6 +5539,8 @@ def test_branches(tmp_path: Path): branch1 = ds_main.create_branch("branch1") ds_main.branches.replace_metadata("branch1", {"description": "branch one"}) assert branch1.version == 1 + # The dataset returned by create_branch must be fully constructed + assert branch1.checkout_version(("main", None)).version == 1 branch1_append = pa.Table.from_pydict({"a": [7, 8], "b": [9, 10]}) branch1 = lance.write_dataset(branch1_append, branch1, mode="append") assert branch1.version == 2 From ee3526128f1e20c8d85ed18717c0932358a95be5 Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Thu, 11 Jun 2026 09:38:36 -0700 Subject: [PATCH 3/3] fix(python): require lance-namespace 0.8.5 for typed table branch errors --- python/pyproject.toml | 2 +- python/python/tests/test_namespace_dir.py | 21 +++++++++++++++++++++ python/uv.lock | 14 +++++++------- 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index ff84e2fb6cd..d2efab23579 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "pylance" dynamic = ["version"] -dependencies = ["pyarrow>=14", "numpy>=1.22", "lance-namespace>=0.8.2,<0.9"] +dependencies = ["pyarrow>=14", "numpy>=1.22", "lance-namespace>=0.8.5,<0.9"] description = "python wrapper for Lance columnar format" authors = [{ name = "Lance Devs", email = "dev@lance.org" }] license = { file = "LICENSE" } diff --git a/python/python/tests/test_namespace_dir.py b/python/python/tests/test_namespace_dir.py index f76eda42070..a57879d3368 100644 --- a/python/python/tests/test_namespace_dir.py +++ b/python/python/tests/test_namespace_dir.py @@ -77,6 +77,8 @@ InvalidInputError, NamespaceNotEmptyError, NamespaceNotFoundError, + TableBranchAlreadyExistsError, + TableBranchNotFoundError, TableNotFoundError, ) @@ -605,6 +607,16 @@ def test_branch_crud_round_trip(self, temp_ns_client): assert "dev" in listed.branches assert listed.branches["dev"].parent_version == 1 + # Duplicate creation and deleting a missing branch surface the typed + # branch errors (codes 23 and 22), not InternalError. + temp_ns_client.create_table_branch( + CreateTableBranchRequest(id=table_id, name="dev2") + ) + with pytest.raises(TableBranchAlreadyExistsError): + temp_ns_client.create_table_branch( + CreateTableBranchRequest(id=table_id, name="dev2") + ) + temp_ns_client.delete_table_branch( DeleteTableBranchRequest(id=table_id, name="dev") ) @@ -612,6 +624,10 @@ def test_branch_crud_round_trip(self, temp_ns_client): ListTableBranchesRequest(id=table_id) ) assert "dev" not in listed.branches + with pytest.raises(TableBranchNotFoundError): + temp_ns_client.delete_table_branch( + DeleteTableBranchRequest(id=table_id, name="dev") + ) def test_create_branch_from_other_branch(self, temp_ns_client): """Forking from a non-main source branch records the right parent.""" @@ -662,6 +678,11 @@ def test_namespace_error_identity_preserved(self): with pytest.raises(TableNotFoundError, match="no such table"): lance.dataset(namespace_client=ns, table_id=["t"]) + # Branch error codes (22/23) survive the round trip too. + ns = _RaisingNamespace(TableBranchNotFoundError("no such branch")) + with pytest.raises(TableBranchNotFoundError, match="no such branch"): + lance.dataset(namespace_client=ns, table_id=["t"]) + def test_foreign_code_attribute_not_trusted(self): # The foreign exception must surface as itself, not be reinterpreted # as a namespace error via its `code` attribute. diff --git a/python/uv.lock b/python/uv.lock index 428578ab26f..289ecdf3549 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -1083,19 +1083,19 @@ wheels = [ [[package]] name = "lance-namespace" -version = "0.8.4" +version = "0.8.5" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "lance-namespace-urllib3-client" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/48/8f/8a03395587a78cfaf92f7307ad931f61eb515af67705c704bd6c7af2f745/lance_namespace-0.8.4.tar.gz", hash = "sha256:1a54ad49e7ace25a629c5f2c99d393629742eceeeb16ba2f51a771ccb350e284", size = 11282, upload-time = "2026-06-10T19:07:21.919Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d0/22/3d8eb4e913edf36cda416f1dca287147af508abe3ca89bf0e619b9fa9f54/lance_namespace-0.8.5.tar.gz", hash = "sha256:b4a5967afcbf9924300a0b9d2fb74c44a23f76907e8734ebed6e0e3a561b0df0", size = 11531, upload-time = "2026-06-11T16:20:26.77Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fd/4b/218c67cafb707024069925ce86534588861a464aaa327f7a457b94eed3c2/lance_namespace-0.8.4-py3-none-any.whl", hash = "sha256:8b347eef4b7c7187a1b52f388b5dcc345fed0bf4ea87728188dcb11a52619d0b", size = 13111, upload-time = "2026-06-10T19:07:22.6Z" }, + { url = "https://files.pythonhosted.org/packages/c0/da/afc3cdc42fc2dcf885a9d3524bf2c3bd2a9df89b1668b1806dec5e436263/lance_namespace-0.8.5-py3-none-any.whl", hash = "sha256:6d3e2b8da586d06409494b56955a63c3152eeae2883cd2e8ba4e80d20dc0de0f", size = 13383, upload-time = "2026-06-11T16:20:26.004Z" }, ] [[package]] name = "lance-namespace-urllib3-client" -version = "0.8.4" +version = "0.8.5" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "pydantic" }, @@ -1104,9 +1104,9 @@ dependencies = [ { name = "urllib3", version = "1.26.20", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, { name = "urllib3", version = "2.5.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/0a/55/4a7cc7e5d19bda170c896a6adff2ec925c533df812b91bce2bc8f7aea30b/lance_namespace_urllib3_client-0.8.4.tar.gz", hash = "sha256:1a292a83509ab79475da967b78839e9ead4ab973064d37d1ba1575b23ffdacef", size = 228485, upload-time = "2026-06-10T19:07:19.863Z" } +sdist = { url = "https://files.pythonhosted.org/packages/44/6f/1291523488523656342d1b424b76b4d91f3af6413b3b4ada43b888a87043/lance_namespace_urllib3_client-0.8.5.tar.gz", hash = "sha256:29922ffb5b0621e24a83183454ec3e5a5828f46d91a95d58efc35db05dec4e62", size = 228595, upload-time = "2026-06-11T16:20:23.985Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b4/f7/70dd2fc1f9ef462d3802b4cffcd64f2b9233a9907d6071e8694338492608/lance_namespace_urllib3_client-0.8.4-py3-none-any.whl", hash = "sha256:37ee1d74614fae6358f50e3589ac26c29379ffb1346f09c4f5ec8953f823cefd", size = 369807, upload-time = "2026-06-10T19:07:21.001Z" }, + { url = "https://files.pythonhosted.org/packages/10/e2/62883d1f43a283ac08f00af993c6a2b92e4ca206fa1ccba032420d8dc578/lance_namespace_urllib3_client-0.8.5-py3-none-any.whl", hash = "sha256:8af211ddc6e73df713ffb59368c94780508e732b19dacb4239d937aaff2f8e3c", size = 369857, upload-time = "2026-06-11T16:20:25.006Z" }, ] [[package]] @@ -2676,7 +2676,7 @@ requires-dist = [ { name = "duckdb", marker = "extra == 'tests'" }, { name = "geoarrow-rust-core", marker = "extra == 'geo'" }, { name = "geoarrow-rust-io", marker = "extra == 'geo'" }, - { name = "lance-namespace", specifier = ">=0.8.0,<0.9" }, + { name = "lance-namespace", specifier = ">=0.8.5,<0.9" }, { name = "ml-dtypes", marker = "extra == 'tests'" }, { name = "numpy", specifier = ">=1.22" }, { name = "pandas", marker = "extra == 'tests'" },