Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 118 additions & 37 deletions crates/lance-graph/src/datafusion_planner/join_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::config::{NodeMapping, RelationshipMapping};
use crate::error::Result;
use crate::source_catalog::GraphSourceCatalog;
use datafusion::logical_expr::{
col, BinaryExpr, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
col, BinaryExpr, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator, TableSource,
};
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -41,6 +41,97 @@ pub(crate) struct TargetJoinParams<'a> {
}

impl DataFusionPlanner {
/// Build a target node scan with property filters and qualified column names
fn build_target_scan(
&self,
target_source: Arc<dyn TableSource>,
target_label: &str,
target_variable: &str,
target_properties: &HashMap<String, PropertyValue>,
) -> Result<LogicalPlan> {
let target_schema = target_source.schema();
let normalized_target_label = target_label.to_lowercase();
let mut target_builder =
LogicalPlanBuilder::scan(&normalized_target_label, target_source, None).map_err(
|e| self.plan_error(&format!("Failed to scan target node '{}'", target_label), e),
)?;

// Apply target property filters (e.g., (b {age: 30}))
for (k, v) in target_properties.iter() {
let lit_expr = super::expression::to_df_value_expr(
&crate::ast::ValueExpression::Literal(v.clone()),
);
let filter_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(k.to_lowercase())),
op: Operator::Eq,
right: Box::new(lit_expr),
});
target_builder = target_builder.filter(filter_expr).map_err(|e| {
self.plan_error(&format!("Failed to apply target filter on '{}'", k), e)
})?;
}

let target_qualified_exprs: Vec<Expr> = target_schema
.fields()
.iter()
.map(|field| {
let qualified_name = qualify_column(target_variable, field.name());
col(field.name()).alias(&qualified_name)
})
.collect();

target_builder
.project(target_qualified_exprs)
.map_err(|e| self.plan_error("Failed to project target columns", e))?
.build()
.map_err(|e| self.plan_error("Failed to build target scan", e))
}

/// Handle variable reuse case by adding a filter constraint instead of joining
fn handle_variable_reuse_filter(
&self,
mut builder: LogicalPlanBuilder,
params: &TargetJoinParams,
qualified_target_key: String,
) -> Result<LogicalPlan> {
// Determine the relationship target key based on direction
let target_key = match params.direction {
RelationshipDirection::Outgoing => &params.rel_map.target_id_field,
RelationshipDirection::Incoming => &params.rel_map.source_id_field,
RelationshipDirection::Undirected => &params.rel_map.target_id_field,
};

let qualified_rel_target_key = qualify_column(params.rel_qualifier, target_key);

// Create a filter expression: rel_target_key = target_key
let join_condition = Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(&qualified_rel_target_key)),
op: Operator::Eq,
right: Box::new(col(&qualified_target_key)),
});

// Apply filter instead of join
builder = builder
.filter(join_condition)
.map_err(|e| self.plan_error("Failed to apply variable reuse filter", e))?;

// TODO: Handle target_properties filters when variable is reused
// For now, we'll skip them since the variable already exists
if !params.target_properties.is_empty() {
return Err(crate::error::GraphError::PlanError {
message: format!(
"Property filters on reused variable '{}' are not yet supported",
params.target_variable
),
location: snafu::Location::new(file!(), line!(), column!()),
});
}

builder
.build()
.map_err(|e| self.plan_error("Failed to build plan with variable reuse", e))
}

/// Join source node plan with relationship scan
pub(crate) fn join_source_to_relationship(
&self,
Expand Down Expand Up @@ -94,43 +185,35 @@ impl DataFusionPlanner {
.map_err(|e| self.plan_error("Failed to build plan (no target source)", e));
};

// Create target node scan with qualified column aliases and property filters
let target_schema = target_source.schema();
let normalized_target_label = target_label.to_lowercase();
let mut target_builder =
LogicalPlanBuilder::scan(&normalized_target_label, target_source, None).map_err(
|e| self.plan_error(&format!("Failed to scan target node '{}'", target_label), e),
)?;
// Check if target variable already exists in the schema (variable reuse)
// Build a temporary plan to inspect the current schema
let current_plan = builder
.clone()
.build()
.map_err(|e| self.plan_error("Failed to build temp plan for schema check", e))?;
let current_schema = current_plan.schema();

// Apply target property filters (e.g., (b {age: 30}))
for (k, v) in params.target_properties.iter() {
let lit_expr = super::expression::to_df_value_expr(
&crate::ast::ValueExpression::Literal(v.clone()),
);
let filter_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(k.to_lowercase())),
op: Operator::Eq,
right: Box::new(lit_expr),
});
target_builder = target_builder.filter(filter_expr).map_err(|e| {
self.plan_error(&format!("Failed to apply target filter on '{}'", k), e)
})?;
// Check if the target variable's ID column already exists
let qualified_target_key =
qualify_column(params.target_variable, &params.node_map.id_field);
let target_exists = current_schema
.field_with_unqualified_name(&qualified_target_key)
.is_ok();

if target_exists {
// Variable reuse: target node columns already in schema
// Skip creating new scan, just add filter constraint
return self.handle_variable_reuse_filter(builder, params, qualified_target_key);
}

let target_qualified_exprs: Vec<Expr> = target_schema
.fields()
.iter()
.map(|field| {
let qualified_name = qualify_column(params.target_variable, field.name());
col(field.name()).alias(&qualified_name)
})
.collect();

