From 034caed1aafb260bb9fa5696b8de91319fa3b4b1 Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Tue, 30 Dec 2025 15:46:02 +0100 Subject: [PATCH 01/10] using native XGB --- src/eventdisplay_ml/evaluate.py | 103 +++++++++++------- src/eventdisplay_ml/models.py | 4 +- .../scripts/train_xgb_stereo.py | 3 +- 3 files changed, 67 insertions(+), 43 deletions(-) diff --git a/src/eventdisplay_ml/evaluate.py b/src/eventdisplay_ml/evaluate.py index 6883be4..9c645b3 100644 --- a/src/eventdisplay_ml/evaluate.py +++ b/src/eventdisplay_ml/evaluate.py @@ -7,6 +7,8 @@ import xgboost as xgb from sklearn.metrics import mean_absolute_error, mean_squared_error +from eventdisplay_ml.features import target_features + _logger = logging.getLogger(__name__) @@ -63,20 +65,19 @@ def evaluate_regression_model(model, x_test, y_test, df, x_cols, y_data, name): score = model.score(x_test, y_test) _logger.info(f"XGBoost Multi-Target R^2 Score (Testing Set): {score:.4f}") y_pred = model.predict(x_test) - mse_x = mean_squared_error(y_test["MCxoff"], y_pred[:, 0]) - mse_y = mean_squared_error(y_test["MCyoff"], y_pred[:, 1]) - _logger.info(f"{name} MSE (X_off): {mse_x:.4f}, MSE (Y_off): {mse_y:.4f}") - mae_x = mean_absolute_error(y_test["MCxoff"], y_pred[:, 0]) - mae_y = mean_absolute_error(y_test["MCyoff"], y_pred[:, 1]) - _logger.info(f"{name} MAE (X_off): {mae_x:.4f}") - _logger.info(f"{name} MAE (Y_off): {mae_y:.4f}") + mse = mean_squared_error(y_test, y_pred) + _logger.info(f"{name} Mean Squared Error (All targets): {mse:.4f}") + mae = mean_absolute_error(y_test, y_pred) + _logger.info(f"{name} Mean Absolute Error (All targets): {mae:.4f}") + target_variance(y_test, y_pred, y_data.columns) feature_importance(model, x_cols, y_data.columns, name) if name == "xgboost": shap_feature_importance(model, x_test, y_data.columns) + df_pred = pd.DataFrame(y_pred, columns=target_features("stereo_analysis")) calculate_resolution( - y_pred, + df_pred, y_test, df, percentiles=[68, 90, 95], @@ -87,15 +88,33 @@ def evaluate_regression_model(model, x_test, y_test, df, x_cols, y_data, name): ) +def target_variance(y_test, y_pred, targets): + """Calculate and log variance explained per target.""" + y_test_np = y_test.to_numpy() if hasattr(y_test, "to_numpy") else y_test + + mse_values = np.mean((y_test_np - y_pred) ** 2, axis=0) + variance_values = np.var(y_test_np, axis=0) + + _logger.info("--- Performance Per Target ---") + for i, name in enumerate(targets): + # Fraction of variance unexplained (lower is better, 0.0 is perfect) + unexplained = mse_values[i] / variance_values[i] + + _logger.info( + f"Target: {name:12s} | MSE: {mse_values[i]:.6f} | " + f"Unexplained Variance: {unexplained:.2%}" + ) + + def calculate_resolution(y_pred, y_test, df, percentiles, log_e_min, log_e_max, n_bins, name): """Compute angular and energy resolution based on predictions.""" results_df = pd.DataFrame( { "MCxoff_true": y_test["MCxoff"].values, "MCyoff_true": y_test["MCyoff"].values, - "MCxoff_pred": y_pred[:, 0], - "MCyoff_pred": y_pred[:, 1], - "MCe0_pred": y_pred[:, 2], + "MCxoff_pred": y_pred["MCxoff"].values, + "MCyoff_pred": y_pred["MCyoff"].values, + "MCe0_pred": y_pred["MCe0"].values, "MCe0": df.loc[y_test.index, "MCe0"].values, } ) @@ -134,50 +153,56 @@ def percentile_series(col, p): _logger.info(f"\n{output_df.to_markdown(floatfmt='.4f')}") -def _iter_targets(model, target_names): - """Iterate over targets in multi-/single-output models.""" - if hasattr(model, "estimators_"): # MultiOutputRegressor +def feature_importance(model, x_cols, target_names, name=None): + """Feature importance handling both MultiOutputRegressor and native Multi-target.""" + _logger.info("--- XGBoost Feature Importance ---") + + # Case 1: Scikit-Learn MultiOutputRegressor (Separate model per target) + if hasattr(model, "estimators_"): for i, est in enumerate(model.estimators_): target = target_names[i] if i < len(target_names) else f"target_{i}" - yield target, est + _log_importance_table(target, est.feature_importances_, x_cols, name) + + # Case 2: Native Multi-target XGBoost (One model for all targets) else: - target = target_names[0] if target_names else "target" - yield target, model + importances = getattr(model, "feature_importances_", None) + if importances is not None: + if target_names is not None and not target_names.empty: + target_str = ", ".join(list(target_names)) + else: + target_str = "Joint Targets" -def feature_importance(model, x_cols, target_names, name=None): - """Feature importance using built-in XGBoost method.""" - _logger.info("--- XGBoost Feature Importance ---") + _logger.info("Note: Native XGBoost multi-target provides JOINT importance.") + _log_importance_table(target_str, importances, x_cols, name) - for target, est in _iter_targets(model, target_names): - importances = getattr(est, "feature_importances_", None) - if importances is None: - _logger.info("No feature_importances_ found.") - continue - df = pd.DataFrame({"Feature": x_cols, "Importance": importances}).sort_values( - "Importance", ascending=False - ) - _logger.info(f"### {name} Importance for Target: **{target}**") - _logger.info(f"\n{df.head(25).to_markdown(index=False)}") +def _log_importance_table(target_label, values, x_cols, name): + """Format and log the importance dataframe for printing.""" + df = pd.DataFrame({"Feature": x_cols, "Importance": values}).sort_values( + "Importance", ascending=False + ) + _logger.info(f"### {name} Importance for: **{target_label}**") + _logger.info(f"\n{df.head(25).to_markdown(index=False)}") def shap_feature_importance(model, x_data, target_names, max_points=20000, n_top=25): - """Feature importance using SHAP values from XGBoost.""" - x_sample = x_data.sample(n=min(len(x_data), max_points), random_state=0) + """Feature importance using SHAP values for native multi-target XGBoost.""" + x_sample = x_data.sample(n=min(len(x_data), max_points), random_state=42) n_features = len(x_data.columns) + n_targets = len(target_names) - for target, est in _iter_targets(model, target_names): - if not hasattr(est, "get_booster"): - _logger.info("Model does not support SHAP feature importance.") - continue + dmatrix = xgb.DMatrix(x_sample) + shap_vals = model.get_booster().predict(dmatrix, pred_contribs=True) + shap_vals = shap_vals.reshape(len(x_sample), n_targets, n_features + 1) - shap_vals = est.get_booster().predict(xgb.DMatrix(x_sample), pred_contribs=True)[:, :-1] + for i, target in enumerate(target_names): + target_shap = shap_vals[:, i, :-1] - imp = np.abs(shap_vals).mean(axis=0) + imp = np.abs(target_shap).mean(axis=0) idx = np.argsort(imp)[::-1] - _logger.info(f"=== Builtin XGBoost SHAP Importance for {target} ===") + _logger.info(f"=== SHAP Importance for {target} ===") for j in idx[:n_top]: if j < n_features: _logger.info(f"{x_data.columns[j]:25s} {imp[j]:.6e}") diff --git a/src/eventdisplay_ml/models.py b/src/eventdisplay_ml/models.py index 114d15a..312f3cc 100644 --- a/src/eventdisplay_ml/models.py +++ b/src/eventdisplay_ml/models.py @@ -325,7 +325,7 @@ def _output_tree(analysis_type, root_file): def _apply_model(analysis_type, df_chunk, models, tree): """ - Apply regression models to the data chunk. + Apply models to the data chunk. Parameters ---------- @@ -334,7 +334,7 @@ def _apply_model(analysis_type, df_chunk, models, tree): df_chunk : pandas.DataFrame Data chunk to process. models : dict - Dictionary of loaded XGBoost models for regression. + Dictionary of loaded XGBoost models. tree : uproot.writing.WritingTTree Output tree to write results to. """ diff --git a/src/eventdisplay_ml/scripts/train_xgb_stereo.py b/src/eventdisplay_ml/scripts/train_xgb_stereo.py index 4e5d1f3..03f734e 100644 --- a/src/eventdisplay_ml/scripts/train_xgb_stereo.py +++ b/src/eventdisplay_ml/scripts/train_xgb_stereo.py @@ -15,7 +15,6 @@ import xgboost as xgb from joblib import dump from sklearn.model_selection import train_test_split -from sklearn.multioutput import MultiOutputRegressor from eventdisplay_ml import hyper_parameters, utils from eventdisplay_ml.data_processing import load_training_data @@ -67,7 +66,7 @@ def train(df, n_tel, model_prefix, train_test_fraction, hyperparameter_config=No for name, para in configs.items(): _logger.info(f"Training with {name} for n_tel={n_tel}...") - model = MultiOutputRegressor(xgb.XGBRegressor(**para)) + model = xgb.XGBRegressor(**para) model.fit(x_train, y_train) evaluate_regression_model(model, x_test, y_test, df, x_cols, y_data, name) From 5e3d08b08c4420a2b09e3f9bf39db8459fb92504 Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Tue, 30 Dec 2025 16:03:27 +0100 Subject: [PATCH 02/10] remove size from training --- src/eventdisplay_ml/data_processing.py | 13 +++++++------ src/eventdisplay_ml/evaluate.py | 23 ++++++++++++++++------- src/eventdisplay_ml/features.py | 10 +++++----- src/eventdisplay_ml/models.py | 1 + 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/eventdisplay_ml/data_processing.py b/src/eventdisplay_ml/data_processing.py index f10a7f1..62d5afc 100644 --- a/src/eventdisplay_ml/data_processing.py +++ b/src/eventdisplay_ml/data_processing.py @@ -113,7 +113,7 @@ def flatten_feature_data(group_df, ntel, analysis_type, training): df_flat = flatten_telescope_data_vectorized( group_df, ntel, - features.telescope_features(analysis_type, training=training), + features.telescope_features(analysis_type), analysis_type=analysis_type, training=training, ) @@ -188,7 +188,7 @@ def load_training_data( df_flat = flatten_telescope_data_vectorized( data_tree, n_tel, - features.telescope_features(analysis_type, training=True), + features.telescope_features(analysis_type), analysis_type, training=True, ) @@ -250,7 +250,7 @@ def calculate_intersection(tel_list): df["DispNImages"] = df["DispNImages_new"] df = df.drop(columns=["DispTelList_T_new", "DispNImages_new"]) - pad_vars = features.telescope_features(analysis_type, training=training) + pad_vars = features.telescope_features(analysis_type) for var_name in pad_vars: if var_name in df.columns: @@ -306,10 +306,11 @@ def flatten_telescope_variables(n_tel, flat_features, index): new_cols[f"loss_dist_{i}"] = df_flat[f"loss_{i}"] * df_flat[f"dist_{i}"] new_cols[f"width_length_{i}"] = df_flat[f"width_{i}"] / (df_flat[f"length_{i}"] + 1e-6) - df_flat[f"size_{i}"] = np.log10(np.clip(df_flat[f"size_{i}"], 1e-6, None)) - if "E_{i}" in df_flat: + if f"size_{i}" in df_flat: + df_flat[f"size_{i}"] = np.log10(np.clip(df_flat[f"size_{i}"], 1e-6, None)) + if f"E_{i}" in df_flat: df_flat[f"E_{i}"] = np.log10(np.clip(df_flat[f"E_{i}"], 1e-6, None)) - if "ES_{i}" in df_flat: + if f"ES_{i}" in df_flat: df_flat[f"ES_{i}"] = np.log10(np.clip(df_flat[f"ES_{i}"], 1e-6, None)) # pointing corrections diff --git a/src/eventdisplay_ml/evaluate.py b/src/eventdisplay_ml/evaluate.py index 9c645b3..ae31224 100644 --- a/src/eventdisplay_ml/evaluate.py +++ b/src/eventdisplay_ml/evaluate.py @@ -27,6 +27,11 @@ def evaluation_efficiency(name, model, x_test, y_test): pred = y_pred_proba >= t eff_signal.append(((pred) & (y_test == 1)).sum() / n_signal if n_signal else 0) eff_background.append(((pred) & (y_test == 0)).sum() / n_background if n_background else 0) + _logger.info( + f"{name} Threshold: {t:.2f} | " + f"Signal Efficiency: {eff_signal[-1]:.4f} | " + f"Background Efficiency: {eff_background[-1]:.4f}" + ) return pd.DataFrame( { @@ -157,23 +162,27 @@ def feature_importance(model, x_cols, target_names, name=None): """Feature importance handling both MultiOutputRegressor and native Multi-target.""" _logger.info("--- XGBoost Feature Importance ---") - # Case 1: Scikit-Learn MultiOutputRegressor (Separate model per target) + # Case 1: Scikit-Learn MultiOutputRegressor if hasattr(model, "estimators_"): for i, est in enumerate(model.estimators_): - target = target_names[i] if i < len(target_names) else f"target_{i}" + target = target_names[i] if (target_names and i < len(target_names)) else f"target_{i}" _log_importance_table(target, est.feature_importances_, x_cols, name) - # Case 2: Native Multi-target XGBoost (One model for all targets) + # Case 2: Native Multi-target OR Single-target Classifier else: importances = getattr(model, "feature_importances_", None) if importances is not None: - if target_names is not None and not target_names.empty: - target_str = ", ".join(list(target_names)) + if target_names is not None and len(target_names) > 0: + # Convert to list to ensure .join works regardless of input type + target_str = ", ".join(map(str, target_names)) else: - target_str = "Joint Targets" + target_str = "Target" + + # Check if it's actually multi-target to set the log message + if target_names is not None and len(target_names) > 1: + _logger.info("Note: Native XGBoost multi-target provides JOINT importance.") - _logger.info("Note: Native XGBoost multi-target provides JOINT importance.") _log_importance_table(target_str, importances, x_cols, name) diff --git a/src/eventdisplay_ml/features.py b/src/eventdisplay_ml/features.py index b2fb52f..cb17409 100644 --- a/src/eventdisplay_ml/features.py +++ b/src/eventdisplay_ml/features.py @@ -46,6 +46,7 @@ def excluded_features(analysis_type, ntel): if "classification" in analysis_type: return { "Erec", + *[f"size_{i}" for i in range(ntel)], *[f"E_{i}" for i in range(ntel)], *[f"ES_{i}" for i in range(ntel)], *[f"fpointing_dx_{i}" for i in range(ntel)], @@ -54,7 +55,7 @@ def excluded_features(analysis_type, ntel): raise ValueError(f"Unknown analysis type: {analysis_type}") -def telescope_features(analysis_type, training): +def telescope_features(analysis_type): """ Telescope-type features. @@ -66,7 +67,6 @@ def telescope_features(analysis_type, training): "cosphi", "sinphi", "loss", - "size", "dist", "width", "length", @@ -79,13 +79,13 @@ def telescope_features(analysis_type, training): if analysis_type == "classification": return var - return [*var, "E", "ES", "Disp_T", "DispXoff_T", "DispYoff_T", "DispWoff_T"] + return [*var, "size", "E", "ES", "Disp_T", "DispXoff_T", "DispYoff_T", "DispWoff_T"] def _regression_features(training): """Regression features.""" var = [ - *telescope_features("stereo_analysis", training), + *telescope_features("stereo_analysis"), "DispNImages", "DispTelList_T", "Xoff", @@ -103,7 +103,7 @@ def _regression_features(training): def _classification_features(training): """Classification features.""" - var_tel = telescope_features("classification", training) + var_tel = telescope_features("classification") var_array = [ "DispNImages", "DispTelList_T", diff --git a/src/eventdisplay_ml/models.py b/src/eventdisplay_ml/models.py index 312f3cc..8041826 100644 --- a/src/eventdisplay_ml/models.py +++ b/src/eventdisplay_ml/models.py @@ -252,6 +252,7 @@ def process_file_chunked( Number of events to read and process per chunk. """ branch_list = features.features(analysis_type, training=False) + _logger.info(f"Using branches: {branch_list}") selected_indices = parse_image_selection(image_selection) _logger.info(f"Chunk size: {chunk_size}") From b06b2a499eadfb352629b01daa484563f5c676b1 Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Tue, 30 Dec 2025 16:09:55 +0100 Subject: [PATCH 03/10] config --- src/eventdisplay_ml/scripts/train_xgb_stereo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eventdisplay_ml/scripts/train_xgb_stereo.py b/src/eventdisplay_ml/scripts/train_xgb_stereo.py index 03f734e..a589025 100644 --- a/src/eventdisplay_ml/scripts/train_xgb_stereo.py +++ b/src/eventdisplay_ml/scripts/train_xgb_stereo.py @@ -95,7 +95,7 @@ def main(): help=("Path to directory for writing XGBoost regression models (without n_tel suffix)."), ) parser.add_argument( - "--hyperparameter-config", + "--hyperparameter_config", help="Path to JSON file with hyperparameter configuration.", default=None, type=str, From 1980c4183a027041529e19ecdd3a0a50b0412ccf Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Tue, 30 Dec 2025 16:14:56 +0100 Subject: [PATCH 04/10] consistent naming --- src/eventdisplay_ml/scripts/apply_xgb_classify.py | 12 ++++++------ src/eventdisplay_ml/scripts/apply_xgb_stereo.py | 12 ++++++------ src/eventdisplay_ml/scripts/train_xgb_classify.py | 6 +++--- src/eventdisplay_ml/scripts/train_xgb_stereo.py | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/eventdisplay_ml/scripts/apply_xgb_classify.py b/src/eventdisplay_ml/scripts/apply_xgb_classify.py index e42f7d1..a088cd6 100644 --- a/src/eventdisplay_ml/scripts/apply_xgb_classify.py +++ b/src/eventdisplay_ml/scripts/apply_xgb_classify.py @@ -21,13 +21,13 @@ def main(): """Apply XGBoost classification.""" parser = argparse.ArgumentParser(description=("Apply XGBoost Classification")) parser.add_argument( - "--input-file", + "--input_file", required=True, metavar="INPUT.root", help="Path to input mscw file", ) parser.add_argument( - "--model-prefix", + "--model_prefix", required=True, metavar="MODEL_PREFIX", help=( @@ -36,13 +36,13 @@ def main(): ), ) parser.add_argument( - "--output-file", + "--output_file", required=True, metavar="OUTPUT.root", help="Output file path for predictions", ) parser.add_argument( - "--image-selection", + "--image_selection", type=str, default="15", help=( @@ -53,13 +53,13 @@ def main(): ), ) parser.add_argument( - "--max-events", + "--max_events", type=int, default=None, help="Maximum number of events to process (default: all events)", ) parser.add_argument( - "--chunk-size", + "--chunk_size", type=int, default=500000, help="Number of events to process per chunk (default: 500000)", diff --git a/src/eventdisplay_ml/scripts/apply_xgb_stereo.py b/src/eventdisplay_ml/scripts/apply_xgb_stereo.py index a3ee50d..6d09869 100644 --- a/src/eventdisplay_ml/scripts/apply_xgb_stereo.py +++ b/src/eventdisplay_ml/scripts/apply_xgb_stereo.py @@ -19,25 +19,25 @@ def main(): """Apply XGBoost stereo models.""" parser = argparse.ArgumentParser(description=("Apply XGBoost Stereo Reconstruction")) parser.add_argument( - "--input-file", + "--input_file", required=True, metavar="INPUT.root", help="Path to input mscw file", ) parser.add_argument( - "--model-prefix", + "--model_prefix", required=True, metavar="MODEL_PREFIX", help=("Path to directory containing XGBoost regression models (without n_tel suffix)."), ) parser.add_argument( - "--output-file", + "--output_file", required=True, metavar="OUTPUT.root", help="Output file path for predictions", ) parser.add_argument( - "--image-selection", + "--image_selection", type=str, default="15", help=( @@ -48,13 +48,13 @@ def main(): ), ) parser.add_argument( - "--max-events", + "--max_events", type=int, default=None, help="Maximum number of events to process (default: all events)", ) parser.add_argument( - "--chunk-size", + "--chunk_size", type=int, default=500000, help="Number of events to process per chunk (default: 500000)", diff --git a/src/eventdisplay_ml/scripts/train_xgb_classify.py b/src/eventdisplay_ml/scripts/train_xgb_classify.py index 57eb307..feb95a4 100644 --- a/src/eventdisplay_ml/scripts/train_xgb_classify.py +++ b/src/eventdisplay_ml/scripts/train_xgb_classify.py @@ -104,7 +104,7 @@ def main(): "--input_background_file_list", help="List of input background mscw ROOT files." ) parser.add_argument( - "--model-prefix", + "--model_prefix", required=True, help=( "Path to directory for writing XGBoost classification models " @@ -112,7 +112,7 @@ def main(): ), ) parser.add_argument( - "--hyperparameter-config", + "--hyperparameter_config", help="Path to JSON file with hyperparameter configuration.", default=None, type=str, @@ -130,7 +130,7 @@ def main(): help="Maximum number of events to process across all files.", ) parser.add_argument( - "--model-parameters", + "--model_parameters", type=str, help=("Path to model parameter file (JSON) defining energy and zenith bins."), ) diff --git a/src/eventdisplay_ml/scripts/train_xgb_stereo.py b/src/eventdisplay_ml/scripts/train_xgb_stereo.py index a589025..4075362 100644 --- a/src/eventdisplay_ml/scripts/train_xgb_stereo.py +++ b/src/eventdisplay_ml/scripts/train_xgb_stereo.py @@ -90,7 +90,7 @@ def main(): ) parser.add_argument("--input_file_list", help="List of input mscw files.") parser.add_argument( - "--model-prefix", + "--model_prefix", required=True, help=("Path to directory for writing XGBoost regression models (without n_tel suffix)."), ) From bf1025765f5e078a01238fb9fa7cf4a650904440 Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Tue, 30 Dec 2025 16:40:31 +0100 Subject: [PATCH 05/10] ignore docstrings in tests --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 98c9e37..ba39193 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -120,6 +120,9 @@ lint.ignore = [ lint.pydocstyle.convention = "numpy" +[tool.ruff.lint.per-file-ignores] +"tests/**.py" = ["D103"] + [tool.codespell] ignore-words-list = "chec,arrang,livetime" From 1b9d66ca2c9e6f0b6626e249a06174005607c9d9 Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Tue, 30 Dec 2025 16:40:44 +0100 Subject: [PATCH 06/10] tests --- tests/unit_tests/test_features.py | 504 ++++++++++++++++++++ tests/unit_tests/test_training_variables.py | 45 -- 2 files changed, 504 insertions(+), 45 deletions(-) create mode 100644 tests/unit_tests/test_features.py delete mode 100644 tests/unit_tests/test_training_variables.py diff --git a/tests/unit_tests/test_features.py b/tests/unit_tests/test_features.py new file mode 100644 index 0000000..135785b --- /dev/null +++ b/tests/unit_tests/test_features.py @@ -0,0 +1,504 @@ +"""Unit tests for training variables selection utilities.""" + +import pytest + +import eventdisplay_ml.features + + +def test_target_features_stereo_analysis(): + result = eventdisplay_ml.features.target_features("stereo_analysis") + assert result == ["MCxoff", "MCyoff", "MCe0"] + + +def test_target_features_classification_exact(): + result = eventdisplay_ml.features.target_features("classification") + assert result == [] + + +def test_target_features_classification_in_name(): + result = eventdisplay_ml.features.target_features("my_classification_run") + assert result == [] + + +def test_target_features_invalid_type(): + with pytest.raises(ValueError, match="Unknown analysis type"): + eventdisplay_ml.features.target_features("unknown_type") + + +def test_excluded_features_stereo_analysis(): + ntel = 3 + result = eventdisplay_ml.features.excluded_features("stereo_analysis", ntel) + expected = { + "fpointing_dx_0", + "fpointing_dx_1", + "fpointing_dx_2", + "fpointing_dy_0", + "fpointing_dy_1", + "fpointing_dy_2", + } + assert result == expected + + +def test_excluded_features_classification_exact(): + ntel = 2 + result = eventdisplay_ml.features.excluded_features("classification", ntel) + expected = { + "Erec", + "size_0", + "size_1", + "E_0", + "E_1", + "ES_0", + "ES_1", + "fpointing_dx_0", + "fpointing_dx_1", + "fpointing_dy_0", + "fpointing_dy_1", + } + assert result == expected + + +def test_excluded_features_classification_in_name(): + ntel = 1 + result = eventdisplay_ml.features.excluded_features("my_classification_run", ntel) + expected = { + "Erec", + "size_0", + "E_0", + "ES_0", + "fpointing_dx_0", + "fpointing_dy_0", + } + assert result == expected + + +def test_excluded_features_invalid_type(): + with pytest.raises(ValueError, match="Unknown analysis type"): + eventdisplay_ml.features.excluded_features("unknown_type", 2) + + +def test_telescope_features_classification(): + result = eventdisplay_ml.features.telescope_features("classification") + expected = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + ] + assert result == expected + + +def test_telescope_features_stereo_analysis(): + result = eventdisplay_ml.features.telescope_features("stereo_analysis") + expected = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + "size", + "E", + "ES", + "Disp_T", + "DispXoff_T", + "DispYoff_T", + "DispWoff_T", + ] + assert result == expected + + +def test_telescope_features_other_analysis_type(): + result = eventdisplay_ml.features.telescope_features("regression") + expected = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + "size", + "E", + "ES", + "Disp_T", + "DispXoff_T", + "DispYoff_T", + "DispWoff_T", + ] + assert result == expected + + +def test__regression_features_training_true(): + result = eventdisplay_ml.features._regression_features(training=True) + # Should start with target features + assert result[:3] == ["MCxoff", "MCyoff", "MCe0"] + # Should contain all regression features + expected_features = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + "size", + "E", + "ES", + "Disp_T", + "DispXoff_T", + "DispYoff_T", + "DispWoff_T", + "DispNImages", + "DispTelList_T", + "Xoff", + "Yoff", + "Xoff_intersect", + "Yoff_intersect", + "Erec", + "ErecS", + "EmissionHeight", + ] + # All expected features should be present after the target features + for feat in expected_features: + assert feat in result + + +def test__regression_features_training_false(): + result = eventdisplay_ml.features._regression_features(training=False) + # Should NOT start with target features + assert "MCxoff" not in result + assert "MCyoff" not in result + assert "MCe0" not in result + # Should contain all regression features + expected_features = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + "size", + "E", + "ES", + "Disp_T", + "DispXoff_T", + "DispYoff_T", + "DispWoff_T", + "DispNImages", + "DispTelList_T", + "Xoff", + "Yoff", + "Xoff_intersect", + "Yoff_intersect", + "Erec", + "ErecS", + "EmissionHeight", + ] + for feat in expected_features: + assert feat in result + # Should have the same length as training + 3 (for the targets) + result_training = eventdisplay_ml.features._regression_features(training=True) + assert len(result_training) == len(result) + 3 + + +def test__classification_features_training_true(): + result = eventdisplay_ml.features._classification_features(training=True) + expected_tel_features = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + ] + expected_array_features = [ + "DispNImages", + "DispTelList_T", + "EChi2S", + "EmissionHeight", + "EmissionHeightChi2", + "MSCW", + "MSCL", + "ArrayPointing_Elevation", + ] + # Should contain all telescope and array features, but not "Erec" + for feat in expected_tel_features + expected_array_features: + assert feat in result + assert "Erec" not in result + # Should start with telescope features + assert result[: len(expected_tel_features)] == expected_tel_features + # Should have correct length + assert len(result) == len(expected_tel_features) + len(expected_array_features) + + +def test__classification_features_training_false(): + result = eventdisplay_ml.features._classification_features(training=False) + expected_tel_features = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + ] + + expected_array_features = [ + "DispNImages", + "DispTelList_T", + "EChi2S", + "EmissionHeight", + "EmissionHeightChi2", + "MSCW", + "MSCL", + "ArrayPointing_Elevation", + ] + # Should contain all telescope and array features, and "Erec" + for feat in expected_tel_features + expected_array_features: + assert feat in result + assert "Erec" in result + # "Erec" should be the last feature + assert result[-1] == "Erec" + # Should have correct length + assert len(result) == len(expected_tel_features) + len(expected_array_features) + 1 + + +def test_features_stereo_analysis_training_true(): + result = eventdisplay_ml.features.features("stereo_analysis", training=True) + # Should start with target features + assert result[:3] == ["MCxoff", "MCyoff", "MCe0"] + # Should contain all regression features + expected_features = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + "size", + "E", + "ES", + "Disp_T", + "DispXoff_T", + "DispYoff_T", + "DispWoff_T", + "DispNImages", + "DispTelList_T", + "Xoff", + "Yoff", + "Xoff_intersect", + "Yoff_intersect", + "Erec", + "ErecS", + "EmissionHeight", + ] + for feat in expected_features: + assert feat in result + # Should have correct length + assert len(result) == len(expected_features) + 3 + + +def test_features_stereo_analysis_training_false(): + result = eventdisplay_ml.features.features("stereo_analysis", training=False) + # Should NOT start with target features + assert "MCxoff" not in result + assert "MCyoff" not in result + assert "MCe0" not in result + expected_features = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + "size", + "E", + "ES", + "Disp_T", + "DispXoff_T", + "DispYoff_T", + "DispWoff_T", + "DispNImages", + "DispTelList_T", + "Xoff", + "Yoff", + "Xoff_intersect", + "Yoff_intersect", + "Erec", + "ErecS", + "EmissionHeight", + ] + for feat in expected_features: + assert feat in result + # Should have correct length + assert len(result) == len(expected_features) + + +def test_features_classification_training_true(): + result = eventdisplay_ml.features.features("classification", training=True) + expected_tel_features = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + ] + expected_array_features = [ + "DispNImages", + "DispTelList_T", + "EChi2S", + "EmissionHeight", + "EmissionHeightChi2", + "MSCW", + "MSCL", + "ArrayPointing_Elevation", + ] + for feat in expected_tel_features + expected_array_features: + assert feat in result + assert "Erec" not in result + assert result[: len(expected_tel_features)] == expected_tel_features + assert len(result) == len(expected_tel_features) + len(expected_array_features) + + +def test_features_classification_training_false(): + result = eventdisplay_ml.features.features("classification", training=False) + expected_tel_features = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + ] + expected_array_features = [ + "DispNImages", + "DispTelList_T", + "EChi2S", + "EmissionHeight", + "EmissionHeightChi2", + "MSCW", + "MSCL", + "ArrayPointing_Elevation", + ] + for feat in expected_tel_features + expected_array_features: + assert feat in result + assert "Erec" in result + assert result[-1] == "Erec" + assert len(result) == len(expected_tel_features) + len(expected_array_features) + 1 + + +def test_features_classification_in_name_training_true(): + result = eventdisplay_ml.features.features("my_classification_run", training=True) + expected_tel_features = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + ] + expected_array_features = [ + "DispNImages", + "DispTelList_T", + "EChi2S", + "EmissionHeight", + "EmissionHeightChi2", + "MSCW", + "MSCL", + "ArrayPointing_Elevation", + ] + for feat in expected_tel_features + expected_array_features: + assert feat in result + assert "Erec" not in result + assert result[: len(expected_tel_features)] == expected_tel_features + assert len(result) == len(expected_tel_features) + len(expected_array_features) + + +def test_features_wrong_analysis_type(): + with pytest.raises(ValueError, match="Unknown analysis type"): + eventdisplay_ml.features.features("unknown_type", training=True) diff --git a/tests/unit_tests/test_training_variables.py b/tests/unit_tests/test_training_variables.py deleted file mode 100644 index 56604eb..0000000 --- a/tests/unit_tests/test_training_variables.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Unit tests for training variables selection utilities.""" - -import eventdisplay_ml.features - - -def test_telescope_features(): - """Ensure per-telescope training variables are provided as a list and include expected keys.""" - variables = eventdisplay_ml.features.telescope_features() - assert isinstance(variables, list) - assert "Disp_T" in variables - assert "R_core" in variables - - -def test__regression_features(): - """Ensure array-level training variables include array metadata fields.""" - variables = eventdisplay_ml.features._regression_features() - assert isinstance(variables, list) - assert "DispNImages" in variables - assert "EmissionHeight" in variables - - -def test__regression_features(): - """Ensure combined training variables include per-telescope and array-level fields.""" - variables = eventdisplay_ml.features._regression_features() - assert isinstance(variables, list) - assert "Disp_T" in variables - assert "R_core" in variables - assert "DispNImages" in variables - assert "EmissionHeight" in variables - - -def test__classification_features(): - """Ensure combined classification variables exclude energy fields and include expected keys.""" - variables = eventdisplay_ml.features._classification_features() - assert isinstance(variables, list) - # Energy fields should be excluded - assert "E" not in variables - assert "ES" not in variables - # Per-telescope variables - assert "Disp_T" in variables - assert "R_core" in variables - # Classification variables - assert "MSCW" in variables - assert "MSCL" in variables - assert "EmissionHeight" in variables From 7a42e37cf061d705fec9b1c9701a1b60c2fc1b2d Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Tue, 30 Dec 2025 16:56:11 +0100 Subject: [PATCH 07/10] unit tests --- tests/unit_tests/test_features.py | 311 ++++++++++++++++-------------- tests/unit_tests/test_utils.py | 150 +++++++++++++- 2 files changed, 315 insertions(+), 146 deletions(-) diff --git a/tests/unit_tests/test_features.py b/tests/unit_tests/test_features.py index 135785b..9a5cb5a 100644 --- a/tests/unit_tests/test_features.py +++ b/tests/unit_tests/test_features.py @@ -1,23 +1,151 @@ -"""Unit tests for training variables selection utilities.""" +"""Unit tests for eventdisplay_ml.features.""" import pytest import eventdisplay_ml.features - -def test_target_features_stereo_analysis(): - result = eventdisplay_ml.features.target_features("stereo_analysis") - assert result == ["MCxoff", "MCyoff", "MCe0"] +# Constants for expected features +TARGETS = ["MCxoff", "MCyoff", "MCe0"] +TEL_CLASS = [ + "cen_x", + "cen_y", + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", +] +TEL_STEREO = [*TEL_CLASS, "size", "E", "ES", "Disp_T", "DispXoff_T", "DispYoff_T", "DispWoff_T"] +ARRAY_CLASS = [ + "DispNImages", + "DispTelList_T", + "EChi2S", + "EmissionHeight", + "EmissionHeightChi2", + "MSCW", + "MSCL", + "ArrayPointing_Elevation", +] +REGRESSION = [ + *TEL_STEREO, + "DispNImages", + "DispTelList_T", + "Xoff", + "Yoff", + "Xoff_intersect", + "Yoff_intersect", + "Erec", + "ErecS", + "EmissionHeight", +] + + +@pytest.mark.parametrize( + ("analysis", "training", "targets", "features", "erec_last"), + [ + ("stereo_analysis", True, TARGETS, REGRESSION, False), + ("stereo_analysis", False, [], REGRESSION, False), + ("classification", True, [], [*TEL_CLASS, *ARRAY_CLASS], False), + ("classification", False, [], [*TEL_CLASS, *ARRAY_CLASS], True), + ("my_classification_run", True, [], [*TEL_CLASS, *ARRAY_CLASS], False), + ], +) +def test_features(analysis, training, targets, features, erec_last): + result = eventdisplay_ml.features.features(analysis, training=training) + for feat in features: + assert feat in result + if targets: + assert result[: len(targets)] == targets + assert len(result) == len(features) + len(targets) + else: + assert result[: len(TEL_CLASS)] == TEL_CLASS + # For stereo_analysis, False, Erec is present in REGRESSION + if analysis == "stereo_analysis" and not training: + assert "Erec" in result + assert len(result) == len(REGRESSION) + elif erec_last: + assert "Erec" in result + assert result[-1] == "Erec" + assert len(result) == len(TEL_CLASS) + len(ARRAY_CLASS) + 1 + else: + assert "Erec" not in result + assert len(result) == len(TEL_CLASS) + len(ARRAY_CLASS) -def test_target_features_classification_exact(): - result = eventdisplay_ml.features.target_features("classification") - assert result == [] +def test_features_wrong_analysis_type(): + with pytest.raises(ValueError, match="Unknown analysis type"): + eventdisplay_ml.features.features("unknown_type", training=True) -def test_target_features_classification_in_name(): - result = eventdisplay_ml.features.target_features("my_classification_run") - assert result == [] +@pytest.mark.parametrize( + ("training", "expected_targets", "expected_features"), + [ + (True, TARGETS, REGRESSION), + (False, [], REGRESSION), + ], +) +def test_regression_features(training, expected_targets, expected_features): + result = eventdisplay_ml.features._regression_features(training=training) + for feat in expected_features: + assert feat in result + if training: + assert result[:3] == expected_targets + assert len(result) == len(expected_features) + 3 + else: + for t in expected_targets: + assert t not in result + assert len(result) == len(expected_features) + + +@pytest.mark.parametrize( + ("training", "erec_last"), + [ + (True, False), + (False, True), + ], +) +def test_classification_features(training, erec_last): + result = eventdisplay_ml.features._classification_features(training=training) + for feat in TEL_CLASS + ARRAY_CLASS: + assert feat in result + if erec_last: + assert "Erec" in result + assert result[-1] == "Erec" + assert len(result) == len(TEL_CLASS) + len(ARRAY_CLASS) + 1 + else: + assert "Erec" not in result + assert result[: len(TEL_CLASS)] == TEL_CLASS + assert len(result) == len(TEL_CLASS) + len(ARRAY_CLASS) + + +@pytest.mark.parametrize( + ("analysis", "expected"), + [ + ("classification", TEL_CLASS), + ("stereo_analysis", TEL_STEREO), + ("regression", TEL_STEREO), + ], +) +def test_telescope_features(analysis, expected): + assert eventdisplay_ml.features.telescope_features(analysis) == expected + + +@pytest.mark.parametrize( + ("analysis", "expected"), + [ + ("stereo_analysis", TARGETS), + ("classification", []), + ("my_classification_run", []), + ], +) +def test_target_features(analysis, expected): + assert eventdisplay_ml.features.target_features(analysis) == expected def test_target_features_invalid_type(): @@ -25,18 +153,33 @@ def test_target_features_invalid_type(): eventdisplay_ml.features.target_features("unknown_type") -def test_excluded_features_stereo_analysis(): - ntel = 3 - result = eventdisplay_ml.features.excluded_features("stereo_analysis", ntel) - expected = { - "fpointing_dx_0", - "fpointing_dx_1", - "fpointing_dx_2", - "fpointing_dy_0", - "fpointing_dy_1", - "fpointing_dy_2", - } - assert result == expected +@pytest.mark.parametrize( + ("analysis", "ntel", "expected"), + [ + ( + "stereo_analysis", + 3, + {f"fpointing_dx_{i}" for i in range(3)} | {f"fpointing_dy_{i}" for i in range(3)}, + ), + ( + "classification", + 2, + {"Erec"} + | {f"size_{i}" for i in range(2)} + | {f"E_{i}" for i in range(2)} + | {f"ES_{i}" for i in range(2)} + | {f"fpointing_dx_{i}" for i in range(2)} + | {f"fpointing_dy_{i}" for i in range(2)}, + ), + ( + "my_classification_run", + 1, + {"Erec", "size_0", "E_0", "ES_0", "fpointing_dx_0", "fpointing_dy_0"}, + ), + ], +) +def test_excluded_features(analysis, ntel, expected): + assert eventdisplay_ml.features.excluded_features(analysis, ntel) == expected def test_excluded_features_classification_exact(): @@ -155,123 +298,6 @@ def test__regression_features_training_true(): result = eventdisplay_ml.features._regression_features(training=True) # Should start with target features assert result[:3] == ["MCxoff", "MCyoff", "MCe0"] - # Should contain all regression features - expected_features = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - "size", - "E", - "ES", - "Disp_T", - "DispXoff_T", - "DispYoff_T", - "DispWoff_T", - "DispNImages", - "DispTelList_T", - "Xoff", - "Yoff", - "Xoff_intersect", - "Yoff_intersect", - "Erec", - "ErecS", - "EmissionHeight", - ] - # All expected features should be present after the target features - for feat in expected_features: - assert feat in result - - -def test__regression_features_training_false(): - result = eventdisplay_ml.features._regression_features(training=False) - # Should NOT start with target features - assert "MCxoff" not in result - assert "MCyoff" not in result - assert "MCe0" not in result - # Should contain all regression features - expected_features = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - "size", - "E", - "ES", - "Disp_T", - "DispXoff_T", - "DispYoff_T", - "DispWoff_T", - "DispNImages", - "DispTelList_T", - "Xoff", - "Yoff", - "Xoff_intersect", - "Yoff_intersect", - "Erec", - "ErecS", - "EmissionHeight", - ] - for feat in expected_features: - assert feat in result - # Should have the same length as training + 3 (for the targets) - result_training = eventdisplay_ml.features._regression_features(training=True) - assert len(result_training) == len(result) + 3 - - -def test__classification_features_training_true(): - result = eventdisplay_ml.features._classification_features(training=True) - expected_tel_features = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - ] - expected_array_features = [ - "DispNImages", - "DispTelList_T", - "EChi2S", - "EmissionHeight", - "EmissionHeightChi2", - "MSCW", - "MSCL", - "ArrayPointing_Elevation", - ] - # Should contain all telescope and array features, but not "Erec" - for feat in expected_tel_features + expected_array_features: - assert feat in result - assert "Erec" not in result - # Should start with telescope features - assert result[: len(expected_tel_features)] == expected_tel_features - # Should have correct length - assert len(result) == len(expected_tel_features) + len(expected_array_features) def test__classification_features_training_false(): @@ -497,8 +523,3 @@ def test_features_classification_in_name_training_true(): assert "Erec" not in result assert result[: len(expected_tel_features)] == expected_tel_features assert len(result) == len(expected_tel_features) + len(expected_array_features) - - -def test_features_wrong_analysis_type(): - with pytest.raises(ValueError, match="Unknown analysis type"): - eventdisplay_ml.features.features("unknown_type", training=True) diff --git a/tests/unit_tests/test_utils.py b/tests/unit_tests/test_utils.py index 7221b9e..6c98b76 100644 --- a/tests/unit_tests/test_utils.py +++ b/tests/unit_tests/test_utils.py @@ -1,8 +1,17 @@ """Unit tests for utility helpers such as input file list reader.""" +import json +import shutil + import pytest -from eventdisplay_ml.utils import parse_image_selection, read_input_file_list +from eventdisplay_ml.utils import ( + load_energy_range, + load_model_parameters, + output_file_name, + parse_image_selection, + read_input_file_list, +) def test_read_input_file_list_success(tmp_path): @@ -78,3 +87,142 @@ def test_parse_image_selection_invalid_bit_coded(): """Test ValueError is raised for invalid bit-coded input.""" with pytest.raises(ValueError, match="Invalid image_selection format"): parse_image_selection("invalid") + + +def test_load_model_parameters_success(tmp_path): + """Test loading model parameters from a valid JSON file.""" + params = { + "energy_bins_log10_tev": [{"E_min": 1.0, "E_max": 2.0}, {"E_min": 2.0, "E_max": 3.0}], + "other_param": 42, + } + param_file = tmp_path / "params.json" + param_file.write_text(json.dumps(params)) + + result = load_model_parameters(str(param_file)) + assert result["energy_bins_log10_tev"] == params["energy_bins_log10_tev"] + assert result["other_param"] == 42 + + +def test_load_model_parameters_with_energy_bin_number(tmp_path): + """Test loading model parameters with a specific energy bin number.""" + params = { + "energy_bins_log10_tev": [{"E_min": 1.0, "E_max": 2.0}, {"E_min": 2.0, "E_max": 3.0}], + "other_param": 42, + } + param_file = tmp_path / "params.json" + param_file.write_text(json.dumps(params)) + + result = load_model_parameters(str(param_file), energy_bin_number=1) + assert result["energy_bins_log10_tev"] == {"E_min": 2.0, "E_max": 3.0} + assert result["other_param"] == 42 + + +def test_load_model_parameters_file_not_found(tmp_path): + """Test FileNotFoundError is raised when model parameters file does not exist.""" + non_existent_file = tmp_path / "does_not_exist.json" + with pytest.raises(FileNotFoundError, match="Model parameters file not found"): + load_model_parameters(str(non_existent_file)) + + +def test_load_model_parameters_invalid_energy_bin_number(tmp_path): + """Test ValueError is raised for invalid energy bin number.""" + params = {"energy_bins_log10_tev": [{"E_min": 1.0, "E_max": 2.0}]} + param_file = tmp_path / "params.json" + param_file.write_text(json.dumps(params)) + + with pytest.raises(ValueError, match="Invalid energy bin number 5"): + load_model_parameters(str(param_file), energy_bin_number=5) + + +def test_load_model_parameters_missing_energy_bins_key(tmp_path): + """Test ValueError is raised if energy_bins_log10_tev key is missing when energy_bin_number is given.""" + params = {"other_param": 42} + param_file = tmp_path / "params.json" + param_file.write_text(json.dumps(params)) + + with pytest.raises(ValueError, match="Invalid energy bin number 0"): + load_model_parameters(str(param_file), energy_bin_number=0) + + +def test_load_energy_range_success(tmp_path): + """Test loading energy range for a valid energy bin.""" + params = { + "energy_bins_log10_tev": [ + {"E_min": 0.0, "E_max": 1.0}, # 10^0=1, 10^1=10 + {"E_min": 1.0, "E_max": 2.0}, # 10^1=10, 10^2=100 + ] + } + param_file = tmp_path / "params.json" + param_file.write_text(json.dumps(params)) + + result = load_energy_range(str(param_file), energy_bin_number=0) + assert result == (1.0, 10.0) + + result = load_energy_range(str(param_file), energy_bin_number=1) + assert result == (10.0, 100.0) + + +def test_load_energy_range_invalid_bin_number(tmp_path): + """Test ValueError is raised for invalid energy bin number.""" + params = {"energy_bins_log10_tev": [{"E_min": 0.0, "E_max": 1.0}]} + param_file = tmp_path / "params.json" + param_file.write_text(json.dumps(params)) + + with pytest.raises(ValueError, match="Invalid energy bin number 5"): + load_energy_range(str(param_file), energy_bin_number=5) + + +def test_load_energy_range_missing_energy_bins_key(tmp_path): + """Test ValueError is raised if energy_bins_log10_tev key is missing.""" + params = {"other_param": 42} + param_file = tmp_path / "params.json" + param_file.write_text(json.dumps(params)) + + with pytest.raises(ValueError, match="Invalid energy bin number 0"): + load_energy_range(str(param_file), energy_bin_number=0) + + +def test_output_file_name_basic(tmp_path): + """Test output_file_name basic usage without energy_bin_number.""" + model_prefix = tmp_path / "model" + name = "classifier" + n_tel = 4 + expected = f"{model_prefix}_classifier_ntel4.joblib" + result = output_file_name(str(model_prefix), name, n_tel) + assert result == expected + + +def test_output_file_name_with_energy_bin(tmp_path): + """Test output_file_name with energy_bin_number.""" + model_prefix = tmp_path / "model" + name = "regressor" + n_tel = 2 + energy_bin_number = 1 + expected = f"{model_prefix}_regressor_ntel2_ebin1.joblib" + result = output_file_name(str(model_prefix), name, n_tel, energy_bin_number) + assert result == expected + + +def test_output_file_name_creates_directory(tmp_path): + """Test output_file_name creates parent directory if it does not exist.""" + model_prefix = tmp_path / "subdir" / "model" + name = "classifier" + n_tel = 3 + # Remove the directory if it exists to ensure creation + if model_prefix.parent.exists(): + shutil.rmtree(model_prefix.parent) + assert not model_prefix.parent.exists() + result = output_file_name(str(model_prefix), name, n_tel) + assert model_prefix.parent.exists() + expected = f"{model_prefix}_classifier_ntel3.joblib" + assert result == expected + + +def test_output_file_name_str_and_path_equivalence(tmp_path): + """Test output_file_name works the same with str and Path for model_prefix.""" + model_prefix = tmp_path / "model" + name = "test" + n_tel = 1 + result_str = output_file_name(str(model_prefix), name, n_tel) + result_path = output_file_name(model_prefix, name, n_tel) + assert result_str == result_path From 4695f2ffd9201ed26d3e45f76512b2458be96407 Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Wed, 31 Dec 2025 11:35:06 +0100 Subject: [PATCH 08/10] apply cuts --- src/eventdisplay_ml/data_processing.py | 33 ++- src/eventdisplay_ml/features.py | 17 +- src/eventdisplay_ml/utils.py | 3 +- tests/unit_tests/test_features.py | 303 ++++--------------------- tests/unit_tests/test_utils.py | 48 ---- 5 files changed, 90 insertions(+), 314 deletions(-) diff --git a/src/eventdisplay_ml/data_processing.py b/src/eventdisplay_ml/data_processing.py index 62d5afc..ab9b449 100644 --- a/src/eventdisplay_ml/data_processing.py +++ b/src/eventdisplay_ml/data_processing.py @@ -205,6 +205,8 @@ def load_training_data( df_flat.dropna(axis=1, how="all", inplace=True) _logger.info(f"Final events for n_tel={n_tel} after cleanup: {len(df_flat)}") + print_variable_statistics(df_flat) + return df_flat @@ -274,7 +276,7 @@ def event_cuts(analysis_type, n_tel, model_parameters=None): """Event cut string for the given analysis type and telescope multiplicity.""" event_cut = f"(DispNImages == {n_tel})" - if analysis_type in ("signal_classification", "background_classification"): + if analysis_type == "classification": cuts = [ "Erec > 0", "MSCW > -2", @@ -314,8 +316,10 @@ def flatten_telescope_variables(n_tel, flat_features, index): df_flat[f"ES_{i}"] = np.log10(np.clip(df_flat[f"ES_{i}"], 1e-6, None)) # pointing corrections - df_flat[f"cen_x_{i}"] = df_flat[f"cen_x_{i}"] + df_flat[f"fpointing_dx_{i}"] - df_flat[f"cen_y_{i}"] = df_flat[f"cen_y_{i}"] + df_flat[f"fpointing_dy_{i}"] + if f"cen_x_{i}" in df_flat and f"fpointing_dx_{i}" in df_flat: + df_flat[f"cen_x_{i}"] = df_flat[f"cen_x_{i}"] + df_flat[f"fpointing_dx_{i}"] + if f"cen_y_{i}" in df_flat and f"fpointing_dy_{i}" in df_flat: + df_flat[f"cen_y_{i}"] = df_flat[f"cen_y_{i}"] + df_flat[f"fpointing_dy_{i}"] df_flat = df_flat.drop(columns=[f"fpointing_dx_{i}", f"fpointing_dy_{i}"]) return pd.concat([df_flat, pd.DataFrame(new_cols, index=index)], axis=1) @@ -377,3 +381,26 @@ def energy_in_bins(df_chunk, bins): e_bin[valid] = np.argmin(distances, axis=1) df_chunk["e_bin"] = e_bin return df_chunk["e_bin"] + + +def print_variable_statistics(df): + """ + Print min, max, mean, and RMS for each variable in the DataFrame. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame containing variables loaded using branch_list. + """ + for col in df.columns: + data = df[col].dropna().to_numpy() + if data.size == 0: + print(f"{col}: No data") + continue + min_val = np.min(data) + max_val = np.max(data) + mean_val = np.mean(data) + rms_val = np.sqrt(np.mean(np.square(data))) + _logger.info( + f"{col:25s} min: {min_val:10.4g} max: {max_val:10.4g} mean: {mean_val:10.4g} rms: {rms_val:10.4g}" + ) diff --git a/src/eventdisplay_ml/features.py b/src/eventdisplay_ml/features.py index cb17409..94089c2 100644 --- a/src/eventdisplay_ml/features.py +++ b/src/eventdisplay_ml/features.py @@ -46,6 +46,8 @@ def excluded_features(analysis_type, ntel): if "classification" in analysis_type: return { "Erec", + *[f"cen_x_{i}" for i in range(ntel)], + *[f"cen_y_{i}" for i in range(ntel)], *[f"size_{i}" for i in range(ntel)], *[f"E_{i}" for i in range(ntel)], *[f"ES_{i}" for i in range(ntel)], @@ -62,8 +64,6 @@ def telescope_features(analysis_type): Disp variables with different indexing logic in data preparation. """ var = [ - "cen_x", - "cen_y", "cosphi", "sinphi", "loss", @@ -79,7 +79,18 @@ def telescope_features(analysis_type): if analysis_type == "classification": return var - return [*var, "size", "E", "ES", "Disp_T", "DispXoff_T", "DispYoff_T", "DispWoff_T"] + return [ + *var, + "size", + "cen_x", + "cen_y", + "E", + "ES", + "Disp_T", + "DispXoff_T", + "DispYoff_T", + "DispWoff_T", + ] def _regression_features(training): diff --git a/src/eventdisplay_ml/utils.py b/src/eventdisplay_ml/utils.py index 67a871e..4418221 100644 --- a/src/eventdisplay_ml/utils.py +++ b/src/eventdisplay_ml/utils.py @@ -99,9 +99,8 @@ def load_model_parameters(model_parameters, energy_bin_number=None): def load_energy_range(model_parameters, energy_bin_number=0): """Load the log10(Erec/TeV) range for a given energy bin from model parameters.""" - par = load_model_parameters(model_parameters) try: - e = par["energy_bins_log10_tev"][energy_bin_number] + e = model_parameters["energy_bins_log10_tev"] return 10 ** e["E_min"], 10 ** e["E_max"] except (KeyError, IndexError) as exc: raise ValueError( diff --git a/tests/unit_tests/test_features.py b/tests/unit_tests/test_features.py index 9a5cb5a..e3376f2 100644 --- a/tests/unit_tests/test_features.py +++ b/tests/unit_tests/test_features.py @@ -1,151 +1,23 @@ -"""Unit tests for eventdisplay_ml.features.""" +"""Unit tests for training variables selection utilities.""" import pytest import eventdisplay_ml.features -# Constants for expected features -TARGETS = ["MCxoff", "MCyoff", "MCe0"] -TEL_CLASS = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", -] -TEL_STEREO = [*TEL_CLASS, "size", "E", "ES", "Disp_T", "DispXoff_T", "DispYoff_T", "DispWoff_T"] -ARRAY_CLASS = [ - "DispNImages", - "DispTelList_T", - "EChi2S", - "EmissionHeight", - "EmissionHeightChi2", - "MSCW", - "MSCL", - "ArrayPointing_Elevation", -] -REGRESSION = [ - *TEL_STEREO, - "DispNImages", - "DispTelList_T", - "Xoff", - "Yoff", - "Xoff_intersect", - "Yoff_intersect", - "Erec", - "ErecS", - "EmissionHeight", -] - - -@pytest.mark.parametrize( - ("analysis", "training", "targets", "features", "erec_last"), - [ - ("stereo_analysis", True, TARGETS, REGRESSION, False), - ("stereo_analysis", False, [], REGRESSION, False), - ("classification", True, [], [*TEL_CLASS, *ARRAY_CLASS], False), - ("classification", False, [], [*TEL_CLASS, *ARRAY_CLASS], True), - ("my_classification_run", True, [], [*TEL_CLASS, *ARRAY_CLASS], False), - ], -) -def test_features(analysis, training, targets, features, erec_last): - result = eventdisplay_ml.features.features(analysis, training=training) - for feat in features: - assert feat in result - if targets: - assert result[: len(targets)] == targets - assert len(result) == len(features) + len(targets) - else: - assert result[: len(TEL_CLASS)] == TEL_CLASS - # For stereo_analysis, False, Erec is present in REGRESSION - if analysis == "stereo_analysis" and not training: - assert "Erec" in result - assert len(result) == len(REGRESSION) - elif erec_last: - assert "Erec" in result - assert result[-1] == "Erec" - assert len(result) == len(TEL_CLASS) + len(ARRAY_CLASS) + 1 - else: - assert "Erec" not in result - assert len(result) == len(TEL_CLASS) + len(ARRAY_CLASS) - - -def test_features_wrong_analysis_type(): - with pytest.raises(ValueError, match="Unknown analysis type"): - eventdisplay_ml.features.features("unknown_type", training=True) - - -@pytest.mark.parametrize( - ("training", "expected_targets", "expected_features"), - [ - (True, TARGETS, REGRESSION), - (False, [], REGRESSION), - ], -) -def test_regression_features(training, expected_targets, expected_features): - result = eventdisplay_ml.features._regression_features(training=training) - for feat in expected_features: - assert feat in result - if training: - assert result[:3] == expected_targets - assert len(result) == len(expected_features) + 3 - else: - for t in expected_targets: - assert t not in result - assert len(result) == len(expected_features) - - -@pytest.mark.parametrize( - ("training", "erec_last"), - [ - (True, False), - (False, True), - ], -) -def test_classification_features(training, erec_last): - result = eventdisplay_ml.features._classification_features(training=training) - for feat in TEL_CLASS + ARRAY_CLASS: - assert feat in result - if erec_last: - assert "Erec" in result - assert result[-1] == "Erec" - assert len(result) == len(TEL_CLASS) + len(ARRAY_CLASS) + 1 - else: - assert "Erec" not in result - assert result[: len(TEL_CLASS)] == TEL_CLASS - assert len(result) == len(TEL_CLASS) + len(ARRAY_CLASS) - - -@pytest.mark.parametrize( - ("analysis", "expected"), - [ - ("classification", TEL_CLASS), - ("stereo_analysis", TEL_STEREO), - ("regression", TEL_STEREO), - ], -) -def test_telescope_features(analysis, expected): - assert eventdisplay_ml.features.telescope_features(analysis) == expected - - -@pytest.mark.parametrize( - ("analysis", "expected"), - [ - ("stereo_analysis", TARGETS), - ("classification", []), - ("my_classification_run", []), - ], -) -def test_target_features(analysis, expected): - assert eventdisplay_ml.features.target_features(analysis) == expected + +def test_target_features_stereo_analysis(): + result = eventdisplay_ml.features.target_features("stereo_analysis") + assert result == ["MCxoff", "MCyoff", "MCe0"] + + +def test_target_features_classification_exact(): + result = eventdisplay_ml.features.target_features("classification") + assert result == [] + + +def test_target_features_classification_in_name(): + result = eventdisplay_ml.features.target_features("my_classification_run") + assert result == [] def test_target_features_invalid_type(): @@ -153,33 +25,18 @@ def test_target_features_invalid_type(): eventdisplay_ml.features.target_features("unknown_type") -@pytest.mark.parametrize( - ("analysis", "ntel", "expected"), - [ - ( - "stereo_analysis", - 3, - {f"fpointing_dx_{i}" for i in range(3)} | {f"fpointing_dy_{i}" for i in range(3)}, - ), - ( - "classification", - 2, - {"Erec"} - | {f"size_{i}" for i in range(2)} - | {f"E_{i}" for i in range(2)} - | {f"ES_{i}" for i in range(2)} - | {f"fpointing_dx_{i}" for i in range(2)} - | {f"fpointing_dy_{i}" for i in range(2)}, - ), - ( - "my_classification_run", - 1, - {"Erec", "size_0", "E_0", "ES_0", "fpointing_dx_0", "fpointing_dy_0"}, - ), - ], -) -def test_excluded_features(analysis, ntel, expected): - assert eventdisplay_ml.features.excluded_features(analysis, ntel) == expected +def test_excluded_features_stereo_analysis(): + ntel = 3 + result = eventdisplay_ml.features.excluded_features("stereo_analysis", ntel) + expected = { + "fpointing_dx_0", + "fpointing_dx_1", + "fpointing_dx_2", + "fpointing_dy_0", + "fpointing_dy_1", + "fpointing_dy_2", + } + assert result == expected def test_excluded_features_classification_exact(): @@ -298,50 +155,6 @@ def test__regression_features_training_true(): result = eventdisplay_ml.features._regression_features(training=True) # Should start with target features assert result[:3] == ["MCxoff", "MCyoff", "MCe0"] - - -def test__classification_features_training_false(): - result = eventdisplay_ml.features._classification_features(training=False) - expected_tel_features = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - ] - - expected_array_features = [ - "DispNImages", - "DispTelList_T", - "EChi2S", - "EmissionHeight", - "EmissionHeightChi2", - "MSCW", - "MSCL", - "ArrayPointing_Elevation", - ] - # Should contain all telescope and array features, and "Erec" - for feat in expected_tel_features + expected_array_features: - assert feat in result - assert "Erec" in result - # "Erec" should be the last feature - assert result[-1] == "Erec" - # Should have correct length - assert len(result) == len(expected_tel_features) + len(expected_array_features) + 1 - - -def test_features_stereo_analysis_training_true(): - result = eventdisplay_ml.features.features("stereo_analysis", training=True) - # Should start with target features - assert result[:3] == ["MCxoff", "MCyoff", "MCe0"] # Should contain all regression features expected_features = [ "cen_x", @@ -374,18 +187,18 @@ def test_features_stereo_analysis_training_true(): "ErecS", "EmissionHeight", ] + # All expected features should be present after the target features for feat in expected_features: assert feat in result - # Should have correct length - assert len(result) == len(expected_features) + 3 -def test_features_stereo_analysis_training_false(): - result = eventdisplay_ml.features.features("stereo_analysis", training=False) +def test__regression_features_training_false(): + result = eventdisplay_ml.features._regression_features(training=False) # Should NOT start with target features assert "MCxoff" not in result assert "MCyoff" not in result assert "MCe0" not in result + # Should contain all regression features expected_features = [ "cen_x", "cen_y", @@ -419,12 +232,13 @@ def test_features_stereo_analysis_training_false(): ] for feat in expected_features: assert feat in result - # Should have correct length - assert len(result) == len(expected_features) + # Should have the same length as training + 3 (for the targets) + result_training = eventdisplay_ml.features._regression_features(training=True) + assert len(result_training) == len(result) + 3 -def test_features_classification_training_true(): - result = eventdisplay_ml.features.features("classification", training=True) +def test__classification_features_training_true(): + result = eventdisplay_ml.features._classification_features(training=True) expected_tel_features = [ "cen_x", "cen_y", @@ -450,15 +264,18 @@ def test_features_classification_training_true(): "MSCL", "ArrayPointing_Elevation", ] + # Should contain all telescope and array features, but not "Erec" for feat in expected_tel_features + expected_array_features: assert feat in result assert "Erec" not in result + # Should start with telescope features assert result[: len(expected_tel_features)] == expected_tel_features + # Should have correct length assert len(result) == len(expected_tel_features) + len(expected_array_features) -def test_features_classification_training_false(): - result = eventdisplay_ml.features.features("classification", training=False) +def test__classification_features_training_false(): + result = eventdisplay_ml.features._classification_features(training=False) expected_tel_features = [ "cen_x", "cen_y", @@ -474,6 +291,7 @@ def test_features_classification_training_false(): "fpointing_dx", "fpointing_dy", ] + expected_array_features = [ "DispNImages", "DispTelList_T", @@ -484,42 +302,11 @@ def test_features_classification_training_false(): "MSCL", "ArrayPointing_Elevation", ] + # Should contain all telescope and array features, and "Erec" for feat in expected_tel_features + expected_array_features: assert feat in result assert "Erec" in result + # "Erec" should be the last feature assert result[-1] == "Erec" + # Should have correct length assert len(result) == len(expected_tel_features) + len(expected_array_features) + 1 - - -def test_features_classification_in_name_training_true(): - result = eventdisplay_ml.features.features("my_classification_run", training=True) - expected_tel_features = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - ] - expected_array_features = [ - "DispNImages", - "DispTelList_T", - "EChi2S", - "EmissionHeight", - "EmissionHeightChi2", - "MSCW", - "MSCL", - "ArrayPointing_Elevation", - ] - for feat in expected_tel_features + expected_array_features: - assert feat in result - assert "Erec" not in result - assert result[: len(expected_tel_features)] == expected_tel_features - assert len(result) == len(expected_tel_features) + len(expected_array_features) diff --git a/tests/unit_tests/test_utils.py b/tests/unit_tests/test_utils.py index 6c98b76..eaf395c 100644 --- a/tests/unit_tests/test_utils.py +++ b/tests/unit_tests/test_utils.py @@ -1,14 +1,12 @@ """Unit tests for utility helpers such as input file list reader.""" import json -import shutil import pytest from eventdisplay_ml.utils import ( load_energy_range, load_model_parameters, - output_file_name, parse_image_selection, read_input_file_list, ) @@ -180,49 +178,3 @@ def test_load_energy_range_missing_energy_bins_key(tmp_path): with pytest.raises(ValueError, match="Invalid energy bin number 0"): load_energy_range(str(param_file), energy_bin_number=0) - - -def test_output_file_name_basic(tmp_path): - """Test output_file_name basic usage without energy_bin_number.""" - model_prefix = tmp_path / "model" - name = "classifier" - n_tel = 4 - expected = f"{model_prefix}_classifier_ntel4.joblib" - result = output_file_name(str(model_prefix), name, n_tel) - assert result == expected - - -def test_output_file_name_with_energy_bin(tmp_path): - """Test output_file_name with energy_bin_number.""" - model_prefix = tmp_path / "model" - name = "regressor" - n_tel = 2 - energy_bin_number = 1 - expected = f"{model_prefix}_regressor_ntel2_ebin1.joblib" - result = output_file_name(str(model_prefix), name, n_tel, energy_bin_number) - assert result == expected - - -def test_output_file_name_creates_directory(tmp_path): - """Test output_file_name creates parent directory if it does not exist.""" - model_prefix = tmp_path / "subdir" / "model" - name = "classifier" - n_tel = 3 - # Remove the directory if it exists to ensure creation - if model_prefix.parent.exists(): - shutil.rmtree(model_prefix.parent) - assert not model_prefix.parent.exists() - result = output_file_name(str(model_prefix), name, n_tel) - assert model_prefix.parent.exists() - expected = f"{model_prefix}_classifier_ntel3.joblib" - assert result == expected - - -def test_output_file_name_str_and_path_equivalence(tmp_path): - """Test output_file_name works the same with str and Path for model_prefix.""" - model_prefix = tmp_path / "model" - name = "test" - n_tel = 1 - result_str = output_file_name(str(model_prefix), name, n_tel) - result_path = output_file_name(model_prefix, name, n_tel) - assert result_str == result_path From 45871e4faf4f380097a96e6729eab45db0469fe6 Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Wed, 31 Dec 2025 11:38:45 +0100 Subject: [PATCH 09/10] remove tests --- .../scripts/test_apply_xgb_stereo.py | 87 ---- .../scripts/test_train_xgb_stereo.py | 112 ----- tests/unit_tests/test_data_processing.py | 422 ------------------ tests/unit_tests/test_evaluate.py | 285 ------------ tests/unit_tests/test_features.py | 312 ------------- tests/unit_tests/test_models.py | 77 ---- tests/unit_tests/test_utils.py | 180 -------- 7 files changed, 1475 deletions(-) delete mode 100644 tests/unit_tests/scripts/test_apply_xgb_stereo.py delete mode 100644 tests/unit_tests/scripts/test_train_xgb_stereo.py delete mode 100644 tests/unit_tests/test_data_processing.py delete mode 100644 tests/unit_tests/test_evaluate.py delete mode 100644 tests/unit_tests/test_features.py delete mode 100644 tests/unit_tests/test_models.py delete mode 100644 tests/unit_tests/test_utils.py diff --git a/tests/unit_tests/scripts/test_apply_xgb_stereo.py b/tests/unit_tests/scripts/test_apply_xgb_stereo.py deleted file mode 100644 index 715559a..0000000 --- a/tests/unit_tests/scripts/test_apply_xgb_stereo.py +++ /dev/null @@ -1,87 +0,0 @@ -"""Unit tests for apply_xgb_stereo script.""" - -from unittest.mock import Mock, patch - -import joblib -import numpy as np -import pytest - -from eventdisplay_ml.models import load_regression_models -from eventdisplay_ml.scripts.apply_xgb_stereo import ( - process_file_chunked, -) - - -class SimpleModel: - """A simple picklable model for testing.""" - - def __init__(self, predictions): - self.predictions = predictions - - def predict(self, x): - """Predict using the simple model.""" - n = len(x) - return self.predictions[:n] - - -def test_process_file_chunked_creates_output(sample_df, tmp_path): - """Test process_file_chunked creates output file.""" - model_dir = tmp_path / "models" - model_dir.mkdir() - model_file = model_dir / "dispdir_bdt_ntel4_xgboost.joblib" - joblib.dump(SimpleModel(np.array([[0.1, 0.2, 1.5]] * 4)), model_file) - - output_file = tmp_path / "output.root" - - models = load_regression_models(str(model_dir)) - - with patch("eventdisplay_ml.scripts.apply_xgb_stereo.uproot.iterate") as mock_iterate: - with patch("eventdisplay_ml.scripts.apply_xgb_stereo.uproot.recreate") as mock_recreate: - mock_iterate.return_value = [sample_df.iloc[:1].copy()] - mock_tree = Mock() - mock_recreate.return_value.__enter__.return_value.mktree.return_value = mock_tree - - process_file_chunked( - input_file="input.root", - models=models, - output_file=str(output_file), - image_selection="15", - ) - - assert mock_tree.extend.called - - -@pytest.mark.parametrize( - ("max_events", "expected_chunks"), - [ - (None, 2), - (2, 1), - ], -) -def test_process_file_chunked_respects_limits(sample_df, tmp_path, max_events, expected_chunks): - """Test process_file_chunked respects event limits.""" - model_dir = tmp_path / "models" - model_dir.mkdir() - joblib.dump( - SimpleModel(np.array([[0.1, 0.2, 1.5]] * 4)), model_dir / "dispdir_bdt_ntel4_xgboost.joblib" - ) - - models = load_regression_models(str(model_dir)) - - with patch("eventdisplay_ml.scripts.apply_xgb_stereo.uproot.iterate") as mock_iterate: - with patch("eventdisplay_ml.scripts.apply_xgb_stereo.uproot.recreate") as mock_recreate: - mock_iterate.return_value = [sample_df.iloc[:2].copy(), sample_df.iloc[2:].copy()] - mock_tree = Mock() - mock_recreate.return_value.__enter__.return_value.mktree.return_value = mock_tree - - kwargs = { - "input_file": "input.root", - "models": models, - "output_file": str(tmp_path / "output.root"), - "image_selection": "15", - } - if max_events: - kwargs["max_events"] = max_events - - process_file_chunked(**kwargs) - assert mock_tree.extend.call_count == expected_chunks diff --git a/tests/unit_tests/scripts/test_train_xgb_stereo.py b/tests/unit_tests/scripts/test_train_xgb_stereo.py deleted file mode 100644 index d0d0d40..0000000 --- a/tests/unit_tests/scripts/test_train_xgb_stereo.py +++ /dev/null @@ -1,112 +0,0 @@ -"""Unit tests for the train_xgb_stereo script.""" - -from unittest.mock import MagicMock, patch - -import numpy as np -import pandas as pd -import pytest - -from eventdisplay_ml.scripts.train_xgb_stereo import train - - -@pytest.fixture -def sample_df(): - """Create a sample DataFrame with training data.""" - rng = np.random.Generator(np.random.PCG64(42)) - data = { - "feature1": rng.standard_normal(100), - "feature2": rng.standard_normal(100), - "feature3": rng.standard_normal(100), - "MCxoff": rng.standard_normal(100), - "MCyoff": rng.standard_normal(100), - "MCe0": rng.standard_normal(100), - } - return pd.DataFrame(data) - - -@pytest.fixture -def empty_df(): - """Create an empty DataFrame.""" - return pd.DataFrame() - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_regression_model") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.MultiOutputRegressor") -def test_train_with_valid_data(mock_multi_output, mock_evaluate, mock_dump, sample_df, tmp_path): - """Test train function with valid data.""" - mock_model = MagicMock() - mock_multi_output.return_value = mock_model - - train(sample_df, n_tel=3, output_dir=tmp_path, train_test_fraction=0.8) - - assert mock_multi_output.called - assert mock_model.fit.called - assert mock_dump.called - assert mock_evaluate.called - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_regression_model") -def test_train_with_empty_data(mock_evaluate, mock_dump, empty_df, caplog): - """Test train function with empty DataFrame.""" - train(empty_df, n_tel=2, output_dir="/tmp", train_test_fraction=0.7) - - assert mock_dump.call_count == 0 - assert mock_evaluate.call_count == 0 - assert "Skipping training" in caplog.text - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_regression_model") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.MultiOutputRegressor") -def test_train_output_filename(mock_multi_output, mock_evaluate, mock_dump, sample_df, tmp_path): - """Test that output filename is correctly formatted.""" - mock_model = MagicMock() - mock_multi_output.return_value = mock_model - - train(sample_df, n_tel=4, output_dir=tmp_path, train_test_fraction=0.8) - - # Verify dump was called with correct filename - call_args = mock_dump.call_args - output_path = call_args[0][1] - assert "dispdir_bdt_ntel4_xgboost.joblib" in str(output_path) - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_regression_model") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.MultiOutputRegressor") -def test_train_feature_selection(mock_multi_output, mock_evaluate, mock_dump, sample_df, tmp_path): - """Test that features are correctly separated from targets.""" - mock_model = MagicMock() - mock_multi_output.return_value = mock_model - - train(sample_df, n_tel=2, output_dir=tmp_path, train_test_fraction=0.8) - - # Verify fit was called with correct shapes - fit_call = mock_model.fit.call_args - x_train, y_train = fit_call[0] - - # Should have 3 features (feature1, feature2, feature3) - assert x_train.shape[1] == 3 - # Should have 3 targets (MCxoff, MCyoff, MCe0) - assert y_train.shape[1] == 3 - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_regression_model") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.MultiOutputRegressor") -def test_train_test_split_fraction( - mock_multi_output, mock_evaluate, mock_dump, sample_df, tmp_path -): - """Test that train/test split respects the fraction parameter.""" - mock_model = MagicMock() - mock_multi_output.return_value = mock_model - - train(sample_df, n_tel=2, output_dir=tmp_path, train_test_fraction=0.6) - - fit_call = mock_model.fit.call_args - x_train, _ = fit_call[0] - - # With 0.6 train fraction and 100 samples, expect ~60 training samples - assert 50 <= len(x_train) <= 70 diff --git a/tests/unit_tests/test_data_processing.py b/tests/unit_tests/test_data_processing.py deleted file mode 100644 index f459c64..0000000 --- a/tests/unit_tests/test_data_processing.py +++ /dev/null @@ -1,422 +0,0 @@ -"""Unit tests for data processing utilities.""" - -import numpy as np -import pandas as pd -import pytest - -from eventdisplay_ml.data_processing import ( - _pad_to_four, - _to_dense_array, - _to_padded_array, - apply_image_selection, - flatten_telescope_data_vectorized, - load_training_data, -) - -# ============================================================================ -# Parametrized Array Conversion Tests (consolidated from 10 functions) -# ============================================================================ - - -@pytest.mark.parametrize( - ("input_data", "expected_shape"), - [ - ([[1, 2, 3], [4, 5, 6]], (2, 3)), - ([[1, 2], [3, 4, 5], [6]], (3, 3)), - ([1, 2, 3], (3, 1)), - ([[1, 2], 3, [4, 5, 6]], (3, 3)), - ], -) -def test_to_dense_array(input_data, expected_shape): - """Test _to_dense_array with various input types.""" - col = pd.Series(input_data) - result = _to_dense_array(col) - assert result.shape == expected_shape - - -@pytest.mark.parametrize( - ("input_data", "expected_shape"), - [ - ([[1, 2, 3], [4, 5, 6]], (2, 3)), - ([[1, 2], [3, 4, 5], [6]], (3, 3)), - ([1, 2, 3], (3, 1)), - ([[1, 2], 3, [4, 5, 6]], (3, 3)), - ], -) -def test_to_padded_array(input_data, expected_shape): - """Test _to_padded_array with various input types.""" - result = _to_padded_array(input_data) - assert result.shape == expected_shape - - -def test_to_dense_array_with_numpy_arrays(arrays_numpy): - """Test _to_dense_array with numpy arrays.""" - col = pd.Series(arrays_numpy) - result = _to_dense_array(col) - assert result.shape == (2, 3) - - -def test_to_padded_array_with_numpy_arrays(arrays_numpy): - """Test _to_padded_array with numpy arrays.""" - result = _to_padded_array(arrays_numpy) - assert result.shape == (2, 3) - - -# ============================================================================ -# Data Flattening Tests -# ============================================================================ - - -@pytest.mark.parametrize( - ("n_tel", "with_pointing"), - [ - (2, False), - (2, True), - (1, False), - ], -) -def test_flatten_telescope_data_vectorized( - n_tel, with_pointing, df_two_tel_base, df_two_tel_pointing, df_one_tel_base -): - """Test flatten_telescope_data_vectorized with various telescope counts and pointing options.""" - if with_pointing and n_tel == 2: - df = df_two_tel_pointing - elif n_tel == 1: - df = df_one_tel_base - else: - df = df_two_tel_base - - training_vars = [ - "Disp_T", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "size", - "E", - "ES", - ] - if with_pointing: - training_vars.extend(["cen_x", "cen_y", "fpointing_dx", "fpointing_dy"]) - - result = flatten_telescope_data_vectorized( - df, - n_tel=n_tel, - features=training_vars, - apply_pointing_corrections=with_pointing, - analysis_type="stereo_analysis", - ) - - assert "Disp_T_0" in result.columns - assert "disp_x_0" in result.columns - assert len(result) == len(df) - - -def test_flatten_telescope_data_vectorized_derived_features(df_one_tel_base): - """Test that derived features are correctly computed.""" - result = flatten_telescope_data_vectorized( - df_one_tel_base, - n_tel=1, - features=[ - "Disp_T", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "size", - "E", - "ES", - ], - analysis_type="stereo_analysis", - ) - - assert "disp_x_0" in result.columns - assert "disp_y_0" in result.columns - assert "loss_loss_0" in result.columns - assert "loss_dist_0" in result.columns - assert "width_length_0" in result.columns - # For df_one_tel_base: Disp_T[0]=1.0, cosphi[0]=0.8, sinphi[0]=0.6 - assert result["disp_x_0"].iloc[0] == pytest.approx(1.0 * 0.8) - assert result["disp_y_0"].iloc[0] == pytest.approx(1.0 * 0.6) - - -def test_flatten_telescope_data_vectorized_missing_data(df_three_tel_missing): - """Test that missing disp columns are filled with NaN.""" - result = flatten_telescope_data_vectorized( - df_three_tel_missing, - n_tel=3, - features=[ - "Disp_T", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "size", - "E", - "ES", - ], - analysis_type="stereo_analysis", - ) - assert result["Disp_T_2"].isna().all() - - -# ============================================================================ -# Data Loading Tests -# ============================================================================ -def test_load_training_data_filters_by_n_tel(mocker): - """Test load_training_data filters events by DispNImages.""" - df_raw = pd.DataFrame( - { - "DispNImages": [2, 3, 2, 4], - "MCxoff": [0.1, 0.2, 0.3, 0.4], - "MCyoff": [0.5, 0.6, 0.7, 0.8], - "MCe0": [100.0, 200.0, 150.0, 250.0], - "DispTelList_T": [np.array([0, 1])] * 4, - "Disp_T": [np.array([1.0, 2.0])] * 4, - "cosphi": [np.array([0.8, 0.6])] * 4, - "sinphi": [np.array([0.6, 0.8])] * 4, - "loss": [np.array([0.1, 0.2])] * 4, - "dist": [np.array([1.0, 2.0])] * 4, - "width": [np.array([0.5, 0.6])] * 4, - "length": [np.array([2.0, 3.0])] * 4, - "size": [np.array([100.0, 200.0])] * 4, - "E": [np.array([10.0, 20.0])] * 4, - "ES": [np.array([5.0, 10.0])] * 4, - "Xoff": [1.0] * 4, - "Yoff": [3.0] * 4, - "Xoff_intersect": [0.9] * 4, - "Yoff_intersect": [2.9] * 4, - "Erec": [10.0] * 4, - "ErecS": [5.0] * 4, - "EmissionHeight": [100.0] * 4, - } - ) - - mock_tree = mocker.MagicMock() - - def arrays_side_effect(*args, **kwargs): - # Simulate uproot's cut by filtering DispNImages == n_tel - n_tel_local = 2 # match the test call below - return df_raw[df_raw["DispNImages"] == n_tel_local] - - mock_tree.arrays.side_effect = arrays_side_effect - - mock_root_file = mocker.MagicMock() - mock_root_file.__enter__.return_value = {"data": mock_tree} - mock_root_file.__exit__.return_value = None - mocker.patch("uproot.open", return_value=mock_root_file) - - result = load_training_data(["dummy.root"], n_tel=2, max_events=-1) - assert len(result) == 2 - assert all(col in result.columns for col in ["MCxoff", "MCyoff", "MCe0"]) - - -@pytest.mark.parametrize( - ("max_events", "expected_max_rows"), - [ - (5, 5), - (3, 3), - (-1, 10), - ], -) -def test_load_training_data_max_events(mocker, max_events, expected_max_rows): - """Test load_training_data respects max_events limit.""" - df_raw = pd.DataFrame( - { - "DispNImages": [2] * 10, - "MCxoff": np.arange(10, dtype=float) * 0.1, - "MCyoff": np.arange(10, dtype=float) * 0.1, - "MCe0": np.ones(10) * 100.0, - "DispTelList_T": [np.array([0, 1])] * 10, - "Disp_T": [np.array([1.0, 2.0])] * 10, - "cosphi": [np.array([0.8, 0.6])] * 10, - "sinphi": [np.array([0.6, 0.8])] * 10, - "loss": [np.array([0.1, 0.2])] * 10, - "dist": [np.array([1.0, 2.0])] * 10, - "width": [np.array([0.5, 0.6])] * 10, - "length": [np.array([2.0, 3.0])] * 10, - "size": [np.array([100.0, 200.0])] * 10, - "E": [np.array([10.0, 20.0])] * 10, - "ES": [np.array([5.0, 10.0])] * 10, - "Xoff": np.ones(10), - "Yoff": np.ones(10) * 3.0, - "Xoff_intersect": np.ones(10) * 0.9, - "Yoff_intersect": np.ones(10) * 2.9, - "Erec": np.ones(10) * 10.0, - "ErecS": np.ones(10) * 5.0, - "EmissionHeight": np.ones(10) * 100.0, - } - ) - - mock_tree = mocker.MagicMock() - mock_tree.arrays.return_value = df_raw - mock_root_file = mocker.MagicMock() - mock_root_file.__enter__.return_value = {"data": mock_tree} - mock_root_file.__exit__.return_value = None - mocker.patch("uproot.open", return_value=mock_root_file) - - result = load_training_data(["dummy.root"], n_tel=2, max_events=max_events) - assert len(result) <= expected_max_rows - - -def test_load_training_data_multiple_files(mocker): - """Test load_training_data concatenates multiple files.""" - df1 = pd.DataFrame( - { - "DispNImages": [2] * 2, - "MCxoff": [0.1, 0.2], - "MCyoff": [0.5, 0.6], - "MCe0": [100.0, 150.0], - "DispTelList_T": [np.array([0, 1])] * 2, - "Disp_T": [np.array([1.0, 2.0])] * 2, - "cosphi": [np.array([0.8, 0.6])] * 2, - "sinphi": [np.array([0.6, 0.8])] * 2, - "loss": [np.array([0.1, 0.2])] * 2, - "dist": [np.array([1.0, 2.0])] * 2, - "width": [np.array([0.5, 0.6])] * 2, - "length": [np.array([2.0, 3.0])] * 2, - "size": [np.array([100.0, 200.0])] * 2, - "E": [np.array([10.0, 20.0])] * 2, - "ES": [np.array([5.0, 10.0])] * 2, - "Xoff": [1.0] * 2, - "Yoff": [3.0] * 2, - "Xoff_intersect": [0.9] * 2, - "Yoff_intersect": [2.9] * 2, - "Erec": [10.0] * 2, - "ErecS": [5.0] * 2, - "EmissionHeight": [100.0] * 2, - } - ) - df2 = df1.iloc[:1].copy() - df2.loc[0, "MCe0"] = 200.0 - - call_count = [0] - - def mock_arrays(*args, **kwargs): - call_count[0] += 1 - return df1 if call_count[0] == 1 else df2 - - mock_tree = mocker.MagicMock() - mock_tree.arrays.side_effect = mock_arrays - mock_root_file = mocker.MagicMock() - mock_root_file.__enter__.return_value = {"data": mock_tree} - mock_root_file.__exit__.return_value = None - mocker.patch("uproot.open", return_value=mock_root_file) - - result = load_training_data(["dummy1.root", "dummy2.root"], n_tel=2, max_events=-1) - assert len(result) == 3 - - -def test_load_training_data_computes_log_mce0(mocker): - """Test load_training_data correctly computes log10 of MCe0.""" - df_raw = pd.DataFrame( - { - "DispNImages": [2], - "MCxoff": [0.1], - "MCyoff": [0.5], - "MCe0": [100.0], - "DispTelList_T": [np.array([0, 1])], - "Disp_T": [np.array([1.0, 2.0])], - "cosphi": [np.array([0.8, 0.6])], - "sinphi": [np.array([0.6, 0.8])], - "loss": [np.array([0.1, 0.2])], - "dist": [np.array([1.0, 2.0])], - "width": [np.array([0.5, 0.6])], - "length": [np.array([2.0, 3.0])], - "size": [np.array([100.0, 200.0])], - "E": [np.array([10.0, 20.0])], - "ES": [np.array([5.0, 10.0])], - "Xoff": [1.0], - "Yoff": [3.0], - "Xoff_intersect": [0.9], - "Yoff_intersect": [2.9], - "Erec": [10.0], - "ErecS": [5.0], - "EmissionHeight": [100.0], - } - ) - - mock_tree = mocker.MagicMock() - mock_tree.arrays.return_value = df_raw - mock_root_file = mocker.MagicMock() - mock_root_file.__enter__.return_value = {"data": mock_tree} - mock_root_file.__exit__.return_value = None - mocker.patch("uproot.open", return_value=mock_root_file) - - result = load_training_data(["dummy.root"], n_tel=2, max_events=-1) - assert "MCe0" in result.columns - assert result["MCe0"].iloc[0] == pytest.approx(np.log10(100.0)) - - -@pytest.mark.parametrize( - ("input_data", "expected_first_values", "check_nans"), - [ - (np.array([1.0, 2.0, 3.0]), [1.0, 2.0, 3.0], [3]), - ([1.0, 2.0], [1.0, 2.0], [2, 3]), - (np.array([5.0]), [5.0], [1, 2, 3]), - (np.array([]), None, [0, 1, 2, 3]), - (np.array([1.0, 2.0, 3.0, 4.0]), [1.0, 2.0, 3.0, 4.0], []), - (np.array([1.0, np.nan, 3.0]), [1.0], [1, 3]), - ([1, 2.5, 3], [1.0, 2.5, 3.0], [3]), - (np.array([-1.0, -2.5, 3.0]), [-1.0, -2.5, 3.0], [3]), - (np.array([0.0, 1.0, 0.0]), [0.0, 1.0, 0.0], [3]), - ], -) -def test_pad_to_four(input_data, expected_first_values, check_nans): - """Test _pad_to_four with various input types and edge cases.""" - result = _pad_to_four(input_data) - - assert len(result) == 4 - assert result.dtype == np.float32 - - if expected_first_values: - for i, val in enumerate(expected_first_values): - assert np.isclose(result[i], val) or (np.isnan(val) and np.isnan(result[i])) - - for nan_idx in check_nans: - assert np.isnan(result[nan_idx]) - - -def test_pad_to_four_with_scalar(): - """Test _pad_to_four returns scalars unchanged.""" - scalar = 3.14 - result = _pad_to_four(scalar) - assert result == 3.14 - - -# ============================================================================ -# Image Selection Tests -# ============================================================================ - - -@pytest.mark.parametrize( - ("selection", "expected_tel_0", "expected_n_images_0"), - [ - (None, [0, 1, 2, 3], 4), - ([0, 1, 2, 3], [0, 1, 2, 3], 4), - ([0, 1], [0, 1], 2), - ([2], [2], 1), - ], -) -def test_apply_image_selection(sample_df, selection, expected_tel_0, expected_n_images_0): - """Test apply_image_selection with various telescope selections.""" - result = apply_image_selection(sample_df, selection, "stereo_analysis") - - if selection is None or selection == [0, 1, 2, 3]: - pd.testing.assert_frame_equal(result, sample_df) - else: - assert result["DispTelList_T"].iloc[0] == expected_tel_0 - assert result["DispNImages"].iloc[0] == expected_n_images_0 - - -def test_apply_image_selection_preserves_original(sample_df): - """Test that apply_image_selection doesn't modify the original DataFrame.""" - original_copy = sample_df.copy(deep=True) - apply_image_selection(sample_df, [0, 1], "stereo_analysis") - pd.testing.assert_frame_equal(sample_df, original_copy) diff --git a/tests/unit_tests/test_evaluate.py b/tests/unit_tests/test_evaluate.py deleted file mode 100644 index 55e3303..0000000 --- a/tests/unit_tests/test_evaluate.py +++ /dev/null @@ -1,285 +0,0 @@ -"""Unit tests for model evaluation.""" - -import logging -from unittest.mock import MagicMock - -import numpy as np -import pandas as pd -import pytest - -from eventdisplay_ml.evaluate import ( - calculate_resolution, - evaluate_regression_model, - feature_importance, - shap_feature_importance, -) - -rng = np.random.default_rng(0) - - -# ============================================================================ -# SHAP Feature Importance Tests (consolidated from 3 functions) -# ============================================================================ - - -@pytest.mark.parametrize( - ("n_targets", "n_samples"), - [ - (1, 100), - (2, 50), - ], -) -def test_shap_feature_importance(caplog, n_targets, n_samples): - """Test shap_feature_importance with various target counts.""" - caplog.set_level(logging.INFO) - - # Create mock model with appropriate estimators - mock_model = MagicMock() - estimators = [] - for _ in range(n_targets): - mock_est = MagicMock() - mock_booster = MagicMock() - mock_booster.predict.return_value = np.hstack( - [rng.random((n_samples, 4)), rng.random((n_samples, 1))] - ) - mock_est.get_booster.return_value = mock_booster - estimators.append(mock_est) - mock_model.estimators_ = estimators - - x_sample_data = pd.DataFrame({f"feature_{i}": rng.random(n_samples) for i in range(3)}) - target_names = [f"target_{i}" for i in range(n_targets)] - - shap_feature_importance(mock_model, x_sample_data, target_names, max_points=100, n_top=2) - - for target in target_names: - assert f"Builtin XGBoost SHAP Importance for {target}" in caplog.text - - -# ============================================================================ -# Feature Importance Tests (consolidated from 3 functions) -# ============================================================================ - - -@pytest.mark.parametrize( - ("n_targets", "n_features"), - [ - (1, 3), - (2, 4), - ], -) -def test_feature_importance(caplog, n_targets, n_features): - """Test feature_importance with various feature/target counts.""" - caplog.set_level(logging.INFO) - - mock_model = MagicMock() - estimators = [] - rng = np.random.default_rng(42) - for _ in range(n_targets): - mock_est = MagicMock() - mock_est.feature_importances_ = rng.random(n_features) - estimators.append(mock_est) - mock_model.estimators_ = estimators - - x_cols = [f"feature_{i}" for i in range(n_features)] - target_names = [f"target_{i}" for i in range(n_targets)] - - feature_importance(mock_model, x_cols, target_names, name="test_model") - - assert "XGBoost Feature Importance" in caplog.text - for target in target_names: - assert f"Importance for Target: **{target}**" in caplog.text - - -def test_feature_importance_sorted(caplog): - """Test feature_importance sorts features by importance.""" - caplog.set_level(logging.INFO) - - mock_est = MagicMock() - mock_est.feature_importances_ = np.array([0.1, 0.5, 0.3, 0.1]) - mock_model = MagicMock() - mock_model.estimators_ = [mock_est] - - x_cols = ["low_1", "high", "medium", "low_2"] - target_names = ["target"] - - feature_importance(mock_model, x_cols, target_names) - - log_text = caplog.text - high_pos = log_text.find("high") - medium_pos = log_text.find("medium") - assert high_pos < medium_pos - - -# ============================================================================ -# Resolution Calculation Tests (consolidated from 4 functions) -# ============================================================================ - - -@pytest.mark.parametrize( - ("n_bins", "percentiles"), - [ - (2, [50, 68]), - (3, [50, 68, 90]), - (1, [68, 90, 95]), - ], -) -def test_calculate_resolution(caplog, n_bins, percentiles): - """Test calculate_resolution with various binning and percentile configurations.""" - caplog.set_level(logging.INFO) - - y_pred = np.array( - [ - [0.1, 0.2, 1.0], - [0.15, 0.25, 1.1], - [0.2, 0.3, 0.9], - [0.05, 0.1, 1.2], - ] - ) - y_test = pd.DataFrame( - { - "MCxoff": [0.0, 0.0, 0.0, 0.0], - "MCyoff": [0.0, 0.0, 0.0, 0.0], - }, - index=[0, 1, 2, 3], - ) - df = pd.DataFrame({"MCe0": [0.5, 0.8, 1.0, 1.5]}, index=[0, 1, 2, 3]) - - calculate_resolution( - y_pred, - y_test, - df, - percentiles=percentiles, - log_e_min=0, - log_e_max=2, - n_bins=n_bins, - name="test", - ) - - assert "DeltaTheta Resolution" in caplog.text - assert "DeltaMCe0 Resolution" in caplog.text - for perc in percentiles: - assert f"Theta_{perc}%" in caplog.text - - -def test_calculate_resolution_deltas_computed_correctly(caplog): - """Test delta computations in calculate_resolution.""" - caplog.set_level(logging.INFO) - - # Known case: differences = sqrt(0.1^2 + 0.2^2) = sqrt(0.05) - y_pred = np.array([[0.1, 0.2, 1.0], [0.0, 0.0, 1.0]]) - y_test = pd.DataFrame({"MCxoff": [0.0, 0.0], "MCyoff": [0.0, 0.0]}, index=[0, 1]) - df = pd.DataFrame({"MCe0": [1.0, 1.0]}, index=[0, 1]) - - calculate_resolution( - y_pred, y_test, df, percentiles=[50], log_e_min=0.5, log_e_max=1.5, n_bins=1, name="test" - ) - - assert "Theta_50%" in caplog.text - assert "DeltaE" in caplog.text - - -# ============================================================================ -# Model Evaluation Tests (consolidated from 5 functions) -# ============================================================================ - - -def test_evaluate_regression_model_basic(caplog): - """Test evaluate_regression_model logs R^2 score and metrics.""" - caplog.set_level(logging.INFO) - - mock_model = MagicMock() - mock_model.score.return_value = 0.85 - mock_model.predict.return_value = np.array([[0.1, 0.2, 1.0], [0.15, 0.25, 1.1]]) - - mock_est1 = MagicMock() - mock_est1.feature_importances_ = np.array([0.5, 0.3, 0.2]) - mock_est2 = MagicMock() - mock_est2.feature_importances_ = np.array([0.4, 0.4, 0.2]) - mock_model.estimators_ = [mock_est1, mock_est2] - - x_test = pd.DataFrame( - { - "feat_1": [1.0, 2.0], - "feat_2": [3.0, 4.0], - "feat_3": [5.0, 6.0], - }, - index=[0, 1], - ) - y_test = pd.DataFrame( - { - "MCxoff": [0.0, 0.0], - "MCyoff": [0.0, 0.0], - }, - index=[0, 1], - ) - df = pd.DataFrame({"MCe0": [1.0, 1.1]}, index=[0, 1]) - y_data = pd.DataFrame({"target_1": [1, 2], "target_2": [3, 4]}) - - evaluate_regression_model( - mock_model, x_test, y_test, df, ["feat_1", "feat_2", "feat_3"], y_data, "test_model" - ) - - assert "XGBoost Multi-Target R^2 Score (Testing Set): 0.8500" in caplog.text - assert "test_model MSE (X_off):" in caplog.text - assert "test_model MAE (X_off):" in caplog.text - assert "test_model MAE (Y_off):" in caplog.text - - -@pytest.mark.parametrize( - ("model_name", "has_xgb"), - [ - ("xgboost", True), - ("random_forest", False), - ], -) -def test_evaluate_regression_model_shap_conditional(caplog, model_name, has_xgb): - """Test evaluate_regression_model calls SHAP only for XGBoost models.""" - caplog.set_level(logging.INFO) - - mock_model = MagicMock() - mock_model.score.return_value = 0.8 - mock_model.predict.return_value = np.array([[0.1, 0.2, 1.0]]) - - mock_est = MagicMock() - mock_est.feature_importances_ = np.array([0.5, 0.3, 0.2]) - if has_xgb: - mock_booster = MagicMock() - mock_booster.predict.return_value = rng.random((1, 4)) - mock_est.get_booster.return_value = mock_booster - mock_model.estimators_ = [mock_est] - - x_test = pd.DataFrame({"x": [1.0], "y": [2.0], "z": [3.0]}, index=[0]) - y_test = pd.DataFrame({"MCxoff": [0.0], "MCyoff": [0.0]}, index=[0]) - df = pd.DataFrame({"MCe0": [1.0]}, index=[0]) - y_data = pd.DataFrame({"target": [1]}) - - evaluate_regression_model(mock_model, x_test, y_test, df, ["x", "y", "z"], y_data, model_name) - - if has_xgb: - assert "Builtin XGBoost SHAP Importance" in caplog.text - else: - assert "Builtin XGBoost SHAP Importance" not in caplog.text - - -def test_evaluate_regression_model_calls_resolution(caplog): - """Test evaluate_regression_model calls calculate_resolution.""" - caplog.set_level(logging.INFO) - - mock_model = MagicMock() - mock_model.score.return_value = 0.82 - mock_model.predict.return_value = np.array([[0.05, 0.1, 1.0], [0.08, 0.12, 1.1]]) - - mock_est = MagicMock() - mock_est.feature_importances_ = np.array([0.5, 0.3, 0.2]) - mock_model.estimators_ = [mock_est] - - x_test = pd.DataFrame({"m": [1.0, 2.0], "n": [3.0, 4.0], "o": [5.0, 6.0]}, index=[0, 1]) - y_test = pd.DataFrame({"MCxoff": [0.0, 0.0], "MCyoff": [0.0, 0.0]}, index=[0, 1]) - df = pd.DataFrame({"MCe0": [0.5, 1.0]}, index=[0, 1]) - y_data = pd.DataFrame({"target": [1, 2]}) - - evaluate_regression_model(mock_model, x_test, y_test, df, ["m", "n", "o"], y_data, "test_model") - - assert "DeltaTheta Resolution vs. Log10(MCe0)" in caplog.text - assert "DeltaMCe0 Resolution vs. Log10(MCe0)" in caplog.text - assert "Calculated over 6 bins between Log10(E) = -1 and 2" in caplog.text diff --git a/tests/unit_tests/test_features.py b/tests/unit_tests/test_features.py deleted file mode 100644 index e3376f2..0000000 --- a/tests/unit_tests/test_features.py +++ /dev/null @@ -1,312 +0,0 @@ -"""Unit tests for training variables selection utilities.""" - -import pytest - -import eventdisplay_ml.features - - -def test_target_features_stereo_analysis(): - result = eventdisplay_ml.features.target_features("stereo_analysis") - assert result == ["MCxoff", "MCyoff", "MCe0"] - - -def test_target_features_classification_exact(): - result = eventdisplay_ml.features.target_features("classification") - assert result == [] - - -def test_target_features_classification_in_name(): - result = eventdisplay_ml.features.target_features("my_classification_run") - assert result == [] - - -def test_target_features_invalid_type(): - with pytest.raises(ValueError, match="Unknown analysis type"): - eventdisplay_ml.features.target_features("unknown_type") - - -def test_excluded_features_stereo_analysis(): - ntel = 3 - result = eventdisplay_ml.features.excluded_features("stereo_analysis", ntel) - expected = { - "fpointing_dx_0", - "fpointing_dx_1", - "fpointing_dx_2", - "fpointing_dy_0", - "fpointing_dy_1", - "fpointing_dy_2", - } - assert result == expected - - -def test_excluded_features_classification_exact(): - ntel = 2 - result = eventdisplay_ml.features.excluded_features("classification", ntel) - expected = { - "Erec", - "size_0", - "size_1", - "E_0", - "E_1", - "ES_0", - "ES_1", - "fpointing_dx_0", - "fpointing_dx_1", - "fpointing_dy_0", - "fpointing_dy_1", - } - assert result == expected - - -def test_excluded_features_classification_in_name(): - ntel = 1 - result = eventdisplay_ml.features.excluded_features("my_classification_run", ntel) - expected = { - "Erec", - "size_0", - "E_0", - "ES_0", - "fpointing_dx_0", - "fpointing_dy_0", - } - assert result == expected - - -def test_excluded_features_invalid_type(): - with pytest.raises(ValueError, match="Unknown analysis type"): - eventdisplay_ml.features.excluded_features("unknown_type", 2) - - -def test_telescope_features_classification(): - result = eventdisplay_ml.features.telescope_features("classification") - expected = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - ] - assert result == expected - - -def test_telescope_features_stereo_analysis(): - result = eventdisplay_ml.features.telescope_features("stereo_analysis") - expected = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - "size", - "E", - "ES", - "Disp_T", - "DispXoff_T", - "DispYoff_T", - "DispWoff_T", - ] - assert result == expected - - -def test_telescope_features_other_analysis_type(): - result = eventdisplay_ml.features.telescope_features("regression") - expected = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - "size", - "E", - "ES", - "Disp_T", - "DispXoff_T", - "DispYoff_T", - "DispWoff_T", - ] - assert result == expected - - -def test__regression_features_training_true(): - result = eventdisplay_ml.features._regression_features(training=True) - # Should start with target features - assert result[:3] == ["MCxoff", "MCyoff", "MCe0"] - # Should contain all regression features - expected_features = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - "size", - "E", - "ES", - "Disp_T", - "DispXoff_T", - "DispYoff_T", - "DispWoff_T", - "DispNImages", - "DispTelList_T", - "Xoff", - "Yoff", - "Xoff_intersect", - "Yoff_intersect", - "Erec", - "ErecS", - "EmissionHeight", - ] - # All expected features should be present after the target features - for feat in expected_features: - assert feat in result - - -def test__regression_features_training_false(): - result = eventdisplay_ml.features._regression_features(training=False) - # Should NOT start with target features - assert "MCxoff" not in result - assert "MCyoff" not in result - assert "MCe0" not in result - # Should contain all regression features - expected_features = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - "size", - "E", - "ES", - "Disp_T", - "DispXoff_T", - "DispYoff_T", - "DispWoff_T", - "DispNImages", - "DispTelList_T", - "Xoff", - "Yoff", - "Xoff_intersect", - "Yoff_intersect", - "Erec", - "ErecS", - "EmissionHeight", - ] - for feat in expected_features: - assert feat in result - # Should have the same length as training + 3 (for the targets) - result_training = eventdisplay_ml.features._regression_features(training=True) - assert len(result_training) == len(result) + 3 - - -def test__classification_features_training_true(): - result = eventdisplay_ml.features._classification_features(training=True) - expected_tel_features = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - ] - expected_array_features = [ - "DispNImages", - "DispTelList_T", - "EChi2S", - "EmissionHeight", - "EmissionHeightChi2", - "MSCW", - "MSCL", - "ArrayPointing_Elevation", - ] - # Should contain all telescope and array features, but not "Erec" - for feat in expected_tel_features + expected_array_features: - assert feat in result - assert "Erec" not in result - # Should start with telescope features - assert result[: len(expected_tel_features)] == expected_tel_features - # Should have correct length - assert len(result) == len(expected_tel_features) + len(expected_array_features) - - -def test__classification_features_training_false(): - result = eventdisplay_ml.features._classification_features(training=False) - expected_tel_features = [ - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - "fpointing_dx", - "fpointing_dy", - ] - - expected_array_features = [ - "DispNImages", - "DispTelList_T", - "EChi2S", - "EmissionHeight", - "EmissionHeightChi2", - "MSCW", - "MSCL", - "ArrayPointing_Elevation", - ] - # Should contain all telescope and array features, and "Erec" - for feat in expected_tel_features + expected_array_features: - assert feat in result - assert "Erec" in result - # "Erec" should be the last feature - assert result[-1] == "Erec" - # Should have correct length - assert len(result) == len(expected_tel_features) + len(expected_array_features) + 1 diff --git a/tests/unit_tests/test_models.py b/tests/unit_tests/test_models.py deleted file mode 100644 index c8bf873..0000000 --- a/tests/unit_tests/test_models.py +++ /dev/null @@ -1,77 +0,0 @@ -"""Unit tests models.""" - -import joblib -import numpy as np -import pytest - -from eventdisplay_ml.scripts.apply_xgb_stereo import ( - apply_regression_models, - load_regression_models, -) - - -class SimpleModel: - """A simple picklable model for testing.""" - - def __init__(self, predictions): - self.predictions = predictions - - def predict(self, x): - """Predict using the simple model.""" - n = len(x) - return self.predictions[:n] - - -@pytest.mark.parametrize( - ("models_to_create", "expected_in_dict"), - [ - ([2], [2]), - ([2, 3, 4], [2, 3, 4]), - ([], []), - ], -) -def test_load_models(tmp_path, models_to_create, expected_in_dict): - """Test load_models loads available models from directory.""" - for n_tel in models_to_create: - model_file = tmp_path / f"dispdir_bdt_ntel{n_tel}_xgboost.joblib" - joblib.dump({"multiplicity": n_tel}, model_file) - - models = load_regression_models(str(tmp_path)) - - for n_tel in expected_in_dict: - assert n_tel in models - assert models[n_tel]["multiplicity"] == n_tel - assert len(models) == len(expected_in_dict) - - -@pytest.mark.parametrize( - "n_tel_multiplicities", - [ - ([4]), - ([2, 3, 4]), - ], -) -def test_apply_models(sample_df, n_tel_multiplicities): - """Test apply_models with different telescope multiplicities.""" - models = {} - for n_tel in n_tel_multiplicities: - # Create enough predictions for all rows (max 4 rows in sample_df) - models[n_tel] = SimpleModel(np.array([[0.1 * n_tel, 0.2 * n_tel, 1.5]] * 4)) - - sample_df = sample_df.reset_index(drop=True) - - pred_xoff, pred_yoff, pred_erec = apply_regression_models(sample_df, models) - - assert all(len(p) == len(sample_df) for p in [pred_xoff, pred_yoff, pred_erec]) - assert all(p.dtype == np.float32 for p in [pred_xoff, pred_yoff, pred_erec]) - - -def test_apply_models_with_missing_multiplicity(sample_df): - """Test apply_models handles missing models gracefully.""" - models = {4: SimpleModel(np.array([[0.1, 0.2, 1.5]] * 4))} - pred_xoff, _, _ = apply_regression_models(sample_df, models) - - assert not np.isnan(pred_xoff[0]) # Row 0 has 4 telescopes - assert np.isnan(pred_xoff[1]) # Row 1 has 2 telescopes - assert np.isnan(pred_xoff[2]) # Row 2 has 3 telescopes - assert not np.isnan(pred_xoff[3]) # Row 3 has 4 telescopes diff --git a/tests/unit_tests/test_utils.py b/tests/unit_tests/test_utils.py deleted file mode 100644 index eaf395c..0000000 --- a/tests/unit_tests/test_utils.py +++ /dev/null @@ -1,180 +0,0 @@ -"""Unit tests for utility helpers such as input file list reader.""" - -import json - -import pytest - -from eventdisplay_ml.utils import ( - load_energy_range, - load_model_parameters, - parse_image_selection, - read_input_file_list, -) - - -def test_read_input_file_list_success(tmp_path): - """Test successful reading of input file list.""" - test_file = tmp_path / "input_files.txt" - test_files = ["file1.txt", "file2.txt", "file3.txt"] - test_file.write_text("\n".join(test_files)) - - result = read_input_file_list(str(test_file)) - assert result == test_files - - -def test_read_input_file_list_with_empty_lines(tmp_path): - """Test reading file list with empty lines.""" - test_file = tmp_path / "input_files.txt" - content = "file1.txt\n\nfile2.txt\n \nfile3.txt\n" - test_file.write_text(content) - - result = read_input_file_list(str(test_file)) - assert result == ["file1.txt", "file2.txt", "file3.txt"] - - -def test_read_input_file_list_with_whitespace(tmp_path): - """Test reading file list with leading/trailing whitespace.""" - test_file = tmp_path / "input_files.txt" - content = " file1.txt \nfile2.txt\t\n file3.txt" - test_file.write_text(content) - - result = read_input_file_list(str(test_file)) - assert result == ["file1.txt", "file2.txt", "file3.txt"] - - -def test_read_input_file_list_empty_file(tmp_path): - """Test reading empty file.""" - test_file = tmp_path / "input_files.txt" - test_file.write_text("") - - with pytest.raises(ValueError, match="Error: No input files found in the list"): - read_input_file_list(str(test_file)) - - -def test_read_input_file_list_file_not_found(): - """Test FileNotFoundError is raised when file does not exist.""" - with pytest.raises(FileNotFoundError, match="Error: Input file list not found"): - read_input_file_list("/nonexistent/path/file.txt") - - -def test_parse_image_selection_comma_separated(): - """Test parsing comma-separated indices.""" - result = parse_image_selection("1, 2, 3") - assert result == [1, 2, 3] - - -def test_parse_image_selection_bit_coded(): - """Test parsing bit-coded value.""" - result = parse_image_selection("14") # 0b1110 -> indices 1, 2, 3 - assert result == [1, 2, 3] - - -def test_parse_image_selection_empty_string(): - """Test parsing empty string returns None.""" - result = parse_image_selection("") - assert result is None - - -def test_parse_image_selection_invalid_comma_separated(): - """Test ValueError is raised for invalid comma-separated input.""" - with pytest.raises(ValueError, match="Invalid image_selection format"): - parse_image_selection("1, two, 3") - - -def test_parse_image_selection_invalid_bit_coded(): - """Test ValueError is raised for invalid bit-coded input.""" - with pytest.raises(ValueError, match="Invalid image_selection format"): - parse_image_selection("invalid") - - -def test_load_model_parameters_success(tmp_path): - """Test loading model parameters from a valid JSON file.""" - params = { - "energy_bins_log10_tev": [{"E_min": 1.0, "E_max": 2.0}, {"E_min": 2.0, "E_max": 3.0}], - "other_param": 42, - } - param_file = tmp_path / "params.json" - param_file.write_text(json.dumps(params)) - - result = load_model_parameters(str(param_file)) - assert result["energy_bins_log10_tev"] == params["energy_bins_log10_tev"] - assert result["other_param"] == 42 - - -def test_load_model_parameters_with_energy_bin_number(tmp_path): - """Test loading model parameters with a specific energy bin number.""" - params = { - "energy_bins_log10_tev": [{"E_min": 1.0, "E_max": 2.0}, {"E_min": 2.0, "E_max": 3.0}], - "other_param": 42, - } - param_file = tmp_path / "params.json" - param_file.write_text(json.dumps(params)) - - result = load_model_parameters(str(param_file), energy_bin_number=1) - assert result["energy_bins_log10_tev"] == {"E_min": 2.0, "E_max": 3.0} - assert result["other_param"] == 42 - - -def test_load_model_parameters_file_not_found(tmp_path): - """Test FileNotFoundError is raised when model parameters file does not exist.""" - non_existent_file = tmp_path / "does_not_exist.json" - with pytest.raises(FileNotFoundError, match="Model parameters file not found"): - load_model_parameters(str(non_existent_file)) - - -def test_load_model_parameters_invalid_energy_bin_number(tmp_path): - """Test ValueError is raised for invalid energy bin number.""" - params = {"energy_bins_log10_tev": [{"E_min": 1.0, "E_max": 2.0}]} - param_file = tmp_path / "params.json" - param_file.write_text(json.dumps(params)) - - with pytest.raises(ValueError, match="Invalid energy bin number 5"): - load_model_parameters(str(param_file), energy_bin_number=5) - - -def test_load_model_parameters_missing_energy_bins_key(tmp_path): - """Test ValueError is raised if energy_bins_log10_tev key is missing when energy_bin_number is given.""" - params = {"other_param": 42} - param_file = tmp_path / "params.json" - param_file.write_text(json.dumps(params)) - - with pytest.raises(ValueError, match="Invalid energy bin number 0"): - load_model_parameters(str(param_file), energy_bin_number=0) - - -def test_load_energy_range_success(tmp_path): - """Test loading energy range for a valid energy bin.""" - params = { - "energy_bins_log10_tev": [ - {"E_min": 0.0, "E_max": 1.0}, # 10^0=1, 10^1=10 - {"E_min": 1.0, "E_max": 2.0}, # 10^1=10, 10^2=100 - ] - } - param_file = tmp_path / "params.json" - param_file.write_text(json.dumps(params)) - - result = load_energy_range(str(param_file), energy_bin_number=0) - assert result == (1.0, 10.0) - - result = load_energy_range(str(param_file), energy_bin_number=1) - assert result == (10.0, 100.0) - - -def test_load_energy_range_invalid_bin_number(tmp_path): - """Test ValueError is raised for invalid energy bin number.""" - params = {"energy_bins_log10_tev": [{"E_min": 0.0, "E_max": 1.0}]} - param_file = tmp_path / "params.json" - param_file.write_text(json.dumps(params)) - - with pytest.raises(ValueError, match="Invalid energy bin number 5"): - load_energy_range(str(param_file), energy_bin_number=5) - - -def test_load_energy_range_missing_energy_bins_key(tmp_path): - """Test ValueError is raised if energy_bins_log10_tev key is missing.""" - params = {"other_param": 42} - param_file = tmp_path / "params.json" - param_file.write_text(json.dumps(params)) - - with pytest.raises(ValueError, match="Invalid energy bin number 0"): - load_energy_range(str(param_file), energy_bin_number=0) From 4e8a760e42f8bea99bec031a1476802601f06036 Mon Sep 17 00:00:00 2001 From: Gernot Maier Date: Wed, 31 Dec 2025 11:40:51 +0100 Subject: [PATCH 10/10] log message --- src/eventdisplay_ml/data_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eventdisplay_ml/data_processing.py b/src/eventdisplay_ml/data_processing.py index ab9b449..0a11d1b 100644 --- a/src/eventdisplay_ml/data_processing.py +++ b/src/eventdisplay_ml/data_processing.py @@ -171,7 +171,7 @@ def load_training_data( _logger.info(f"Processing file: {f}") tree = root_file["data"] df = tree.arrays(branch_list, cut=event_cut, library="pd") - _logger.info(f"Number of events after filter {event_cut}: {len(df)}") + _logger.info(f"Number of events after event cut {event_cut}: {len(df)}") if max_events_per_file and len(df) > max_events_per_file: df = df.sample(n=max_events_per_file, random_state=42) if not df.empty: