diff --git a/pyproject.toml b/pyproject.toml index 6edce79..ba39193 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,7 +60,9 @@ optional-dependencies."tests" = [ urls."bug tracker" = "https://github.com/Eventdisplay/Eventdisplay-ML/issues" urls."documentation" = "https://github.com/Eventdisplay/Eventdisplay-ML" urls."repository" = "https://github.com/Eventdisplay/Eventdisplay-ML" +scripts.eventdisplay-ml-apply-xgb-classify = "eventdisplay_ml.scripts.apply_xgb_classify:main" scripts.eventdisplay-ml-apply-xgb-stereo = "eventdisplay_ml.scripts.apply_xgb_stereo:main" +scripts.eventdisplay-ml-train-xgb-classify = "eventdisplay_ml.scripts.train_xgb_classify:main" scripts.eventdisplay-ml-train-xgb-stereo = "eventdisplay_ml.scripts.train_xgb_stereo:main" [tool.setuptools] @@ -118,6 +120,9 @@ lint.ignore = [ lint.pydocstyle.convention = "numpy" +[tool.ruff.lint.per-file-ignores] +"tests/**.py" = ["D103"] + [tool.codespell] ignore-words-list = "chec,arrang,livetime" diff --git a/src/eventdisplay_ml/data_processing.py b/src/eventdisplay_ml/data_processing.py index 830718e..0a11d1b 100644 --- a/src/eventdisplay_ml/data_processing.py +++ b/src/eventdisplay_ml/data_processing.py @@ -1,5 +1,5 @@ """ -Shared data processing utilities for XGBoost stereo analysis. +Data processing for XGBoost analysis. Provides common functions for flattening and preprocessing telescope array data. """ @@ -10,21 +10,13 @@ import pandas as pd import uproot -from eventdisplay_ml.training_variables import ( - xgb_all_training_variables, - xgb_per_telescope_training_variables, -) +from eventdisplay_ml import features +from eventdisplay_ml.utils import load_energy_range _logger = logging.getLogger(__name__) -def flatten_data_vectorized( - df, - n_tel, - training_variables, - apply_pointing_corrections=False, - dtype=None, -): +def flatten_telescope_data_vectorized(df, n_tel, features, analysis_type, training=True): """ Vectorized flattening of telescope array columns. @@ -34,51 +26,35 @@ def flatten_data_vectorized( Parameters ---------- df : pandas.DataFrame - Input DataFrame containing telescope data. If apply_pointing_corrections - is True, must also contain "fpointing_dx" and "fpointing_dy". + Input DataFrame containing telescope data. n_tel : int Number of telescopes to flatten for. - training_variables : list[str] + features : list[str] List of training variable names to flatten. - apply_pointing_corrections : bool, optional - If True, apply pointing offset corrections to cen_x and cen_y. - Set to True for inference, False for training. Default is False. - dtype : numpy.dtype, optional - Data type to cast flattened features to. If None, the main flattened - features are not explicitly cast, but extra derived columns are created - with dtype ``np.float32``. Use np.float32 for memory efficiency in inference. + analysis_type : str + Type of analysis (e.g., "stereo_analysis"). + training : bool, optional + If True, indicates training mode. Default is True. Returns ------- pandas.DataFrame Flattened DataFrame with per-telescope columns suffixed by ``_{i}`` - for telescope index ``i``, plus derived features (disp_x, disp_y, - loss_loss, loss_dist, width_length, size, E, ES, and optionally - pointing-corrected cen_x/cen_y), and extra columns (Xoff, - Xoff_intersect, Erec, EmissionHeight, etc.). + for telescope index ``i``, plus derived features, and array features. """ flat_features = {} - tel_list_col = "DispTelList_T" - - tel_list_matrix = _to_dense_array(df[tel_list_col]) - - for var_name in training_variables: - if var_name in df: - data_matrix = _to_dense_array(df[var_name]) - else: - _logger.debug( - "Training variable %s missing in input data; filling with NaN values.", - var_name, - ) - data_matrix = np.full((len(df), n_tel), np.nan) + tel_list_matrix = _to_dense_array(df["DispTelList_T"]) + n_evt = len(df) + for var in features: + data = _to_dense_array(df[var]) if var in df else np.full((n_evt, n_tel), np.nan) for i in range(n_tel): - col_name = f"{var_name}_{i}" + col_name = f"{var}_{i}" - if var_name.startswith("Disp"): + if var.startswith("Disp"): # Case 1: Simple index i - if i < data_matrix.shape[1]: - flat_features[col_name] = data_matrix[:, i] + if i < data.shape[1]: + flat_features[col_name] = data[:, i] else: flat_features[col_name] = np.full(len(df), np.nan) else: @@ -86,54 +62,14 @@ def flatten_data_vectorized( target_tel_indices = tel_list_matrix[:, i].astype(int) row_indices = np.arange(len(df)) - valid_mask = (target_tel_indices >= 0) & (target_tel_indices < data_matrix.shape[1]) + valid_mask = (target_tel_indices >= 0) & (target_tel_indices < data.shape[1]) result = np.full(len(df), np.nan) - result[valid_mask] = data_matrix[ - row_indices[valid_mask], target_tel_indices[valid_mask] - ] + result[valid_mask] = data[row_indices[valid_mask], target_tel_indices[valid_mask]] flat_features[col_name] = result - df_flat = pd.DataFrame(flat_features, index=df.index) - - if dtype is not None: - df_flat = df_flat.astype(dtype) - - new_cols = {} - for i in range(n_tel): - new_cols[f"disp_x_{i}"] = df_flat[f"Disp_T_{i}"] * df_flat[f"cosphi_{i}"] - new_cols[f"disp_y_{i}"] = df_flat[f"Disp_T_{i}"] * df_flat[f"sinphi_{i}"] - new_cols[f"loss_loss_{i}"] = df_flat[f"loss_{i}"] ** 2 - new_cols[f"loss_dist_{i}"] = df_flat[f"loss_{i}"] * df_flat[f"dist_{i}"] - new_cols[f"width_length_{i}"] = df_flat[f"width_{i}"] / (df_flat[f"length_{i}"] + 1e-6) - - df_flat[f"size_{i}"] = np.log10(np.clip(df_flat[f"size_{i}"], 1e-6, None)) - df_flat[f"E_{i}"] = np.log10(np.clip(df_flat[f"E_{i}"], 1e-6, None)) - df_flat[f"ES_{i}"] = np.log10(np.clip(df_flat[f"ES_{i}"], 1e-6, None)) - - if apply_pointing_corrections: - df_flat[f"cen_x_{i}"] = df_flat[f"cen_x_{i}"] + df_flat[f"fpointing_dx_{i}"] - df_flat[f"cen_y_{i}"] = df_flat[f"cen_y_{i}"] + df_flat[f"fpointing_dy_{i}"] - - df_flat = pd.concat([df_flat, pd.DataFrame(new_cols, index=df.index)], axis=1) - - cast_type = dtype if dtype is not None else np.float32 - extra_cols = pd.DataFrame( - { - "Xoff_weighted_bdt": df["Xoff"].astype(cast_type), - "Yoff_weighted_bdt": df["Yoff"].astype(cast_type), - "Xoff_intersect": df["Xoff_intersect"].astype(cast_type), - "Yoff_intersect": df["Yoff_intersect"].astype(cast_type), - "Diff_Xoff": (df["Xoff"] - df["Xoff_intersect"]).astype(cast_type), - "Diff_Yoff": (df["Yoff"] - df["Yoff_intersect"]).astype(cast_type), - "Erec": np.log10(np.clip(df["Erec"], 1e-6, None)).astype(cast_type), - "ErecS": np.log10(np.clip(df["ErecS"], 1e-6, None)).astype(cast_type), - "EmissionHeight": df["EmissionHeight"].astype(cast_type), - }, - index=df.index, - ) - - return pd.concat([df_flat, extra_cols], axis=1) + df_flat = flatten_telescope_variables(n_tel, flat_features, df.index) + return pd.concat([df_flat, extra_columns(df, analysis_type, training)], axis=1) def _to_padded_array(arrays): @@ -172,73 +108,299 @@ def _to_dense_array(col): return _to_padded_array(arrays) -def load_training_data(input_files, n_tel, max_events): +def flatten_feature_data(group_df, ntel, analysis_type, training): + """Get flattened features for a group of events with given telescope multiplicity.""" + df_flat = flatten_telescope_data_vectorized( + group_df, + ntel, + features.telescope_features(analysis_type), + analysis_type=analysis_type, + training=training, + ) + excluded_columns = set(features.target_features(analysis_type)) | set( + features.excluded_features(analysis_type, ntel) + ) + return df_flat.drop(columns=excluded_columns, errors="ignore") + + +def load_training_data( + input_files, + n_tel, + max_events, + analysis_type="stereo_analysis", + model_parameters=None, +): """ Load and flatten training data from the mscw file for the requested telescope multiplicity. Parameters ---------- input_files : list[str] - List of input mscw ROOT files. + List of input mscw files. n_tel : int Telescope multiplicity to filter on. max_events : int Maximum number of events to load. If <= 0, load all available events. + analysis_type : str, optional + Type of analysis: "stereo_analysis", "classification". + model_parameters : dict + Dictionary of model parameters. """ - _logger.info(f"\n--- Loading and Flattening Data for n_tel = {n_tel} ---") + _logger.info(f"--- Loading and Flattening Data for {analysis_type} for n_tel = {n_tel} ---") _logger.info( "Max events to process: " f"{max_events if max_events is not None and max_events > 0 else 'All available'}" ) - branch_list = ["MCxoff", "MCyoff", "MCe0", *xgb_all_training_variables()] - - dfs = [] - - if not input_files: - _logger.error("No input files provided.") - return pd.DataFrame() + branch_list = features.features(analysis_type, training=True) + _logger.info(f"Branch list: {branch_list}") + event_cut = event_cuts(analysis_type, n_tel, model_parameters) if max_events is not None and max_events > 0: max_events_per_file = max_events // len(input_files) else: max_events_per_file = None + dfs = [] for f in input_files: try: with uproot.open(f) as root_file: - if "data" in root_file: - _logger.info(f"Processing file: {f}") - tree = root_file["data"] - df = tree.arrays(branch_list, library="pd") - df = df[df["DispNImages"] == n_tel] - _logger.info(f"Number of events after n_tel filter: {len(df)}") - if max_events_per_file and len(df) > max_events_per_file: - df = df.sample(n=max_events_per_file, random_state=42) - if not df.empty: - dfs.append(df) - else: + if "data" not in root_file: _logger.warning(f"File: {f} does not contain a 'data' tree.") + continue + + _logger.info(f"Processing file: {f}") + tree = root_file["data"] + df = tree.arrays(branch_list, cut=event_cut, library="pd") + _logger.info(f"Number of events after event cut {event_cut}: {len(df)}") + if max_events_per_file and len(df) > max_events_per_file: + df = df.sample(n=max_events_per_file, random_state=42) + if not df.empty: + dfs.append(df) except Exception as e: - _logger.error(f"Error opening or reading file {f}: {e}") + raise FileNotFoundError(f"Error opening or reading file {f}: {e}") from e if len(dfs) == 0: - _logger.error("No data loaded from input files.") - return pd.DataFrame() + raise ValueError("No data loaded from input files.") data_tree = pd.concat(dfs, ignore_index=True) _logger.info(f"Total events for n_tel={n_tel}: {len(data_tree)}") - df_flat = flatten_data_vectorized( - data_tree, n_tel, xgb_per_telescope_training_variables(), apply_pointing_corrections=False + df_flat = flatten_telescope_data_vectorized( + data_tree, + n_tel, + features.telescope_features(analysis_type), + analysis_type, + training=True, ) - df_flat["MCxoff"] = data_tree["MCxoff"] - df_flat["MCyoff"] = data_tree["MCyoff"] - df_flat["MCe0"] = np.log10(data_tree["MCe0"]) + if analysis_type == "stereo_analysis": + df_flat["MCxoff"] = data_tree["MCxoff"] + df_flat["MCyoff"] = data_tree["MCyoff"] + df_flat["MCe0"] = np.log10(data_tree["MCe0"]) + elif analysis_type == "classification": + df_flat["ze_bin"] = zenith_in_bins( + 90.0 - data_tree["ArrayPointing_Elevation"], model_parameters.get("zenith_bins_deg", []) + ) - # Keep events even if some optional training variables are missing; only drop - # columns that are entirely NaN (e.g., missing branches like DispXoff_T). df_flat.dropna(axis=1, how="all", inplace=True) _logger.info(f"Final events for n_tel={n_tel} after cleanup: {len(df_flat)}") + print_variable_statistics(df_flat) + return df_flat + + +def apply_image_selection(df, selected_indices, analysis_type, training=False): + """ + Filter and pad telescope lists for selected indices. + + Parameters + ---------- + df : pandas.DataFrame + Input DataFrame containing telescope data. + selected_indices : list[int] or None + List of selected telescope indices. If None or all 4 telescopes + are selected, the DataFrame is returned unchanged. + analysis_type : str, optional + Type of analysis (e.g., "stereo_analysis") + training : bool, optional + If True, indicates training mode. Default is False. + + Returns + ------- + pandas.DataFrame + DataFrame with updated "DispTelList_T" and "DispNImages" columns, + and per-telescope variables padded to length 4 with NaN. + """ + if selected_indices is None or len(selected_indices) == 4: + return df + + selected_set = set(selected_indices) + + def calculate_intersection(tel_list): + return [tel_idx for tel_idx in tel_list if tel_idx in selected_set] + + df = df.copy() + df["DispTelList_T_new"] = df["DispTelList_T"].apply(calculate_intersection) + df["DispNImages_new"] = df["DispTelList_T_new"].apply(len) + + _logger.info( + f"\n{df[['DispNImages', 'DispTelList_T', 'DispNImages_new', 'DispTelList_T_new']].head(20).to_string()}" + ) + + df["DispTelList_T"] = df["DispTelList_T_new"] + df["DispNImages"] = df["DispNImages_new"] + df = df.drop(columns=["DispTelList_T_new", "DispNImages_new"]) + + pad_vars = features.telescope_features(analysis_type) + + for var_name in pad_vars: + if var_name in df.columns: + df[var_name] = df[var_name].apply(_pad_to_four) + + return df + + +def _pad_to_four(arr_like): + """Pad a per-telescope array-like to length 4 with NaN values.""" + if isinstance(arr_like, (list, np.ndarray)): + arr = np.asarray(arr_like, dtype=np.float32) + pad = max(0, 4 - arr.shape[0]) + if pad: + arr = np.pad(arr, (0, pad), mode="constant", constant_values=np.nan) + return arr + return arr_like + + +def event_cuts(analysis_type, n_tel, model_parameters=None): + """Event cut string for the given analysis type and telescope multiplicity.""" + event_cut = f"(DispNImages == {n_tel})" + + if analysis_type == "classification": + cuts = [ + "Erec > 0", + "MSCW > -2", + "MSCW < 2", + "MSCL > -2", + "MSCL < 5", + "EmissionHeight > 0", + "EmissionHeight < 50", + ] + if model_parameters is not None: + e_min, e_max = load_energy_range(model_parameters) + cuts += [f"Erec >= {e_min}", f"Erec <= {e_max}"] + event_cut += " & " + " & ".join(f"({c})" for c in cuts) + + return event_cut + + +def flatten_telescope_variables(n_tel, flat_features, index): + """Generate dataframe for telescope variables flattened for n_tel telescopes.""" + df_flat = pd.DataFrame(flat_features, index=index) + df_flat = df_flat.astype(np.float32) + + new_cols = {} + for i in range(n_tel): + if f"Disp_T_{i}" in df_flat: + new_cols[f"disp_x_{i}"] = df_flat[f"Disp_T_{i}"] * df_flat[f"cosphi_{i}"] + new_cols[f"disp_y_{i}"] = df_flat[f"Disp_T_{i}"] * df_flat[f"sinphi_{i}"] + new_cols[f"loss_loss_{i}"] = df_flat[f"loss_{i}"] ** 2 + new_cols[f"loss_dist_{i}"] = df_flat[f"loss_{i}"] * df_flat[f"dist_{i}"] + new_cols[f"width_length_{i}"] = df_flat[f"width_{i}"] / (df_flat[f"length_{i}"] + 1e-6) + + if f"size_{i}" in df_flat: + df_flat[f"size_{i}"] = np.log10(np.clip(df_flat[f"size_{i}"], 1e-6, None)) + if f"E_{i}" in df_flat: + df_flat[f"E_{i}"] = np.log10(np.clip(df_flat[f"E_{i}"], 1e-6, None)) + if f"ES_{i}" in df_flat: + df_flat[f"ES_{i}"] = np.log10(np.clip(df_flat[f"ES_{i}"], 1e-6, None)) + + # pointing corrections + if f"cen_x_{i}" in df_flat and f"fpointing_dx_{i}" in df_flat: + df_flat[f"cen_x_{i}"] = df_flat[f"cen_x_{i}"] + df_flat[f"fpointing_dx_{i}"] + if f"cen_y_{i}" in df_flat and f"fpointing_dy_{i}" in df_flat: + df_flat[f"cen_y_{i}"] = df_flat[f"cen_y_{i}"] + df_flat[f"fpointing_dy_{i}"] + df_flat = df_flat.drop(columns=[f"fpointing_dx_{i}", f"fpointing_dy_{i}"]) + + return pd.concat([df_flat, pd.DataFrame(new_cols, index=index)], axis=1) + + +def extra_columns(df, analysis_type, training): + """Add extra columns required for analysis type.""" + if analysis_type == "stereo_analysis": + return pd.DataFrame( + { + "Xoff_weighted_bdt": df["Xoff"].astype(np.float32), + "Yoff_weighted_bdt": df["Yoff"].astype(np.float32), + "Xoff_intersect": df["Xoff_intersect"].astype(np.float32), + "Yoff_intersect": df["Yoff_intersect"].astype(np.float32), + "Diff_Xoff": (df["Xoff"] - df["Xoff_intersect"]).astype(np.float32), + "Diff_Yoff": (df["Yoff"] - df["Yoff_intersect"]).astype(np.float32), + "Erec": np.log10(np.clip(df["Erec"], 1e-6, None)).astype(np.float32), + "ErecS": np.log10(np.clip(df["ErecS"], 1e-6, None)).astype(np.float32), + "EmissionHeight": df["EmissionHeight"].astype(np.float32), + }, + index=df.index, + ) + + if "classification" in analysis_type: + data = { + "MSCW": df["MSCW"].astype(np.float32), + "MSCL": df["MSCL"].astype(np.float32), + "EChi2S": np.log10(np.clip(df["EChi2S"], 1e-6, None)).astype(np.float32), + "EmissionHeight": df["EmissionHeight"].astype(np.float32), + "EmissionHeightChi2": np.log10(np.clip(df["EmissionHeightChi2"], 1e-6, None)).astype( + np.float32 + ), + } + if not training: + data["ze_bin"] = df["ze_bin"].astype(np.float32) + return pd.DataFrame(data, index=df.index) + + raise ValueError(f"Unknown analysis_type: {analysis_type}") + + +def zenith_in_bins(zenith_angles, bins): + """Apply zenith binning based on zenith angles and given bin edges.""" + if isinstance(bins[0], dict): + bins = [b["Ze_min"] for b in bins] + [bins[-1]["Ze_max"]] + bins = np.asarray(bins, dtype=float) + idx = np.clip(np.digitize(zenith_angles, bins) - 1, 0, len(bins) - 2) + return idx.astype(np.int32) + + +def energy_in_bins(df_chunk, bins): + """Apply energy binning based on reconstructed energy and given limits.""" + centers = np.array([(b["E_min"] + b["E_max"]) / 2 if b is not None else np.nan for b in bins]) + valid = (df_chunk["Erec"].to_numpy() > 0) & ~np.isnan(centers).all() + e_bin = np.full(len(df_chunk), -1, dtype=np.int32) + log_e = np.log10(df_chunk.loc[valid, "Erec"].to_numpy()) + distances = np.abs(log_e[:, None] - centers) + distances[:, np.isnan(centers)] = np.inf + + e_bin[valid] = np.argmin(distances, axis=1) + df_chunk["e_bin"] = e_bin + return df_chunk["e_bin"] + + +def print_variable_statistics(df): + """ + Print min, max, mean, and RMS for each variable in the DataFrame. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame containing variables loaded using branch_list. + """ + for col in df.columns: + data = df[col].dropna().to_numpy() + if data.size == 0: + print(f"{col}: No data") + continue + min_val = np.min(data) + max_val = np.max(data) + mean_val = np.mean(data) + rms_val = np.sqrt(np.mean(np.square(data))) + _logger.info( + f"{col:25s} min: {min_val:10.4g} max: {max_val:10.4g} mean: {mean_val:10.4g} rms: {rms_val:10.4g}" + ) diff --git a/src/eventdisplay_ml/evaluate.py b/src/eventdisplay_ml/evaluate.py index 1854f92..ae31224 100644 --- a/src/eventdisplay_ml/evaluate.py +++ b/src/eventdisplay_ml/evaluate.py @@ -7,28 +7,82 @@ import xgboost as xgb from sklearn.metrics import mean_absolute_error, mean_squared_error +from eventdisplay_ml.features import target_features + _logger = logging.getLogger(__name__) -def evaluate_model(model, x_test, y_test, df, x_cols, y_data, name): +def evaluation_efficiency(name, model, x_test, y_test): + """Calculate signal and background efficiency as a function of threshold.""" + y_pred_proba = model.predict_proba(x_test)[:, 1] + thresholds = np.linspace(0, 1, 101) + + n_signal = (y_test == 1).sum() + n_background = (y_test == 0).sum() + + eff_signal = [] + eff_background = [] + + for t in thresholds: + pred = y_pred_proba >= t + eff_signal.append(((pred) & (y_test == 1)).sum() / n_signal if n_signal else 0) + eff_background.append(((pred) & (y_test == 0)).sum() / n_background if n_background else 0) + _logger.info( + f"{name} Threshold: {t:.2f} | " + f"Signal Efficiency: {eff_signal[-1]:.4f} | " + f"Background Efficiency: {eff_background[-1]:.4f}" + ) + + return pd.DataFrame( + { + "threshold": thresholds, + "signal_efficiency": eff_signal, + "background_efficiency": eff_background, + } + ) + + +def evaluate_classification_model(model, x_test, y_test, df, x_cols, name): + """Evaluate the trained model on the test set and log performance metrics.""" + y_pred_proba = model.predict_proba(x_test)[:, 1] + y_pred = (y_pred_proba >= 0.5).astype(int) + + accuracy = (y_pred == y_test).mean() + _logger.info(f"XGBoost Classification Accuracy (Testing Set): {accuracy:.4f}") + + from sklearn.metrics import classification_report, confusion_matrix + + _logger.info(f"--- Confusion Matrix for {name} ---") + cm = confusion_matrix(y_test, y_pred) + _logger.info(f"\n{cm}") + + _logger.info(f"--- Classification Report for {name} ---") + report = classification_report(y_test, y_pred, digits=4) + _logger.info(f"\n{report}") + + feature_importance(model, x_cols, ["label"], name) + if name == "xgboost": + shap_feature_importance(model, x_test, ["label"]) + + +def evaluate_regression_model(model, x_test, y_test, df, x_cols, y_data, name): """Evaluate the trained model on the test set and log performance metrics.""" score = model.score(x_test, y_test) _logger.info(f"XGBoost Multi-Target R^2 Score (Testing Set): {score:.4f}") y_pred = model.predict(x_test) - mse_x = mean_squared_error(y_test["MCxoff"], y_pred[:, 0]) - mse_y = mean_squared_error(y_test["MCyoff"], y_pred[:, 1]) - _logger.info(f"{name} MSE (X_off): {mse_x:.4f}, MSE (Y_off): {mse_y:.4f}") - mae_x = mean_absolute_error(y_test["MCxoff"], y_pred[:, 0]) - mae_y = mean_absolute_error(y_test["MCyoff"], y_pred[:, 1]) - _logger.info(f"{name} MAE (X_off): {mae_x:.4f}") - _logger.info(f"{name} MAE (Y_off): {mae_y:.4f}") + mse = mean_squared_error(y_test, y_pred) + _logger.info(f"{name} Mean Squared Error (All targets): {mse:.4f}") + mae = mean_absolute_error(y_test, y_pred) + _logger.info(f"{name} Mean Absolute Error (All targets): {mae:.4f}") + target_variance(y_test, y_pred, y_data.columns) feature_importance(model, x_cols, y_data.columns, name) if name == "xgboost": shap_feature_importance(model, x_test, y_data.columns) + df_pred = pd.DataFrame(y_pred, columns=target_features("stereo_analysis")) calculate_resolution( - y_pred, + df_pred, y_test, df, percentiles=[68, 90, 95], @@ -39,15 +93,33 @@ def evaluate_model(model, x_test, y_test, df, x_cols, y_data, name): ) +def target_variance(y_test, y_pred, targets): + """Calculate and log variance explained per target.""" + y_test_np = y_test.to_numpy() if hasattr(y_test, "to_numpy") else y_test + + mse_values = np.mean((y_test_np - y_pred) ** 2, axis=0) + variance_values = np.var(y_test_np, axis=0) + + _logger.info("--- Performance Per Target ---") + for i, name in enumerate(targets): + # Fraction of variance unexplained (lower is better, 0.0 is perfect) + unexplained = mse_values[i] / variance_values[i] + + _logger.info( + f"Target: {name:12s} | MSE: {mse_values[i]:.6f} | " + f"Unexplained Variance: {unexplained:.2%}" + ) + + def calculate_resolution(y_pred, y_test, df, percentiles, log_e_min, log_e_max, n_bins, name): """Compute angular and energy resolution based on predictions.""" results_df = pd.DataFrame( { "MCxoff_true": y_test["MCxoff"].values, "MCyoff_true": y_test["MCyoff"].values, - "MCxoff_pred": y_pred[:, 0], - "MCyoff_pred": y_pred[:, 1], - "MCe0_pred": y_pred[:, 2], + "MCxoff_pred": y_pred["MCxoff"].values, + "MCyoff_pred": y_pred["MCyoff"].values, + "MCe0_pred": y_pred["MCe0"].values, "MCe0": df.loc[y_test.index, "MCe0"].values, } ) @@ -87,38 +159,59 @@ def percentile_series(col, p): def feature_importance(model, x_cols, target_names, name=None): - """Log feature importance from the trained XGBoost model.""" - _logger.info("--- XGBoost Multi-Regression Feature Importance ---") - for i, estimator in enumerate(model.estimators_): - target = target_names[i] - _logger.info(f"\n### {name} Importance for Target: **{target}**") + """Feature importance handling both MultiOutputRegressor and native Multi-target.""" + _logger.info("--- XGBoost Feature Importance ---") - importances = estimator.feature_importances_ - importance_df = pd.DataFrame({"Feature": x_cols, "Importance": importances}) + # Case 1: Scikit-Learn MultiOutputRegressor + if hasattr(model, "estimators_"): + for i, est in enumerate(model.estimators_): + target = target_names[i] if (target_names and i < len(target_names)) else f"target_{i}" + _log_importance_table(target, est.feature_importances_, x_cols, name) - importance_df = importance_df.sort_values(by="Importance", ascending=False) - _logger.info(f"\n{importance_df.head(15).to_markdown(index=False)}") + # Case 2: Native Multi-target OR Single-target Classifier + else: + importances = getattr(model, "feature_importances_", None) + + if importances is not None: + if target_names is not None and len(target_names) > 0: + # Convert to list to ensure .join works regardless of input type + target_str = ", ".join(map(str, target_names)) + else: + target_str = "Target" + + # Check if it's actually multi-target to set the log message + if target_names is not None and len(target_names) > 1: + _logger.info("Note: Native XGBoost multi-target provides JOINT importance.") + + _log_importance_table(target_str, importances, x_cols, name) + + +def _log_importance_table(target_label, values, x_cols, name): + """Format and log the importance dataframe for printing.""" + df = pd.DataFrame({"Feature": x_cols, "Importance": values}).sort_values( + "Importance", ascending=False + ) + _logger.info(f"### {name} Importance for: **{target_label}**") + _logger.info(f"\n{df.head(25).to_markdown(index=False)}") def shap_feature_importance(model, x_data, target_names, max_points=20000, n_top=25): - """Use XGBoost's builtin SHAP.""" - x_sample = x_data.sample(n=min(len(x_data), max_points), random_state=0) - for i, est in enumerate(model.estimators_): - target = target_names[i] - - # Builtin XGBoost SHAP values (n_samples, n_features+1) - # Last column is the bias term: drop it - shap_vals = est.get_booster().predict(xgb.DMatrix(x_sample), pred_contribs=True) - shap_vals = shap_vals[:, :-1] # drop bias column - - # Global importance: mean(|SHAP|) - imp = np.abs(shap_vals).mean(axis=0) + """Feature importance using SHAP values for native multi-target XGBoost.""" + x_sample = x_data.sample(n=min(len(x_data), max_points), random_state=42) + n_features = len(x_data.columns) + n_targets = len(target_names) + + dmatrix = xgb.DMatrix(x_sample) + shap_vals = model.get_booster().predict(dmatrix, pred_contribs=True) + shap_vals = shap_vals.reshape(len(x_sample), n_targets, n_features + 1) + + for i, target in enumerate(target_names): + target_shap = shap_vals[:, i, :-1] + + imp = np.abs(target_shap).mean(axis=0) idx = np.argsort(imp)[::-1] - n_features = len(x_data.columns) - _logger.info(f"\n=== Builtin XGBoost SHAP Importance for {target} ===") + _logger.info(f"=== SHAP Importance for {target} ===") for j in idx[:n_top]: - # Guard against mismatches between SHAP array length and feature columns - if j >= n_features: - continue - _logger.info(f"{x_data.columns[j]:25s} {imp[j]:.6e}") + if j < n_features: + _logger.info(f"{x_data.columns[j]:25s} {imp[j]:.6e}") diff --git a/src/eventdisplay_ml/features.py b/src/eventdisplay_ml/features.py new file mode 100644 index 0000000..94089c2 --- /dev/null +++ b/src/eventdisplay_ml/features.py @@ -0,0 +1,155 @@ +"""Features used for XGB training and prediction.""" + + +def target_features(analysis_type): + """ + Get target features based on analysis type. + + Parameters + ---------- + analysis_type : str + Type of analysis. + + Returns + ------- + list + List of target feature names. + """ + if analysis_type == "stereo_analysis": + return ["MCxoff", "MCyoff", "MCe0"] # sequence matters + if "classification" in analysis_type: + return [] + raise ValueError(f"Unknown analysis type: {analysis_type}") + + +def excluded_features(analysis_type, ntel): + """ + Features not to be used for training/prediction. + + Parameters + ---------- + analysis_type : str + Type of analysis. + ntel : int + Number of telescopes. + + Returns + ------- + set + Set of excluded feature names. + """ + if analysis_type == "stereo_analysis": + return { + *[f"fpointing_dx_{i}" for i in range(ntel)], + *[f"fpointing_dy_{i}" for i in range(ntel)], + } + if "classification" in analysis_type: + return { + "Erec", + *[f"cen_x_{i}" for i in range(ntel)], + *[f"cen_y_{i}" for i in range(ntel)], + *[f"size_{i}" for i in range(ntel)], + *[f"E_{i}" for i in range(ntel)], + *[f"ES_{i}" for i in range(ntel)], + *[f"fpointing_dx_{i}" for i in range(ntel)], + *[f"fpointing_dy_{i}" for i in range(ntel)], + } + raise ValueError(f"Unknown analysis type: {analysis_type}") + + +def telescope_features(analysis_type): + """ + Telescope-type features. + + Disp variables with different indexing logic in data preparation. + """ + var = [ + "cosphi", + "sinphi", + "loss", + "dist", + "width", + "length", + "asym", + "tgrad_x", + "R_core", + "fpointing_dx", + "fpointing_dy", + ] + if analysis_type == "classification": + return var + + return [ + *var, + "size", + "cen_x", + "cen_y", + "E", + "ES", + "Disp_T", + "DispXoff_T", + "DispYoff_T", + "DispWoff_T", + ] + + +def _regression_features(training): + """Regression features.""" + var = [ + *telescope_features("stereo_analysis"), + "DispNImages", + "DispTelList_T", + "Xoff", + "Yoff", + "Xoff_intersect", + "Yoff_intersect", + "Erec", + "ErecS", + "EmissionHeight", + ] + if training: + return [*target_features("stereo_analysis"), *var] + return var + + +def _classification_features(training): + """Classification features.""" + var_tel = telescope_features("classification") + var_array = [ + "DispNImages", + "DispTelList_T", + "EChi2S", + "EmissionHeight", + "EmissionHeightChi2", + "MSCW", + "MSCL", + "ArrayPointing_Elevation", + ] + if training: + return var_tel + var_array + # energy used to bin the models, but not as feature + return var_tel + var_array + ["Erec"] + + +def features(analysis_type, training=True): + """ + Get features based on analysis type. + + Parameters + ---------- + analysis_type : str + Type of analysis. + training : bool, optional + If True (default), return training features. If False, return + all features including target features. + + Returns + ------- + list + List of feature names. + """ + if analysis_type == "stereo_analysis": + return _regression_features(training) + if "classification" in analysis_type: + return _classification_features(training) + raise ValueError(f"Unknown analysis type: {analysis_type}") diff --git a/src/eventdisplay_ml/hyper_parameters.py b/src/eventdisplay_ml/hyper_parameters.py new file mode 100644 index 0000000..265df39 --- /dev/null +++ b/src/eventdisplay_ml/hyper_parameters.py @@ -0,0 +1,59 @@ +"""Hyperparameter for classification and regression models.""" + +import json +import logging + +_logger = logging.getLogger(__name__) + + +XGB_REGRESSION_HYPERPARAMETERS = { + "xgboost": { + "n_estimators": 1000, + "learning_rate": 0.1, # Shrinkage + "max_depth": 5, + "min_child_weight": 1.0, # Equivalent to MinNodeSize=1.0% for XGBoost + "objective": "reg:squarederror", + "n_jobs": 4, + "random_state": None, + "tree_method": "hist", + "subsample": 0.7, # Default sensible value + "colsample_bytree": 0.7, # Default sensible value + } +} + +XGB_CLASSIFICATION_HYPERPARAMETERS = { + "xgboost": { + "objective": "binary:logistic", + "eval_metric": "logloss", # TODO AUC ? + "n_estimators": 100, # TODO probably too low + "max_depth": 6, + "learning_rate": 0.1, + "subsample": 0.8, + "colsample_bytree": 0.8, + "random_state": None, + } +} + + +def regression_hyperparameters(config_file=None): + """Get hyperparameters for XGBoost regression model.""" + if config_file: + return _load_hyper_parameters_from_file(config_file) + _logger.info(f"Default hyperparameters: {XGB_REGRESSION_HYPERPARAMETERS}") + return XGB_REGRESSION_HYPERPARAMETERS + + +def classification_hyperparameters(config_file=None): + """Get hyperparameters for XGBoost classification model.""" + if config_file: + return _load_hyper_parameters_from_file(config_file) + _logger.info(f"Default hyperparameters: {XGB_CLASSIFICATION_HYPERPARAMETERS}") + return XGB_CLASSIFICATION_HYPERPARAMETERS + + +def _load_hyper_parameters_from_file(config_file): + """Load hyperparameters from a JSON file.""" + with open(config_file) as f: + hyperparameters = json.load(f) + _logger.info(f"Loaded hyperparameters from {config_file}: {hyperparameters}") + return hyperparameters diff --git a/src/eventdisplay_ml/models.py b/src/eventdisplay_ml/models.py new file mode 100644 index 0000000..8041826 --- /dev/null +++ b/src/eventdisplay_ml/models.py @@ -0,0 +1,359 @@ +"""Apply models for regression and classification tasks.""" + +import logging +import re +from pathlib import Path + +import joblib +import numpy as np +import uproot + +from eventdisplay_ml import features +from eventdisplay_ml.data_processing import ( + apply_image_selection, + energy_in_bins, + flatten_feature_data, + zenith_in_bins, +) +from eventdisplay_ml.utils import parse_image_selection + +_logger = logging.getLogger(__name__) + + +def load_models(analysis_type, model_prefix): + """ + Load XGBoost models based on analysis type. + + Parameters + ---------- + analysis_type : str + Type of analysis ("stereo_analysis" or "classification"). + model_prefix : str + Prefix path to the trained model files. + + Returns + ------- + dict + A dictionary of loaded models. + dict, optional + A dictionary of model parameters (only for classification). + """ + if analysis_type == "stereo_analysis": + return load_regression_models(model_prefix) + if analysis_type == "classification": + return load_classification_models(model_prefix) + raise ValueError(f"Unknown analysis_type: {analysis_type}") + + +def load_classification_models(model_prefix): + """ + Load XGBoost classification models for different telescope multiplicities from a directory. + + Parameters + ---------- + model_prefix : str + Prefix path to the trained model files. Models are expected to be named + ``{model_prefix}_ntel{n_tel}_bin{e_bin}.joblib``. + + Returns + ------- + dict, dict + A dictionary mapping the number of telescopes (n_tel) and energy bin + to the corresponding loaded model objects. Also returns a dictionary + of model parameters. + """ + model_prefix = Path(model_prefix) + model_dir_path = Path(model_prefix.parent) + + models = {} + par = {} + for n_tel in range(2, 5): + pattern = f"{model_prefix.name}_ntel{n_tel}_bin*.joblib" + for file in sorted(model_dir_path.glob(pattern)): + match = re.search(r"_bin(\d+)\.joblib$", file.name) + if not match: + _logger.warning(f"Could not extract energy bin from filename: {file.name}") + continue + e_bin = int(match.group(1)) + _logger.info(f"Loading model: {file}") + model_data = joblib.load(file) + models.setdefault(n_tel, {})[e_bin] = model_data["model"] + par = _update_parameters(par, model_data.get("parameters", {}), e_bin) + + _logger.info(f"Loaded classification model parameters: {par}") + return models, par + + +def _update_parameters(full_params, single_bin_params, e_bin_number): + """Merge a single-bin model parameters into the full parameters dict.""" + energy_bin = single_bin_params["energy_bins_log10_tev"] + zenith_bins = single_bin_params["zenith_bins_deg"] + + if "energy_bins_log10_tev" not in full_params: + full_params["energy_bins_log10_tev"] = [] + full_params["zenith_bins_deg"] = zenith_bins + + while len(full_params["energy_bins_log10_tev"]) <= e_bin_number: + full_params["energy_bins_log10_tev"].append(None) + + full_params["energy_bins_log10_tev"][e_bin_number] = energy_bin + if full_params.get("zenith_bins_deg") != zenith_bins: + raise ValueError(f"Inconsistent zenith_bins_deg for energy bin {e_bin_number}") + + return full_params + + +def load_regression_models(model_prefix): + """ + Load XGBoost models for different telescope multiplicities from a directory. + + Parameters + ---------- + model_prefix : str + Prefix path to the trained model files. Models are expected to be named + ``{model_prefix}_ntel{n_tel}_xgboost.joblib``. + + Returns + ------- + dict[int, Any] + A dictionary mapping the number of telescopes (n_tel) to the + corresponding loaded model objects. Only models whose files + exist in ``model_dir`` are included. + """ + model_prefix = Path(model_prefix) + model_dir_path = Path(model_prefix.parent) + + models = {} + for n_tel in range(2, 5): + model_filename = model_dir_path / f"{model_prefix.name}_ntel{n_tel}.joblib" + if model_filename.exists(): + _logger.info(f"Loading model: {model_filename}") + model_data = joblib.load(model_filename) + models[n_tel] = model_data["model"] + else: + _logger.warning(f"Model not found: {model_filename}") + return models + + +def apply_regression_models(df, models): + """ + Apply trained XGBoost models for stereo analysis to a DataFrame chunk. + + Parameters + ---------- + df : pandas.DataFrame + Chunk of events to process. + models : dict + Preloaded models dictionary. + + Returns + ------- + pred_xoff : numpy.ndarray + Array of predicted Xoff values for each event in the chunk. + pred_yoff : numpy.ndarray + Array of predicted Yoff values for each event in the chunk. + pred_erec : numpy.ndarray + Array of predicted Erec values for each event in the chunk. + """ + preds = np.full((len(df), 3), np.nan, dtype=np.float32) + + grouped = df.groupby("DispNImages") + + for n_tel, group_df in grouped: + n_tel = int(n_tel) + if n_tel < 2 or n_tel not in models: + _logger.warning(f"No model for n_tel={n_tel}") + continue + + _logger.info(f"Processing {len(group_df)} events with n_tel={n_tel}") + + x_features = flatten_feature_data( + group_df, n_tel, analysis_type="stereo_analysis", training=False + ) + preds[group_df.index] = models[n_tel].predict(x_features) + + return preds[:, 0], preds[:, 1], preds[:, 2] + + +def apply_classification_models(df, models): + """ + Apply trained XGBoost classification models to a DataFrame chunk. + + Parameters + ---------- + df : pandas.DataFrame + Chunk of events to process. + models: dict + Preloaded models dictionary + + Returns + ------- + class_probability : numpy.ndarray + Array of predicted class probabilities for each event in the chunk, aligned + with the index of ``df``. + """ + class_probability = np.full(len(df), np.nan, dtype=np.float32) + + # 1. Group by Number of Images (n_tel) + for n_tel, group_ntel_df in df.groupby("DispNImages"): + n_tel = int(n_tel) + if n_tel < 2 or n_tel not in models: + _logger.warning(f"No model for n_tel={n_tel}") + continue + + # 2. Group by Energy Bin (e_bin) + for e_bin, group_df in group_ntel_df.groupby("e_bin"): + e_bin = int(e_bin) + if e_bin == -1: + _logger.warning("Skipping events with e_bin = -1") + continue + if e_bin not in models[n_tel]: + _logger.warning(f"No model for n_tel={n_tel}, e_bin={e_bin}") + continue + + _logger.info(f"Processing {len(group_df)} events: n_tel={n_tel}, bin={e_bin}") + + x_features = flatten_feature_data( + group_df, n_tel, analysis_type="classification", training=False + ) + class_probability[group_df.index] = models[n_tel][e_bin].predict_proba(x_features)[:, 1] + + return class_probability + + +def process_file_chunked( + analysis_type, + input_file, + output_file, + models, + image_selection, + model_parameters=None, + max_events=None, + chunk_size=500000, +): + """ + Stream events from an input file in chunks, apply XGBoost models, write events. + + Parameters + ---------- + input_file : str + Path to the input file containing a "data" TTree. + output_file : str + Path to the output file to create. + models : dict + Dictionary of loaded XGBoost models for regression. + image_selection : str + String specifying which telescope indices to select. + model_parameters : dict, optional + Dictionary of model parameters. + max_events : int, optional + Maximum number of events to process. + chunk_size : int, optional + Number of events to read and process per chunk. + """ + branch_list = features.features(analysis_type, training=False) + _logger.info(f"Using branches: {branch_list}") + selected_indices = parse_image_selection(image_selection) + + _logger.info(f"Chunk size: {chunk_size}") + if max_events: + _logger.info(f"Maximum events to process: {max_events}") + + with uproot.recreate(output_file) as root_file: + tree = _output_tree(analysis_type, root_file) + total_processed = 0 + + for df_chunk in uproot.iterate( + f"{input_file}:data", + branch_list, + library="pd", + step_size=chunk_size, + ): + if df_chunk.empty: + continue + + df_chunk = apply_image_selection(df_chunk, selected_indices, analysis_type) + if df_chunk.empty: + continue + if max_events is not None and total_processed >= max_events: + break + + # Reset index to local chunk indices (0, 1, 2, ...) to avoid + # index out-of-bounds when indexing chunk-sized output arrays + df_chunk = df_chunk.reset_index(drop=True) + if analysis_type == "classification": + df_chunk["e_bin"] = energy_in_bins( + df_chunk, model_parameters["energy_bins_log10_tev"] + ) + df_chunk["ze_bin"] = zenith_in_bins( + 90.0 - df_chunk["ArrayPointing_Elevation"].values, + model_parameters["zenith_bins_deg"], + ) + + _apply_model(analysis_type, df_chunk, models, tree) + + total_processed += len(df_chunk) + _logger.info(f"Processed {total_processed} events so far") + + _logger.info(f"Total processed events written: {total_processed}") + + +def _output_tree(analysis_type, root_file): + """ + Generate output tree structure for the given analysis type. + + Parameters + ---------- + analysis_type : str + Type of analysis (e.g., "stereo_analysis") + root_file : uproot.writing.WritingFile + Uproot file object to create the tree in. + + Returns + ------- + uproot.writing.WritingTTree + Output tree. + """ + if analysis_type == "stereo_analysis": + return root_file.mktree( + "StereoAnalysis", + {"Dir_Xoff": np.float32, "Dir_Yoff": np.float32, "Dir_Erec": np.float32}, + ) + if analysis_type == "classification": + return root_file.mktree("Classification", {"IsGamma": np.float32}) + raise ValueError(f"Unknown analysis_type: {analysis_type}") + + +def _apply_model(analysis_type, df_chunk, models, tree): + """ + Apply models to the data chunk. + + Parameters + ---------- + analysis_type : str + Type of analysis (e.g., "stereo_analysis") + df_chunk : pandas.DataFrame + Data chunk to process. + models : dict + Dictionary of loaded XGBoost models. + tree : uproot.writing.WritingTTree + Output tree to write results to. + """ + if analysis_type == "stereo_analysis": + pred_xoff, pred_yoff, pred_erec = apply_regression_models(df_chunk, models) + tree.extend( + { + "Dir_Xoff": np.asarray(pred_xoff, dtype=np.float32), + "Dir_Yoff": np.asarray(pred_yoff, dtype=np.float32), + "Dir_Erec": np.power(10.0, pred_erec, dtype=np.float32), + } + ) + elif analysis_type == "classification": + pred_proba = apply_classification_models(df_chunk, models) + tree.extend( + { + "IsGamma": np.asarray(pred_proba, dtype=np.float32), + } + ) + else: + raise ValueError(f"Unknown analysis_type: {analysis_type}") diff --git a/src/eventdisplay_ml/scripts/apply_xgb_classify.py b/src/eventdisplay_ml/scripts/apply_xgb_classify.py new file mode 100644 index 0000000..a088cd6 --- /dev/null +++ b/src/eventdisplay_ml/scripts/apply_xgb_classify.py @@ -0,0 +1,90 @@ +""" +Apply XGBoost classification model. + +Applies trained XGBoost classification models to input data and outputs +for each event the predicted signal probability. + +Takes into account telescope multiplicity and training in energy bins. + +""" + +import argparse +import logging + +from eventdisplay_ml.models import load_models, process_file_chunked + +logging.basicConfig(level=logging.INFO) +_logger = logging.getLogger(__name__) + + +def main(): + """Apply XGBoost classification.""" + parser = argparse.ArgumentParser(description=("Apply XGBoost Classification")) + parser.add_argument( + "--input_file", + required=True, + metavar="INPUT.root", + help="Path to input mscw file", + ) + parser.add_argument( + "--model_prefix", + required=True, + metavar="MODEL_PREFIX", + help=( + "Path to directory containing XGBoost classification models " + "(without n_tel and energy bin suffix)." + ), + ) + parser.add_argument( + "--output_file", + required=True, + metavar="OUTPUT.root", + help="Output file path for predictions", + ) + parser.add_argument( + "--image_selection", + type=str, + default="15", + help=( + "Optional telescope selection. Can be bit-coded (e.g., 14 for telescopes 1,2,3) " + "or comma-separated indices (e.g., '1,2,3'). " + "Keeps events with all selected telescopes or 4-telescope events. " + "Default is 15, which selects all 4 telescopes." + ), + ) + parser.add_argument( + "--max_events", + type=int, + default=None, + help="Maximum number of events to process (default: all events)", + ) + parser.add_argument( + "--chunk_size", + type=int, + default=500000, + help="Number of events to process per chunk (default: 500000)", + ) + args = parser.parse_args() + + _logger.info("--- XGBoost Classification Evaluation ---") + _logger.info(f"Input file: {args.input_file}") + _logger.info(f"Model prefix: {args.model_prefix}") + _logger.info(f"Output file: {args.output_file}") + _logger.info(f"Image selection: {args.image_selection}") + + models, model_par = load_models("classification", args.model_prefix) + + process_file_chunked( + analysis_type="classification", + input_file=args.input_file, + output_file=args.output_file, + models=models, + model_parameters=model_par, + image_selection=args.image_selection, + max_events=args.max_events, + chunk_size=args.chunk_size, + ) + + +if __name__ == "__main__": + main() diff --git a/src/eventdisplay_ml/scripts/apply_xgb_stereo.py b/src/eventdisplay_ml/scripts/apply_xgb_stereo.py index 238c9b9..6d09869 100644 --- a/src/eventdisplay_ml/scripts/apply_xgb_stereo.py +++ b/src/eventdisplay_ml/scripts/apply_xgb_stereo.py @@ -1,5 +1,5 @@ """ -Evaluate XGBoost BDTs for stereo reconstruction (direction, energy). +Apply XGBoost BDTs stereo reconstruction (direction, energy). Applies trained XGBoost models to predict Xoff, Yoff, and energy for each event from an input mscw file. The output ROOT file contains @@ -8,315 +8,36 @@ import argparse import logging -from pathlib import Path -import joblib -import numpy as np -import uproot - -from eventdisplay_ml.data_processing import flatten_data_vectorized -from eventdisplay_ml.training_variables import ( - xgb_all_training_variables, - xgb_per_telescope_training_variables, -) -from eventdisplay_ml.utils import parse_image_selection +from eventdisplay_ml.models import load_models, process_file_chunked logging.basicConfig(level=logging.INFO) _logger = logging.getLogger(__name__) -def apply_image_selection(df, selected_indices): - """ - Filter and pad telescope lists for selected indices. - - Parameters - ---------- - df : pandas.DataFrame - Input DataFrame containing telescope data. - selected_indices : list[int] or None - List of selected telescope indices. If None or all 4 telescopes - are selected, the DataFrame is returned unchanged. - - Returns - ------- - pandas.DataFrame - DataFrame with updated "DispTelList_T" and "DispNImages" columns, - and per-telescope variables padded to length 4 with NaN. - """ - if selected_indices is None or len(selected_indices) == 4: - return df - - selected_set = set(selected_indices) - - def calculate_intersection(tel_list): - return [tel_idx for tel_idx in tel_list if tel_idx in selected_set] - - df = df.copy() - df["DispTelList_T_new"] = df["DispTelList_T"].apply(calculate_intersection) - df["DispNImages_new"] = df["DispTelList_T_new"].apply(len) - - _logger.info( - f"\n{df[['DispNImages', 'DispTelList_T', 'DispNImages_new', 'DispTelList_T_new']].head(20).to_string()}" - ) - - df["DispTelList_T"] = df["DispTelList_T_new"] - df["DispNImages"] = df["DispNImages_new"] - df = df.drop(columns=["DispTelList_T_new", "DispNImages_new"]) - - pad_vars = [ - *xgb_per_telescope_training_variables(), - "fpointing_dx", - "fpointing_dy", - ] - for var_name in pad_vars: - if var_name in df.columns: - df[var_name] = df[var_name].apply(_pad_to_four) - - return df - - -def _pad_to_four(arr_like): - """Pad a per-telescope array-like to length 4 with NaN values.""" - if isinstance(arr_like, (list, np.ndarray)): - arr = np.asarray(arr_like, dtype=np.float32) - pad = max(0, 4 - arr.shape[0]) - if pad: - arr = np.pad(arr, (0, pad), mode="constant", constant_values=np.nan) - return arr - return arr_like - - -def load_models(model_dir): - """ - Load XGBoost models for different telescope multiplicities from a directory. - - Parameters - ---------- - model_dir : str - Path to the directory containing the trained model files - named ``dispdir_bdt_ntel{n_tel}_xgboost.joblib``. - - Returns - ------- - dict[int, Any] - A dictionary mapping the number of telescopes (n_tel) to the - corresponding loaded model objects. Only models whose files - exist in ``model_dir`` are included. - """ - models = {} - model_dir_path = Path(model_dir) - for n_tel in range(2, 5): - model_filename = model_dir_path / f"dispdir_bdt_ntel{n_tel}_xgboost.joblib" - if model_filename.exists(): - _logger.info(f"Loading model: {model_filename}") - models[n_tel] = joblib.load(model_filename) - else: - _logger.warning(f"Model not found: {model_filename}") - return models - - -def apply_models(df, models_or_dir, selection_mask=None): - """ - Apply trained XGBoost models to a DataFrame chunk. - - Parameters - ---------- - df : pandas.DataFrame - Chunk of events to process. - models_or_dir : dict[int, Any] or str - Either a preloaded models dictionary (as returned by :func:`load_models`) - or a path to a model directory. If a string is provided, models are - loaded on the fly to satisfy test expectations. - selection_mask : pandas.Series or None - Optional mask; False entries are marked with -999 in outputs. - - Returns - ------- - pred_xoff : numpy.ndarray - Array of predicted Xoff values for each event in the chunk, aligned - with the index of ``df``. - pred_yoff : numpy.ndarray - Array of predicted Yoff values for each event in the chunk, aligned - with the index of ``df``. - pred_erec : numpy.ndarray - Array of predicted Erec values for each event in the chunk, aligned - with the index of ``df``. - """ - n_events = len(df) - pred_xoff = np.full(n_events, np.nan, dtype=np.float32) - pred_yoff = np.full(n_events, np.nan, dtype=np.float32) - pred_erec = np.full(n_events, np.nan, dtype=np.float32) - if isinstance(models_or_dir, str): - models = load_models(models_or_dir) - else: - models = models_or_dir - - grouped = df.groupby("DispNImages") - - for n_tel, group_df in grouped: - n_tel = int(n_tel) - if int(n_tel) < 2: - continue - if n_tel not in models: - _logger.warning( - f"No model available for n_tel={n_tel}, skipping {len(group_df)} events" - ) - continue - - _logger.info(f"Processing {len(group_df)} events with n_tel={n_tel}") - - training_vars_with_pointing = [ - *xgb_per_telescope_training_variables(), - "fpointing_dx", - "fpointing_dy", - ] - df_flat = flatten_data_vectorized( - group_df, - n_tel, - training_vars_with_pointing, - apply_pointing_corrections=True, - dtype=np.float32, - ) - - excluded_columns = ["MCxoff", "MCyoff", "MCe0"] - for n in range(n_tel): - excluded_columns.append(f"fpointing_dx_{n}") - excluded_columns.append(f"fpointing_dy_{n}") - - feature_cols = [col for col in df_flat.columns if col not in excluded_columns] - x_features = df_flat[feature_cols] - - model = models[n_tel] - predictions = model.predict(x_features) - - for i, idx in enumerate(group_df.index): - pred_xoff[idx] = predictions[i, 0] - pred_yoff[idx] = predictions[i, 1] - pred_erec[idx] = predictions[i, 2] - - if selection_mask is not None: - pred_xoff = np.where(selection_mask, pred_xoff, -999.0) - pred_yoff = np.where(selection_mask, pred_yoff, -999.0) - pred_erec = np.where(selection_mask, pred_erec, -999.0) - - return pred_xoff, pred_yoff, pred_erec - - -def process_file_chunked( - input_file, - model_dir, - output_file, - image_selection, - max_events=None, - chunk_size=500000, -): - """ - Stream events from an input ROOT file in chunks, apply XGBoost models, write events. - - Parameters - ---------- - input_file : str - Path to the input ROOT file containing a "data" TTree. - model_dir : str - Directory containing the trained XGBoost model files named - ``dispdir_bdt_ntel{n_tel}_xgboost.joblib`` for different telescope - multiplicities. - output_file : str - Path to the output ROOT file to create. - image_selection : str - String specifying which telescope indices to select, passed to - :func:`parse_image_selection` to obtain the corresponding indices - used by :func:`apply_image_selection`. - max_events : int, optional - Maximum number of events to process. If None (default), all - available events in the input file are processed. - chunk_size : int, optional - Number of events to read and process per chunk. Larger values reduce - I/O overhead but increase memory usage. Default is 500000. - - Returns - ------- - None - This function writes results directly to ``output_file`` and does not - return a value. - """ - models = load_models(model_dir) - branch_list = [*xgb_all_training_variables(), "fpointing_dx", "fpointing_dy"] - selected_indices = parse_image_selection(image_selection) - - _logger.info(f"Chunk size: {chunk_size}") - if max_events: - _logger.info(f"Maximum events to process: {max_events}") - - with uproot.recreate(output_file) as root_file: - tree = root_file.mktree( - "StereoAnalysis", - {"Dir_Xoff": np.float32, "Dir_Yoff": np.float32, "Dir_Erec": np.float32}, - ) - - total_processed = 0 - - for df_chunk in uproot.iterate( - f"{input_file}:data", - branch_list, - library="pd", - step_size=chunk_size, - ): - if df_chunk.empty: - continue - - df_chunk = apply_image_selection(df_chunk, selected_indices) - if df_chunk.empty: - continue - - if max_events is not None and total_processed >= max_events: - break - - # Reset index to local chunk indices (0, 1, 2, ...) to avoid - # index out-of-bounds when indexing chunk-sized output arrays - df_chunk = df_chunk.reset_index(drop=True) - - pred_xoff, pred_yoff, pred_erec = apply_models(df_chunk, models) - - tree.extend( - { - "Dir_Xoff": np.asarray(pred_xoff, dtype=np.float32), - "Dir_Yoff": np.asarray(pred_yoff, dtype=np.float32), - "Dir_Erec": np.power(10.0, pred_erec, dtype=np.float32), - } - ) - - total_processed += len(df_chunk) - _logger.info(f"Processed {total_processed} events so far") - - _logger.info(f"Streaming complete. Total processed events written: {total_processed}") - - def main(): - """Parse CLI arguments and run inference on an input ROOT file.""" - parser = argparse.ArgumentParser( - description=("Apply XGBoost Multi-Target BDTs for Stereo Reconstruction") - ) + """Apply XGBoost stereo models.""" + parser = argparse.ArgumentParser(description=("Apply XGBoost Stereo Reconstruction")) parser.add_argument( - "--input-file", + "--input_file", required=True, metavar="INPUT.root", - help="Path to input mscw ROOT file", + help="Path to input mscw file", ) parser.add_argument( - "--model-dir", + "--model_prefix", required=True, - metavar="MODEL_DIR", - help="Directory containing XGBoost models", + metavar="MODEL_PREFIX", + help=("Path to directory containing XGBoost regression models (without n_tel suffix)."), ) parser.add_argument( - "--output-file", + "--output_file", required=True, metavar="OUTPUT.root", - help="Output ROOT file path for predictions", + help="Output file path for predictions", ) parser.add_argument( - "--image-selection", + "--image_selection", type=str, default="15", help=( @@ -327,29 +48,30 @@ def main(): ), ) parser.add_argument( - "--max-events", + "--max_events", type=int, default=None, help="Maximum number of events to process (default: all events)", ) parser.add_argument( - "--chunk-size", + "--chunk_size", type=int, default=500000, help="Number of events to process per chunk (default: 500000)", ) args = parser.parse_args() - _logger.info("--- XGBoost Multi-Target Stereo Analysis Evaluation ---") + _logger.info("--- XGBoost Stereo Analysis Evaluation ---") _logger.info(f"Input file: {args.input_file}") - _logger.info(f"Model directory: {args.model_dir}") + _logger.info(f"Model prefix: {args.model_prefix}") _logger.info(f"Output file: {args.output_file}") _logger.info(f"Image selection: {args.image_selection}") process_file_chunked( + analysis_type="stereo_analysis", input_file=args.input_file, - model_dir=args.model_dir, output_file=args.output_file, + models=load_models("stereo_analysis", args.model_prefix), image_selection=args.image_selection, max_events=args.max_events, chunk_size=args.chunk_size, diff --git a/src/eventdisplay_ml/scripts/train_xgb_classify.py b/src/eventdisplay_ml/scripts/train_xgb_classify.py new file mode 100644 index 0000000..feb95a4 --- /dev/null +++ b/src/eventdisplay_ml/scripts/train_xgb_classify.py @@ -0,0 +1,179 @@ +""" +Train XGBBoost models for gamma/hadron classification. + +Uses image and stereo parameters to train classification BDTs to separate +gamma-ray events from hadronic background events. + +Separate BDTs are trained for 2, 3, and 4 telescope multiplicity events. +""" + +import argparse +import logging + +import pandas as pd +import xgboost as xgb +from joblib import dump +from sklearn.model_selection import train_test_split + +from eventdisplay_ml import hyper_parameters, utils +from eventdisplay_ml.data_processing import load_training_data +from eventdisplay_ml.evaluate import ( + evaluate_classification_model, + evaluation_efficiency, +) + +logging.basicConfig(level=logging.INFO) +_logger = logging.getLogger(__name__) + + +def train( + df, + n_tel, + model_prefix, + train_test_fraction, + model_parameters, + energy_bin_number, + hyperparameter_config, +): + """ + Train a single XGBoost model for gamma/hadron classification. + + Parameters + ---------- + df : list of pd.DataFrame + List containing signal and background DataFrames. + n_tel : int + Telescope multiplicity. + model_prefix : str + Directory to save the trained model. + train_test_fraction : float + Fraction of data to use for training. + model_parameters : dict, + Dictionary of model parameters. + energy_bin_number : int + Energy bin number (for naming the output model). + hyperparameter_config : str, optional + Path to JSON file with hyperparameter configuration, by default None. + """ + if df[0].empty or df[1].empty: + _logger.warning(f"Skip training for n_tel={n_tel} due to empty signal / background data.") + return + + df[0]["label"] = 1 + df[1]["label"] = 0 + full_df = pd.concat([df[0], df[1]], ignore_index=True) + x_data = full_df.drop(columns=["label"]) + _logger.info(f"Training features ({len(x_data.columns)}): {', '.join(x_data.columns)}") + y_data = full_df["label"] + x_train, x_test, y_train, y_test = train_test_split( + x_data, y_data, train_size=train_test_fraction, random_state=None, stratify=y_data + ) + + _logger.info(f"n_tel={n_tel}: Training events: {len(x_train)}, Testing events: {len(x_test)}") + + configs = hyper_parameters.classification_hyperparameters(hyperparameter_config) + + for name, para in configs.items(): + _logger.info(f"Training with {name} for n_tel={n_tel}...") + model = xgb.XGBClassifier(**para) + model.fit(x_train, y_train) + + evaluate_classification_model(model, x_test, y_test, full_df, x_data.columns.tolist(), name) + + dump( + { + "model": model, + "features": x_data.columns.tolist(), + "hyperparameters": para, + "efficiency": evaluation_efficiency(name, model, x_test, y_test), + "parameters": model_parameters, + "n_tel": n_tel, + "energy_bin_number": energy_bin_number, + }, + utils.output_file_name(model_prefix, name, n_tel, energy_bin_number), + ) + + +def main(): + """Parse CLI arguments and run the training pipeline.""" + parser = argparse.ArgumentParser( + description=("Train XGBoost models for gamma/hadron classification.") + ) + parser.add_argument("--input_signal_file_list", help="List of input signal mscw ROOT files.") + parser.add_argument( + "--input_background_file_list", help="List of input background mscw ROOT files." + ) + parser.add_argument( + "--model_prefix", + required=True, + help=( + "Path to directory for writing XGBoost classification models " + "(without n_tel and energy bin suffix)." + ), + ) + parser.add_argument( + "--hyperparameter_config", + help="Path to JSON file with hyperparameter configuration.", + default=None, + type=str, + ) + parser.add_argument("--ntel", type=int, help="Telescope multiplicity (2, 3, or 4).") + parser.add_argument( + "--train_test_fraction", + type=float, + help="Fraction of data for training (e.g., 0.5).", + default=0.5, + ) + parser.add_argument( + "--max_events", + type=int, + help="Maximum number of events to process across all files.", + ) + parser.add_argument( + "--model_parameters", + type=str, + help=("Path to model parameter file (JSON) defining energy and zenith bins."), + ) + parser.add_argument( + "--energy_bin_number", + type=int, + help="Energy bin number for selection (optional).", + default=0, + ) + + args = parser.parse_args() + + _logger.info("--- XGBoost Classification Training ---") + _logger.info(f"Telescope multiplicity: {args.ntel}") + _logger.info(f"Model output prefix: {args.model_prefix}") + _logger.info(f"Train vs test fraction: {args.train_test_fraction}") + _logger.info(f"Max events: {args.max_events}") + _logger.info(f"Energy bin {args.energy_bin_number}") + + model_parameters = utils.load_model_parameters(args.model_parameters, args.energy_bin_number) + + event_lists = [ + load_training_data( + utils.read_input_file_list(file_list), + args.ntel, + args.max_events, + analysis_type="classification", + model_parameters=model_parameters, + ) + for file_list in (args.input_signal_file_list, args.input_background_file_list) + ] + + train( + event_lists, + args.ntel, + args.model_prefix, + args.train_test_fraction, + model_parameters, + args.energy_bin_number, + args.hyperparameter_config, + ) + _logger.info("XGBoost classification model trained successfully.") + + +if __name__ == "__main__": + main() diff --git a/src/eventdisplay_ml/scripts/train_xgb_stereo.py b/src/eventdisplay_ml/scripts/train_xgb_stereo.py index 850ebf4..4075362 100644 --- a/src/eventdisplay_ml/scripts/train_xgb_stereo.py +++ b/src/eventdisplay_ml/scripts/train_xgb_stereo.py @@ -1,5 +1,5 @@ """ -Train XGBoost Multi-Target BDTs for direction and energy reconstruction. +Train XGBoost BDTs stereo reconstruction (direction, energy). Uses x,y offsets calculated from intersection and dispBDT methods plus image parameters to train multi-target regression BDTs to predict x,y offsets. @@ -11,79 +11,76 @@ import argparse import logging -from pathlib import Path import xgboost as xgb from joblib import dump from sklearn.model_selection import train_test_split -from sklearn.multioutput import MultiOutputRegressor -from eventdisplay_ml import utils +from eventdisplay_ml import hyper_parameters, utils from eventdisplay_ml.data_processing import load_training_data -from eventdisplay_ml.evaluate import evaluate_model +from eventdisplay_ml.evaluate import evaluate_regression_model +from eventdisplay_ml.features import target_features logging.basicConfig(level=logging.INFO) _logger = logging.getLogger(__name__) -def train(df, n_tel, output_dir, train_test_fraction): +def train(df, n_tel, model_prefix, train_test_fraction, hyperparameter_config=None): """ Train a single XGBoost model for multi-target regression (Xoff, Yoff, MCe0). Parameters ---------- - - df: Pandas DataFrame with training data. - - n_tel: Telescope multiplicity. - - output_dir: Directory to save the trained model. - - train_test_fraction: Fraction of data to use for training. + df : pd.DataFrame + Pandas DataFrame with training data. + n_tel : int + Telescope multiplicity. + model_prefix : str + Directory to save the trained model. + train_test_fraction : float + Fraction of data to use for training. + hyperparameter_config : str, optional + Path to JSON file with hyperparameter configuration, by default None. """ if df.empty: _logger.warning(f"Skipping training for n_tel={n_tel} due to empty data.") return - # Separate feature and target columns - x_cols = [col for col in df.columns if col not in ["MCxoff", "MCyoff", "MCe0"]] + targets = target_features("stereo_analysis") + x_cols = [col for col in df.columns if col not in targets] x_data = df[x_cols] - y_data = df[["MCxoff", "MCyoff", "MCe0"]] + y_data = df[targets] _logger.info(f"Training variables ({len(x_cols)}): {x_cols}") x_train, x_test, y_train, y_test = train_test_split( x_data, y_data, - test_size=1.0 - train_test_fraction, + train_size=train_test_fraction, random_state=None, ) _logger.info(f"n_tel={n_tel}: Training events: {len(x_train)}, Testing events: {len(x_test)}") - xgb_params = { - "n_estimators": 1000, - "learning_rate": 0.1, # Shrinkage - "max_depth": 5, - "min_child_weight": 1.0, # Equivalent to MinNodeSize=1.0% for XGBoost - "objective": "reg:squarederror", - "n_jobs": 4, - "random_state": None, - "tree_method": "hist", - "subsample": 0.7, # Default sensible value - "colsample_bytree": 0.7, # Default sensible value - } - configs = { - "xgboost": xgb.XGBRegressor(**xgb_params), - } - - for name, estimator in configs.items(): + configs = hyper_parameters.regression_hyperparameters(hyperparameter_config) + + for name, para in configs.items(): _logger.info(f"Training with {name} for n_tel={n_tel}...") - _logger.info(f"parameters: {xgb_params}") - model = MultiOutputRegressor(estimator) + model = xgb.XGBRegressor(**para) model.fit(x_train, y_train) - output_filename = Path(output_dir) / f"dispdir_bdt_ntel{n_tel}_{name}.joblib" - dump(model, output_filename) - _logger.info(f"{name} model saved to: {output_filename}") + evaluate_regression_model(model, x_test, y_test, df, x_cols, y_data, name) - evaluate_model(model, x_test, y_test, df, x_cols, y_data, name) + dump( + { + "model": model, + "features": x_cols, + "target": targets, + "hyperparameters": para, + "n_tel": n_tel, + }, + utils.output_file_name(model_prefix, name, n_tel), + ) def main(): @@ -91,9 +88,19 @@ def main(): parser = argparse.ArgumentParser( description=("Train XGBoost Multi-Target BDTs for Stereo Analysis (Direction, Energy).") ) - parser.add_argument("--input_file_list", help="List of input mscw ROOT files.") + parser.add_argument("--input_file_list", help="List of input mscw files.") + parser.add_argument( + "--model_prefix", + required=True, + help=("Path to directory for writing XGBoost regression models (without n_tel suffix)."), + ) + parser.add_argument( + "--hyperparameter_config", + help="Path to JSON file with hyperparameter configuration.", + default=None, + type=str, + ) parser.add_argument("--ntel", type=int, help="Telescope multiplicity (2, 3, or 4).") - parser.add_argument("--output_dir", help="Output directory for XGBoost models and weights.") parser.add_argument( "--train_test_fraction", type=float, @@ -105,26 +112,24 @@ def main(): type=int, help="Maximum number of events to process across all files.", ) - args = parser.parse_args() - input_files = utils.read_input_file_list(args.input_file_list) - - output_dir = Path(args.output_dir) - if not output_dir.exists(): - output_dir.mkdir(parents=True) - - _logger.info("--- XGBoost Multi-Target Training ---") - _logger.info(f"Input files: {len(input_files)}") + _logger.info("--- XGBoost Regression Training ---") _logger.info(f"Telescope multiplicity: {args.ntel}") - _logger.info(f"Output directory: {output_dir}") - _logger.info( - f"Train vs test fraction: {args.train_test_fraction}, Max events: {args.max_events}" + _logger.info(f"Model output prefix: {args.model_prefix}") + _logger.info(f"Train vs test fraction: {args.train_test_fraction}") + _logger.info(f"Max events: {args.max_events}") + + df_flat = load_training_data( + utils.read_input_file_list(args.input_file_list), + args.ntel, + args.max_events, + analysis_type="stereo_analysis", ) - - df_flat = load_training_data(input_files, args.ntel, args.max_events) - train(df_flat, args.ntel, output_dir, args.train_test_fraction) - _logger.info("\nXGBoost model trained successfully.") + train( + df_flat, args.ntel, args.model_prefix, args.train_test_fraction, args.hyperparameter_config + ) + _logger.info("XGBoost regression model trained successfully.") if __name__ == "__main__": diff --git a/src/eventdisplay_ml/training_variables.py b/src/eventdisplay_ml/training_variables.py deleted file mode 100644 index c844d56..0000000 --- a/src/eventdisplay_ml/training_variables.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Training variables for XGBoost direction reconstruction.""" - - -def xgb_per_telescope_training_variables(): - """ - Telescope-type training variables for XGB. - - Disp variables with different indexing logic in data preparation. - """ - return [ - "Disp_T", - "DispXoff_T", - "DispYoff_T", - "DispWoff_T", - "E", - "ES", - "cen_x", - "cen_y", - "cosphi", - "sinphi", - "loss", - "size", - "dist", - "width", - "length", - "asym", - "tgrad_x", - "R_core", - ] - - -def xgb_array_training_variables(): - """Array-level training variables for XGB.""" - return [ - "DispNImages", - "DispTelList_T", - "Xoff", - "Yoff", - "Xoff_intersect", - "Yoff_intersect", - "Erec", - "ErecS", - "EmissionHeight", - ] - - -def xgb_all_training_variables(): - """All training variables for XGB.""" - return xgb_per_telescope_training_variables() + xgb_array_training_variables() diff --git a/src/eventdisplay_ml/utils.py b/src/eventdisplay_ml/utils.py index 059dfb1..4418221 100644 --- a/src/eventdisplay_ml/utils.py +++ b/src/eventdisplay_ml/utils.py @@ -1,6 +1,8 @@ """Utility functions for Eventdisplay-ML.""" +import json import logging +from pathlib import Path _logger = logging.getLogger(__name__) @@ -25,6 +27,10 @@ def read_input_file_list(input_file_list): except FileNotFoundError as exc: raise FileNotFoundError(f"Error: Input file list not found: {input_file_list}") from exc + if not input_files: + raise ValueError(f"Error: No input files found in the list: {input_file_list}") + + _logger.info(f"Read {len(input_files)} input files from {input_file_list}") return input_files @@ -67,3 +73,52 @@ def parse_image_selection(image_selection_str): f"Invalid image_selection format: {image_selection_str}. " "Use bit-coded value (e.g., 14) or comma-separated indices (e.g., '1,2,3')" ) + + +def load_model_parameters(model_parameters, energy_bin_number=None): + """ + Load model parameters from a JSON file. + + Reduce the energy bins to only the specified energy bin number if provided. + """ + try: + with open(model_parameters) as f: + para = json.load(f) + except (FileNotFoundError, TypeError) as exc: + raise FileNotFoundError(f"Model parameters file not found: {model_parameters}") from exc + + if energy_bin_number is not None: + try: + para["energy_bins_log10_tev"] = para["energy_bins_log10_tev"][energy_bin_number] + except (KeyError, IndexError) as exc: + raise ValueError( + f"Invalid energy bin number {energy_bin_number} for model parameters." + ) from exc + return para + + +def load_energy_range(model_parameters, energy_bin_number=0): + """Load the log10(Erec/TeV) range for a given energy bin from model parameters.""" + try: + e = model_parameters["energy_bins_log10_tev"] + return 10 ** e["E_min"], 10 ** e["E_max"] + except (KeyError, IndexError) as exc: + raise ValueError( + f"Invalid energy bin number {energy_bin_number} for model parameters." + ) from exc + + +def output_file_name(model_prefix, name, n_tel, energy_bin_number=None): + """Generate output filename for the trained model.""" + model_prefix = Path(model_prefix) + + output_dir = model_prefix.parent + if not output_dir.exists(): + output_dir.mkdir(parents=True) + + filename = f"{model_prefix}_{name}_ntel{n_tel}" + if energy_bin_number is not None: + filename += f"_ebin{energy_bin_number}" + filename += ".joblib" + _logger.info(f"Output filename: {filename}") + return filename diff --git a/tests/conftest.py b/tests/conftest.py index cb97cab..74dfc75 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,8 @@ import pandas as pd import pytest +from eventdisplay_ml.features import telescope_features + # ============================================================================ # DataFrame Factory Functions # ============================================================================ @@ -110,6 +112,53 @@ def df_raw_two_files(): return df1, df2 +@pytest.fixture +def sample_df(): + """Create a sample DataFrame with telescope data.""" + df = pd.DataFrame( + { + "DispTelList_T": [[0, 1, 2, 3], [0, 1], [1, 2, 3], [0, 1, 2, 3]], + "DispNImages": [4, 2, 3, 4], + "mscw": [1.0, 2.0, 3.0, 4.0], + "mscl": [5.0, 6.0, 7.0, 8.0], + "MSCW_T": [ + np.array([1.0, 2.0, 3.0, 4.0]), + np.array([1.0, 2.0, np.nan, np.nan]), + np.array([1.0, 2.0, 3.0, np.nan]), + np.array([1.0, 2.0, 3.0, 4.0]), + ], + "fpointing_dx": [ + np.array([0.1, 0.2, 0.3, 0.4]), + np.array([0.1, 0.2, np.nan, np.nan]), + np.array([0.1, 0.2, 0.3, np.nan]), + np.array([0.1, 0.2, 0.3, 0.4]), + ], + "fpointing_dy": [ + np.array([0.1, 0.2, 0.3, 0.4]), + np.array([0.1, 0.2, np.nan, np.nan]), + np.array([0.1, 0.2, 0.3, np.nan]), + np.array([0.1, 0.2, 0.3, 0.4]), + ], + "Xoff": [0.5, 0.6, 0.7, 0.8], + "Yoff": [0.3, 0.4, 0.5, 0.6], + "Xoff_intersect": [0.51, 0.61, 0.71, 0.81], + "Yoff_intersect": [0.31, 0.41, 0.51, 0.61], + "Erec": [100.0, 200.0, 300.0, 400.0], + "ErecS": [90.0, 180.0, 270.0, 360.0], + "EmissionHeight": [10.0, 11.0, 12.0, 13.0], + } + ) + + for var in telescope_features(): + df[var] = [ + np.array([1.0, 2.0, 3.0, 4.0]), + np.array([1.0, 2.0, np.nan, np.nan]), + np.array([1.0, 2.0, 3.0, np.nan]), + np.array([1.0, 2.0, 3.0, 4.0]), + ] + return df + + # ============================================================================ # Mock Helper Functions # ============================================================================ diff --git a/tests/resources/classify-parameter.json b/tests/resources/classify-parameter.json new file mode 100644 index 0000000..2ebbfac --- /dev/null +++ b/tests/resources/classify-parameter.json @@ -0,0 +1,19 @@ +{ + "model_file_name": "classify_bdt_xgboost", + "energy_bins_log10_tev": [ + {"E_min": -1.5, "E_max": -0.5}, + {"E_min": -1.25, "E_max": -0.25}, + {"E_min": -1.0, "E_max": 0.0}, + {"E_min": -0.75, "E_max": 0.25}, + {"E_min": -0.5, "E_max": 0.5}, + {"E_min": -0.25, "E_max": 0.75}, + {"E_min": 0.0, "E_max": 1.0}, + {"E_min": 0.25, "E_max": 1.25}, + {"E_min": 0.5, "E_max": 2.0} + ], + "zenith_bins_deg": [ + {"Ze_min": 0.0, "Ze_max": 32.5}, + {"Ze_min": 32.5, "Ze_max": 47.5}, + {"Ze_min": 47.5, "Ze_max": 75.0} + ] +} diff --git a/tests/unit_tests/scripts/test_apply_xgb_stereo.py b/tests/unit_tests/scripts/test_apply_xgb_stereo.py deleted file mode 100644 index 50a2950..0000000 --- a/tests/unit_tests/scripts/test_apply_xgb_stereo.py +++ /dev/null @@ -1,301 +0,0 @@ -"""Unit tests for apply_xgb_stereo script.""" - -from unittest.mock import Mock - -import joblib -import numpy as np -import pandas as pd -import pytest - -from eventdisplay_ml.scripts.apply_xgb_stereo import ( - _pad_to_four, - apply_image_selection, - apply_models, - load_models, - process_file_chunked, -) -from eventdisplay_ml.training_variables import xgb_per_telescope_training_variables - - -class SimpleModel: - """A simple picklable model for testing.""" - - def __init__(self, predictions): - self.predictions = predictions - - def predict(self, x): - """Predict using the simple model.""" - return self.predictions - - -# ============================================================================ -# Consolidated pad_to_four tests (11 -> 1 parametrized + 1 special case) -# ============================================================================ - - -@pytest.mark.parametrize( - ("input_data", "expected_first_values", "check_nans"), - [ - (np.array([1.0, 2.0, 3.0]), [1.0, 2.0, 3.0], [3]), - ([1.0, 2.0], [1.0, 2.0], [2, 3]), - (np.array([5.0]), [5.0], [1, 2, 3]), - (np.array([]), None, [0, 1, 2, 3]), - (np.array([1.0, 2.0, 3.0, 4.0]), [1.0, 2.0, 3.0, 4.0], []), - (np.array([1.0, np.nan, 3.0]), [1.0], [1, 3]), - ([1, 2.5, 3], [1.0, 2.5, 3.0], [3]), - (np.array([-1.0, -2.5, 3.0]), [-1.0, -2.5, 3.0], [3]), - (np.array([0.0, 1.0, 0.0]), [0.0, 1.0, 0.0], [3]), - ], -) -def test_pad_to_four(input_data, expected_first_values, check_nans): - """Test _pad_to_four with various input types and edge cases.""" - result = _pad_to_four(input_data) - - assert len(result) == 4 - assert result.dtype == np.float32 - - if expected_first_values: - for i, val in enumerate(expected_first_values): - assert np.isclose(result[i], val) or (np.isnan(val) and np.isnan(result[i])) - - for nan_idx in check_nans: - assert np.isnan(result[nan_idx]) - - -def test_pad_to_four_with_scalar(): - """Test _pad_to_four returns scalars unchanged.""" - scalar = 3.14 - result = _pad_to_four(scalar) - assert result == 3.14 - - -# ============================================================================ -# Image Selection Tests -# ============================================================================ - - -@pytest.fixture -def sample_df(): - """Create a sample DataFrame with telescope data.""" - df = pd.DataFrame( - { - "DispTelList_T": [[0, 1, 2, 3], [0, 1], [1, 2, 3], [0, 1, 2, 3]], - "DispNImages": [4, 2, 3, 4], - "mscw": [1.0, 2.0, 3.0, 4.0], - "mscl": [5.0, 6.0, 7.0, 8.0], - "MSCW_T": [ - np.array([1.0, 2.0, 3.0, 4.0]), - np.array([1.0, 2.0, np.nan, np.nan]), - np.array([1.0, 2.0, 3.0, np.nan]), - np.array([1.0, 2.0, 3.0, 4.0]), - ], - "fpointing_dx": [ - np.array([0.1, 0.2, 0.3, 0.4]), - np.array([0.1, 0.2, np.nan, np.nan]), - np.array([0.1, 0.2, 0.3, np.nan]), - np.array([0.1, 0.2, 0.3, 0.4]), - ], - "fpointing_dy": [ - np.array([0.1, 0.2, 0.3, 0.4]), - np.array([0.1, 0.2, np.nan, np.nan]), - np.array([0.1, 0.2, 0.3, np.nan]), - np.array([0.1, 0.2, 0.3, 0.4]), - ], - "Xoff": [0.5, 0.6, 0.7, 0.8], - "Yoff": [0.3, 0.4, 0.5, 0.6], - "Xoff_intersect": [0.51, 0.61, 0.71, 0.81], - "Yoff_intersect": [0.31, 0.41, 0.51, 0.61], - "Erec": [100.0, 200.0, 300.0, 400.0], - "ErecS": [90.0, 180.0, 270.0, 360.0], - "EmissionHeight": [10.0, 11.0, 12.0, 13.0], - } - ) - - for var in xgb_per_telescope_training_variables(): - df[var] = [ - np.array([1.0, 2.0, 3.0, 4.0]), - np.array([1.0, 2.0, np.nan, np.nan]), - np.array([1.0, 2.0, 3.0, np.nan]), - np.array([1.0, 2.0, 3.0, 4.0]), - ] - return df - - -@pytest.mark.parametrize( - ("selection", "expected_tel_0", "expected_n_images_0"), - [ - (None, [0, 1, 2, 3], 4), - ([0, 1, 2, 3], [0, 1, 2, 3], 4), - ([0, 1], [0, 1], 2), - ([2], [2], 1), - ], -) -def test_apply_image_selection(sample_df, selection, expected_tel_0, expected_n_images_0): - """Test apply_image_selection with various telescope selections.""" - result = apply_image_selection(sample_df, selection) - - if selection is None or selection == [0, 1, 2, 3]: - pd.testing.assert_frame_equal(result, sample_df) - else: - assert result["DispTelList_T"].iloc[0] == expected_tel_0 - assert result["DispNImages"].iloc[0] == expected_n_images_0 - - -def test_apply_image_selection_preserves_original(sample_df): - """Test that apply_image_selection doesn't modify the original DataFrame.""" - original_copy = sample_df.copy(deep=True) - apply_image_selection(sample_df, [0, 1]) - pd.testing.assert_frame_equal(sample_df, original_copy) - - -# ============================================================================ -# Model Loading Tests -# ============================================================================ - - -@pytest.mark.parametrize( - ("models_to_create", "expected_in_dict"), - [ - ([2], [2]), - ([2, 3, 4], [2, 3, 4]), - ([], []), - ], -) -def test_load_models(tmp_path, models_to_create, expected_in_dict): - """Test load_models loads available models from directory.""" - for n_tel in models_to_create: - model_file = tmp_path / f"dispdir_bdt_ntel{n_tel}_xgboost.joblib" - joblib.dump({"multiplicity": n_tel}, model_file) - - models = load_models(str(tmp_path)) - - for n_tel in expected_in_dict: - assert n_tel in models - assert models[n_tel]["multiplicity"] == n_tel - assert len(models) == len(expected_in_dict) - - -# ============================================================================ -# Model Application Tests -# ============================================================================ - - -@pytest.mark.parametrize( - "n_tel_multiplicities", - [ - ([4]), - ([2, 3, 4]), - ], -) -def test_apply_models(sample_df, n_tel_multiplicities): - """Test apply_models with different telescope multiplicities.""" - models = {} - for n_tel in n_tel_multiplicities: - # Create enough predictions for all rows (max 4 rows in sample_df) - models[n_tel] = SimpleModel(np.array([[0.1 * n_tel, 0.2 * n_tel, 1.5]] * 4)) - - pred_xoff, pred_yoff, pred_erec = apply_models(sample_df, models) - - assert all(len(p) == len(sample_df) for p in [pred_xoff, pred_yoff, pred_erec]) - assert all(p.dtype == np.float32 for p in [pred_xoff, pred_yoff, pred_erec]) - - -def test_apply_models_with_missing_multiplicity(sample_df): - """Test apply_models handles missing models gracefully.""" - models = {4: SimpleModel(np.array([[0.1, 0.2, 1.5]] * 4))} - pred_xoff, _, _ = apply_models(sample_df, models) - - assert not np.isnan(pred_xoff[0]) # Row 0 has 4 telescopes - assert np.isnan(pred_xoff[1]) # Row 1 has 2 telescopes - assert np.isnan(pred_xoff[2]) # Row 2 has 3 telescopes - assert not np.isnan(pred_xoff[3]) # Row 3 has 4 telescopes - - -def test_apply_models_with_selection_mask(sample_df): - """Test apply_models respects selection mask.""" - models = {4: SimpleModel(np.array([[0.1, 0.2, 1.5]] * 4))} - selection_mask = np.array([True, False, True, False]) - - pred_xoff, _, _ = apply_models(sample_df, models, selection_mask) - - assert pred_xoff[0] == 0.1 # 4 tels, mask=True - assert pred_xoff[1] == -999.0 # 2 tels, mask=False - assert np.isnan(pred_xoff[2]) # 3 tels (no model) - assert pred_xoff[3] == -999.0 # 4 tels, mask=False - - -def test_apply_models_from_directory(sample_df, tmp_path): - """Test apply_models loads from directory string.""" - model_file = tmp_path / "dispdir_bdt_ntel4_xgboost.joblib" - joblib.dump(SimpleModel(np.array([[0.1, 0.2, 1.5]] * 4)), model_file) - - pred_xoff, _, _ = apply_models(sample_df, str(tmp_path)) - assert len(pred_xoff) == len(sample_df) - - -# ============================================================================ -# File Processing Tests -# ============================================================================ - - -def test_process_file_chunked_creates_output(sample_df, tmp_path): - """Test process_file_chunked creates output file.""" - from unittest.mock import patch - - model_dir = tmp_path / "models" - model_dir.mkdir() - model_file = model_dir / "dispdir_bdt_ntel4_xgboost.joblib" - joblib.dump(SimpleModel(np.array([[0.1, 0.2, 1.5]] * 4)), model_file) - - output_file = tmp_path / "output.root" - - with patch("eventdisplay_ml.scripts.apply_xgb_stereo.uproot.iterate") as mock_iterate: - with patch("eventdisplay_ml.scripts.apply_xgb_stereo.uproot.recreate") as mock_recreate: - mock_iterate.return_value = [sample_df.iloc[:1].copy()] - mock_tree = Mock() - mock_recreate.return_value.__enter__.return_value.mktree.return_value = mock_tree - - process_file_chunked( - input_file="input.root", - model_dir=str(model_dir), - output_file=str(output_file), - image_selection="15", - ) - - assert mock_tree.extend.called - - -@pytest.mark.parametrize( - ("max_events", "expected_chunks"), - [ - (None, 2), - (2, 1), - ], -) -def test_process_file_chunked_respects_limits(sample_df, tmp_path, max_events, expected_chunks): - """Test process_file_chunked respects event limits.""" - from unittest.mock import patch - - model_dir = tmp_path / "models" - model_dir.mkdir() - joblib.dump( - SimpleModel(np.array([[0.1, 0.2, 1.5]] * 4)), model_dir / "dispdir_bdt_ntel4_xgboost.joblib" - ) - - with patch("eventdisplay_ml.scripts.apply_xgb_stereo.uproot.iterate") as mock_iterate: - with patch("eventdisplay_ml.scripts.apply_xgb_stereo.uproot.recreate") as mock_recreate: - mock_iterate.return_value = [sample_df.iloc[:2].copy(), sample_df.iloc[2:].copy()] - mock_tree = Mock() - mock_recreate.return_value.__enter__.return_value.mktree.return_value = mock_tree - - kwargs = { - "input_file": "input.root", - "model_dir": str(model_dir), - "output_file": str(tmp_path / "output.root"), - "image_selection": "15", - } - if max_events: - kwargs["max_events"] = max_events - - process_file_chunked(**kwargs) - assert mock_tree.extend.call_count == expected_chunks diff --git a/tests/unit_tests/scripts/test_train_xgb_stereo.py b/tests/unit_tests/scripts/test_train_xgb_stereo.py deleted file mode 100644 index 66f677d..0000000 --- a/tests/unit_tests/scripts/test_train_xgb_stereo.py +++ /dev/null @@ -1,112 +0,0 @@ -"""Unit tests for the train_xgb_stereo script.""" - -from unittest.mock import MagicMock, patch - -import numpy as np -import pandas as pd -import pytest - -from eventdisplay_ml.scripts.train_xgb_stereo import train - - -@pytest.fixture -def sample_df(): - """Create a sample DataFrame with training data.""" - rng = np.random.Generator(np.random.PCG64(42)) - data = { - "feature1": rng.standard_normal(100), - "feature2": rng.standard_normal(100), - "feature3": rng.standard_normal(100), - "MCxoff": rng.standard_normal(100), - "MCyoff": rng.standard_normal(100), - "MCe0": rng.standard_normal(100), - } - return pd.DataFrame(data) - - -@pytest.fixture -def empty_df(): - """Create an empty DataFrame.""" - return pd.DataFrame() - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_model") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.MultiOutputRegressor") -def test_train_with_valid_data(mock_multi_output, mock_evaluate, mock_dump, sample_df, tmp_path): - """Test train function with valid data.""" - mock_model = MagicMock() - mock_multi_output.return_value = mock_model - - train(sample_df, n_tel=3, output_dir=tmp_path, train_test_fraction=0.8) - - assert mock_multi_output.called - assert mock_model.fit.called - assert mock_dump.called - assert mock_evaluate.called - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_model") -def test_train_with_empty_data(mock_evaluate, mock_dump, empty_df, caplog): - """Test train function with empty DataFrame.""" - train(empty_df, n_tel=2, output_dir="/tmp", train_test_fraction=0.7) - - assert mock_dump.call_count == 0 - assert mock_evaluate.call_count == 0 - assert "Skipping training" in caplog.text - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_model") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.MultiOutputRegressor") -def test_train_output_filename(mock_multi_output, mock_evaluate, mock_dump, sample_df, tmp_path): - """Test that output filename is correctly formatted.""" - mock_model = MagicMock() - mock_multi_output.return_value = mock_model - - train(sample_df, n_tel=4, output_dir=tmp_path, train_test_fraction=0.8) - - # Verify dump was called with correct filename - call_args = mock_dump.call_args - output_path = call_args[0][1] - assert "dispdir_bdt_ntel4_xgboost.joblib" in str(output_path) - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_model") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.MultiOutputRegressor") -def test_train_feature_selection(mock_multi_output, mock_evaluate, mock_dump, sample_df, tmp_path): - """Test that features are correctly separated from targets.""" - mock_model = MagicMock() - mock_multi_output.return_value = mock_model - - train(sample_df, n_tel=2, output_dir=tmp_path, train_test_fraction=0.8) - - # Verify fit was called with correct shapes - fit_call = mock_model.fit.call_args - x_train, y_train = fit_call[0] - - # Should have 3 features (feature1, feature2, feature3) - assert x_train.shape[1] == 3 - # Should have 3 targets (MCxoff, MCyoff, MCe0) - assert y_train.shape[1] == 3 - - -@patch("eventdisplay_ml.scripts.train_xgb_stereo.dump") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.evaluate_model") -@patch("eventdisplay_ml.scripts.train_xgb_stereo.MultiOutputRegressor") -def test_train_test_split_fraction( - mock_multi_output, mock_evaluate, mock_dump, sample_df, tmp_path -): - """Test that train/test split respects the fraction parameter.""" - mock_model = MagicMock() - mock_multi_output.return_value = mock_model - - train(sample_df, n_tel=2, output_dir=tmp_path, train_test_fraction=0.6) - - fit_call = mock_model.fit.call_args - x_train, _ = fit_call[0] - - # With 0.6 train fraction and 100 samples, expect ~60 training samples - assert 50 <= len(x_train) <= 70 diff --git a/tests/unit_tests/test_data_processing.py b/tests/unit_tests/test_data_processing.py deleted file mode 100644 index 61964f2..0000000 --- a/tests/unit_tests/test_data_processing.py +++ /dev/null @@ -1,385 +0,0 @@ -"""Unit tests for data processing utilities.""" - -import numpy as np -import pandas as pd -import pytest - -from eventdisplay_ml.data_processing import ( - _to_dense_array, - _to_padded_array, - flatten_data_vectorized, - load_training_data, -) - -# ============================================================================ -# Parametrized Array Conversion Tests (consolidated from 10 functions) -# ============================================================================ - - -@pytest.mark.parametrize( - ("input_data", "expected_shape"), - [ - ([[1, 2, 3], [4, 5, 6]], (2, 3)), - ([[1, 2], [3, 4, 5], [6]], (3, 3)), - ([1, 2, 3], (3, 1)), - ([[1, 2], 3, [4, 5, 6]], (3, 3)), - ], -) -def test_to_dense_array(input_data, expected_shape): - """Test _to_dense_array with various input types.""" - col = pd.Series(input_data) - result = _to_dense_array(col) - assert result.shape == expected_shape - - -@pytest.mark.parametrize( - ("input_data", "expected_shape"), - [ - ([[1, 2, 3], [4, 5, 6]], (2, 3)), - ([[1, 2], [3, 4, 5], [6]], (3, 3)), - ([1, 2, 3], (3, 1)), - ([[1, 2], 3, [4, 5, 6]], (3, 3)), - ], -) -def test_to_padded_array(input_data, expected_shape): - """Test _to_padded_array with various input types.""" - result = _to_padded_array(input_data) - assert result.shape == expected_shape - - -def test_to_dense_array_with_numpy_arrays(arrays_numpy): - """Test _to_dense_array with numpy arrays.""" - col = pd.Series(arrays_numpy) - result = _to_dense_array(col) - assert result.shape == (2, 3) - - -def test_to_padded_array_with_numpy_arrays(arrays_numpy): - """Test _to_padded_array with numpy arrays.""" - result = _to_padded_array(arrays_numpy) - assert result.shape == (2, 3) - - -# ============================================================================ -# Data Flattening Tests -# ============================================================================ - - -@pytest.mark.parametrize( - ("n_tel", "with_pointing"), - [ - (2, False), - (2, True), - (1, False), - ], -) -def test_flatten_data_vectorized( - n_tel, with_pointing, df_two_tel_base, df_two_tel_pointing, df_one_tel_base -): - """Test flatten_data_vectorized with various telescope counts and pointing options.""" - if with_pointing and n_tel == 2: - df = df_two_tel_pointing - elif n_tel == 1: - df = df_one_tel_base - else: - df = df_two_tel_base - - training_vars = [ - "Disp_T", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "size", - "E", - "ES", - ] - if with_pointing: - training_vars.extend(["cen_x", "cen_y", "fpointing_dx", "fpointing_dy"]) - - result = flatten_data_vectorized( - df, n_tel=n_tel, training_variables=training_vars, apply_pointing_corrections=with_pointing - ) - - assert "Disp_T_0" in result.columns - assert "disp_x_0" in result.columns - assert len(result) == len(df) - - -def test_flatten_data_vectorized_derived_features(df_one_tel_base): - """Test that derived features are correctly computed.""" - result = flatten_data_vectorized( - df_one_tel_base, - n_tel=1, - training_variables=[ - "Disp_T", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "size", - "E", - "ES", - ], - ) - - assert "disp_x_0" in result.columns - assert "disp_y_0" in result.columns - assert "loss_loss_0" in result.columns - assert "loss_dist_0" in result.columns - assert "width_length_0" in result.columns - # For df_one_tel_base: Disp_T[0]=1.0, cosphi[0]=0.8, sinphi[0]=0.6 - assert result["disp_x_0"].iloc[0] == pytest.approx(1.0 * 0.8) - assert result["disp_y_0"].iloc[0] == pytest.approx(1.0 * 0.6) - - -def test_flatten_data_vectorized_missing_data(df_three_tel_missing): - """Test that missing disp columns are filled with NaN.""" - result = flatten_data_vectorized( - df_three_tel_missing, - n_tel=3, - training_variables=[ - "Disp_T", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "size", - "E", - "ES", - ], - ) - assert result["Disp_T_2"].isna().all() - - -@pytest.mark.parametrize("dtype", [np.float32, np.float64]) -def test_flatten_data_vectorized_dtype(dtype, df_two_tel_base): - """Test flatten_data_vectorized dtype casting.""" - result = flatten_data_vectorized( - df_two_tel_base, - n_tel=2, - training_variables=[ - "Disp_T", - "cosphi", - "sinphi", - "loss", - "dist", - "width", - "length", - "size", - "E", - "ES", - ], - dtype=dtype, - ) - assert result["Disp_T_0"].dtype == dtype - - -# ============================================================================ -# Data Loading Tests -# ============================================================================ - - -def test_load_training_data_empty_files(tmp_path, mocker): - """Test load_training_data with no matching data.""" - mock_file = tmp_path / "test.root" - mock_file.touch() - - mock_root_file = mocker.MagicMock() - mock_root_file.__enter__.return_value = {"data": None} - mocker.patch("uproot.open", return_value=mock_root_file) - - result = load_training_data([str(mock_file)], n_tel=2, max_events=100) - assert result.empty - - -def test_load_training_data_filters_by_n_tel(mocker): - """Test load_training_data filters events by DispNImages.""" - df_raw = pd.DataFrame( - { - "DispNImages": [2, 3, 2, 4], - "MCxoff": [0.1, 0.2, 0.3, 0.4], - "MCyoff": [0.5, 0.6, 0.7, 0.8], - "MCe0": [100.0, 200.0, 150.0, 250.0], - "DispTelList_T": [np.array([0, 1])] * 4, - "Disp_T": [np.array([1.0, 2.0])] * 4, - "cosphi": [np.array([0.8, 0.6])] * 4, - "sinphi": [np.array([0.6, 0.8])] * 4, - "loss": [np.array([0.1, 0.2])] * 4, - "dist": [np.array([1.0, 2.0])] * 4, - "width": [np.array([0.5, 0.6])] * 4, - "length": [np.array([2.0, 3.0])] * 4, - "size": [np.array([100.0, 200.0])] * 4, - "E": [np.array([10.0, 20.0])] * 4, - "ES": [np.array([5.0, 10.0])] * 4, - "Xoff": [1.0] * 4, - "Yoff": [3.0] * 4, - "Xoff_intersect": [0.9] * 4, - "Yoff_intersect": [2.9] * 4, - "Erec": [10.0] * 4, - "ErecS": [5.0] * 4, - "EmissionHeight": [100.0] * 4, - } - ) - - mock_tree = mocker.MagicMock() - mock_tree.arrays.return_value = df_raw - - mock_root_file = mocker.MagicMock() - mock_root_file.__enter__.return_value = {"data": mock_tree} - mock_root_file.__exit__.return_value = None - mocker.patch("uproot.open", return_value=mock_root_file) - - result = load_training_data(["dummy.root"], n_tel=2, max_events=-1) - assert len(result) == 2 - assert all(col in result.columns for col in ["MCxoff", "MCyoff", "MCe0"]) - - -@pytest.mark.parametrize( - ("max_events", "expected_max_rows"), - [ - (5, 5), - (3, 3), - (-1, 10), - ], -) -def test_load_training_data_max_events(mocker, max_events, expected_max_rows): - """Test load_training_data respects max_events limit.""" - df_raw = pd.DataFrame( - { - "DispNImages": [2] * 10, - "MCxoff": np.arange(10, dtype=float) * 0.1, - "MCyoff": np.arange(10, dtype=float) * 0.1, - "MCe0": np.ones(10) * 100.0, - "DispTelList_T": [np.array([0, 1])] * 10, - "Disp_T": [np.array([1.0, 2.0])] * 10, - "cosphi": [np.array([0.8, 0.6])] * 10, - "sinphi": [np.array([0.6, 0.8])] * 10, - "loss": [np.array([0.1, 0.2])] * 10, - "dist": [np.array([1.0, 2.0])] * 10, - "width": [np.array([0.5, 0.6])] * 10, - "length": [np.array([2.0, 3.0])] * 10, - "size": [np.array([100.0, 200.0])] * 10, - "E": [np.array([10.0, 20.0])] * 10, - "ES": [np.array([5.0, 10.0])] * 10, - "Xoff": np.ones(10), - "Yoff": np.ones(10) * 3.0, - "Xoff_intersect": np.ones(10) * 0.9, - "Yoff_intersect": np.ones(10) * 2.9, - "Erec": np.ones(10) * 10.0, - "ErecS": np.ones(10) * 5.0, - "EmissionHeight": np.ones(10) * 100.0, - } - ) - - mock_tree = mocker.MagicMock() - mock_tree.arrays.return_value = df_raw - mock_root_file = mocker.MagicMock() - mock_root_file.__enter__.return_value = {"data": mock_tree} - mock_root_file.__exit__.return_value = None - mocker.patch("uproot.open", return_value=mock_root_file) - - result = load_training_data(["dummy.root"], n_tel=2, max_events=max_events) - assert len(result) <= expected_max_rows - - -def test_load_training_data_handles_errors(mocker): - """Test load_training_data handles file read exceptions.""" - mocker.patch("uproot.open", side_effect=Exception("File read error")) - result = load_training_data(["dummy.root"], n_tel=2, max_events=100) - assert result.empty - - -def test_load_training_data_multiple_files(mocker): - """Test load_training_data concatenates multiple files.""" - df1 = pd.DataFrame( - { - "DispNImages": [2] * 2, - "MCxoff": [0.1, 0.2], - "MCyoff": [0.5, 0.6], - "MCe0": [100.0, 150.0], - "DispTelList_T": [np.array([0, 1])] * 2, - "Disp_T": [np.array([1.0, 2.0])] * 2, - "cosphi": [np.array([0.8, 0.6])] * 2, - "sinphi": [np.array([0.6, 0.8])] * 2, - "loss": [np.array([0.1, 0.2])] * 2, - "dist": [np.array([1.0, 2.0])] * 2, - "width": [np.array([0.5, 0.6])] * 2, - "length": [np.array([2.0, 3.0])] * 2, - "size": [np.array([100.0, 200.0])] * 2, - "E": [np.array([10.0, 20.0])] * 2, - "ES": [np.array([5.0, 10.0])] * 2, - "Xoff": [1.0] * 2, - "Yoff": [3.0] * 2, - "Xoff_intersect": [0.9] * 2, - "Yoff_intersect": [2.9] * 2, - "Erec": [10.0] * 2, - "ErecS": [5.0] * 2, - "EmissionHeight": [100.0] * 2, - } - ) - df2 = df1.iloc[:1].copy() - df2.loc[0, "MCe0"] = 200.0 - - call_count = [0] - - def mock_arrays(*args, **kwargs): - call_count[0] += 1 - return df1 if call_count[0] == 1 else df2 - - mock_tree = mocker.MagicMock() - mock_tree.arrays.side_effect = mock_arrays - mock_root_file = mocker.MagicMock() - mock_root_file.__enter__.return_value = {"data": mock_tree} - mock_root_file.__exit__.return_value = None - mocker.patch("uproot.open", return_value=mock_root_file) - - result = load_training_data(["dummy1.root", "dummy2.root"], n_tel=2, max_events=-1) - assert len(result) == 3 - - -def test_load_training_data_computes_log_mce0(mocker): - """Test load_training_data correctly computes log10 of MCe0.""" - df_raw = pd.DataFrame( - { - "DispNImages": [2], - "MCxoff": [0.1], - "MCyoff": [0.5], - "MCe0": [100.0], - "DispTelList_T": [np.array([0, 1])], - "Disp_T": [np.array([1.0, 2.0])], - "cosphi": [np.array([0.8, 0.6])], - "sinphi": [np.array([0.6, 0.8])], - "loss": [np.array([0.1, 0.2])], - "dist": [np.array([1.0, 2.0])], - "width": [np.array([0.5, 0.6])], - "length": [np.array([2.0, 3.0])], - "size": [np.array([100.0, 200.0])], - "E": [np.array([10.0, 20.0])], - "ES": [np.array([5.0, 10.0])], - "Xoff": [1.0], - "Yoff": [3.0], - "Xoff_intersect": [0.9], - "Yoff_intersect": [2.9], - "Erec": [10.0], - "ErecS": [5.0], - "EmissionHeight": [100.0], - } - ) - - mock_tree = mocker.MagicMock() - mock_tree.arrays.return_value = df_raw - mock_root_file = mocker.MagicMock() - mock_root_file.__enter__.return_value = {"data": mock_tree} - mock_root_file.__exit__.return_value = None - mocker.patch("uproot.open", return_value=mock_root_file) - - result = load_training_data(["dummy.root"], n_tel=2, max_events=-1) - assert "MCe0" in result.columns - assert result["MCe0"].iloc[0] == pytest.approx(np.log10(100.0)) diff --git a/tests/unit_tests/test_evaluate.py b/tests/unit_tests/test_evaluate.py deleted file mode 100644 index 9b0e3c2..0000000 --- a/tests/unit_tests/test_evaluate.py +++ /dev/null @@ -1,285 +0,0 @@ -"""Unit tests for model evaluation.""" - -import logging -from unittest.mock import MagicMock - -import numpy as np -import pandas as pd -import pytest - -from eventdisplay_ml.evaluate import ( - calculate_resolution, - evaluate_model, - feature_importance, - shap_feature_importance, -) - -rng = np.random.default_rng(0) - - -# ============================================================================ -# SHAP Feature Importance Tests (consolidated from 3 functions) -# ============================================================================ - - -@pytest.mark.parametrize( - ("n_targets", "n_samples"), - [ - (1, 100), - (2, 50), - ], -) -def test_shap_feature_importance(caplog, n_targets, n_samples): - """Test shap_feature_importance with various target counts.""" - caplog.set_level(logging.INFO) - - # Create mock model with appropriate estimators - mock_model = MagicMock() - estimators = [] - for _ in range(n_targets): - mock_est = MagicMock() - mock_booster = MagicMock() - mock_booster.predict.return_value = np.hstack( - [rng.random((n_samples, 4)), rng.random((n_samples, 1))] - ) - mock_est.get_booster.return_value = mock_booster - estimators.append(mock_est) - mock_model.estimators_ = estimators - - x_sample_data = pd.DataFrame({f"feature_{i}": rng.random(n_samples) for i in range(3)}) - target_names = [f"target_{i}" for i in range(n_targets)] - - shap_feature_importance(mock_model, x_sample_data, target_names, max_points=100, n_top=2) - - for target in target_names: - assert f"Builtin XGBoost SHAP Importance for {target}" in caplog.text - - -# ============================================================================ -# Feature Importance Tests (consolidated from 3 functions) -# ============================================================================ - - -@pytest.mark.parametrize( - ("n_targets", "n_features"), - [ - (1, 3), - (2, 4), - ], -) -def test_feature_importance(caplog, n_targets, n_features): - """Test feature_importance with various feature/target counts.""" - caplog.set_level(logging.INFO) - - mock_model = MagicMock() - estimators = [] - rng = np.random.default_rng(42) - for _ in range(n_targets): - mock_est = MagicMock() - mock_est.feature_importances_ = rng.random(n_features) - estimators.append(mock_est) - mock_model.estimators_ = estimators - - x_cols = [f"feature_{i}" for i in range(n_features)] - target_names = [f"target_{i}" for i in range(n_targets)] - - feature_importance(mock_model, x_cols, target_names, name="test_model") - - assert "XGBoost Multi-Regression Feature Importance" in caplog.text - for target in target_names: - assert f"Importance for Target: **{target}**" in caplog.text - - -def test_feature_importance_sorted(caplog): - """Test feature_importance sorts features by importance.""" - caplog.set_level(logging.INFO) - - mock_est = MagicMock() - mock_est.feature_importances_ = np.array([0.1, 0.5, 0.3, 0.1]) - mock_model = MagicMock() - mock_model.estimators_ = [mock_est] - - x_cols = ["low_1", "high", "medium", "low_2"] - target_names = ["target"] - - feature_importance(mock_model, x_cols, target_names) - - log_text = caplog.text - high_pos = log_text.find("high") - medium_pos = log_text.find("medium") - assert high_pos < medium_pos - - -# ============================================================================ -# Resolution Calculation Tests (consolidated from 4 functions) -# ============================================================================ - - -@pytest.mark.parametrize( - ("n_bins", "percentiles"), - [ - (2, [50, 68]), - (3, [50, 68, 90]), - (1, [68, 90, 95]), - ], -) -def test_calculate_resolution(caplog, n_bins, percentiles): - """Test calculate_resolution with various binning and percentile configurations.""" - caplog.set_level(logging.INFO) - - y_pred = np.array( - [ - [0.1, 0.2, 1.0], - [0.15, 0.25, 1.1], - [0.2, 0.3, 0.9], - [0.05, 0.1, 1.2], - ] - ) - y_test = pd.DataFrame( - { - "MCxoff": [0.0, 0.0, 0.0, 0.0], - "MCyoff": [0.0, 0.0, 0.0, 0.0], - }, - index=[0, 1, 2, 3], - ) - df = pd.DataFrame({"MCe0": [0.5, 0.8, 1.0, 1.5]}, index=[0, 1, 2, 3]) - - calculate_resolution( - y_pred, - y_test, - df, - percentiles=percentiles, - log_e_min=0, - log_e_max=2, - n_bins=n_bins, - name="test", - ) - - assert "DeltaTheta Resolution" in caplog.text - assert "DeltaMCe0 Resolution" in caplog.text - for perc in percentiles: - assert f"Theta_{perc}%" in caplog.text - - -def test_calculate_resolution_deltas_computed_correctly(caplog): - """Test delta computations in calculate_resolution.""" - caplog.set_level(logging.INFO) - - # Known case: differences = sqrt(0.1^2 + 0.2^2) = sqrt(0.05) - y_pred = np.array([[0.1, 0.2, 1.0], [0.0, 0.0, 1.0]]) - y_test = pd.DataFrame({"MCxoff": [0.0, 0.0], "MCyoff": [0.0, 0.0]}, index=[0, 1]) - df = pd.DataFrame({"MCe0": [1.0, 1.0]}, index=[0, 1]) - - calculate_resolution( - y_pred, y_test, df, percentiles=[50], log_e_min=0.5, log_e_max=1.5, n_bins=1, name="test" - ) - - assert "Theta_50%" in caplog.text - assert "DeltaE" in caplog.text - - -# ============================================================================ -# Model Evaluation Tests (consolidated from 5 functions) -# ============================================================================ - - -def test_evaluate_model_basic(caplog): - """Test evaluate_model logs R^2 score and metrics.""" - caplog.set_level(logging.INFO) - - mock_model = MagicMock() - mock_model.score.return_value = 0.85 - mock_model.predict.return_value = np.array([[0.1, 0.2, 1.0], [0.15, 0.25, 1.1]]) - - mock_est1 = MagicMock() - mock_est1.feature_importances_ = np.array([0.5, 0.3, 0.2]) - mock_est2 = MagicMock() - mock_est2.feature_importances_ = np.array([0.4, 0.4, 0.2]) - mock_model.estimators_ = [mock_est1, mock_est2] - - x_test = pd.DataFrame( - { - "feat_1": [1.0, 2.0], - "feat_2": [3.0, 4.0], - "feat_3": [5.0, 6.0], - }, - index=[0, 1], - ) - y_test = pd.DataFrame( - { - "MCxoff": [0.0, 0.0], - "MCyoff": [0.0, 0.0], - }, - index=[0, 1], - ) - df = pd.DataFrame({"MCe0": [1.0, 1.1]}, index=[0, 1]) - y_data = pd.DataFrame({"target_1": [1, 2], "target_2": [3, 4]}) - - evaluate_model( - mock_model, x_test, y_test, df, ["feat_1", "feat_2", "feat_3"], y_data, "test_model" - ) - - assert "XGBoost Multi-Target R^2 Score (Testing Set): 0.8500" in caplog.text - assert "test_model MSE (X_off):" in caplog.text - assert "test_model MAE (X_off):" in caplog.text - assert "test_model MAE (Y_off):" in caplog.text - - -@pytest.mark.parametrize( - ("model_name", "has_xgb"), - [ - ("xgboost", True), - ("random_forest", False), - ], -) -def test_evaluate_model_shap_conditional(caplog, model_name, has_xgb): - """Test evaluate_model calls SHAP only for XGBoost models.""" - caplog.set_level(logging.INFO) - - mock_model = MagicMock() - mock_model.score.return_value = 0.8 - mock_model.predict.return_value = np.array([[0.1, 0.2, 1.0]]) - - mock_est = MagicMock() - mock_est.feature_importances_ = np.array([0.5, 0.3, 0.2]) - if has_xgb: - mock_booster = MagicMock() - mock_booster.predict.return_value = rng.random((1, 4)) - mock_est.get_booster.return_value = mock_booster - mock_model.estimators_ = [mock_est] - - x_test = pd.DataFrame({"x": [1.0], "y": [2.0], "z": [3.0]}, index=[0]) - y_test = pd.DataFrame({"MCxoff": [0.0], "MCyoff": [0.0]}, index=[0]) - df = pd.DataFrame({"MCe0": [1.0]}, index=[0]) - y_data = pd.DataFrame({"target": [1]}) - - evaluate_model(mock_model, x_test, y_test, df, ["x", "y", "z"], y_data, model_name) - - if has_xgb: - assert "Builtin XGBoost SHAP Importance" in caplog.text - else: - assert "Builtin XGBoost SHAP Importance" not in caplog.text - - -def test_evaluate_model_calls_resolution(caplog): - """Test evaluate_model calls calculate_resolution.""" - caplog.set_level(logging.INFO) - - mock_model = MagicMock() - mock_model.score.return_value = 0.82 - mock_model.predict.return_value = np.array([[0.05, 0.1, 1.0], [0.08, 0.12, 1.1]]) - - mock_est = MagicMock() - mock_est.feature_importances_ = np.array([0.5, 0.3, 0.2]) - mock_model.estimators_ = [mock_est] - - x_test = pd.DataFrame({"m": [1.0, 2.0], "n": [3.0, 4.0], "o": [5.0, 6.0]}, index=[0, 1]) - y_test = pd.DataFrame({"MCxoff": [0.0, 0.0], "MCyoff": [0.0, 0.0]}, index=[0, 1]) - df = pd.DataFrame({"MCe0": [0.5, 1.0]}, index=[0, 1]) - y_data = pd.DataFrame({"target": [1, 2]}) - - evaluate_model(mock_model, x_test, y_test, df, ["m", "n", "o"], y_data, "test_model") - - assert "DeltaTheta Resolution vs. Log10(MCe0)" in caplog.text - assert "DeltaMCe0 Resolution vs. Log10(MCe0)" in caplog.text - assert "Calculated over 6 bins between Log10(E) = -1 and 2" in caplog.text diff --git a/tests/unit_tests/test_training_variables.py b/tests/unit_tests/test_training_variables.py deleted file mode 100644 index 83dbe5e..0000000 --- a/tests/unit_tests/test_training_variables.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Unit tests for training variables selection utilities.""" - -import eventdisplay_ml.training_variables - - -def test_xgb_per_telescope_training_variables(): - """Ensure per-telescope training variables are provided as a list and include expected keys.""" - variables = eventdisplay_ml.training_variables.xgb_per_telescope_training_variables() - assert isinstance(variables, list) - assert "Disp_T" in variables - assert "R_core" in variables - - -def test_xgb_array_training_variables(): - """Ensure array-level training variables include array metadata fields.""" - variables = eventdisplay_ml.training_variables.xgb_array_training_variables() - assert isinstance(variables, list) - assert "DispNImages" in variables - assert "EmissionHeight" in variables - - -def test_xgb_all_training_variables(): - """Ensure combined training variables include per-telescope and array-level fields.""" - variables = eventdisplay_ml.training_variables.xgb_all_training_variables() - assert isinstance(variables, list) - assert "Disp_T" in variables - assert "R_core" in variables - assert "DispNImages" in variables - assert "EmissionHeight" in variables diff --git a/tests/unit_tests/test_utils.py b/tests/unit_tests/test_utils.py deleted file mode 100644 index d640fc1..0000000 --- a/tests/unit_tests/test_utils.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Unit tests for utility helpers such as input file list reader.""" - -import pytest - -from eventdisplay_ml.utils import parse_image_selection, read_input_file_list - - -def test_read_input_file_list_success(tmp_path): - """Test successful reading of input file list.""" - test_file = tmp_path / "input_files.txt" - test_files = ["file1.txt", "file2.txt", "file3.txt"] - test_file.write_text("\n".join(test_files)) - - result = read_input_file_list(str(test_file)) - assert result == test_files - - -def test_read_input_file_list_with_empty_lines(tmp_path): - """Test reading file list with empty lines.""" - test_file = tmp_path / "input_files.txt" - content = "file1.txt\n\nfile2.txt\n \nfile3.txt\n" - test_file.write_text(content) - - result = read_input_file_list(str(test_file)) - assert result == ["file1.txt", "file2.txt", "file3.txt"] - - -def test_read_input_file_list_with_whitespace(tmp_path): - """Test reading file list with leading/trailing whitespace.""" - test_file = tmp_path / "input_files.txt" - content = " file1.txt \nfile2.txt\t\n file3.txt" - test_file.write_text(content) - - result = read_input_file_list(str(test_file)) - assert result == ["file1.txt", "file2.txt", "file3.txt"] - - -def test_read_input_file_list_empty_file(tmp_path): - """Test reading empty file.""" - test_file = tmp_path / "input_files.txt" - test_file.write_text("") - - result = read_input_file_list(str(test_file)) - assert result == [] - - -def test_read_input_file_list_file_not_found(): - """Test FileNotFoundError is raised when file does not exist.""" - with pytest.raises(FileNotFoundError, match="Error: Input file list not found"): - read_input_file_list("/nonexistent/path/file.txt") - - -def test_parse_image_selection_comma_separated(): - """Test parsing comma-separated indices.""" - result = parse_image_selection("1, 2, 3") - assert result == [1, 2, 3] - - -def test_parse_image_selection_bit_coded(): - """Test parsing bit-coded value.""" - result = parse_image_selection("14") # 0b1110 -> indices 1, 2, 3 - assert result == [1, 2, 3] - - -def test_parse_image_selection_empty_string(): - """Test parsing empty string returns None.""" - result = parse_image_selection("") - assert result is None - - -def test_parse_image_selection_invalid_comma_separated(): - """Test ValueError is raised for invalid comma-separated input.""" - with pytest.raises(ValueError, match="Invalid image_selection format"): - parse_image_selection("1, two, 3") - - -def test_parse_image_selection_invalid_bit_coded(): - """Test ValueError is raised for invalid bit-coded input.""" - with pytest.raises(ValueError, match="Invalid image_selection format"): - parse_image_selection("invalid")