let target_scan = target_builder
.project(target_qualified_exprs)
.map_err(|e| self.plan_error("Failed to project target columns", e))?
.build()
.map_err(|e| self.plan_error("Failed to build target scan", e))?;
// Normal case: target variable doesn't exist yet
// Create target node scan with qualified column aliases and property filters
let target_scan = self.build_target_scan(
target_source,
&target_label,
params.target_variable,
params.target_properties,
)?;

// Determine target join keys
let target_key = match params.direction {
Expand All @@ -140,8 +223,6 @@ impl DataFusionPlanner {
};

let qualified_rel_target_key = qualify_column(params.rel_qualifier, target_key);
let qualified_target_key =
qualify_column(params.target_variable, &params.node_map.id_field);

builder = builder
.join(
Expand Down
108 changes: 99 additions & 9 deletions crates/lance-graph/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,18 @@ impl LogicalPlanner {
.clone()
.unwrap_or_else(|| format!("_node_{}", self.variables.len()));

let label = node
.labels
.first()
// Validate label consistency if variable already exists
self.validate_variable_label(&variable, &node.labels)?;

// Reuse existing label or derive from AST (default to "Node")
let label = self
.variables
.get(&variable)
.cloned()
.or_else(|| node.labels.first().cloned())
.unwrap_or_else(|| "Node".to_string());

// Register variable
// Register variable with its label
self.variables.insert(variable.clone(), label.clone());

Ok(LogicalOperator::ScanByLabel {
Expand All @@ -354,7 +359,25 @@ impl LogicalPlanner {
})
}

// (removed) plan_path_segment is superseded by plan_path
/// Validate that a node's explicit label (if any) matches the already-registered
/// label for this variable. Returns `Ok(())` if the variable is new or if labels
/// are consistent, and an error if they conflict.
fn validate_variable_label(&self, variable: &str, ast_labels: &[String]) -> Result<()> {
if let Some(existing_label) = self.variables.get(variable) {
if let Some(ast_label) = ast_labels.first() {
if ast_label != existing_label {
return Err(GraphError::PlanError {
message: format!(
"Variable '{}' already has label '{}', cannot redefine as '{}'",
variable, existing_label, ast_label
),
location: snafu::Location::new(file!(), line!(), column!()),
});
}
}
}
Ok(())
}

/// Plan a full path pattern, respecting the starting variable if provided
fn plan_path(
Expand All @@ -375,6 +398,11 @@ impl LogicalPlanner {
None => self.extract_variable_from_plan(&plan)?,
};

// Validate start node label consistency if variable already exists
if let Some(start_var) = &path.start_node.variable {
self.validate_variable_label(start_var, &path.start_node.labels)?;
}

// For each segment, add an expand
for segment in &path.segments {
// Determine / register target variable
Expand All @@ -384,12 +412,17 @@ impl LogicalPlanner {
.clone()
.unwrap_or_else(|| format!("_node_{}", self.variables.len()));

let target_label = segment
.end_node
.labels
.first()
// Validate label consistency for already-registered variables
self.validate_variable_label(&target_variable, &segment.end_node.labels)?;

// Reuse existing label or derive from AST (default to "Node")
let target_label = self
.variables
.get(&target_variable)
.cloned()
.or_else(|| segment.end_node.labels.first().cloned())
.unwrap_or_else(|| "Node".to_string());

self.variables
.insert(target_variable.clone(), target_label.clone());

Expand Down Expand Up @@ -1171,4 +1204,61 @@ mod tests {
_ => panic!("Expected Project at top level"),
}
}

#[test]
fn test_variable_reuse_across_patterns() {
let query_text =
"MATCH (a:Person)-[:KNOWS]->(shared:Person), (shared)-[:KNOWS]->(b:Person) RETURN b.name";

let ast = parse_cypher_query(query_text).unwrap();
let mut planner = LogicalPlanner::new();
let logical_plan = planner.plan(&ast).unwrap();

// Expect: Project { Expand(shared->b) { Expand(a->shared) { Scan(a) } } }
match &logical_plan {
LogicalOperator::Project { input, .. } => match input.as_ref() {
LogicalOperator::Expand {
input: inner,
source_variable,
target_variable,
..
} => {
assert_eq!(source_variable, "shared");
assert_eq!(target_variable, "b");

match inner.as_ref() {
LogicalOperator::Expand {
source_variable: first_src,
target_variable: first_dst,
..
} => {
assert_eq!(first_src, "a");
assert_eq!(first_dst, "shared");
}
_ => panic!("Expected first Expand (a->shared)"),
}
}
_ => panic!("Expected second Expand (shared->b)"),
},
_ => panic!("Expected Project at top level"),
}
}

#[test]
fn test_variable_reuse_with_conflicting_labels() {
let query_text =
"MATCH (a:Person)-[:KNOWS]->(shared:Person), (shared:Company)-[:EMPLOYS]->(b:Person) RETURN b.name";

let ast = parse_cypher_query(query_text).unwrap();
let mut planner = LogicalPlanner::new();
let err = planner.plan(&ast).unwrap_err();
let err_msg = err.to_string();

assert!(
err_msg.contains("already has label 'Person'")
&& err_msg.contains("cannot redefine as 'Company'"),
"Expected error about label conflict, got: {}",
err_msg
);
}
}
Loading
Loading