From 70b6191799722f12bb52ecea8f333a69c8006d93 Mon Sep 17 00:00:00 2001 From: cdunn314 Date: Thu, 28 Aug 2025 17:59:25 -0400 Subject: [PATCH 01/12] Add efficiency error --- .../activation_foils/calibration.py | 31 +++++++++++++++++++ .../activation_foils/compass.py | 20 ++++++++++-- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/libra_toolbox/neutron_detection/activation_foils/calibration.py b/libra_toolbox/neutron_detection/activation_foils/calibration.py index b98dfde..0f19dae 100644 --- a/libra_toolbox/neutron_detection/activation_foils/calibration.py +++ b/libra_toolbox/neutron_detection/activation_foils/calibration.py @@ -133,6 +133,7 @@ class CheckSource: nuclide: Nuclide activity_date: datetime.date activity: float + activity_error: float = 0.0 """ Class to hold the information of a check source. @@ -175,6 +176,36 @@ def get_expected_activity(self, date: datetime.date) -> float: time = (date - activity_datetime).total_seconds() act_expec = self.activity * np.exp(-decay_constant * time) return act_expec + + def get_expected_activity_error(self, date: datetime.date) -> float: + """Returns the expected activity error of the check source at a given date. + Time is assumed to have no error in its measurement. + + Args: + date: the date to calculate the expected activity error for. + + Returns: + the expected activity error of the check source in Bq + """ + + decay_constant = np.log(2) / self.nuclide.half_life + + # Convert date to datetime if needed + if isinstance(self.activity_date, datetime.date) and not isinstance( + self.activity_date, datetime.datetime + ): + + activity_datetime = datetime.datetime.combine( + self.activity_date, datetime.datetime.min.time() + ) + # add a timezone + activity_datetime = activity_datetime.replace(tzinfo=date.tzinfo) + else: + activity_datetime = self.activity_date + + time = (date - activity_datetime).total_seconds() + act_expec = self.activity_error * np.exp(-decay_constant * time) + return act_expec @dataclass diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index ddba183..5ae5735 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -415,6 +415,7 @@ def compute_detection_efficiency( calibration_coeffs: np.ndarray, channel_nb: int, search_width: float = 800, + compute_error: bool = False ) -> Union[np.ndarray, float]: """ Computes the detection efficiency of a check source given the @@ -456,7 +457,6 @@ def compute_detection_efficiency( ) nb_counts_measured = np.array(nb_counts_measured) - nb_counts_measured_err = np.sqrt(nb_counts_measured) # assert that all numbers in nb_counts_measured are > 0 assert np.all( @@ -483,7 +483,23 @@ def compute_detection_efficiency( detection_efficiency = nb_counts_measured / expected_nb_counts - return detection_efficiency + if compute_error: + nb_counts_measured_err = np.sqrt(nb_counts_measured) + act_expec_err = self.check_source.get_expected_activity_error(self.start_time) + gamma_rays_expected_err = act_expec_err * ( + np.array(self.check_source.nuclide.intensity) + ) + expected_nb_counts_err = gamma_rays_expected_err / decay_constant + expected_nb_counts_err *= ( + live_count_time_correction_factor * decay_counting_correction_factor + ) + detection_efficiency_err = detection_efficiency * np.sqrt( + (nb_counts_measured_err / nb_counts_measured) ** 2 + + (expected_nb_counts_err / expected_nb_counts) ** 2 + ) + return detection_efficiency, detection_efficiency_err + else: + return detection_efficiency def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: """Returns the peak indices of the histogram From d50a920d9f8a6be654f4f7c39ddc6b6dfc4adfa7 Mon Sep 17 00:00:00 2001 From: cdunn314 Date: Fri, 29 Aug 2025 12:38:08 -0400 Subject: [PATCH 02/12] Updated gamma_emitted_err with efficiency_err --- .../neutron_detection/activation_foils/compass.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index 5ae5735..d8c3a70 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -485,6 +485,7 @@ def compute_detection_efficiency( if compute_error: nb_counts_measured_err = np.sqrt(nb_counts_measured) + act_expec_err = self.check_source.get_expected_activity_error(self.start_time) gamma_rays_expected_err = act_expec_err * ( np.array(self.check_source.nuclide.intensity) @@ -493,6 +494,7 @@ def compute_detection_efficiency( expected_nb_counts_err *= ( live_count_time_correction_factor * decay_counting_correction_factor ) + detection_efficiency_err = detection_efficiency * np.sqrt( (nb_counts_measured_err / nb_counts_measured) ** 2 + (expected_nb_counts_err / expected_nb_counts) ** 2 @@ -566,6 +568,8 @@ def get_gamma_emitted( calibration_coeffs, channel_nb: int, search_width: float = 800, + detection_efficiency: float = None, + detection_efficiency_err: float = 0.0, ): # find right background detector @@ -590,10 +594,15 @@ def get_gamma_emitted( nb_counts_measured = np.array(nb_counts_measured) nb_counts_measured_err = np.sqrt(nb_counts_measured) - detection_efficiency = np.polyval(efficiency_coeffs, energy) + if detection_efficiency is None: + detection_efficiency = np.polyval(efficiency_coeffs, energy) gamma_emmitted = nb_counts_measured / detection_efficiency - gamma_emmitted_err = nb_counts_measured_err / detection_efficiency + gamma_emmitted_err = gamma_emmitted * np.sqrt( + (nb_counts_measured_err / nb_counts_measured) ** 2 + + (detection_efficiency_err / detection_efficiency) ** 2 + ) + return gamma_emmitted, gamma_emmitted_err def get_neutron_flux( From ba06a53e1143927560763253b8be1b94b33a338c Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Tue, 3 Mar 2026 15:33:56 -0500 Subject: [PATCH 03/12] Added bins from settings.xml file --- .../activation_foils/compass.py | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index 853d064..8d57763 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -8,6 +8,8 @@ import uproot import glob import h5py +import xml.etree.ElementTree as ET +import re import warnings from libra_toolbox.neutron_detection.activation_foils.calibration import ( @@ -224,6 +226,19 @@ def from_directory( else: root_filename = None print("No root file found, assuming all counts are live") + + # Get energy channel bins for each detector from the settings.xml file if it exists, otherwise assume 4096 bins + # search for settings.xml file in source_dir + settings_file = Path(source_dir).parent / "settings.xml" + if os.path.isfile(settings_file): + energy_bins = get_spectrum_nbins(settings_file) + print(f"Found settings.xml file. Using energy bins from file: {energy_bins}") + else: + print("No settings.xml file found. Assuming 4096 energy bins.") + print(os.listdir(Path(source_dir).parent)) + energy_bins = None + if not energy_bins: + energy_bins = 4096 for detector in detectors: detector.events = np.column_stack( @@ -247,6 +262,7 @@ def from_directory( ) * ps_to_seconds detector.live_count_time = live_count_time detector.real_count_time = real_count_time + detector.nb_digitizer_bins = energy_bins measurement_object.detectors = detectors @@ -288,6 +304,7 @@ def to_h5(self, filename: str, mode: str = "w", spectrum_only=False) -> None: detector_group.attrs["live_count_time"] = detector.live_count_time detector_group.attrs["real_count_time"] = detector.real_count_time + detector_group.attrs["nb_digitizer_bins"] = detector.nb_digitizer_bins @classmethod def from_h5( @@ -346,6 +363,8 @@ def from_h5( detector.real_count_time = detector_group.attrs[ "real_count_time" ] + + detector.nb_digitizer_bins = detector_group.attrs.get("nb_digitizer_bins", 4096) if "spectrum" in detector_group: detector._spectrum = detector_group["spectrum"][:] @@ -1178,3 +1197,34 @@ def get_live_time_from_root(root_filename: str, channel: int) -> Tuple[float, fl live_count_time = root_file[f"LiveTime_{channel}"].members["fMilliSec"] / 1000 real_count_time = root_file[f"RealTime_{channel}"].members["fMilliSec"] / 1000 return live_count_time, real_count_time + + +def get_spectrum_nbins(settings_file: str) -> int: + """ + Read a settings.xml file and extract the number of spectrum bins + from the SRV_PARAM_CH_SPECTRUM_NBINS parameter. + + Args: + settings_file: Path to the settings.xml file + + Returns: + The number of bins as an integer (e.g., 16384 from "BINS_16384") + """ + tree = ET.parse(settings_file) + root = tree.getroot() + + # Find entry with the matching key + for entry in root.iter('entry'): + key_elem = entry.find('key') + if key_elem is not None and key_elem.text == 'SRV_PARAM_CH_SPECTRUM_NBINS': + # Get the first value element (contains the actual value) + value_container = entry.find('value') + if value_container is not None: + value_elem = value_container.find('value') + if value_elem is not None and value_elem.text: + # Extract number from "BINS_16384" format + match = re.search(r'BINS_(\d+)', value_elem.text) + if match: + return int(match.group(1)) + print(f"SRV_PARAM_CH_SPECTRUM_NBINS not found in settings file {settings_file}. Defaulting to None bins.") + return None From 2e3eae0c872b9d3a98556f65f84775c997a4a19a Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Tue, 3 Mar 2026 15:49:28 -0500 Subject: [PATCH 04/12] initial labr category in get_peaks --- .../neutron_detection/activation_foils/compass.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index 853d064..16bd565 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -597,6 +597,13 @@ def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: height = 0.7 * np.max(hist[start_index:]) prominence = 0.7 * np.max(hist[start_index:]) distance = 100 + elif self.detector_type.lower() == 'labr': + # peak finding parameters for LaBr3 detectors + start_index = 10 + prominence = 0.30 * np.max(hist[start_index:]) + height = 0.30 * np.max(hist[start_index:]) + width = [5, 100] + distance = 50 else: raise ValueError( f"Unknown detector type: {self.detector_type}. Supported types are 'NaI' and 'HPGe'." From 9095c8bce8f0f37c13e6c8a4908955b37597c72c Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Tue, 3 Mar 2026 16:14:47 -0500 Subject: [PATCH 05/12] Changed labr peak width --- libra_toolbox/neutron_detection/activation_foils/compass.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index abc0761..0e24a3d 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -621,7 +621,7 @@ def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: start_index = 10 prominence = 0.30 * np.max(hist[start_index:]) height = 0.30 * np.max(hist[start_index:]) - width = [5, 100] + width = [5, 150] distance = 50 else: raise ValueError( @@ -638,6 +638,7 @@ def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: # run the peak finding algorithm # NOTE: the start_index is used to ignore the low energy region + print("start_index:", start_index, "prominence:", prominence, "height:", height, "width:", width, "distance:", distance) peaks, peak_data = find_peaks( hist[start_index:], prominence=prominence, From 8c69dae90e8abe230ce4448ee7e85346f7b8f2a7 Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Tue, 3 Mar 2026 16:29:06 -0500 Subject: [PATCH 06/12] Added kwargs for check sources --- .../activation_foils/compass.py | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index 0e24a3d..a11f168 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -19,6 +19,7 @@ co60, ba133, mn54, + cs137 ) from libra_toolbox.neutron_detection.activation_foils.explicit import get_chain @@ -618,11 +619,29 @@ def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: distance = 100 elif self.detector_type.lower() == 'labr': # peak finding parameters for LaBr3 detectors - start_index = 10 + start_index = 1000 prominence = 0.30 * np.max(hist[start_index:]) height = 0.30 * np.max(hist[start_index:]) width = [5, 150] - distance = 50 + distance = 20 + if self.check_source.nuclide == na22: + start_index = 1500 + height = 0.1 * np.max(hist[start_index:]) + prominence = 0.1 * np.max(hist[start_index:]) + width = None + elif self.check_source.nuclide == ba133: + start_index = 1300 + prominence = np.max(hist[start_index:]) * 0.05, + height = np.max(hist[start_index:]) * 0.1, + elif self.check_source.nuclide == co60: + start_index = 2000 + prominence = np.max(hist[start_index:]) * 0.5, + height = np.max(hist[start_index:]) * 0.5, + elif self.check_source.nuclide == cs137: + start_index = 1500 + prominence = np.max(hist[start_index:]) * 0.5 + height = np.max(hist[start_index:]) * 0.5 + width = [5, 200] else: raise ValueError( f"Unknown detector type: {self.detector_type}. Supported types are 'NaI' and 'HPGe'." From a652acfe492e6156a9f714b1a1e1159082949e59 Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Tue, 3 Mar 2026 18:31:31 -0500 Subject: [PATCH 07/12] get_peaks changes with total channels --- .../activation_foils/compass.py | 53 ++++++++++--------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index a11f168..d620830 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -569,79 +569,82 @@ def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: the peak indices in ``hist`` """ + # get total number of channels for scaling + total_channels = self.detectors[0].nb_digitizer_bins + channel_multiplier = int(total_channels / 4096) # Assuming 4096 channels is the base case if self.detector_type.lower() == 'nai': # peak finding parameters - start_index = 100 + start_index = int(100 * channel_multiplier) prominence = 0.10 * np.max(hist[start_index:]) height = 0.10 * np.max(hist[start_index:]) - width = [10, 150] - distance = 30 + width = [int(10 * channel_multiplier), int(150 * channel_multiplier)] + distance = int(30 * channel_multiplier) if self.check_source.nuclide == na22: - start_index = 100 + start_index = int(100 * channel_multiplier) height = 0.1 * np.max(hist[start_index:]) prominence = 0.1 * np.max(hist[start_index:]) - width = [10, 150] - distance = 30 + width = [int(10 * channel_multiplier), int(150 * channel_multiplier)] + distance = int(30 * channel_multiplier) elif self.check_source.nuclide == co60: - start_index = 400 + start_index = int(400 * channel_multiplier) height = 0.60 * np.max(hist[start_index:]) prominence = None elif self.check_source.nuclide == ba133: - start_index = 150 + start_index = int(150 * channel_multiplier) height = 0.10 * np.max(hist[start_index:]) prominence = 0.10 * np.max(hist[start_index:]) elif self.check_source.nuclide == mn54: height = 0.6 * np.max(hist[start_index:]) elif self.detector_type.lower() == 'hpge': # peak finding parameters for HPGe detectors - start_index = 10 + start_index = int(10 * channel_multiplier) prominence = 0.50 * np.max(hist[start_index:]) height = 0.50 * np.max(hist[start_index:]) - width = [2, 50] - distance = 100 + width = [int(2 * channel_multiplier), int(50 * channel_multiplier)] + distance = int(100 * channel_multiplier) if self.check_source.nuclide == na22: - start_index = 100 + start_index = int(100 * channel_multiplier) height = 0.4 * np.max(hist[start_index:]) prominence = 0.4 * np.max(hist[start_index:]) - distance = 100 + distance = int(100 * channel_multiplier) elif self.check_source.nuclide == co60: height = 0.5 * np.max(hist[start_index:]) prominence = 0.5 * np.max(hist[start_index:]) elif self.check_source.nuclide == ba133: - start_index = 150 + start_index = int(150 * channel_multiplier) height = 0.10 * np.max(hist[start_index:]) prominence = 0.10 * np.max(hist[start_index:]) - distance = 10 + distance = int(10 * channel_multiplier) elif self.check_source.nuclide == mn54: - start_index = 400 + start_index = int(400 * channel_multiplier) height = 0.7 * np.max(hist[start_index:]) prominence = 0.7 * np.max(hist[start_index:]) - distance = 100 + distance = int(100 * channel_multiplier) elif self.detector_type.lower() == 'labr': # peak finding parameters for LaBr3 detectors - start_index = 1000 + start_index = int(250 * channel_multiplier) prominence = 0.30 * np.max(hist[start_index:]) height = 0.30 * np.max(hist[start_index:]) - width = [5, 150] - distance = 20 + width = [int(1 * channel_multiplier), int(40 * channel_multiplier)] + distance = int(10 * channel_multiplier) if self.check_source.nuclide == na22: - start_index = 1500 + start_index = int(400 * channel_multiplier) height = 0.1 * np.max(hist[start_index:]) prominence = 0.1 * np.max(hist[start_index:]) width = None elif self.check_source.nuclide == ba133: - start_index = 1300 + start_index = int(300 * channel_multiplier) prominence = np.max(hist[start_index:]) * 0.05, height = np.max(hist[start_index:]) * 0.1, elif self.check_source.nuclide == co60: - start_index = 2000 + start_index = int(500 * channel_multiplier) prominence = np.max(hist[start_index:]) * 0.5, height = np.max(hist[start_index:]) * 0.5, elif self.check_source.nuclide == cs137: - start_index = 1500 + start_index = int(400 * channel_multiplier) prominence = np.max(hist[start_index:]) * 0.5 height = np.max(hist[start_index:]) * 0.5 - width = [5, 200] + width = [int(1 * channel_multiplier), int(50 * channel_multiplier)] else: raise ValueError( f"Unknown detector type: {self.detector_type}. Supported types are 'NaI' and 'HPGe'." From c2037f6c127fbad9540efdaf3d48afd33e34215a Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Tue, 3 Mar 2026 18:38:01 -0500 Subject: [PATCH 08/12] Added comma --- libra_toolbox/neutron_detection/activation_foils/compass.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index 9186895..267292a 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -473,7 +473,7 @@ def compute_detection_efficiency( calibration_coeffs: np.ndarray, channel_nb: int, search_width: float = 800, - compute_error: bool = False + compute_error: bool = False, threshold_overlap: float = 200, summing_method: str = 'sum_gaussian', ) -> Union[np.ndarray, float]: From bd841b88fb4b4a645e933dceacee3e8f2f837e0f Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Tue, 3 Mar 2026 18:48:21 -0500 Subject: [PATCH 09/12] removed print statement --- libra_toolbox/neutron_detection/activation_foils/compass.py | 1 - 1 file changed, 1 deletion(-) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index 267292a..4191054 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -678,7 +678,6 @@ def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: # run the peak finding algorithm # NOTE: the start_index is used to ignore the low energy region - print("start_index:", start_index, "prominence:", prominence, "height:", height, "width:", width, "distance:", distance) peaks, peak_data = find_peaks( hist[start_index:], prominence=prominence, From dc35ff31da41b813ee665422094e3566bb64363b Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Wed, 4 Mar 2026 11:41:12 -0500 Subject: [PATCH 10/12] Added *_info.txt compatibility --- .../activation_foils/compass.py | 191 ++++++++++++++++-- 1 file changed, 170 insertions(+), 21 deletions(-) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index 4191054..d7066cb 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -10,6 +10,7 @@ import h5py import xml.etree.ElementTree as ET import re +from zoneinfo import ZoneInfo import warnings from libra_toolbox.neutron_detection.activation_foils.calibration import ( @@ -221,12 +222,22 @@ def from_directory( detectors = [Detector(channel_nb=nb) for nb in time_values.keys()] # Get live and real count times + # First check if root files are present in the source directory and get live and real count times from there all_root_filenames = glob.glob(os.path.join(source_dir, "*.root")) if len(all_root_filenames) == 1: root_filename = all_root_filenames[0] else: root_filename = None - print("No root file found, assuming all counts are live") + + # if root file is not present, check for *_info.txt files and get live and real count times from there + if root_filename is None: + info_txt_filename = source_dir.parent / f"{source_dir.name}_info.txt" + if not os.path.isfile(info_txt_filename): + # if the *_info.txt file is not present, set to None, + # which will assume that the live count time is the time between the first and last event, + # and the real count time is the time between the start and stop time (if they are available) + info_txt_filename = None + # Get energy channel bins for each detector from the settings.xml file if it exists, otherwise assume 4096 bins # search for settings.xml file in source_dir @@ -253,16 +264,23 @@ def from_directory( detector.live_count_time = live_count_time detector.real_count_time = real_count_time else: - real_count_time = (stop_time - start_time).total_seconds() - # Assume first and last event correspond to start and stop time of live counts - # and convert from picoseconds to seconds - ps_to_seconds = 1e-12 - live_count_time = ( - time_values[detector.channel_nb][-1] - - time_values[detector.channel_nb][0] - ) * ps_to_seconds - detector.live_count_time = live_count_time - detector.real_count_time = real_count_time + if info_txt_filename: + live_count_time, real_count_time = get_live_time_from_info_txt( + info_txt_filename, detector.channel_nb + ) + detector.live_count_time = live_count_time + detector.real_count_time = real_count_time + else: + real_count_time = (stop_time - start_time).total_seconds() + # Assume first and last event correspond to start and stop time of live counts + # and convert from picoseconds to seconds + ps_to_seconds = 1e-12 + live_count_time = ( + time_values[detector.channel_nb][-1] + - time_values[detector.channel_nb][0] + ) * ps_to_seconds + detector.live_count_time = live_count_time + detector.real_count_time = real_count_time detector.nb_digitizer_bins = energy_bins measurement_object.detectors = detectors @@ -476,6 +494,7 @@ def compute_detection_efficiency( compute_error: bool = False, threshold_overlap: float = 200, summing_method: str = 'sum_gaussian', + ax=None ) -> Union[np.ndarray, float]: """ Computes the detection efficiency of a check source given the @@ -526,6 +545,7 @@ def compute_detection_efficiency( search_width=search_width, threshold_overlap=threshold_overlap, summing_method=summing_method, + ax=ax ) nb_counts_measured = np.array(nb_counts_measured) @@ -960,7 +980,8 @@ def gauss(x, b, m, *args): def fit_peak_gauss(hist, xvals, peak_ergs, search_width=600, - threshold_overlap=200): + threshold_overlap=200, + ax=None): if len(peak_ergs) > 1: if np.max(peak_ergs) - np.min(peak_ergs) > threshold_overlap: @@ -990,6 +1011,11 @@ def fit_peak_gauss(hist, xvals, peak_ergs, search_width / (3 * len(peak_ergs)), ] + if ax: + print("Plotting initial guess...") + ax.plot(xvals[search_start:search_end], gauss(xvals[search_start:search_end], *guess_parameters), + '--', + label='Initial Guess') parameters, covariance = curve_fit( gauss, @@ -998,7 +1024,13 @@ def fit_peak_gauss(hist, xvals, peak_ergs, p0=guess_parameters, ) - print("Fitted parameters:", parameters) + if ax: + print("Plotting fitted curve...") + ax.plot(xvals[search_start:search_end], gauss(xvals[search_start:search_end], *parameters), + label='Fitted Curve') + ax.legend() + + return parameters, covariance @@ -1025,7 +1057,8 @@ def get_multipeak_area( [peak], search_width=search_width, threshold_overlap=threshold_overlap, - summing_method=summing_method + summing_method=summing_method, + ax=ax ) areas += area return areas @@ -1036,9 +1069,11 @@ def get_multipeak_area( parameters, covariance = fit_peak_gauss( - hist, xvals, peak_ergs, search_width=search_width + hist, xvals, peak_ergs, search_width=search_width, + ax=ax, ) + areas = [] peak_starts = [] peak_ends = [] @@ -1078,6 +1113,7 @@ def get_multipeak_area( areas += [area] + return areas @@ -1211,17 +1247,29 @@ def get_events(directory: str) -> Tuple[Dict[int, np.ndarray], Dict[int, np.ndar def get_start_stop_time(directory: str) -> Tuple[datetime.datetime, datetime.datetime]: - """Obtains count start and stop time from the run.info file.""" + """Obtains count start and stop time from the run.info or *_info.txt file. + Some versions of CoMPASS output a run.info file, while others output a *_info.txt file. + This function checks for both and reads the time information from the file that exists.""" + + run_info_file = Path(directory).parent / "run.info" + info_txt_file = Path(directory).parent / f"{Path(directory).stem}_info.txt" - info_file = Path(directory).parent / "run.info" - if info_file.exists(): - time_format = "%Y/%m/%d %H:%M:%S.%f%z" - with open(info_file, "r") as file: - lines = file.readlines() + if run_info_file.exists(): + start_time, stop_time = get_start_stop_time_from_run_info(run_info_file) + elif info_txt_file.exists(): + start_time, stop_time = get_start_stop_time_from_info_txt(info_txt_file) else: raise FileNotFoundError( f"Could not find run.info file in parent directory {Path(directory).parent}" ) + return start_time, stop_time + + +def get_start_stop_time_from_run_info(info_file: Path) -> Tuple[datetime.datetime, datetime.datetime]: + """Obtains count start and stop time from the run.info file.""" + time_format = "%Y/%m/%d %H:%M:%S.%f%z" + with open(info_file, "r") as file: + lines = file.readlines() start_time, stop_time = None, None for line in lines: @@ -1240,6 +1288,39 @@ def get_start_stop_time(directory: str) -> Tuple[datetime.datetime, datetime.dat return start_time, stop_time +def get_start_stop_time_from_info_txt(info_txt_file: Path, tz=ZoneInfo("America/New_York")) -> Tuple[datetime.datetime, datetime.datetime]: + """Obtains count start and stop time from the *_info.txt file. + + Args: + info_txt_file: Path to the info.txt file + tz: Timezone info for the datetime objects + + Returns: + Tuple of (start_time, stop_time) as timezone-aware datetime objects (America/New_York) + """ + + time_format = "%a %b %d %H:%M:%S %Y" + + with open(info_txt_file, "r") as file: + lines = file.readlines() + + start_time, stop_time = None, None + for line in lines: + if line.startswith("Start time = "): + time_string = line.split(" = ", 1)[1].strip() + start_time = datetime.datetime.strptime(time_string, time_format) + start_time = start_time.replace(tzinfo=tz) + elif line.startswith("Stop time = "): + time_string = line.split(" = ", 1)[1].strip() + stop_time = datetime.datetime.strptime(time_string, time_format) + stop_time = stop_time.replace(tzinfo=tz) + + if None in (start_time, stop_time): + raise ValueError(f"Could not find 'Start time' or 'Stop time' in file {info_txt_file}.") + + return start_time, stop_time + + def get_live_time_from_root(root_filename: str, channel: int) -> Tuple[float, float]: """ Gets live and real count time from Compass root file. @@ -1253,6 +1334,74 @@ def get_live_time_from_root(root_filename: str, channel: int) -> Tuple[float, fl return live_count_time, real_count_time +def get_live_time_from_info_txt(info_txt_file: Path, channel: int) -> Tuple[float, float]: + """ + Gets live and real count time in seconds from Compass *_info.txt file. + Live time is defined as the difference between the actual time that + a count is occurring and the "dead time," in which the output of detector + pulses is saturated such that additional signals cannot be processed. + + Args: + info_txt_file: Path to the info.txt file + channel: Channel number (e.g., 0 for CH0@, 1 for CH1@) + + Returns: + Tuple of (live_count_time, real_count_time) in seconds + """ + with open(info_txt_file, "r") as file: + lines = file.readlines() + + channel_header = f"CH{channel}@" + in_channel_section = False + live_count_time = None + real_count_time = None + + for line in lines: + if line.startswith(channel_header): + in_channel_section = True + continue + elif in_channel_section and line.startswith("CH") and "@" in line: + # We've reached the next channel section, stop searching + break + elif in_channel_section: + if "Real time = " in line: + # Extract time string like "0:03:14.665" + match = re.search(r"Real time = (\d+:\d{2}:\d{2}\.\d+)", line) + if match: + real_count_time = _parse_time_to_seconds(match.group(1)) + if "Live time = " in line: + match = re.search(r"Live time = (\d+:\d{2}:\d{2}\.\d+)", line) + if match: + live_count_time = _parse_time_to_seconds(match.group(1)) + + if live_count_time is not None and real_count_time is not None: + break + + if live_count_time is None or real_count_time is None: + raise ValueError( + f"Could not find Real time or Live time for channel {channel} in file {info_txt_file}." + ) + + return live_count_time, real_count_time + + +def _parse_time_to_seconds(time_str: str) -> float: + """ + Parse a time string in format "H:MM:SS.mmm" to seconds. + + Args: + time_str: Time string like "0:03:14.665" + + Returns: + Time in seconds as a float + """ + parts = time_str.split(":") + hours = int(parts[0]) + minutes = int(parts[1]) + seconds = float(parts[2]) + return hours * 3600 + minutes * 60 + seconds + + def get_spectrum_nbins(settings_file: str) -> int: """ Read a settings.xml file and extract the number of spectrum bins From 88d33233807f1f88bd7a1b5cb1adbe304a343c48 Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Wed, 4 Mar 2026 20:40:17 -0500 Subject: [PATCH 11/12] Added reaction type to reaction class --- libra_toolbox/neutron_detection/activation_foils/calibration.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libra_toolbox/neutron_detection/activation_foils/calibration.py b/libra_toolbox/neutron_detection/activation_foils/calibration.py index d2aacd2..114e990 100644 --- a/libra_toolbox/neutron_detection/activation_foils/calibration.py +++ b/libra_toolbox/neutron_detection/activation_foils/calibration.py @@ -103,6 +103,7 @@ class Reaction: reactant: Nuclide product: Nuclide cross_section: float + type: str = "(n,2n)" """ Class to hold the information of a reaction. Attributes @@ -113,6 +114,7 @@ class Reaction: The product of the reaction. cross_section : The cross section of the reaction in cm2. + type: The type of the reaction (default is "(n,2n)"). """ nb93_n2n = Reaction( From 1d554b06a0d194c35a9fe4f2f400f4c1105d7589 Mon Sep 17 00:00:00 2001 From: Collin Dunn Date: Wed, 4 Mar 2026 21:49:28 -0500 Subject: [PATCH 12/12] Added multipeak area plotter --- .../activation_foils/compass.py | 282 +++++++++++++++--- 1 file changed, 242 insertions(+), 40 deletions(-) diff --git a/libra_toolbox/neutron_detection/activation_foils/compass.py b/libra_toolbox/neutron_detection/activation_foils/compass.py index d7066cb..b001140 100644 --- a/libra_toolbox/neutron_detection/activation_foils/compass.py +++ b/libra_toolbox/neutron_detection/activation_foils/compass.py @@ -231,7 +231,7 @@ def from_directory( # if root file is not present, check for *_info.txt files and get live and real count times from there if root_filename is None: - info_txt_filename = source_dir.parent / f"{source_dir.name}_info.txt" + info_txt_filename = Path(source_dir).parent / f"{Path(source_dir).parent.stem}_info.txt" if not os.path.isfile(info_txt_filename): # if the *_info.txt file is not present, set to None, # which will assume that the live count time is the time between the first and last event, @@ -601,7 +601,16 @@ def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: Args: hist: a histogram kwargs: optional parameters for the peak finding algorithm - see scipy.signal.find_peaks for more information + see scipy.signal.find_peaks for more information like: + start_index: the index to start the peak finding from, to ignore the low energy region + + relative_prominence: fraction of maximum count in the histogram to set the prominence parameter for peak finding + + relative_height: fraction of maximum count in the histogram to set the height parameter for peak finding + + width: the width parameter for peak finding + + distance: the distance parameter for peak finding Returns: the peak indices in ``hist`` @@ -613,75 +622,75 @@ def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: if self.detector_type.lower() == 'nai': # peak finding parameters start_index = int(100 * channel_multiplier) - prominence = 0.10 * np.max(hist[start_index:]) - height = 0.10 * np.max(hist[start_index:]) + relative_prominence = 0.10 + relative_height = 0.10 width = [int(10 * channel_multiplier), int(150 * channel_multiplier)] distance = int(30 * channel_multiplier) if self.check_source.nuclide == na22: start_index = int(100 * channel_multiplier) - height = 0.1 * np.max(hist[start_index:]) - prominence = 0.1 * np.max(hist[start_index:]) + relative_height = 0.1 + relative_prominence = 0.1 width = [int(10 * channel_multiplier), int(150 * channel_multiplier)] distance = int(30 * channel_multiplier) elif self.check_source.nuclide == co60: start_index = int(400 * channel_multiplier) - height = 0.60 * np.max(hist[start_index:]) - prominence = None + relative_height = 0.60 + relative_prominence = None elif self.check_source.nuclide == ba133: start_index = int(150 * channel_multiplier) - height = 0.10 * np.max(hist[start_index:]) - prominence = 0.10 * np.max(hist[start_index:]) + relative_height = 0.10 + relative_prominence = 0.10 elif self.check_source.nuclide == mn54: - height = 0.6 * np.max(hist[start_index:]) + relative_height = 0.6 elif self.detector_type.lower() == 'hpge': # peak finding parameters for HPGe detectors start_index = int(10 * channel_multiplier) - prominence = 0.50 * np.max(hist[start_index:]) - height = 0.50 * np.max(hist[start_index:]) + relative_prominence = 0.50 + relative_height = 0.50 width = [int(2 * channel_multiplier), int(50 * channel_multiplier)] distance = int(100 * channel_multiplier) if self.check_source.nuclide == na22: start_index = int(100 * channel_multiplier) - height = 0.4 * np.max(hist[start_index:]) - prominence = 0.4 * np.max(hist[start_index:]) + relative_height = 0.4 + relative_prominence = 0.4 distance = int(100 * channel_multiplier) elif self.check_source.nuclide == co60: - height = 0.5 * np.max(hist[start_index:]) - prominence = 0.5 * np.max(hist[start_index:]) + relative_height = 0.5 + relative_prominence = 0.5 elif self.check_source.nuclide == ba133: start_index = int(150 * channel_multiplier) - height = 0.10 * np.max(hist[start_index:]) - prominence = 0.10 * np.max(hist[start_index:]) + relative_height = 0.10 + relative_prominence = 0.10 distance = int(10 * channel_multiplier) elif self.check_source.nuclide == mn54: start_index = int(400 * channel_multiplier) - height = 0.7 * np.max(hist[start_index:]) - prominence = 0.7 * np.max(hist[start_index:]) + relative_height = 0.7 + relative_prominence = 0.7 distance = int(100 * channel_multiplier) elif self.detector_type.lower() == 'labr': # peak finding parameters for LaBr3 detectors start_index = int(250 * channel_multiplier) - prominence = 0.30 * np.max(hist[start_index:]) - height = 0.30 * np.max(hist[start_index:]) + relative_prominence = 0.30 + relative_height = 0.30 width = [int(1 * channel_multiplier), int(40 * channel_multiplier)] distance = int(10 * channel_multiplier) if self.check_source.nuclide == na22: start_index = int(400 * channel_multiplier) - height = 0.1 * np.max(hist[start_index:]) - prominence = 0.1 * np.max(hist[start_index:]) + relative_height = 0.1 + relative_prominence = 0.1 width = None elif self.check_source.nuclide == ba133: start_index = int(300 * channel_multiplier) - prominence = np.max(hist[start_index:]) * 0.05, - height = np.max(hist[start_index:]) * 0.1, + relative_prominence = 0.05 + relative_height = 0.1 elif self.check_source.nuclide == co60: start_index = int(500 * channel_multiplier) - prominence = np.max(hist[start_index:]) * 0.5, - height = np.max(hist[start_index:]) * 0.5, + relative_prominence = 0.5 + relative_height = 0.5 elif self.check_source.nuclide == cs137: start_index = int(400 * channel_multiplier) - prominence = np.max(hist[start_index:]) * 0.5 - height = np.max(hist[start_index:]) * 0.5 + relative_prominence = 0.5 + relative_height = 0.5 width = [int(1 * channel_multiplier), int(50 * channel_multiplier)] else: raise ValueError( @@ -690,12 +699,15 @@ def get_peaks(self, hist: np.ndarray, **kwargs) -> np.ndarray: # update the parameters if kwargs are provided if kwargs: + print("kwargs provided, updating peak finding parameters with provided values") start_index = kwargs.get("start_index", start_index) - prominence = kwargs.get("prominence", prominence) - height = kwargs.get("height", height) + relative_prominence = kwargs.get("relative_prominence", relative_prominence) + relative_height = kwargs.get("relative_height", relative_height) width = kwargs.get("width", width) distance = kwargs.get("distance", distance) + prominence = relative_prominence * np.max(hist) if relative_prominence is not None else None + height = relative_height * np.max(hist) if relative_height is not None else None # run the peak finding algorithm # NOTE: the start_index is used to ignore the low energy region peaks, peak_data = find_peaks( @@ -910,9 +922,16 @@ def get_calibration_data( get_peaks() for each check source measurement, with the check source nuclide name as key. Example: peak_kwargs = { - 'Na-22': {'start_index': 100, 'height': 0.1 * np.max(hist)}, - 'Co-60': {'start_index': 400, 'height': 0.6 * np.max(hist)}, + 'Na-22': {'start_index': 100, 'height': 0.1}, + 'Co-60': {'start_index': 400, 'height': 0.6}, } + "The height and prominence parameters in the get_peaks() function are defined + as a factor of the maximum count in the histogram, starting from the start_index. + So for example, if the histogram has a maximum count of 1000 starting from the start_index, + and the height parameter is set to 0.1, then the height threshold for peak finding will be 100 counts. + This allows the peak finding parameters to be scaled according to the actual counts + in the histogram for each check source measurement, which can be useful if the check sources + have different activities and therefore different count rates. """ background_detector = [ detector @@ -933,8 +952,8 @@ def get_calibration_data( ) kwargs = {} if peak_kwargs is not None: - if measurement.check_source.nuclide in peak_kwargs.keys(): - kwargs = peak_kwargs[measurement.check_source.nuclide] + if measurement.check_source.nuclide.name in peak_kwargs.keys(): + kwargs = peak_kwargs[measurement.check_source.nuclide.name] found_a_nuclide = True peaks_ind = measurement.get_peaks(hist, **kwargs) @@ -1117,6 +1136,184 @@ def get_multipeak_area( return areas +def plot_histogram_with_peak_fit( + detector: Detector, + calibration_coeffs: np.ndarray, + peak_energies: List[float], + background_detector: Detector = None, + search_width: float = 600, + threshold_overlap: float = 200, + ax=None, + plot_initial_guess: bool = False, + plot_peak_area: bool = True, + hist_kwargs: dict = None, + fit_kwargs: dict = None, + fill_kwargs: dict = None, + plot_title: str = None, +): + """ + Plot the energy histogram of a detector with Gaussian peak fits. + + Args: + detector: Detector object containing the measurement data + calibration_coeffs: Polynomial coefficients for energy calibration + peak_energies: List of peak energies (in keV) to fit + background_detector: Optional background Detector for subtraction + search_width: Width around peaks to search for fitting + threshold_overlap: Threshold for considering peaks as overlapping + ax: Matplotlib axes object. If None, a new figure is created. + plot_initial_guess: If True, also plot the initial guess for the fit + plot_peak_area: If True, fill the area under the peak down to the baseline + hist_kwargs: Additional kwargs for histogram plotting (e.g., color, alpha) + fit_kwargs: Additional kwargs for fit curve plotting (e.g., color, linewidth) + fill_kwargs: Additional kwargs for area fill (e.g., color, alpha) + + Returns: + Tuple of (fig, ax, fit_parameters) where fit_parameters is a list of + fitted parameters for each peak group + """ + import matplotlib.pyplot as plt + + if ax is None: + fig, ax = plt.subplots(figsize=(10, 6)) + else: + fig = ax.get_figure() + + # Get histogram data + if background_detector is not None: + hist, bin_edges = detector.get_energy_hist_background_substract( + background_detector, bins=None + ) + else: + hist, bin_edges = detector.get_energy_hist(bins=None) + + # Calibrate bin edges + calibrated_bin_edges = np.polyval(calibration_coeffs, bin_edges) + xvals = np.diff(calibrated_bin_edges) / 2 + calibrated_bin_edges[:-1] + + # Plot histogram + default_hist_kwargs = {'alpha': 0.7, 'label': 'Histogram'} + if hist_kwargs: + default_hist_kwargs.update(hist_kwargs) + ax.stairs(hist, calibrated_bin_edges, **default_hist_kwargs) + + # Fit and plot peaks + peak_energies = np.atleast_1d(peak_energies) + all_fit_parameters = [] + + # Group peaks by overlap threshold + if len(peak_energies) > 1 and np.max(peak_energies) - np.min(peak_energies) > threshold_overlap: + # Process peaks individually + peak_groups = [[p] for p in peak_energies] + else: + # Process all peaks together + peak_groups = [list(peak_energies)] + + default_fit_kwargs = {'linewidth': 2} + if fit_kwargs: + default_fit_kwargs.update(fit_kwargs) + + default_fill_kwargs = {'alpha': 0.3, 'label': 'Peak Area'} + if fill_kwargs: + default_fill_kwargs.update(fill_kwargs) + + fill_label_added = False + + for i, peak_group in enumerate(peak_groups): + search_start = np.argmin( + np.abs((peak_group[0] - search_width / len(peak_group)) - xvals) + ) + search_end = np.argmin( + np.abs((peak_group[-1] + search_width / len(peak_group)) - xvals) + ) + + # Build initial guess + guess_parameters = [0, 0] + for peak_erg in peak_group: + peak_ind = np.argmin(np.abs(peak_erg - xvals)) + guess_parameters += [ + hist[peak_ind], + peak_erg, + search_width / (3 * len(peak_group)), + ] + + if plot_initial_guess: + ax.plot( + xvals[search_start:search_end], + gauss(xvals[search_start:search_end], *guess_parameters), + '--', + label=f'Initial Guess (Peak {i+1})' if len(peak_groups) > 1 else 'Initial Guess', + alpha=0.5 + ) + + # Fit the peak(s) + try: + parameters, covariance = curve_fit( + gauss, + xvals[search_start:search_end], + hist[search_start:search_end], + p0=guess_parameters, + ) + all_fit_parameters.append(parameters) + + # Plot fitted curve + ax.plot( + xvals[search_start:search_end], + gauss(xvals[search_start:search_end], *parameters), + label=f'Fitted Curve (Peak {i+1})' if len(peak_groups) > 1 else 'Fitted Curve', + **default_fit_kwargs + ) + + # Fill peak area for each peak in the group + if plot_peak_area: + for j in range(len(peak_group)): + # Extract individual peak parameters + mean = parameters[2 + 3 * j + 1] + sigma = np.abs(parameters[2 + 3 * j + 2]) + + # Peak bounds at ±3 sigma + peak_start = np.argmin(np.abs((mean - 3 * sigma) - xvals)) + peak_end = np.argmin(np.abs((mean + 3 * sigma) - xvals)) + + peak_x = xvals[peak_start:peak_end] + + # Individual peak Gaussian curve + peak_params = [parameters[0], parameters[1], parameters[2 + 3 * j], mean, sigma] + peak_curve = gauss(peak_x, *peak_params) + + # Linear baseline (trap_cutoff) + baseline = parameters[0] + parameters[1] * peak_x + + # Fill between the Gaussian and the baseline + current_fill_kwargs = default_fill_kwargs.copy() + if fill_label_added: + current_fill_kwargs.pop('label', None) + else: + fill_label_added = True + + ax.fill_between( + peak_x, + baseline, + peak_curve, + **current_fill_kwargs + ) + + # Plot the baseline + ax.plot(peak_x, baseline, 'k--', linewidth=1, alpha=0.5) + + except RuntimeError as e: + warnings.warn(f"Could not fit peak group {peak_group}: {e}") + all_fit_parameters.append(None) + + ax.set_xlabel('Energy (keV)') + ax.set_ylabel('Counts') + ax.legend() + if plot_title: + ax.set_title(plot_title) + + return fig, ax, all_fit_parameters + + def get_channel(filename): """ Extract the channel number from a given filename string. @@ -1252,11 +1449,16 @@ def get_start_stop_time(directory: str) -> Tuple[datetime.datetime, datetime.dat This function checks for both and reads the time information from the file that exists.""" run_info_file = Path(directory).parent / "run.info" - info_txt_file = Path(directory).parent / f"{Path(directory).stem}_info.txt" + info_txt_file = Path(directory).parent / f"{Path(directory).parent.stem}_info.txt" + + print("Hello world from get_start_stop_time!") + print(f"Looking for run.info file at {run_info_file}") + print(f"Looking for info.txt file at {info_txt_file}") if run_info_file.exists(): start_time, stop_time = get_start_stop_time_from_run_info(run_info_file) elif info_txt_file.exists(): + print("Hello world from get_start_stop_time_from_info_txt!") start_time, stop_time = get_start_stop_time_from_info_txt(info_txt_file) else: raise FileNotFoundError( @@ -1301,7 +1503,7 @@ def get_start_stop_time_from_info_txt(info_txt_file: Path, tz=ZoneInfo("America/ time_format = "%a %b %d %H:%M:%S %Y" - with open(info_txt_file, "r") as file: + with open(info_txt_file, "r", encoding="latin-1") as file: lines = file.readlines() start_time, stop_time = None, None @@ -1348,7 +1550,7 @@ def get_live_time_from_info_txt(info_txt_file: Path, channel: int) -> Tuple[floa Returns: Tuple of (live_count_time, real_count_time) in seconds """ - with open(info_txt_file, "r") as file: + with open(info_txt_file, "r", encoding="latin-1") as file: lines = file.readlines() channel_header = f"CH{channel}@"