diff --git a/pointblank/validate.py b/pointblank/validate.py index 2a80d38af..3194c5128 100644 --- a/pointblank/validate.py +++ b/pointblank/validate.py @@ -22,11 +22,11 @@ import commonmark import narwhals as nw -from narwhals.dependencies import is_narwhals_lazyframe from great_tables import GT, from_column, google_font, html, loc, md, style, vals from great_tables.gt import _get_column_of_values from great_tables.vals import fmt_integer, fmt_number from importlib_resources import files +from narwhals.dependencies import is_narwhals_lazyframe from pointblank._agg import ( is_valid_agg, @@ -157,6 +157,7 @@ "missing_vals_tbl", "get_action_metadata", "get_column_count", + "get_dataframe", "get_data_path", "get_row_count", "get_validation_summary", @@ -4150,6 +4151,8 @@ def connect_to_table(connection_string: str) -> Any: "Install it with: pip install 'ibis-framework[duckdb]' (or other backend as needed)" ) + import os + import ibis # Check if connection string includes table specification @@ -4165,6 +4168,8 @@ def connect_to_table(connection_string: str) -> Any: available_tables = [] conn.disconnect() + conn.close() + os.unlink(base_connection) # Create helpful error message if available_tables: @@ -17805,6 +17810,268 @@ def get_step_report( return step_report + +def get_dataframe( + self, tbl_type: Literal["polars", "pandas", "duckdb"] = "polars" +) -> Any: + """ + Return validation results as a tabular object (Polars by default). + + This method returns a compact, row-wise summary of validation step results that is suitable for + logging and exporting (e.g., writing to CSV). The returned object depends on `tbl_type`. + + Parameters + ---------- + tbl_type : + The output backend. One of `"polars"`, `"pandas"`, or `"duckdb"`. Default is `"polars"`. + + Returns + ------- + polars.DataFrame | pandas.DataFrame | ibis.expr.types.relations.Table + A tabular summary of validation results. When `tbl_type="duckdb"`, the return value is an Ibis memtable (a `Table` expression). + + Supported DataFrame Types + ------------------------- + The `tbl_type=` parameter can be set to one of the following: + + - `"polars"`: A Polars DataFrame. + - `"pandas"`: A Pandas DataFrame. + - `"duckdb"`: An Ibis memtable. + + Examples + -------- + ```{python} + import pointblank as pb + + validation = ( + pb.Validate(data=pb.load_dataset("small_table", tbl_type="polars"), label="My validation") + .col_vals_gt(columns="d", value=100) + .col_vals_regex(columns="b", pattern=r"[0-9]-[a-z]{3}-[0-9]{3}") + .interrogate() + ) + + df_validation = validation.get_dataframe() + ``` + """ + allowed_tbl_types = ("polars", "pandas", "duckdb") + if tbl_type not in allowed_tbl_types: + raise ValueError( + f"The DataFrame type `{tbl_type}` is not valid. Choose one of the following:\n" + "- `polars`\n" + "- `pandas`\n" + "- `duckdb`" + ) + + report_original = _validation_info_as_dict(self.validation_info) + + # Remove extracts (can be large / nested; not intended for summary logging) + report_original.pop("extract", None) + + names_dict = { + "active": "active", + "i": "step_number", + "assertion_type": "step_description", + "column": "columns", + "values": "values", + "pre": "original_pre", + "segments": "original_segments", + "eval_error": "step_evaluated", + "n": "units", + "all_passed": "all_units_passed", + "n_passed": "pass_n", + "f_passed": "pass_pct", + "n_failed": "failed_n", + "f_failed": "failed_pct", + "warning": "warning", + "error": "error", + "critical": "critical", + "brief": "input_brief", + "autobrief": "autobrief", + } + + final_report = {k: report_original[k] for k in names_dict if k in report_original} + + # Normalize `values`: if a dict contains a regex `pattern`, log just that pattern. + values = final_report.get("values") + if isinstance(values, list): + final_report = { + **final_report, + "values": [ + v.get("pattern") if isinstance(v, dict) and "pattern" in v else v for v in values + ], + } + + if tbl_type == "polars": + if not _is_lib_present(lib_name="polars"): + raise ImportError( + "The Polars library is not installed but is required when specifying " + '`tbl_type="polars".' + ) + + import polars as pl + + pl_schema = pl.Schema( + { + "active": pl.Boolean, + "i": pl.Int64, + "assertion_type": pl.String, + "column": pl.String, + "values": pl.Object, + "pre": pl.Object, + "segments": pl.String, + "eval_error": pl.Boolean, + "n": pl.Int64, + "all_passed": pl.Boolean, + "n_passed": pl.Int64, + "f_passed": pl.Float64, + "n_failed": pl.Int64, + "f_failed": pl.Float64, + "warning": pl.Boolean, + "error": pl.Boolean, + "critical": pl.Boolean, + "brief": pl.String, + "autobrief": pl.String, + } + ) + + inactive_null_cols = [ + "step_evaluated", + "units", + "all_units_passed", + "pass_n", + "pass_pct", + "failed_n", + "failed_pct", + "warning", + "error", + "critical", + ] + + df_validation_results = ( + pl.DataFrame(data=final_report, schema=pl_schema) + .rename(names_dict) + .with_columns( + brief=pl.coalesce(pl.col("input_brief"), pl.col("autobrief")), + preprocessed=pl.col("original_pre").is_not_null(), + segmented=pl.col("original_segments").is_not_null(), + ) + .with_columns( + [ + pl.when(~pl.col("active")).then(pl.lit(None)).otherwise(pl.col(col)).alias(col) + for col in inactive_null_cols + ] + ) + .drop(["input_brief", "autobrief", "original_pre", "original_segments"]) + ) + + return df_validation_results + + elif tbl_type == "pandas": + if not _is_lib_present(lib_name="pandas"): + raise ImportError( + "The Pandas library is not installed but is required when specifying " + '`tbl_type="pandas".' + ) + + import pandas as pd + + def transform_validation_results(df: pd.DataFrame) -> pd.DataFrame: + df = df.assign(brief=df["input_brief"].fillna(df["autobrief"])) + df = df.assign( + preprocessed=df["original_pre"].notna(), + segmented=df["original_segments"].notna(), + ) + + inactive_null_cols = [ + "step_evaluated", + "units", + "all_units_passed", + "pass_n", + "pass_pct", + "failed_n", + "failed_pct", + "warning", + "error", + "critical", + ] + for col in inactive_null_cols: + df[col] = df[col].where(df["active"], pd.NA) + + return df.drop( + columns=["input_brief", "autobrief", "original_pre", "original_segments"] + ) + + df_validation_results = ( + pd.DataFrame(data=final_report) + .rename(columns=names_dict) + .pipe(transform_validation_results) + ) + + return df_validation_results + + else: # tbl_type == "duckdb" + if not _is_lib_present(lib_name="ibis"): + raise ImportError( + "The Ibis library is not installed but is required when specifying " + '`tbl_type="duckdb".' + ) + + import ibis + import ibis.expr.datatypes as dt + + ibis_schema = { + "active": dt.Boolean(), + "i": dt.Int64(), + "assertion_type": dt.String(), + "column": dt.String(), + "values": dt.json(), + "pre": dt.json(), + "segments": dt.String(), + "eval_error": dt.Boolean(), + "n": dt.Int64(), + "all_passed": dt.Boolean(), + "n_passed": dt.Int64(), + "f_passed": dt.Float64(), + "n_failed": dt.Int64(), + "f_failed": dt.Float64(), + "warning": dt.Boolean(), + "error": dt.Boolean(), + "critical": dt.Boolean(), + "brief": dt.String(), + "autobrief": dt.String(), + } + + report_table = ibis.memtable(final_report, schema=ibis_schema).rename(names_dict) + + inactive_null_cols = [ + "step_evaluated", + "units", + "all_units_passed", + "pass_n", + "pass_pct", + "failed_n", + "failed_pct", + "warning", + "error", + "critical", + ] + + df_validation_results = report_table.mutate( + brief=ibis.coalesce(report_table.input_brief, report_table.autobrief), + preprocessed=report_table.original_pre.notnull(), + segmented=report_table.original_segments.notnull(), + **{ + col: ibis.ifelse( + ~report_table.active, + ibis.null().cast(report_table[col].type()), + report_table[col], + ) + for col in inactive_null_cols + }, + ).drop("input_brief", "autobrief", "original_pre", "original_segments") + + return df_validation_results + def _add_validation(self, validation_info): """ Add a validation to the list of validations. diff --git a/tests/test_validate.py b/tests/test_validate.py index f58c2f150..c1a4b4923 100644 --- a/tests/test_validate.py +++ b/tests/test_validate.py @@ -78,7 +78,7 @@ class StrEnum(str, Enum): ## If we specifically disable tests in pytest set the availability to False if os.environ.get("SKIP_PYSPARK_TESTS", "").lower() in ("true", "1", "yes"): - PYSPARK_AVAILABLE = False + PYSPARKAVAILABLE = False SQLITE_AVAILABLE = True if os.environ.get("SKIP_SQLITE_TESTS", "").lower() in ("true", "1", "yes"): SQLITE_AVAILABLE = False @@ -13899,6 +13899,48 @@ def test_get_step_report_schema_checks(schema) -> None: assert isinstance(validation.get_step_report(i=1), GT.GT) +def test_get_dataframe_wrong_tbl_type_messaging(): + tbl = pl.DataFrame({"name": ["Monica", "Erica", "Rita", "Tina"], "mambo_no": [2, 3, 4, 5]}) + + validation = Validate(data=tbl).col_vals_gt(columns="mambo_no", value=5).interrogate() + + with pytest.raises(ValueError, match="The DataFrame type `polar` is not valid. Choose one of"): + validation.get_dataframe("polar") + + +@pytest.mark.parametrize( + "library, tbl_type", [("Polars", "polars"), ("Pandas", "pandas"), ("Ibis", "duckdb")] +) +def test_get_dataframe_missing_libraries(library, tbl_type): + + validation = Validate(data="small_table") + + with patch("pointblank.validate._is_lib_present") as mock_is_lib: + mock_is_lib.return_value = False # library not present + + with pytest.raises(ImportError, match=f"The {library} library is not installed"): + validation.get_dataframe(tbl_type) + + +def test_get_dataframe_returns_polars_df(): + validation = Validate(data="small_table") + df_polars = validation.get_dataframe("polars") + assert isinstance(df_polars, pl.DataFrame) + + +def test_get_dataframe_returns_pandas_df(): + validation = Validate(data="small_table") + df_pandas = validation.get_dataframe("pandas") + assert isinstance(df_pandas, pd.DataFrame) + + +def test_get_dataframe_returns_ibis_memtable(): + validation = Validate(data="small_table") + df_ibis = validation.get_dataframe("duckdb") + assert isinstance(df_ibis, ibis.expr.types.relations.Table) + +# TODO: MEGHAN - test col names, brief coalescing, values to dict check, test inactive steps output, empty validation check + def get_schema_info( data_tbl, schema, @@ -19317,6 +19359,7 @@ def test_col_vals_ge_timezone_datetime_duckdb() -> None: finally: conn.close() + os.unlink(temp_db_path) @pytest.mark.xfail(reason="Mixed timezone comparisons may not work correctly yet")