From d632b90dc6f51503bbbfa45b9d8e8d4fadf80681 Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Mon, 2 Feb 2026 22:55:46 -0800 Subject: [PATCH 1/2] feat: expose CypherEngine in Python for multi-query execution --- python/Cargo.lock | 1 + python/Cargo.toml | 1 + python/README.md | 38 ++++- python/python/tests/test_cypher_engine.py | 168 +++++++++++++++++++ python/src/graph.rs | 194 ++++++++++++++++++++++ python/uv.lock | 2 +- 6 files changed, 401 insertions(+), 3 deletions(-) create mode 100644 python/python/tests/test_cypher_engine.py diff --git a/python/Cargo.lock b/python/Cargo.lock index 4985c27..433f4fc 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -3459,6 +3459,7 @@ dependencies = [ "arrow-array", "arrow-ipc", "arrow-schema", + "datafusion", "futures", "lance-graph", "pyo3", diff --git a/python/Cargo.toml b/python/Cargo.toml index aed49bb..a38a1f5 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -14,6 +14,7 @@ arrow = { version = "56.2", features = ["pyarrow"] } arrow-array = "56.2" arrow-schema = "56.2" arrow-ipc = "56.2" +datafusion = { version = "50.3", default-features = false } futures = "0.3" lance-graph = { path = "../crates/lance-graph" } serde = { version = "1", features = ["derive"] } diff --git a/python/README.md b/python/README.md index 69f90be..0034718 100644 --- a/python/README.md +++ b/python/README.md @@ -48,7 +48,41 @@ print(result.to_pylist()) [{'p.name': 'Alice', 'c.name': 'London'}, {'p.name': 'Bob', 'c.name': 'Sydney'}] ``` -### 2. Build a Knowledge Graph from Text +### 2. Multi-Query Execution with CypherEngine + +For executing multiple queries against the same datasets, use `CypherEngine` to cache the catalog and achieve better performance: + +```python +import pyarrow as pa +from lance_graph import CypherEngine, GraphConfig + +cfg = ( + GraphConfig.builder() + .with_node_label("Person", "id") + .with_node_label("City", "id") + .with_relationship("lives_in", "src", "dst") + .build() +) + +datasets = { + "Person": pa.table({"id": [1, 2], "name": ["Alice", "Bob"], "age": [30, 25]}), + "City": pa.table({"id": [10, 20], "name": ["London", "Sydney"]}), + "lives_in": pa.table({"src": [1, 2], "dst": [10, 20]}), +} + +# Create engine once - builds catalog +engine = CypherEngine(cfg, datasets) + +# Execute multiple queries efficiently - catalog is reused +result1 = engine.execute("MATCH (p:Person) WHERE p.age > 25 RETURN p.name") +result2 = engine.execute("MATCH (p:Person)-[:lives_in]->(c:City) RETURN p.name, c.name") +result3 = engine.execute("MATCH (p:Person) RETURN count(*) as total") + +print(result1.to_pylist()) +# [{'p.name': 'Alice'}] +``` + +### 3. Build a Knowledge Graph from Text ```python from pathlib import Path @@ -107,7 +141,7 @@ result = kg.query(""" print(result.to_pylist()) ``` -### 3. Natural Language Q&A +### 4. Natural Language Q&A ```python from knowledge_graph.llm.qa import ask_question diff --git a/python/python/tests/test_cypher_engine.py b/python/python/tests/test_cypher_engine.py new file mode 100644 index 0000000..84372f6 --- /dev/null +++ b/python/python/tests/test_cypher_engine.py @@ -0,0 +1,168 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +import pyarrow as pa +import pytest +from lance_graph import CypherEngine, CypherQuery, GraphConfig + + +@pytest.fixture +def graph_env(): + """Create sample graph data for testing.""" + people_table = pa.table( + { + "person_id": [1, 2, 3, 4], + "name": ["Alice", "Bob", "Carol", "David"], + "age": [28, 34, 29, 42], + "city": ["New York", "San Francisco", "New York", "Chicago"], + } + ) + + companies_table = pa.table( + { + "company_id": [101, 102, 103], + "company_name": ["TechCorp", "DataInc", "CloudSoft"], + "industry": ["Technology", "Analytics", "Cloud"], + } + ) + + employment_table = pa.table( + { + "person_id": [1, 2, 3, 4], + "company_id": [101, 101, 102, 103], + "position": ["Engineer", "Designer", "Manager", "Director"], + "salary": [120000, 95000, 130000, 180000], + } + ) + + friendship_table = pa.table( + { + "person1_id": [1, 1, 2, 3], + "person2_id": [2, 3, 4, 4], + "friendship_type": ["close", "casual", "close", "casual"], + "years_known": [5, 2, 3, 1], + } + ) + + config = ( + GraphConfig.builder() + .with_node_label("Person", "person_id") + .with_node_label("Company", "company_id") + .with_relationship("WORKS_FOR", "person_id", "company_id") + .with_relationship("FRIEND_OF", "person1_id", "person2_id") + .build() + ) + + datasets = { + "Person": people_table, + "Company": companies_table, + "WORKS_FOR": employment_table, + "FRIEND_OF": friendship_table, + } + + return config, datasets + + +def test_cypher_engine_basic_query(graph_env): + """Test basic query execution with CypherEngine.""" + config, datasets = graph_env + engine = CypherEngine(config, datasets) + + result = engine.execute("MATCH (p:Person) RETURN p.name, p.age") + data = result.to_pydict() + + assert set(data.keys()) == {"p.name", "p.age"} + assert len(data["p.name"]) == 4 + assert "Alice" in set(data["p.name"]) + + +def test_cypher_engine_filtered_query(graph_env): + """Test filtered query with WHERE clause.""" + config, datasets = graph_env + engine = CypherEngine(config, datasets) + + result = engine.execute("MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age") + data = result.to_pydict() + + assert len(data["p.name"]) == 2 + assert set(data["p.name"]) == {"Bob", "David"} + assert all(age > 30 for age in data["p.age"]) + + +def test_cypher_engine_relationship_query(graph_env): + """Test query with relationship traversal.""" + config, datasets = graph_env + engine = CypherEngine(config, datasets) + + result = engine.execute( + "MATCH (p:Person)-[:WORKS_FOR]->(c:Company) " + "RETURN p.name AS person_name, c.company_name AS company_name" + ) + data = result.to_pydict() + + assert len(data["person_name"]) == 4 + assert "Alice" in data["person_name"] + assert "TechCorp" in data["company_name"] + + +def test_cypher_engine_multiple_queries(graph_env): + """Test that catalog is reused across multiple queries.""" + config, datasets = graph_env + engine = CypherEngine(config, datasets) + + # Execute multiple different queries + result1 = engine.execute("MATCH (p:Person) WHERE p.age > 30 RETURN p.name") + result2 = engine.execute("MATCH (p:Person) WHERE p.city = 'New York' RETURN p.name") + result3 = engine.execute("MATCH (p:Person) RETURN count(*) as total") + + data1 = result1.to_pydict() + data2 = result2.to_pydict() + data3 = result3.to_pydict() + + assert len(data1["p.name"]) == 2 + assert len(data2["p.name"]) == 2 + assert data3["total"][0] == 4 + + +def test_cypher_engine_aggregation(graph_env): + """Test aggregation queries.""" + config, datasets = graph_env + engine = CypherEngine(config, datasets) + + result = engine.execute( + "MATCH (p:Person) RETURN count(*) as total, avg(p.age) as avg_age" + ) + data = result.to_pydict() + + assert data["total"][0] == 4 + # Average of [28, 34, 29, 42] = 33.25 + assert abs(data["avg_age"][0] - 33.25) < 0.01 + + +def test_cypher_engine_vs_cypher_query_equivalence(graph_env): + """Test that CypherEngine produces same results as CypherQuery.""" + config, datasets = graph_env + + query_text = "MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age ORDER BY p.age" + + # Execute with CypherQuery + query = CypherQuery(query_text).with_config(config) + result_query = query.execute(datasets) + + # Execute with CypherEngine + engine = CypherEngine(config, datasets) + result_engine = engine.execute(query_text) + + # Results should be identical + assert result_query.to_pydict() == result_engine.to_pydict() + + +def test_cypher_engine_config_access(graph_env): + """Test that we can access the engine's config.""" + config, datasets = graph_env + engine = CypherEngine(config, datasets) + + engine_config = engine.config() + + assert "person" in engine_config.node_labels() # case-insensitive + assert "company" in engine_config.node_labels() diff --git a/python/src/graph.rs b/python/src/graph.rs index c1051d1..584553d 100644 --- a/python/src/graph.rs +++ b/python/src/graph.rs @@ -21,11 +21,14 @@ use arrow::compute::concat_batches; use arrow::ffi_stream::ArrowArrayStreamReader; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::Schema; +use datafusion::datasource::{DefaultTableSource, MemTable}; +use datafusion::execution::context::SessionContext; use lance_graph::{ ast::DistanceMetric as RustDistanceMetric, CypherQuery as RustCypherQuery, ExecutionStrategy as RustExecutionStrategy, GraphConfig as RustGraphConfig, GraphError as RustGraphError, VectorSearch as RustVectorSearch, }; +use lance_graph::source_catalog::InMemoryCatalog; use pyo3::{ exceptions::{PyNotImplementedError, PyRuntimeError, PyValueError}, prelude::*, @@ -873,6 +876,196 @@ fn record_batch_to_python_table( Ok(table.unbind()) } +/// Cypher query engine with cached catalog for efficient multi-query execution +/// +/// This class provides a high-performance query execution interface by building +/// the catalog and DataFusion context once, then reusing them across multiple queries. +/// This avoids the cold-start penalty of rebuilding metadata structures on every query. +/// +/// Use this class when you need to execute multiple queries against the same datasets. +/// For single queries, the simpler `CypherQuery.execute()` API may be more convenient. +/// +/// Implementation Details +/// ---------------------- +/// - Each dataset is registered as both a node and relationship source in the catalog. +/// The GraphConfig and query planner determine at runtime which interpretation to use +/// based on the Cypher query pattern (e.g., (p:Person) vs -[:KNOWS]->). +/// - SessionContext is internally Arc-wrapped, so cloning for each query is cheap +/// (just incrementing refcounts, not copying state). +/// +/// Examples +/// -------- +/// >>> from lance_graph import CypherEngine, GraphConfig +/// >>> import pyarrow as pa +/// >>> +/// >>> # Setup +/// >>> config = GraphConfig.builder() \\ +/// ... .with_node_label("Person", "id") \\ +/// ... .with_relationship("KNOWS", "src_id", "dst_id") \\ +/// ... .build() +/// >>> +/// >>> datasets = { +/// ... "Person": person_table, +/// ... "KNOWS": knows_table +/// ... } +/// >>> +/// >>> # Create engine once +/// >>> engine = CypherEngine(config, datasets) +/// >>> +/// >>> # Execute multiple queries efficiently +/// >>> result1 = engine.execute("MATCH (p:Person) WHERE p.age > 30 RETURN p.name") +/// >>> result2 = engine.execute("MATCH (p:Person)-[:KNOWS]->(f) RETURN p.name, f.name") +/// >>> result3 = engine.execute("MATCH (p:Person) RETURN count(*)") +#[pyclass(name = "CypherEngine", module = "lance.graph")] +pub struct CypherEngine { + config: RustGraphConfig, + catalog: Arc, + context: Arc, +} + +#[pymethods] +impl CypherEngine { + /// Create a new CypherEngine with cached catalog + /// + /// This builds the catalog and DataFusion context once during initialization. + /// Subsequent queries will reuse these structures for better performance. + /// + /// Parameters + /// ---------- + /// config : GraphConfig + /// The graph configuration defining node labels and relationships + /// datasets : dict + /// Dictionary mapping table names to Lance datasets or PyArrow tables + /// + /// Returns + /// ------- + /// CypherEngine + /// A new engine instance ready to execute queries + /// + /// Raises + /// ------ + /// ValueError + /// If the configuration or datasets are invalid + /// RuntimeError + /// If catalog building fails + #[new] + fn new(config: &GraphConfig, datasets: &Bound<'_, PyDict>) -> PyResult { + // Convert datasets to Arrow batches + let arrow_datasets = python_datasets_to_batches(datasets)?; + + if arrow_datasets.is_empty() { + return Err(PyValueError::new_err("No input datasets provided")); + } + + // Create session context and catalog + let ctx = SessionContext::new(); + let mut catalog = InMemoryCatalog::new(); + + // Register all datasets as tables + for (name, batch) in &arrow_datasets { + let mem_table = Arc::new( + MemTable::try_new(batch.schema(), vec![vec![batch.clone()]]) + .map_err(|e| PyRuntimeError::new_err(format!("Failed to create MemTable for {}: {}", name, e)))?, + ); + + // Register in session context for execution + let normalized_name = name.to_lowercase(); + ctx.register_table(&normalized_name, mem_table.clone()) + .map_err(|e| PyRuntimeError::new_err(format!("Failed to register table {}: {}", name, e)))?; + + let table_source = Arc::new(DefaultTableSource::new(mem_table)); + + // Register as both node and relationship source with original name. + // + // This is intentional: lance-graph uses GraphConfig to determine at query-planning + // time whether a dataset should be treated as a node table or relationship table + // based on the Cypher query pattern (e.g., MATCH (p:Person) vs -[:KNOWS]->). + // + // By registering all datasets in both catalogs, we allow the planner to look up + // the correct source based on query context. This pattern matches the Rust + // implementation in query.rs:build_catalog_and_context_from_datasets. + catalog = catalog + .with_node_source(name, table_source.clone()) + .with_relationship_source(name, table_source); + } + + Ok(Self { + config: config.inner.clone(), + catalog: Arc::new(catalog), + context: Arc::new(ctx), + }) + } + + /// Execute a Cypher query using the cached catalog + /// + /// This method reuses the catalog and context built during initialization, + /// avoiding the overhead of rebuilding metadata structures. + /// + /// Parameters + /// ---------- + /// query : str + /// The Cypher query string to execute + /// + /// Returns + /// ------- + /// pyarrow.Table + /// Query results as Arrow table + /// + /// Raises + /// ------ + /// ValueError + /// If the query is invalid + /// RuntimeError + /// If query execution fails + /// + /// Examples + /// -------- + /// >>> result = engine.execute("MATCH (p:Person) WHERE p.age > 30 RETURN p.name") + /// >>> print(result.to_pandas()) + fn execute( + &self, + py: Python, + query: &str, + ) -> PyResult { + // Parse the query + let cypher_query = RustCypherQuery::new(query) + .map_err(graph_error_to_pyerr)? + .with_config(self.config.clone()); + + // Execute using the cached catalog and context + // Note: SessionContext is internally Arc-wrapped, so clone() is cheap. + // We clone the SessionContext (not the Arc) because execute_with_catalog_and_context + // takes ownership. DataFusion's SessionContext::clone() just increments Arc refcounts + // for internal state (RuntimeEnv, SessionState), so this is a shallow clone. + let catalog = self.catalog.clone(); + let context = self.context.as_ref().clone(); + + let result_batch = RT + .block_on( + Some(py), + cypher_query.execute_with_catalog_and_context(catalog, context), + )? + .map_err(graph_error_to_pyerr)?; + + record_batch_to_python_table(py, &result_batch) + } + + /// Get the graph configuration + fn config(&self) -> GraphConfig { + GraphConfig { + inner: self.config.clone(), + } + } + + fn __repr__(&self) -> String { + format!( + "CypherEngine(nodes={}, relationships={})", + self.config.node_mappings.len(), + self.config.relationship_mappings.len() + ) + } +} + /// Register graph functionality with the Python module pub fn register_graph_module(py: Python, parent_module: &Bound<'_, PyModule>) -> PyResult<()> { let graph_module = PyModule::new(py, "graph")?; @@ -882,6 +1075,7 @@ pub fn register_graph_module(py: Python, parent_module: &Bound<'_, PyModule>) -> graph_module.add_class::()?; graph_module.add_class::()?; graph_module.add_class::()?; + graph_module.add_class::()?; graph_module.add_class::()?; graph_module.add_class::()?; diff --git a/python/uv.lock b/python/uv.lock index 114b02a..aea6c35 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -1241,7 +1241,7 @@ wheels = [ [[package]] name = "lance-graph" -version = "0.4.0" +version = "0.5.0" source = { editable = "." } dependencies = [ { name = "fastapi" }, From 3f7f8f2bc00b2416101fddfda38f602f213e759e Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Mon, 2 Feb 2026 23:25:44 -0800 Subject: [PATCH 2/2] fix test errors --- python/python/lance_graph/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/python/lance_graph/__init__.py b/python/python/lance_graph/__init__.py index 3e2d0b6..8ea65ee 100644 --- a/python/python/lance_graph/__init__.py +++ b/python/python/lance_graph/__init__.py @@ -71,14 +71,19 @@ def _load_dev_build() -> ModuleType: GraphConfig = _bindings.graph.GraphConfig GraphConfigBuilder = _bindings.graph.GraphConfigBuilder CypherQuery = _bindings.graph.CypherQuery +CypherEngine = _bindings.graph.CypherEngine +ExecutionStrategy = _bindings.graph.ExecutionStrategy VectorSearch = _bindings.graph.VectorSearch DistanceMetric = _bindings.graph.DistanceMetric + DirNamespace = _bindings.graph.DirNamespace __all__ = [ "GraphConfig", "GraphConfigBuilder", "CypherQuery", + "CypherEngine", + "ExecutionStrategy", "VectorSearch", "DistanceMetric", "DirNamespace",