diff --git a/crates/core/src/dataframe.rs b/crates/core/src/dataframe.rs index 72595ba81..017363f45 100644 --- a/crates/core/src/dataframe.rs +++ b/crates/core/src/dataframe.rs @@ -582,6 +582,14 @@ impl PyDataFrame { Ok(Self::new(df)) } + /// Apply window function expressions to the DataFrame + #[pyo3(signature = (*exprs))] + fn window(&self, exprs: Vec) -> PyDataFusionResult { + let window_exprs = exprs.into_iter().map(|e| e.into()).collect(); + let df = self.df.as_ref().clone().window(window_exprs)?; + Ok(Self::new(df)) + } + fn filter(&self, predicate: PyExpr) -> PyDataFusionResult { let df = self.df.as_ref().clone().filter(predicate.into())?; Ok(Self::new(df)) @@ -804,9 +812,25 @@ impl PyDataFrame { } /// Print the query plan - #[pyo3(signature = (verbose=false, analyze=false))] - fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyDataFusionResult<()> { - let df = self.df.as_ref().clone().explain(verbose, analyze)?; + #[pyo3(signature = (verbose=false, analyze=false, format=None))] + fn explain( + &self, + py: Python, + verbose: bool, + analyze: bool, + format: Option<&str>, + ) -> PyDataFusionResult<()> { + let explain_format = match format { + Some(f) => f + .parse::() + .map_err(|e| PyDataFusionError::Common(e.to_string()))?, + None => datafusion::common::format::ExplainFormat::Indent, + }; + let opts = datafusion::logical_expr::ExplainOption::default() + .with_verbose(verbose) + .with_analyze(analyze) + .with_format(explain_format); + let df = self.df.as_ref().clone().explain_with_options(opts)?; print_dataframe(py, df) } @@ -875,11 +899,14 @@ impl PyDataFrame { Ok(Self::new(new_df)) } - #[pyo3(signature = (column, preserve_nulls=true))] - fn unnest_column(&self, column: &str, preserve_nulls: bool) -> PyDataFusionResult { - // TODO: expose RecursionUnnestOptions - // REF: https://github.com/apache/datafusion/pull/11577 - let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls); + #[pyo3(signature = (column, preserve_nulls=true, recursions=None))] + fn unnest_column( + &self, + column: &str, + preserve_nulls: bool, + recursions: Option>, + ) -> PyDataFusionResult { + let unnest_options = build_unnest_options(preserve_nulls, recursions); let df = self .df .as_ref() @@ -888,15 +915,14 @@ impl PyDataFrame { Ok(Self::new(df)) } - #[pyo3(signature = (columns, preserve_nulls=true))] + #[pyo3(signature = (columns, preserve_nulls=true, recursions=None))] fn unnest_columns( &self, columns: Vec, preserve_nulls: bool, + recursions: Option>, ) -> PyDataFusionResult { - // TODO: expose RecursionUnnestOptions - // REF: https://github.com/apache/datafusion/pull/11577 - let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls); + let unnest_options = build_unnest_options(preserve_nulls, recursions); let cols = columns.iter().map(|s| s.as_ref()).collect::>(); let df = self .df @@ -922,6 +948,71 @@ impl PyDataFrame { Ok(Self::new(new_df)) } + /// Calculate the set difference with deduplication + fn except_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult { + let new_df = self + .df + .as_ref() + .clone() + .except_distinct(py_df.df.as_ref().clone())?; + Ok(Self::new(new_df)) + } + + /// Calculate the intersection with deduplication + fn intersect_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult { + let new_df = self + .df + .as_ref() + .clone() + .intersect_distinct(py_df.df.as_ref().clone())?; + Ok(Self::new(new_df)) + } + + /// Union two DataFrames matching columns by name + fn union_by_name(&self, py_df: PyDataFrame) -> PyDataFusionResult { + let new_df = self + .df + .as_ref() + .clone() + .union_by_name(py_df.df.as_ref().clone())?; + Ok(Self::new(new_df)) + } + + /// Union two DataFrames by name with deduplication + fn union_by_name_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult { + let new_df = self + .df + .as_ref() + .clone() + .union_by_name_distinct(py_df.df.as_ref().clone())?; + Ok(Self::new(new_df)) + } + + /// Deduplicate rows based on specific columns, keeping the first row per group + fn distinct_on( + &self, + on_expr: Vec, + select_expr: Vec, + sort_expr: Option>, + ) -> PyDataFusionResult { + let on_expr = on_expr.into_iter().map(|e| e.into()).collect(); + let select_expr = select_expr.into_iter().map(|e| e.into()).collect(); + let sort_expr = sort_expr.map(to_sort_expressions); + let df = self + .df + .as_ref() + .clone() + .distinct_on(on_expr, select_expr, sort_expr)?; + Ok(Self::new(df)) + } + + /// Sort by column expressions with ascending order and nulls last + fn sort_by(&self, exprs: Vec) -> PyDataFusionResult { + let exprs = exprs.into_iter().map(|e| e.into()).collect(); + let df = self.df.as_ref().clone().sort_by(exprs)?; + Ok(Self::new(df)) + } + /// Write a `DataFrame` to a CSV file. fn write_csv( &self, @@ -1295,6 +1386,26 @@ impl PyDataFrameWriteOptions { } } +fn build_unnest_options( + preserve_nulls: bool, + recursions: Option>, +) -> UnnestOptions { + let mut opts = UnnestOptions::default().with_preserve_nulls(preserve_nulls); + if let Some(recs) = recursions { + opts.recursions = recs + .into_iter() + .map( + |(input, output, depth)| datafusion::common::RecursionUnnestOption { + input_column: datafusion::common::Column::from(input.as_str()), + output_column: datafusion::common::Column::from(output.as_str()), + depth, + }, + ) + .collect(); + } + opts +} + /// Print DataFrame fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> { // Get string representation of record batches diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 2e6f81166..a736c3966 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -47,6 +47,7 @@ from .dataframe import ( DataFrame, DataFrameWriteOptions, + ExplainFormat, InsertOp, ParquetColumnOptions, ParquetWriterOptions, @@ -82,6 +83,7 @@ "DataFrameWriteOptions", "Database", "ExecutionPlan", + "ExplainFormat", "Expr", "InsertOp", "LogicalPlan", diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 10e2a913f..5f103081c 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -65,6 +65,25 @@ from enum import Enum +class ExplainFormat(Enum): + """Output format for explain plans. + + Controls how the query plan is rendered in :py:meth:`DataFrame.explain`. + """ + + INDENT = "indent" + """Default indented text format.""" + + TREE = "tree" + """Tree-style visual format with box-drawing characters.""" + + PGJSON = "pgjson" + """PostgreSQL-compatible JSON format for use with visualization tools.""" + + GRAPHVIZ = "graphviz" + """Graphviz DOT format for graph rendering.""" + + # excerpt from deltalake # https://github.com/apache/datafusion-python/pull/981#discussion_r1905619163 class Compression(Enum): @@ -468,6 +487,21 @@ def drop(self, *columns: str) -> DataFrame: """ return DataFrame(self.df.drop(*columns)) + def window(self, *exprs: Expr) -> DataFrame: + """Add window function columns to the DataFrame. + + Applies the given window function expressions and appends the results + as new columns. + + Args: + exprs: Window function expressions to evaluate. + + Returns: + DataFrame with new window function columns appended. + """ + raw = expr_list_to_raw_expr_list(exprs) + return DataFrame(self.df.window(*raw)) + def filter(self, *predicates: Expr | str) -> DataFrame: """Return a DataFrame for which ``predicate`` evaluates to ``True``. @@ -918,7 +952,12 @@ def join_on( exprs = [ensure_expr(expr) for expr in on_exprs] return DataFrame(self.df.join_on(right.df, exprs, how)) - def explain(self, verbose: bool = False, analyze: bool = False) -> None: + def explain( + self, + verbose: bool = False, + analyze: bool = False, + format: ExplainFormat | None = None, + ) -> None: """Print an explanation of the DataFrame's plan so far. If ``analyze`` is specified, runs the plan and reports metrics. @@ -926,8 +965,11 @@ def explain(self, verbose: bool = False, analyze: bool = False) -> None: Args: verbose: If ``True``, more details will be included. analyze: If ``True``, the plan will run and metrics reported. + format: Output format for the plan. Defaults to + :py:attr:`ExplainFormat.INDENT`. """ - self.df.explain(verbose, analyze) + fmt = format.value if format is not None else None + self.df.explain(verbose, analyze, fmt) def logical_plan(self) -> LogicalPlan: """Return the unoptimized ``LogicalPlan``. @@ -1036,6 +1078,109 @@ def except_all(self, other: DataFrame) -> DataFrame: """ return DataFrame(self.df.except_all(other.df)) + def except_distinct(self, other: DataFrame) -> DataFrame: + """Calculate the set difference with deduplication. + + Returns rows that are in this DataFrame but not in ``other``, + removing any duplicates. This is the complement of :py:meth:`except_all` + which preserves duplicates. + + The two :py:class:`DataFrame` must have exactly the same schema. + + Args: + other: DataFrame to calculate exception with. + + Returns: + DataFrame after set difference with deduplication. + """ + return DataFrame(self.df.except_distinct(other.df)) + + def intersect_distinct(self, other: DataFrame) -> DataFrame: + """Calculate the intersection with deduplication. + + Returns distinct rows that appear in both DataFrames. This is the + complement of :py:meth:`intersect` which preserves duplicates. + + The two :py:class:`DataFrame` must have exactly the same schema. + + Args: + other: DataFrame to intersect with. + + Returns: + DataFrame after intersection with deduplication. + """ + return DataFrame(self.df.intersect_distinct(other.df)) + + def union_by_name(self, other: DataFrame) -> DataFrame: + """Union two :py:class:`DataFrame` matching columns by name. + + Unlike :py:meth:`union` which matches columns by position, this method + matches columns by their names, allowing DataFrames with different + column orders to be combined. + + Args: + other: DataFrame to union with. + + Returns: + DataFrame after union by name. + """ + return DataFrame(self.df.union_by_name(other.df)) + + def union_by_name_distinct(self, other: DataFrame) -> DataFrame: + """Union two :py:class:`DataFrame` by name with deduplication. + + Combines :py:meth:`union_by_name` with deduplication of rows. + + Args: + other: DataFrame to union with. + + Returns: + DataFrame after union by name with deduplication. + """ + return DataFrame(self.df.union_by_name_distinct(other.df)) + + def distinct_on( + self, + on_expr: list[Expr], + select_expr: list[Expr], + sort_expr: list[SortKey] | None = None, + ) -> DataFrame: + """Deduplicate rows based on specific columns. + + Returns a new DataFrame with one row per unique combination of the + ``on_expr`` columns, keeping the first row per group as determined by + ``sort_expr``. + + Args: + on_expr: Expressions that determine uniqueness. + select_expr: Expressions to include in the output. + sort_expr: Optional sort expressions to determine which row to keep. + + Returns: + DataFrame after deduplication. + """ + on_raw = expr_list_to_raw_expr_list(on_expr) + select_raw = expr_list_to_raw_expr_list(select_expr) + sort_raw = sort_list_to_raw_sort_list(sort_expr) if sort_expr else None + return DataFrame(self.df.distinct_on(on_raw, select_raw, sort_raw)) + + def sort_by(self, *exprs: Expr | str) -> DataFrame: + """Sort the DataFrame by column expressions in ascending order. + + This is a convenience method that sorts all columns in ascending order + with nulls last. For more control over sort direction and null ordering, + use :py:meth:`sort` instead. + + Args: + exprs: Expressions or column names to sort by. + + Returns: + DataFrame after sorting. + """ + exprs = [self.parse_sql_expr(e) if isinstance(e, str) else e for e in exprs] + raw = expr_list_to_raw_expr_list(exprs) + return DataFrame(self.df.sort_by(raw)) + def write_csv( self, path: str | pathlib.Path, @@ -1296,23 +1441,44 @@ def count(self) -> int: return self.df.count() @deprecated("Use :py:func:`unnest_columns` instead.") - def unnest_column(self, column: str, preserve_nulls: bool = True) -> DataFrame: + def unnest_column( + self, + column: str, + preserve_nulls: bool = True, + recursions: list[tuple[str, str, int]] | None = None, + ) -> DataFrame: """See :py:func:`unnest_columns`.""" - return DataFrame(self.df.unnest_column(column, preserve_nulls=preserve_nulls)) + return DataFrame( + self.df.unnest_column( + column, preserve_nulls=preserve_nulls, recursions=recursions + ) + ) - def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFrame: + def unnest_columns( + self, + *columns: str, + preserve_nulls: bool = True, + recursions: list[tuple[str, str, int]] | None = None, + ) -> DataFrame: """Expand columns of arrays into a single row per array element. Args: columns: Column names to perform unnest operation on. preserve_nulls: If False, rows with null entries will not be returned. + recursions: Optional list of ``(input_column, output_column, depth)`` + tuples that control how deeply nested columns are unnested. Any + column not mentioned here is unnested with depth 1. Returns: A DataFrame with the columns expanded. """ columns = list(columns) - return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls)) + return DataFrame( + self.df.unnest_columns( + columns, preserve_nulls=preserve_nulls, recursions=recursions + ) + ) def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: """Export the DataFrame as an Arrow C Stream. diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py index 759d6278c..7d4cd7acd 100644 --- a/python/tests/test_dataframe.py +++ b/python/tests/test_dataframe.py @@ -3569,3 +3569,109 @@ def test_read_parquet_file_sort_order(tmp_path, file_sort_order): pa.parquet.write_table(table, path) df = ctx.read_parquet(path, file_sort_order=file_sort_order) assert df.collect()[0].column(0).to_pylist() == [1, 2] + + +def test_except_distinct(): + ctx = SessionContext() + df1 = ctx.from_pydict({"a": [1, 2, 3, 1], "b": [10, 20, 30, 10]}) + df2 = ctx.from_pydict({"a": [1, 2], "b": [10, 20]}) + result = ( + df1.except_distinct(df2).sort(column("a").sort(ascending=True)).collect()[0] + ) + assert result.column(0).to_pylist() == [3] + assert result.column(1).to_pylist() == [30] + + +def test_intersect_distinct(): + ctx = SessionContext() + df1 = ctx.from_pydict({"a": [1, 2, 3, 1], "b": [10, 20, 30, 10]}) + df2 = ctx.from_pydict({"a": [1, 4], "b": [10, 40]}) + result = df1.intersect_distinct(df2).collect()[0] + assert result.column(0).to_pylist() == [1] + assert result.column(1).to_pylist() == [10] + + +def test_union_by_name(): + ctx = SessionContext() + df1 = ctx.from_pydict({"a": [1], "b": [10]}) + # Different column order + df2 = ctx.from_pydict({"b": [20], "a": [2]}) + batches = df1.union_by_name(df2).sort(column("a").sort(ascending=True)).collect() + rows = pa.concat_arrays([b.column(0) for b in batches]).to_pylist() + assert rows == [1, 2] + + +def test_union_by_name_distinct(): + ctx = SessionContext() + df1 = ctx.from_pydict({"a": [1, 1], "b": [10, 10]}) + df2 = ctx.from_pydict({"b": [10], "a": [1]}) + batches = df1.union_by_name_distinct(df2).collect() + total_rows = sum(b.num_rows for b in batches) + assert total_rows == 1 + + +def test_distinct_on(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [1, 1, 2, 2], "b": [10, 20, 30, 40]}) + result = ( + df.distinct_on( + [column("a")], + [column("a"), column("b")], + [column("a").sort(ascending=True), column("b").sort(ascending=True)], + ) + .sort(column("a").sort(ascending=True)) + .collect()[0] + ) + # Keeps the first row per group (smallest b per a) + assert result.column(0).to_pylist() == [1, 2] + assert result.column(1).to_pylist() == [10, 30] + + +def test_sort_by(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [3, 1, 2]}) + result = df.sort_by(column("a")).collect()[0] + assert result.column(0).to_pylist() == [1, 2, 3] + + +def test_explain_with_format(capsys): + from datafusion import ExplainFormat + + ctx = SessionContext() + df = ctx.from_pydict({"a": [1]}) + + # Default format works + df.explain() + captured = capsys.readouterr() + assert "plan_type" in captured.out + + # Tree format produces box-drawing characters + df.explain(format=ExplainFormat.TREE) + captured = capsys.readouterr() + assert "\u250c" in captured.out or "plan_type" in captured.out + + # Verbose + analyze still works with format + df.explain(verbose=True, analyze=True, format=ExplainFormat.INDENT) + captured = capsys.readouterr() + assert "plan_type" in captured.out + + +def test_window(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "x", "y"]}) + result = df.window( + f.row_number(partition_by=[column("b")], order_by=[column("a")]).alias("rn") + ).collect()[0] + assert "rn" in result.schema.names + assert result.column(result.schema.get_field_index("rn")).to_pylist() == [1, 2, 1] + + +def test_unnest_columns_with_recursions(): + ctx = SessionContext() + df = ctx.from_pydict({"a": [[1, 2], [3]], "b": ["x", "y"]}) + # Basic unnest still works + result = df.unnest_columns("a").collect()[0] + assert result.column(0).to_pylist() == [1, 2, 3] + # With explicit recursion options + result = df.unnest_columns("a", recursions=[("a", "a", 1)]).collect()[0] + assert result.column(0).to_pylist() == [1, 2, 3]