Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion buckaroo/customizations/pl_autocleaning_conf.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import polars as pl

from buckaroo.dataflow.autocleaning import AutocleaningConfig
from buckaroo.customizations.polars_commands import (
Search)


def _pl_lazy_enter(df):
return df.lazy()


def _pl_lazy_exit(df):
# GroupBy.transform calls .collect() mid-pipeline, so anything
# downstream of a groupby is already an eager DataFrame by the time
# we see it here. Only collect if we're still lazy.
return df.collect() if isinstance(df, pl.LazyFrame) else df


class NoCleaningConfPl(AutocleaningConfig):
Expand All @@ -11,5 +22,7 @@ class NoCleaningConfPl(AutocleaningConfig):
command_klasses = [Search]
quick_command_klasses = [Search]
name=""

lazy_enter = staticmethod(_pl_lazy_enter)
lazy_exit = staticmethod(_pl_lazy_exit)


6 changes: 5 additions & 1 deletion buckaroo/customizations/polars_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ def transform(df, col, val):
filtered = df.filter(pl.any_horizontal(pl.col(pl.String).str.contains(val)))
# `.str.contains(val)` treats val as a regex, so expose it as
# highlight_regex (not _phrase) for consistent semantics on the JS side.
string_cols = [c for c, dt in zip(df.columns, df.dtypes) if dt == pl.String]
# collect_schema avoids polars's PerformanceWarning when df is a
# LazyFrame (the interpreter threads LazyFrame between ops on the
# polars path); also works on eager DataFrames.
schema = df.collect_schema()
string_cols = [name for name, dt in schema.items() if dt == pl.String]
sd_updates = {c: {'highlight_regex': val} for c in string_cols}
return SDResult(filtered, sd_updates)

Expand Down
20 changes: 19 additions & 1 deletion buckaroo/dataflow/autocleaning.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,21 @@ def handle_ops_and_clean(self, df, cleaning_method, quick_command_args, existing
cleaned_df = df
return [cleaned_df, {}, generated_code, merged_operations]

def _identity(x):
return x


class AutocleaningConfig:
command_klasses = [DefaultCommandKlsList]
autocleaning_analysis_klasses = []
quick_command_klasses = []
name = 'default'
# Hooks bookending the lisp-interpreter call in _run_df_interpreter.
# Default identity = current behaviour. Polars overrides with df.lazy()
# / collect-if-LazyFrame so a multi-op pipeline materialises once
# instead of after every command. See NoCleaningConfPl.
lazy_enter = staticmethod(_identity)
lazy_exit = staticmethod(_identity)


class WrongFrontendQuickArgs(Exception):
Expand Down Expand Up @@ -136,6 +146,8 @@ def _setup_from_command_kls_list(self, name):
self.df_interpreter, self.gencode_interpreter = df_interpreter, gencode_interpreter
self.command_config = dict(argspecs=c_patterns, defaultArgs=c_defaults)
self.quick_command_klasses = conf.quick_command_klasses
self.lazy_enter = conf.lazy_enter
self.lazy_exit = conf.lazy_exit


def _run_df_interpreter(self, df, operations, initial_sd):
Expand Down Expand Up @@ -168,7 +180,13 @@ def wrap_set_df(form):
# contract is precisely "no ops → caller's objects come back as-is".
return df, initial_sd

return self.df_interpreter(full_ops, df, initial_sd)
# lazy_enter/lazy_exit are conf-provided hooks. Polars flips to
# LazyFrame on entry and collects on exit (one materialisation per
# pipeline instead of N). Pandas/xorq leave the defaults — identity.
# Both hooks run *after* the no-op short-circuit above, so the
# by-reference identity contract there is preserved.
ret_df, ret_sd = self.df_interpreter(full_ops, self.lazy_enter(df), initial_sd)
return self.lazy_exit(ret_df), ret_sd

def _run_code_generator(self, operations):
if len(operations) == 0:
Expand Down
49 changes: 49 additions & 0 deletions tests/unit/dataflow/autocleaning_pl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from buckaroo.pluggable_analysis_framework.col_analysis import (ColAnalysis)
from buckaroo.dataflow.autocleaning import (merge_ops, format_ops, AutocleaningConfig, _rekey_op_sd_to_internal)
from buckaroo.polars_buckaroo import PolarsAutocleaning
from buckaroo.customizations.pl_autocleaning_conf import NoCleaningConfPl
from buckaroo.customizations.polars_commands import (
Command, PlSafeInt, DropCol, FillNA, GroupBy, NoOp, Search, SDResult
)
Expand Down Expand Up @@ -317,6 +318,54 @@ def test_init_sd_displayer_args_and_search_highlight_coexist_on_same_column():
assert cc['ag_grid_specs']['wrapText'] is True


class _RecordTypeCommand(Command):
"""Probe command for the lazy-threading tests below: records the
runtime type name of `df` it sees mid-pipeline, returns df unchanged.

Module-level recording — reset at the start of each test that uses it.
"""
seen_types: list = []

command_default = [s('record_type'), s('df'), 'col']
command_pattern = [None]

@staticmethod
def transform(df, col):
_RecordTypeCommand.seen_types.append(type(df).__name__)
return df

@staticmethod
def transform_to_py(df, col):
return " # record_type"


class LazyProbeConf(NoCleaningConfPl):
autocleaning_analysis_klasses = []
command_klasses = [FillNA, _RecordTypeCommand]
quick_command_klasses = []
name = ""


def test_polars_pipeline_threads_lazyframe_between_ops():
"""The polars conf flips df to LazyFrame at interpreter entry and
collects once at exit. Each command's transform therefore sees a
LazyFrame mid-pipeline; the final cleaned df is a DataFrame so
make_origs / PlDfStatsV2 keep working unchanged."""
_RecordTypeCommand.seen_types = []
ac = PolarsAutocleaning([LazyProbeConf])
df = pl.DataFrame({'a': [1, None, 3]})
ops = [
[{'symbol': 'fillna'}, s('df'), 'a', 0],
[{'symbol': 'record_type'}, s('df'), 'a'],
[{'symbol': 'fillna'}, s('df'), 'a', 0],
[{'symbol': 'record_type'}, s('df'), 'a']]
cleaned, _sd, _gen, _ops = ac.handle_ops_and_clean(
df, cleaning_method='', quick_command_args={}, existing_operations=ops)

assert _RecordTypeCommand.seen_types == ['LazyFrame', 'LazyFrame']
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Include the lazy-threading implementation

This assertion does not match the code path exercised by the test: PolarsAutocleaning still inherits _run_df_interpreter from PandasAutocleaning, and configure_buckaroo.buckaroo_transform clones non-pandas inputs with df.clone() rather than converting Polars DataFrame to LazyFrame or collecting once at exit. With Polars installed, _RecordTypeCommand therefore records ['DataFrame', 'DataFrame'], so this new test fails instead of validating the advertised lazy-thread interpreter pipeline.

Useful? React with 👍 / 👎.

assert isinstance(cleaned, pl.DataFrame)


def test_style_column_delete_keys_drops_tooltip():
"""init_sd's delete_keys lets a user drop top-level keys that style_column
adds by default. The motivating case: a string column where the user
Expand Down
Loading