Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions crates/lance-graph/src/datafusion_planner/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@ pub struct RelationshipInstance {
pub struct PlanningContext<'a> {
pub analysis: &'a QueryAnalysis,
pub(crate) relationship_instance_idx: HashMap<String, usize>,
pub parameters: &'a HashMap<String, serde_json::Value>,
}

impl<'a> PlanningContext<'a> {
pub fn new(analysis: &'a QueryAnalysis) -> Self {
pub fn new(
analysis: &'a QueryAnalysis,
parameters: &'a HashMap<String, serde_json::Value>,
) -> Self {
Self {
analysis,
relationship_instance_idx: HashMap::new(),
parameters,
}
}

Expand Down Expand Up @@ -383,7 +388,8 @@ mod tests {
required_datasets: HashSet::new(),
};

let mut ctx = PlanningContext::new(&analysis);
let empty_params = HashMap::new();
let mut ctx = PlanningContext::new(&analysis, &empty_params);

// First call should return first instance
let inst1 = ctx.next_relationship_instance("KNOWS").unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! Aggregation operations: Projection with aggregates and grouping

use crate::datafusion_planner::analysis::PlanningContext;
use crate::datafusion_planner::DataFusionPlanner;
use crate::error::Result;
use crate::logical_plan::*;
Expand All @@ -11,6 +12,7 @@ use datafusion::logical_expr::{col, LogicalPlan, LogicalPlanBuilder};
impl DataFusionPlanner {
pub(crate) fn build_project_with_aggregates(
&self,
ctx: &mut PlanningContext,
input_plan: LogicalPlan,
projections: &[ProjectionItem],
) -> Result<LogicalPlan> {
Expand All @@ -21,7 +23,7 @@ impl DataFusionPlanner {
let mut agg_aliases = Vec::new();

for p in projections {
let expr = super::super::expression::to_df_value_expr(&p.expression);
let expr = super::super::expression::to_df_value_expr(&p.expression, ctx.parameters);

if super::super::expression::contains_aggregate(&p.expression) {
// Aggregate expressions get aliased
Expand All @@ -44,7 +46,8 @@ impl DataFusionPlanner {
for p in projections {
if !super::super::expression::contains_aggregate(&p.expression) {
// Re-create the expression and apply alias
let expr = super::super::expression::to_df_value_expr(&p.expression);
let expr =
super::super::expression::to_df_value_expr(&p.expression, ctx.parameters);
let aliased = if let Some(alias) = &p.alias {
expr.alias(alias)
} else {
Expand Down
15 changes: 9 additions & 6 deletions crates/lance-graph/src/datafusion_planner/builder/basic_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl DataFusionPlanner {
predicate: &crate::ast::BooleanExpression,
) -> Result<LogicalPlan> {
let input_plan = self.build_operator(ctx, input)?;
let expr = super::super::expression::to_df_boolean_expr(predicate);
let expr = super::super::expression::to_df_boolean_expr(predicate, ctx.parameters);
LogicalPlanBuilder::from(input_plan)
.filter(expr)
.map_err(|e| self.plan_error("Failed to build filter", e))?
Expand All @@ -39,21 +39,23 @@ impl DataFusionPlanner {
.any(|p| super::super::expression::contains_aggregate(&p.expression));

if has_aggregates {
self.build_project_with_aggregates(input_plan, projections)
self.build_project_with_aggregates(ctx, input_plan, projections)
} else {
self.build_simple_project(input_plan, projections)
self.build_simple_project(ctx, input_plan, projections)
}
}

pub(crate) fn build_simple_project(
&self,
ctx: &mut PlanningContext,
input_plan: LogicalPlan,
projections: &[ProjectionItem],
) -> Result<LogicalPlan> {
let exprs: Vec<datafusion::logical_expr::Expr> = projections
.iter()
.map(|p| {
let expr = super::super::expression::to_df_value_expr(&p.expression);
let expr =
super::super::expression::to_df_value_expr(&p.expression, ctx.parameters);
// Apply alias if provided, otherwise use Cypher dot notation
// Normalize alias to lowercase for case-insensitive behavior
if let Some(alias) = &p.alias {
Expand Down Expand Up @@ -98,7 +100,8 @@ impl DataFusionPlanner {
let sort_exprs: Vec<SortExpr> = sort_items
.iter()
.map(|item| {
let expr = super::super::expression::to_df_value_expr(&item.expression);
let expr =
super::super::expression::to_df_value_expr(&item.expression, ctx.parameters);
let asc = matches!(item.direction, crate::ast::SortDirection::Ascending);
SortExpr {
expr,
Expand Down Expand Up @@ -160,7 +163,7 @@ impl DataFusionPlanner {
};

// Convert expression to DataFusion Expr
let df_expr = super::super::expression::to_df_value_expr(expression);
let df_expr = super::super::expression::to_df_value_expr(expression, ctx.parameters);

// We project the list expression first (aliased as the target alias temporarily)
// DataFusion unnest takes a column name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl DataFusionPlanner {

// Build relationship scan with qualified columns and property filters
let rel_scan =
self.build_relationship_scan(&rel_instance, rel_source, relationship_properties)?;
self.build_relationship_scan(ctx, &rel_instance, rel_source, relationship_properties)?;

// Join source node with relationship
let source_params = SourceJoinParams {
Expand Down Expand Up @@ -297,6 +297,7 @@ impl DataFusionPlanner {

// Build target node scan and join
let target_scan = self.build_qualified_target_scan(
ctx,
catalog,
&target_label,
target_variable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,8 @@ mod tests {

// Analyze both patterns to build the context
let left_analysis = analysis::analyze(&expand_left).unwrap();
let left_ctx = analysis::PlanningContext::new(&left_analysis);
let empty_params = std::collections::HashMap::new();
let left_ctx = analysis::PlanningContext::new(&left_analysis, &empty_params);

// Test the key inference logic directly
let (left_keys, right_keys) =
Expand Down
15 changes: 10 additions & 5 deletions crates/lance-graph/src/datafusion_planner/config_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ mod tests {
analysis
.var_to_label
.insert("b".to_string(), "Person".to_string());
let ctx = PlanningContext::new(&analysis);
let empty_params = std::collections::HashMap::new();
let ctx = PlanningContext::new(&analysis, &empty_params);

let (label, node_map) = planner
.get_target_node_mapping(&ctx, "b")
Expand All @@ -156,7 +157,8 @@ mod tests {
analysis
.var_to_label
.insert("a".to_string(), "Person".to_string());
let ctx = PlanningContext::new(&analysis);
let empty_params = std::collections::HashMap::new();
let ctx = PlanningContext::new(&analysis, &empty_params);

let (label, node_map) = planner
.get_target_node_mapping(&ctx, "_temp_a_1")
Expand All @@ -169,7 +171,8 @@ mod tests {
fn test_get_target_node_mapping_invalid_temp_variable() {
let planner = planner_with_basic_config();
let analysis = QueryAnalysis::default();
let ctx = PlanningContext::new(&analysis);
let empty_params = std::collections::HashMap::new();
let ctx = PlanningContext::new(&analysis, &empty_params);

let err = planner
.get_target_node_mapping(&ctx, "_temp_invalid")
Expand All @@ -185,7 +188,8 @@ mod tests {
analysis
.var_to_label
.insert("a".to_string(), "Person".to_string());
let ctx = PlanningContext::new(&analysis);
let empty_params = std::collections::HashMap::new();
let ctx = PlanningContext::new(&analysis, &empty_params);

let err = planner.get_target_node_mapping(&ctx, "c").unwrap_err();
let msg = format!("{}", err);
Expand All @@ -203,7 +207,8 @@ mod tests {
analysis
.var_to_label
.insert("b".to_string(), "Organization".to_string());
let ctx = PlanningContext::new(&analysis);
let empty_params = std::collections::HashMap::new();
let ctx = PlanningContext::new(&analysis, &empty_params);

let err = planner.get_target_node_mapping(&ctx, "b").unwrap_err();
let msg = format!("{}", err);
Expand Down
Loading
Loading