From 5d1483b76bd2e8fbf9ff0def0a66ae94407aad2e Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Sat, 7 Feb 2026 19:51:26 -0800 Subject: [PATCH 1/4] fix: Avoid duplicate columns in shared variables --- .../src/datafusion_planner/join_ops.rs | 155 +++++++++++++----- crates/lance-graph/src/logical_plan.rs | 33 +++- .../tests/test_datafusion_pipeline.rs | 81 +++++++++ 3 files changed, 226 insertions(+), 43 deletions(-) diff --git a/crates/lance-graph/src/datafusion_planner/join_ops.rs b/crates/lance-graph/src/datafusion_planner/join_ops.rs index a1e83ba..ef815ef 100644 --- a/crates/lance-graph/src/datafusion_planner/join_ops.rs +++ b/crates/lance-graph/src/datafusion_planner/join_ops.rs @@ -16,7 +16,7 @@ use crate::config::{NodeMapping, RelationshipMapping}; use crate::error::Result; use crate::source_catalog::GraphSourceCatalog; use datafusion::logical_expr::{ - col, BinaryExpr, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, + col, BinaryExpr, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, TableSource, }; use std::collections::HashMap; use std::sync::Arc; @@ -41,6 +41,97 @@ pub(crate) struct TargetJoinParams<'a> { } impl DataFusionPlanner { + /// Build a target node scan with property filters and qualified column names + fn build_target_scan( + &self, + target_source: Arc, + target_label: &str, + target_variable: &str, + target_properties: &HashMap, + ) -> Result { + let target_schema = target_source.schema(); + let normalized_target_label = target_label.to_lowercase(); + let mut target_builder = + LogicalPlanBuilder::scan(&normalized_target_label, target_source, None).map_err( + |e| self.plan_error(&format!("Failed to scan target node '{}'", target_label), e), + )?; + + // Apply target property filters (e.g., (b {age: 30})) + for (k, v) in target_properties.iter() { + let lit_expr = super::expression::to_df_value_expr( + &crate::ast::ValueExpression::Literal(v.clone()), + ); + let filter_expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(col(k.to_lowercase())), + op: Operator::Eq, + right: Box::new(lit_expr), + }); + target_builder = target_builder.filter(filter_expr).map_err(|e| { + self.plan_error(&format!("Failed to apply target filter on '{}'", k), e) + })?; + } + + let target_qualified_exprs: Vec = target_schema + .fields() + .iter() + .map(|field| { + let qualified_name = qualify_column(target_variable, field.name()); + col(field.name()).alias(&qualified_name) + }) + .collect(); + + target_builder + .project(target_qualified_exprs) + .map_err(|e| self.plan_error("Failed to project target columns", e))? + .build() + .map_err(|e| self.plan_error("Failed to build target scan", e)) + } + + /// Handle variable reuse case by adding a filter constraint instead of joining + fn handle_variable_reuse_filter( + &self, + mut builder: LogicalPlanBuilder, + params: &TargetJoinParams, + qualified_target_key: String, + ) -> Result { + // Determine the relationship target key based on direction + let target_key = match params.direction { + RelationshipDirection::Outgoing => ¶ms.rel_map.target_id_field, + RelationshipDirection::Incoming => ¶ms.rel_map.source_id_field, + RelationshipDirection::Undirected => ¶ms.rel_map.target_id_field, + }; + + let qualified_rel_target_key = qualify_column(params.rel_qualifier, target_key); + + // Create a filter expression: rel_target_key = target_key + let join_condition = Expr::BinaryExpr(BinaryExpr { + left: Box::new(col(&qualified_rel_target_key)), + op: Operator::Eq, + right: Box::new(col(&qualified_target_key)), + }); + + // Apply filter instead of join + builder = builder + .filter(join_condition) + .map_err(|e| self.plan_error("Failed to apply variable reuse filter", e))?; + + // TODO: Handle target_properties filters when variable is reused + // For now, we'll skip them since the variable already exists + if !params.target_properties.is_empty() { + return Err(crate::error::GraphError::PlanError { + message: format!( + "Property filters on reused variable '{}' are not yet supported", + params.target_variable + ), + location: snafu::Location::new(file!(), line!(), column!()), + }); + } + + builder + .build() + .map_err(|e| self.plan_error("Failed to build plan with variable reuse", e)) + } + /// Join source node plan with relationship scan pub(crate) fn join_source_to_relationship( &self, @@ -94,43 +185,35 @@ impl DataFusionPlanner { .map_err(|e| self.plan_error("Failed to build plan (no target source)", e)); }; - // Create target node scan with qualified column aliases and property filters - let target_schema = target_source.schema(); - let normalized_target_label = target_label.to_lowercase(); - let mut target_builder = - LogicalPlanBuilder::scan(&normalized_target_label, target_source, None).map_err( - |e| self.plan_error(&format!("Failed to scan target node '{}'", target_label), e), - )?; + // Check if target variable already exists in the schema (variable reuse) + // Build a temporary plan to inspect the current schema + let current_plan = builder + .clone() + .build() + .map_err(|e| self.plan_error("Failed to build temp plan for schema check", e))?; + let current_schema = current_plan.schema(); - // Apply target property filters (e.g., (b {age: 30})) - for (k, v) in params.target_properties.iter() { - let lit_expr = super::expression::to_df_value_expr( - &crate::ast::ValueExpression::Literal(v.clone()), - ); - let filter_expr = Expr::BinaryExpr(BinaryExpr { - left: Box::new(col(k.to_lowercase())), - op: Operator::Eq, - right: Box::new(lit_expr), - }); - target_builder = target_builder.filter(filter_expr).map_err(|e| { - self.plan_error(&format!("Failed to apply target filter on '{}'", k), e) - })?; + // Check if the target variable's ID column already exists + let qualified_target_key = + qualify_column(params.target_variable, ¶ms.node_map.id_field); + let target_exists = current_schema + .field_with_unqualified_name(&qualified_target_key) + .is_ok(); + + if target_exists { + // Variable reuse: target node columns already in schema + // Skip creating new scan, just add filter constraint + return self.handle_variable_reuse_filter(builder, params, qualified_target_key); } - let target_qualified_exprs: Vec = target_schema - .fields() - .iter() - .map(|field| { - let qualified_name = qualify_column(params.target_variable, field.name()); - col(field.name()).alias(&qualified_name) - }) - .collect(); - - let target_scan = target_builder - .project(target_qualified_exprs) - .map_err(|e| self.plan_error("Failed to project target columns", e))? - .build() - .map_err(|e| self.plan_error("Failed to build target scan", e))?; + // Normal case: target variable doesn't exist yet + // Create target node scan with qualified column aliases and property filters + let target_scan = self.build_target_scan( + target_source, + &target_label, + params.target_variable, + params.target_properties, + )?; // Determine target join keys let target_key = match params.direction { @@ -140,8 +223,6 @@ impl DataFusionPlanner { }; let qualified_rel_target_key = qualify_column(params.rel_qualifier, target_key); - let qualified_target_key = - qualify_column(params.target_variable, ¶ms.node_map.id_field); builder = builder .join( diff --git a/crates/lance-graph/src/logical_plan.rs b/crates/lance-graph/src/logical_plan.rs index 9fae28c..64d9e2f 100644 --- a/crates/lance-graph/src/logical_plan.rs +++ b/crates/lance-graph/src/logical_plan.rs @@ -384,12 +384,33 @@ impl LogicalPlanner { .clone() .unwrap_or_else(|| format!("_node_{}", self.variables.len())); - let target_label = segment - .end_node - .labels - .first() - .cloned() - .unwrap_or_else(|| "Node".to_string()); + // Check if variable already exists and reuse its label + let target_label = if let Some(existing_label) = self.variables.get(&target_variable) { + // Variable already registered - reuse its label + if !segment.end_node.labels.is_empty() { + // Label provided in AST - validate it matches + let ast_label = &segment.end_node.labels[0]; + if ast_label != existing_label { + return Err(GraphError::PlanError { + message: format!( + "Variable '{}' already has label '{}', cannot redefine as '{}'", + target_variable, existing_label, ast_label + ), + location: snafu::Location::new(file!(), line!(), column!()), + }); + } + } + existing_label.clone() + } else { + // New variable - get label from AST or default to "Node" + segment + .end_node + .labels + .first() + .cloned() + .unwrap_or_else(|| "Node".to_string()) + }; + self.variables .insert(target_variable.clone(), target_label.clone()); diff --git a/crates/lance-graph/tests/test_datafusion_pipeline.rs b/crates/lance-graph/tests/test_datafusion_pipeline.rs index 243db94..94e3cb0 100644 --- a/crates/lance-graph/tests/test_datafusion_pipeline.rs +++ b/crates/lance-graph/tests/test_datafusion_pipeline.rs @@ -4951,3 +4951,84 @@ async fn test_unwind_then_match() { assert_eq!(rows, expected); } + +// =========================================================================== +// Variable Reuse Tests +// =========================================================================== + +#[tokio::test] +async fn test_datafusion_variable_reuse_with_count_distinct() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Test variable reuse: 'shared' appears in both patterns + // Simulates finding comments where creator is same as post creator + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(shared:Person), \ + (shared)<-[:KNOWS]-(b:Person) \ + RETURN COUNT(DISTINCT shared.id) AS count", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); + + assert_eq!(result.num_rows(), 1); + + let counts = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Distinct people who have at least 2 people knowing them: + // Bob (known by Alice), Charlie (known by Alice and Bob), + // David (known by Charlie), Eve (known by David) = 4 people + assert_eq!(counts.value(0), 4); +} + +#[tokio::test] +async fn test_datafusion_variable_reuse_triangle_pattern() { + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + // Test multiple variable reuses: a, b, c all reused across 3 patterns + let query = CypherQuery::new( + "MATCH (a:Person)-[:KNOWS]->(b:Person), \ + (b)-[:KNOWS]->(c:Person), \ + (a)-[:KNOWS]->(c) \ + RETURN a.name, b.name, c.name", + ) + .unwrap() + .with_config(config); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + let result = query + .execute(datasets, Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); + + // Edges: 1->2, 2->3, 3->4, 4->5, 1->3 + // Triangle: Alice->Bob->Charlie with Alice->Charlie shortcut + assert_eq!(result.num_rows(), 1); + + let a_names = result.column(0).as_any().downcast_ref::().unwrap(); + let b_names = result.column(1).as_any().downcast_ref::().unwrap(); + let c_names = result.column(2).as_any().downcast_ref::().unwrap(); + + assert_eq!(a_names.value(0), "Alice"); + assert_eq!(b_names.value(0), "Bob"); + assert_eq!(c_names.value(0), "Charlie"); +} From 280eb74391d04b692510913099e80833eb108c73 Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Sat, 7 Feb 2026 20:09:36 -0800 Subject: [PATCH 2/4] format code --- .../tests/test_datafusion_pipeline.rs | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/crates/lance-graph/tests/test_datafusion_pipeline.rs b/crates/lance-graph/tests/test_datafusion_pipeline.rs index 94e3cb0..680f0a4 100644 --- a/crates/lance-graph/tests/test_datafusion_pipeline.rs +++ b/crates/lance-graph/tests/test_datafusion_pipeline.rs @@ -4982,13 +4982,13 @@ async fn test_datafusion_variable_reuse_with_count_distinct() { .unwrap(); assert_eq!(result.num_rows(), 1); - + let counts = result .column(0) .as_any() .downcast_ref::() .unwrap(); - + // Distinct people who have at least 2 people knowing them: // Bob (known by Alice), Charlie (known by Alice and Bob), // David (known by Charlie), Eve (known by David) = 4 people @@ -5024,9 +5024,21 @@ async fn test_datafusion_variable_reuse_triangle_pattern() { // Triangle: Alice->Bob->Charlie with Alice->Charlie shortcut assert_eq!(result.num_rows(), 1); - let a_names = result.column(0).as_any().downcast_ref::().unwrap(); - let b_names = result.column(1).as_any().downcast_ref::().unwrap(); - let c_names = result.column(2).as_any().downcast_ref::().unwrap(); + let a_names = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let b_names = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let c_names = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); assert_eq!(a_names.value(0), "Alice"); assert_eq!(b_names.value(0), "Bob"); From 59df72892bc03c806cb863f9b9be8b58bd178b8d Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Sat, 7 Feb 2026 20:21:26 -0800 Subject: [PATCH 3/4] add tests --- crates/lance-graph/src/logical_plan.rs | 118 +++++++++++++++++++------ 1 file changed, 91 insertions(+), 27 deletions(-) diff --git a/crates/lance-graph/src/logical_plan.rs b/crates/lance-graph/src/logical_plan.rs index 64d9e2f..6df1a5f 100644 --- a/crates/lance-graph/src/logical_plan.rs +++ b/crates/lance-graph/src/logical_plan.rs @@ -354,7 +354,25 @@ impl LogicalPlanner { }) } - // (removed) plan_path_segment is superseded by plan_path + /// Validate that a node's explicit label (if any) matches the already-registered + /// label for this variable. Returns `Ok(())` if the variable is new or if labels + /// are consistent, and an error if they conflict. + fn validate_variable_label(&self, variable: &str, ast_labels: &[String]) -> Result<()> { + if let Some(existing_label) = self.variables.get(variable) { + if let Some(ast_label) = ast_labels.first() { + if ast_label != existing_label { + return Err(GraphError::PlanError { + message: format!( + "Variable '{}' already has label '{}', cannot redefine as '{}'", + variable, existing_label, ast_label + ), + location: snafu::Location::new(file!(), line!(), column!()), + }); + } + } + } + Ok(()) + } /// Plan a full path pattern, respecting the starting variable if provided fn plan_path( @@ -375,6 +393,11 @@ impl LogicalPlanner { None => self.extract_variable_from_plan(&plan)?, }; + // Validate start node label consistency if variable already exists + if let Some(start_var) = &path.start_node.variable { + self.validate_variable_label(start_var, &path.start_node.labels)?; + } + // For each segment, add an expand for segment in &path.segments { // Determine / register target variable @@ -384,32 +407,16 @@ impl LogicalPlanner { .clone() .unwrap_or_else(|| format!("_node_{}", self.variables.len())); - // Check if variable already exists and reuse its label - let target_label = if let Some(existing_label) = self.variables.get(&target_variable) { - // Variable already registered - reuse its label - if !segment.end_node.labels.is_empty() { - // Label provided in AST - validate it matches - let ast_label = &segment.end_node.labels[0]; - if ast_label != existing_label { - return Err(GraphError::PlanError { - message: format!( - "Variable '{}' already has label '{}', cannot redefine as '{}'", - target_variable, existing_label, ast_label - ), - location: snafu::Location::new(file!(), line!(), column!()), - }); - } - } - existing_label.clone() - } else { - // New variable - get label from AST or default to "Node" - segment - .end_node - .labels - .first() - .cloned() - .unwrap_or_else(|| "Node".to_string()) - }; + // Validate label consistency for already-registered variables + self.validate_variable_label(&target_variable, &segment.end_node.labels)?; + + // Reuse existing label or derive from AST (default to "Node") + let target_label = self + .variables + .get(&target_variable) + .cloned() + .or_else(|| segment.end_node.labels.first().cloned()) + .unwrap_or_else(|| "Node".to_string()); self.variables .insert(target_variable.clone(), target_label.clone()); @@ -1192,4 +1199,61 @@ mod tests { _ => panic!("Expected Project at top level"), } } + + #[test] + fn test_variable_reuse_across_patterns() { + let query_text = + "MATCH (a:Person)-[:KNOWS]->(shared:Person), (shared)-[:KNOWS]->(b:Person) RETURN b.name"; + + let ast = parse_cypher_query(query_text).unwrap(); + let mut planner = LogicalPlanner::new(); + let logical_plan = planner.plan(&ast).unwrap(); + + // Expect: Project { Expand(shared->b) { Expand(a->shared) { Scan(a) } } } + match &logical_plan { + LogicalOperator::Project { input, .. } => match input.as_ref() { + LogicalOperator::Expand { + input: inner, + source_variable, + target_variable, + .. + } => { + assert_eq!(source_variable, "shared"); + assert_eq!(target_variable, "b"); + + match inner.as_ref() { + LogicalOperator::Expand { + source_variable: first_src, + target_variable: first_dst, + .. + } => { + assert_eq!(first_src, "a"); + assert_eq!(first_dst, "shared"); + } + _ => panic!("Expected first Expand (a->shared)"), + } + } + _ => panic!("Expected second Expand (shared->b)"), + }, + _ => panic!("Expected Project at top level"), + } + } + + #[test] + fn test_variable_reuse_with_conflicting_labels() { + let query_text = + "MATCH (a:Person)-[:KNOWS]->(shared:Person), (shared:Company)-[:EMPLOYS]->(b:Person) RETURN b.name"; + + let ast = parse_cypher_query(query_text).unwrap(); + let mut planner = LogicalPlanner::new(); + let err = planner.plan(&ast).unwrap_err(); + let err_msg = err.to_string(); + + assert!( + err_msg.contains("already has label 'Person'") + && err_msg.contains("cannot redefine as 'Company'"), + "Expected error about label conflict, got: {}", + err_msg + ); + } } From d8aa954f001d6e5e4066f51f26861aae01be39d9 Mon Sep 17 00:00:00 2001 From: JoshuaTang <1240604020@qq.com> Date: Sun, 8 Feb 2026 09:32:51 -0800 Subject: [PATCH 4/4] test mult-pattern optmization --- crates/lance-graph/src/logical_plan.rs | 13 +++-- .../tests/test_datafusion_pipeline.rs | 57 +++++++++++++++++++ 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/crates/lance-graph/src/logical_plan.rs b/crates/lance-graph/src/logical_plan.rs index 6df1a5f..236706a 100644 --- a/crates/lance-graph/src/logical_plan.rs +++ b/crates/lance-graph/src/logical_plan.rs @@ -338,13 +338,18 @@ impl LogicalPlanner { .clone() .unwrap_or_else(|| format!("_node_{}", self.variables.len())); - let label = node - .labels - .first() + // Validate label consistency if variable already exists + self.validate_variable_label(&variable, &node.labels)?; + + // Reuse existing label or derive from AST (default to "Node") + let label = self + .variables + .get(&variable) .cloned() + .or_else(|| node.labels.first().cloned()) .unwrap_or_else(|| "Node".to_string()); - // Register variable + // Register variable with its label self.variables.insert(variable.clone(), label.clone()); Ok(LogicalOperator::ScanByLabel { diff --git a/crates/lance-graph/tests/test_datafusion_pipeline.rs b/crates/lance-graph/tests/test_datafusion_pipeline.rs index 680f0a4..b1b203b 100644 --- a/crates/lance-graph/tests/test_datafusion_pipeline.rs +++ b/crates/lance-graph/tests/test_datafusion_pipeline.rs @@ -5044,3 +5044,60 @@ async fn test_datafusion_variable_reuse_triangle_pattern() { assert_eq!(b_names.value(0), "Bob"); assert_eq!(c_names.value(0), "Charlie"); } + +#[tokio::test] +async fn test_datafusion_variable_reuse_multi_pattern_optimization() { + // Pattern structure: (x)-[:R1]->(shared), (x)-[:R2]->(y)-[:R3]->(shared) + let config = create_graph_config(); + let person_batch = create_person_dataset(); + let knows_batch = create_knows_dataset(); + + let mut datasets = HashMap::new(); + datasets.insert("Person".to_string(), person_batch); + datasets.insert("KNOWS".to_string(), knows_batch); + + // Query finds Alice(1) who knows Bob(2), and Alice(1) knows someone(2) who knows same Bob(3)? + let query = CypherQuery::new( + "MATCH (p:Person)-[:KNOWS]->(friend:Person), \ + (p)-[:KNOWS]->(other:Person)-[:KNOWS]->(friend) \ + RETURN p.name, friend.name", + ) + .unwrap() + .with_config(config.clone()); + + let result = query + .execute(datasets.clone(), Some(ExecutionStrategy::DataFusion)) + .await + .unwrap(); + + // Alice(1) knows Charlie(3), and Alice(1) knows Bob(2) who knows Charlie(3) + assert_eq!(result.num_rows(), 1); + + let p_names = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let friend_names = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(p_names.value(0), "Alice"); + assert_eq!(friend_names.value(0), "Charlie"); + + // Verify the query plan is optimal - 'p' and 'friend' should not be scanned twice + let explain_output = query.explain(datasets).await.unwrap(); + let plan_str = format!("{:?}", explain_output); + let person_scan_count = plan_str.matches("TableScan: person").count(); + + // Without optimization: would be 4 scans (p, friend, p again, friend again) + // With optimization: should be 3 scans (p, friend, other - reusing p and friend) + assert_eq!( + person_scan_count, 3, + "Expected optimal plan with 3 Person scans (p, friend, other), got {}. \ + Without optimization would be 4 (duplicate p and friend).", + person_scan_count + ); +}