diff --git a/bilby/gw/detector/interferometer.py b/bilby/gw/detector/interferometer.py index 94267d30c..e77e7927c 100644 --- a/bilby/gw/detector/interferometer.py +++ b/bilby/gw/detector/interferometer.py @@ -43,7 +43,8 @@ class Interferometer(object): minimum_frequency = PropertyAccessor('strain_data', 'minimum_frequency') maximum_frequency = PropertyAccessor('strain_data', 'maximum_frequency') frequency_mask = PropertyAccessor('strain_data', 'frequency_mask') - frequency_domain_strain = PropertyAccessor('strain_data', 'frequency_domain_strain') + frequency_domain_strain = PropertyAccessor( + 'strain_data', 'frequency_domain_strain') time_domain_strain = PropertyAccessor('strain_data', 'time_domain_strain') def __init__(self, name, power_spectral_density, minimum_frequency, maximum_frequency, length, latitude, longitude, @@ -108,10 +109,14 @@ def __repr__(self): 'maximum_frequency={}, length={}, latitude={}, longitude={}, elevation={}, ' \ 'xarm_azimuth={}, yarm_azimuth={}, xarm_tilt={}, yarm_tilt={})' \ .format(self.name, self.power_spectral_density, float(self.strain_data.minimum_frequency), - float(self.strain_data.maximum_frequency), float(self.geometry.length), - float(self.geometry.latitude), float(self.geometry.longitude), - float(self.geometry.elevation), float(self.geometry.xarm_azimuth), - float(self.geometry.yarm_azimuth), float(self.geometry.xarm_tilt), + float(self.strain_data.maximum_frequency), float( + self.geometry.length), + float(self.geometry.latitude), float( + self.geometry.longitude), + float(self.geometry.elevation), float( + self.geometry.xarm_azimuth), + float(self.geometry.yarm_azimuth), float( + self.geometry.xarm_tilt), float(self.geometry.yarm_tilt)) def set_strain_data_from_gwpy_timeseries(self, time_series): @@ -280,7 +285,8 @@ def antenna_response(self, ra, dec, time, psi, mode): """ if mode in ["plus", "cross", "x", "y", "breathing", "longitudinal"]: - polarization_tensor = get_polarization_tensor(ra, dec, time, psi, mode) + polarization_tensor = get_polarization_tensor( + ra, dec, time, psi, mode) return three_by_three_matrix_contraction(self.geometry.detector_tensor, polarization_tensor) elif mode == self.name: return 1 @@ -342,7 +348,8 @@ def get_detector_response(self, waveform_polarizations, parameters, frequencies= dt_geocent = parameters['geocent_time'] - self.strain_data.start_time dt = dt_geocent + time_shift - signal_ifo[mask] = signal_ifo[mask] * np.exp(-1j * 2 * np.pi * dt * frequencies) + signal_ifo[mask] = signal_ifo[mask] * \ + np.exp(-1j * 2 * np.pi * dt * frequencies) signal_ifo[mask] *= self.calibration_model.get_calibration_factor( frequencies, prefix='recalib_{}_'.format(self.name), **parameters @@ -371,7 +378,8 @@ def check_signal_duration(self, parameters, raise_error=True): if ("mass_1" not in parameters) and ("mass_2" not in parameters): if raise_error: - raise AttributeError("Unable to check signal duration as mass not given") + raise AttributeError( + "Unable to check signal duration as mass not given") else: return @@ -392,6 +400,111 @@ def check_signal_duration(self, parameters, raise_error=True): else: logger.warning(msg) + def adjust_optimal_snr(self, frequency_domain_strain, target_snr): + """ Rescale a frequency-domain strain array to a target optimal SNR. + + Parameters + ========== + frequency_domain_strain: numpy.array + Frequency-domain strain to rescale. + target_snr: float + Desired optimal SNR. + + Returns + ======= + numpy.ndarray + Frequency-domain strain rescaled so that its optimal SNR equals ``target_snr``. + """ + temporary_snr_squared = np.real( + self.optimal_snr_squared(signal=frequency_domain_strain)) + return frequency_domain_strain * target_snr / np.sqrt(temporary_snr_squared) + + def inject_glitch(self, glitch_parameters=None, glitch_time_domain_strain=None, + glitch_sample_times=None, glitch_waveform_generator=None): + """ Inject a glitch into the interferometer data. + + Parameters + ========== + glitch_parameters: dict, optional + Dictionary of glitch parameters. + Must contain ``onset_time`` (the GPS time at which the glitch peak should occur). + Must contain ``snr``. The glitch will be rescaled in such a + way that it's optimal SNR matches to this value. + glitch_time_domain_strain: numpy.array, optional + Time-domain strain data of the glitch to inject. + glitch_sample_times: numpy.array + Array of sample times corresponding to ``glitch_time_domain_strain``. + glitch_waveform_generator: bilby.gw.WaveformGenerator, + A waveform generator used to produce the glitch frequency-domain + strain. + """ + + if glitch_parameters is None: + glitch_parameters = {} + for key, val in [('ra', 0.0), ('dec', 0.0), ('psi', 0.0)]: + glitch_parameters.setdefault(key, val) + + if glitch_time_domain_strain is None and glitch_waveform_generator is None: + raise ValueError( + "inject_glitch needs one of glitch_waveform_generator or " + "glitch_time_domain_strain.") + elif glitch_time_domain_strain is not None: + # Populate ra, dec, psi. Necessary to make an injection. + glitch_sampling_frequency = 1.0 / (glitch_sample_times[1] - glitch_sample_times[0]) + # Resample the glitch to match with the interferometer's sampling frequency + if glitch_sampling_frequency != self.sampling_frequency: + temp_sample_times = np.arange( + len(self.time_array)) / self.sampling_frequency + glitch_time_domain_strain = np.interp( + temp_sample_times, glitch_sample_times, glitch_time_domain_strain) + glitch_sample_times = temp_sample_times + + # Padding the glitch to have the correct size for the fft. + padded_glitch = np.zeros(len(self.time_array)) + padded_glitch[:len(glitch_time_domain_strain) + ] += glitch_time_domain_strain + + # Convert to frequency domain glitch + glitch_frequency_domain_strain, _ = utils.nfft( + padded_glitch, self.sampling_frequency) + + # Adjust SNR + glitch_frequency_domain_strain = self.adjust_optimal_snr( + glitch_frequency_domain_strain, glitch_parameters['snr']) + injection_polarizations = { + self.name: glitch_frequency_domain_strain} + + # Roll the glitch since the glitch maxima of the glitch may not align in the same way as the signal maxima. + glitch_parameters['geocent_time'] = glitch_parameters["onset_time"] \ + - glitch_sample_times[np.argmax(glitch_time_domain_strain)] + + # Usual inject method + self.inject_signal_from_waveform_polarizations(parameters=glitch_parameters, + injection_polarizations=injection_polarizations) + + elif glitch_waveform_generator is not None: + glitch_frequency_domain_strain = glitch_waveform_generator.frequency_domain_strain( + glitch_parameters) + glitch_frequency_domain_strain = self.adjust_optimal_snr( + np.asarray(glitch_frequency_domain_strain[0]), glitch_parameters['snr']) + injection_polarizations = {self.name: glitch_frequency_domain_strain} + + # Populate ra, dec, psi. Necessary to make an injection. + for key, val in [('ra', 0.0), ('dec', 0.0), ('psi', 0.0)]: + glitch_parameters.setdefault(key, val) + # Roll the glitch since the glitch maxima of the glitch may not align in the same way as the signal maxima. + glitch_time_domain_strain = glitch_waveform_generator.time_domain_strain(glitch_parameters) + glitch_parameters['geocent_time'] = glitch_parameters["onset_time"] - \ + glitch_waveform_generator.time_array[np.argmax(glitch_time_domain_strain)] + + self.inject_signal_from_waveform_polarizations(parameters=glitch_parameters, + injection_polarizations=injection_polarizations) + + logger.info("Injected a glitch in: {}".format(self.name)) + logger.info("Optimal SNR of the glitch: {}".format(glitch_parameters['snr'])) + + return glitch_time_domain_strain + def inject_signal(self, parameters, injection_polarizations=None, waveform_generator=None, raise_error=True): """ General signal injection method. @@ -490,7 +603,8 @@ def inject_signal_from_waveform_polarizations(self, parameters, injection_polari 'Injecting signal outside segment, start_time={}, merger time={}.' .format(self.strain_data.start_time, parameters['geocent_time'])) - signal_ifo = self.get_detector_response(injection_polarizations, parameters) + signal_ifo = self.get_detector_response( + injection_polarizations, parameters) self.strain_data.frequency_domain_strain += signal_ifo self.meta_data['optimal_SNR'] = ( @@ -500,8 +614,10 @@ def inject_signal_from_waveform_polarizations(self, parameters, injection_polari self.meta_data['parameters'] = parameters logger.info("Injected signal in {}:".format(self.name)) - logger.info(" optimal SNR = {:.2f}".format(self.meta_data['optimal_SNR'])) - logger.info(" matched filter SNR = {:.2f}".format(self.meta_data['matched_filter_SNR'])) + logger.info(" optimal SNR = {:.2f}".format( + self.meta_data['optimal_SNR'])) + logger.info(" matched filter SNR = {:.2f}".format( + self.meta_data['matched_filter_SNR'])) for key in parameters: logger.info(' {} = {}'.format(key, parameters[key])) @@ -512,7 +628,8 @@ def _window_power_correction(self): using the :code:`BILBY_INCORRECT_PSD_NORMALIZATION` environment variable. """ if string_to_boolean( - os.environ.get("BILBY_INCORRECT_PSD_NORMALIZATION", "FALSE").upper() + os.environ.get("BILBY_INCORRECT_PSD_NORMALIZATION", + "FALSE").upper() ): return self.strain_data.window_factor else: @@ -657,11 +774,12 @@ def matched_filter_snr(self, signal): """ return gwutils.matched_filter_snr( signal=signal[self.strain_data.frequency_mask], - frequency_domain_strain=self.strain_data.frequency_domain_strain[self.strain_data.frequency_mask], + frequency_domain_strain=self.strain_data.frequency_domain_strain[ + self.strain_data.frequency_mask], power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask], duration=self.strain_data.duration) - def whiten_frequency_series(self, frequency_series : np.array) -> np.array: + def whiten_frequency_series(self, frequency_series: np.array) -> np.array: """Whitens a frequency series with the noise properties of the detector .. math:: @@ -684,7 +802,7 @@ def whiten_frequency_series(self, frequency_series : np.array) -> np.array: def get_whitened_time_series_from_whitened_frequency_series( self, - whitened_frequency_series : np.array + whitened_frequency_series: np.array ) -> np.array: """Gets the whitened time series from a whitened frequency series. @@ -769,10 +887,12 @@ def save_data(self, outdir, label=None): if label is None: filename_asd = '{}/{}_asd.dat'.format(outdir, self.name) - filename_data = '{}/{}_frequency_domain_data.dat'.format(outdir, self.name) + filename_data = '{}/{}_frequency_domain_data.dat'.format( + outdir, self.name) else: filename_asd = '{}/{}_{}_asd.dat'.format(outdir, self.name, label) - filename_data = '{}/{}_{}_frequency_domain_data.dat'.format(outdir, self.name, label) + filename_data = '{}/{}_{}_frequency_domain_data.dat'.format( + outdir, self.name, label) np.savetxt(filename_data, np.array( [self.strain_data.frequency_array, @@ -791,7 +911,8 @@ def plot_data(self, signal=None, outdir='.', label=None): return fig, ax = plt.subplots() - df = self.strain_data.frequency_array[1] - self.strain_data.frequency_array[0] + df = self.strain_data.frequency_array[1] - \ + self.strain_data.frequency_array[0] asd = gwutils.asd_from_freq_series( freq_data=self.strain_data.frequency_domain_strain, df=df) @@ -924,7 +1045,8 @@ def _filename_from_outdir_label_extension(outdir, label, extension="h5"): )) def to_pickle(self, outdir="outdir", label=None): utils.check_directory_exists_and_if_not_mkdir('outdir') - filename = self._filename_from_outdir_label_extension(outdir, label, extension="pkl") + filename = self._filename_from_outdir_label_extension( + outdir, label, extension="pkl") safe_file_dump(self, filename, "dill") @classmethod diff --git a/examples/gw_examples/injection_examples/glitch.py b/examples/gw_examples/injection_examples/glitch.py new file mode 100644 index 000000000..fdc89f58a --- /dev/null +++ b/examples/gw_examples/injection_examples/glitch.py @@ -0,0 +1,139 @@ +import bilby +import matplotlib.pyplot as plt +import numpy as np + +duration = 8.0 +sampling_frequency = 2048.0 +glitch_snr = 12 +injection_parameters = dict( + mass_1=36.0, + mass_2=29.0, + a_1=0.4, + a_2=0.3, + tilt_1=0.5, + tilt_2=1.0, + phi_12=1.7, + phi_jl=0.3, + luminosity_distance=2100.0, + theta_jn=0.4, + psi=2.659, + phase=1.3, + geocent_time=205.0, + ra=1.375, + dec=-1.2108, +) + +bilby.core.utils.random.seed(23) + + +def sine_gaussian_glitch( + time_array, amplitude=1e-22, f0=100.0, tau=0.1, t0=0.0, phase=0.0 +): + """Generate a sine-Gaussian waveform. + + Parameters + ---------- + time_array : array_like + Time samples. + amplitude : float + Peak amplitude. + f0 : float + Central frequency in Hz. + tau : float + Decay time (Gaussian width) in seconds. + t0 : float + Central time of the burst in seconds. + phase : float + Phase offset in radians. + """ + envelope = np.exp(-((time_array - t0) ** 2) / (2 * tau**2)) + return amplitude * envelope * np.sin(2 * np.pi * f0 * (time_array - t0) + phase) + + +# Fixed arguments passed into the source model +waveform_arguments = dict( + waveform_approximant="IMRPhenomXPHM", + reference_frequency=50.0, + minimum_frequency=20.0, +) + +# Create the waveform_generator using a LAL BinaryBlackHole source function +# the generator will convert all the parameters +waveform_generator = bilby.gw.WaveformGenerator( + duration=duration, + sampling_frequency=sampling_frequency, + frequency_domain_source_model=bilby.gw.source.lal_binary_black_hole, + parameter_conversion=bilby.gw.conversion.convert_to_lal_binary_black_hole_parameters, + waveform_arguments=waveform_arguments, +) + +# Set up interferometers. In this case we'll use two interferometers +# (LIGO-Hanford (H1), LIGO-Livingston (L1). These default to their design +# sensitivity +ifos = bilby.gw.detector.InterferometerList(["H1", "L1"]) +ifos.set_strain_data_from_zero_noise( + sampling_frequency=sampling_frequency, + duration=duration, + start_time=injection_parameters["geocent_time"] - 4, +) + +ifos.inject_signal( + waveform_generator=waveform_generator, parameters=injection_parameters +) + +inject_glitch_from_time_domain_strain = True + +if inject_glitch_from_time_domain_strain: + glitch_sample_times = np.arange(0, 2, 1 / 256) + from scipy.signal.windows import tukey + + glitch = sine_gaussian_glitch(glitch_sample_times, tau=0.1, t0=0.5) * tukey( + len(glitch_sample_times), alpha=0.2 + ) + + # Inject glitch + glitch_parameters = { + "onset_time": injection_parameters["geocent_time"] + 1, + "snr": glitch_snr, + } + ifos[0].inject_glitch( + glitch_snr, + glitch_parameters=glitch_parameters, + glitch_time_domain_strain=glitch, + glitch_sample_times=glitch_sample_times, + ) + + +inject_glitch_from_a_glitch_waveform_generator = False +if inject_glitch_from_a_glitch_waveform_generator: + + glitch_waveform_generator = bilby.gw.WaveformGenerator( + duration=duration, + sampling_frequency=sampling_frequency, + time_domain_source_model=sine_gaussian_glitch, + ) + glitch_parameters = { + "onset_time": injection_parameters["geocent_time"] + 1, + "amplitude": 1e-22, + "f0": 100.0, + "tau": 0.1, + "t0": 1.3, + "phase": 0.0, + "snr": glitch_snr, + } + + ifos[0].inject_glitch( + glitch_parameters=glitch_parameters, + glitch_waveform_generator=glitch_waveform_generator, + ) + + +fig, axes = plt.subplots(2, 1) +ax = axes[0] + +ax.axvline(x=injection_parameters["geocent_time"], color="black", label="Merger time") +ax.axvline(x=glitch_parameters["onset_time"], color="blue", label="glitch onset time") +ax.legend() + +ax.plot(ifos[0].time_array, ifos[0].time_domain_strain) +fig.savefig("./time_domain_strain.pdf")