diff --git a/python/packages/nisar/antenna/__init__.py b/python/packages/nisar/antenna/__init__.py index 23134a4f9..afb9d4fea 100644 --- a/python/packages/nisar/antenna/__init__.py +++ b/python/packages/nisar/antenna/__init__.py @@ -2,3 +2,6 @@ from .beamformer import TxBMF, RxDBF, compute_receive_pattern_weights, \ compute_transmit_pattern_weights, get_calib_range_line_idx from .pattern import AntennaPattern +from .rx_channel_imbalance_helpers import ( + compute_all_rx_channel_imbalances_from_l0b +) diff --git a/python/packages/nisar/antenna/beamformer.py b/python/packages/nisar/antenna/beamformer.py index 300549a1d..4acc7cf9b 100644 --- a/python/packages/nisar/antenna/beamformer.py +++ b/python/packages/nisar/antenna/beamformer.py @@ -315,11 +315,13 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None, # check the pulse_time to be within time tag of TxTRM if (pulse_time[0] < self._tm_first_trm or pulse_time[-1] > self._tm_last_trm): - raise ValueError("Requested time interval " + raise ValueError( + "Requested time interval " f"[{pulse_time[0]}, {pulse_time[-1]}] (s) is not fully " "contained within expected TxTrmInfo time interval " f"[{self.trm_info.time[0]}, {self.trm_info.time[-1]}] (s) " - f"relative to {str(self.orbit.reference_epoch)}") + f"relative to {str(self.orbit.reference_epoch)}" + ) # get total number of TX channels _, num_chanl = self.trm_info.correlator_tap2.shape @@ -380,9 +382,11 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None, # Old method: compute monotonically increasing elevation angles # at each successive slant range bin - # Compute the respective slant range for beamformed antenna pattern + # Compute the respective slant range for beamformed + # antenna pattern. # Simply calculate slant range for every few pulses where - # S/C pos/vel and DEM barely changes. This speeds up the process! + # S/C pos/vel and DEM barely changes. This speeds up + # the process! if (pp % self.num_pulse_skip == 0): sr_ant = self._elaz2slantrange(tm) @@ -580,11 +584,13 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None): # check the pulse_time to be within time tag of RxTRM if (pulse_time[0] < self._tm_first_trm or pulse_time[-1] > self._tm_last_trm): - raise ValueError("Requested time interval " + raise ValueError( + "Requested time interval " f"[{pulse_time[0]}, {pulse_time[-1]}] (s) is not fully " "contained within expected RxTrmInfo time interval " f"[{self.trm_info.time[0]}, {self.trm_info.time[-1]}] (s) " - f"relative to {str(self.orbit.reference_epoch)}") + f"relative to {str(self.orbit.reference_epoch)}" + ) # EL-cut pattern with shape active beams by EL angles ant_pat_el = self.el_ant_info.copol_pattern[self.active_channel_idx] @@ -609,16 +615,27 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None): # if provided if channel_adj_factors is not None: # check the size of correction factor container - if len(channel_adj_factors) != num_chanl: - raise ValueError('Size of RX "channel adjustment factor" ' - f'must be {num_chanl}') - # check if the correction factor is zero for all active channels - cor_fact = np.asarray(channel_adj_factors)[self.active_channel_idx] - if np.isclose(abs(cor_fact).max(), 0): - raise ValueError('"channel_adj_factors" are zeros for all ' - 'active RX channels!') - rx_wgt *= cor_fact[:, None] - + if len(channel_adj_factors) == num_chanl: + # check if the correction factor is zero for all + # active channels + cor_fact = np.asarray(channel_adj_factors)[ + self.active_channel_idx] + if np.isclose(abs(cor_fact).max(), 0): + raise ValueError('"channel_adj_factors" are zeros for all ' + 'active RX channels!') + rx_wgt *= cor_fact[:, None] + elif len(channel_adj_factors) == 1: # fixed scalar + if np.isclose(channel_adj_factors[0], 0): + raise ValueError( + '"channel_adj_factors" is a zero-value scalar for all' + 'active RX channels!' + ) + rx_wgt *= channel_adj_factors[0] + else: # neither 1 (scalar) nor `num_chanl` + raise ValueError( + 'Size of RX "channel adjustment factor" must be either ' + f'{num_chanl} or 1 but got {len(channel_adj_factors)}!' + ) # initialize the RX DBF pattern rx_pat = np.zeros((len(pulse_time), slant_range.size), dtype='complex') num_active_chanl = len(self.active_channel_idx) @@ -779,7 +796,7 @@ def compute_transmit_pattern_weights(tx_trm_info, norm=False): warnings.warn( 'HPA Cal contains some zero values. These will be replaced with' ' the nearest non-zero values.', category=BadHPACalWarning - ) + ) # replace zero values with nearest non-zero ones for n in range(active_tx_idx.size): mask_zr = np.isclose(hcal_abs[:, n], 0) @@ -789,7 +806,7 @@ def compute_transmit_pattern_weights(tx_trm_info, norm=False): f_nearest = interp1d( i_hpa_nz, tx_weights[i_hpa_nz, n], kind='nearest', fill_value='extrapolate', assume_sorted=True - ) + ) tx_weights[i_hpa_z, n] = f_nearest(i_hpa_z) # If BCAL exists compute ratio HCAL/(BCAL/BCAL[0]) @@ -975,8 +992,8 @@ def get_pulse_index(pulse_times, t, nearest=False, eps=5e-10): eps : float Tolerance for snapping time tags, e.g., when `abs(pulse_times[i] - t) <= eps` - then return `i`. This accommodates floating point precision issues like - `n * pri != n / prf`. + then return `i`. This accommodates floating point precision + issues like `n * pri != n / prf`. Returns ------- diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 9b6b65b44..0648e738e 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -1,14 +1,17 @@ +from warnings import warn from collections import defaultdict -from enum import IntEnum, unique from isce3.core import Orbit, Attitude, Linspace from isce3.geometry import DEMInterpolator import logging -from nisar.mixed_mode.logic import PolChannelSet from nisar.products.readers.antenna import AntennaParser from nisar.products.readers.instrument import InstrumentParser from nisar.products.readers.Raw import Raw from nisar.antenna import TxTrmInfo, RxTrmInfo, TxBMF, RxDBF from nisar.antenna.beamformer import get_pulse_index +from nisar.antenna.rx_channel_imbalance_helpers import ( + compute_all_rx_channel_imbalances_from_l0b, + get_range_delay_from_raw +) import numpy as np log = logging.getLogger("nisar.antenna.pattern") @@ -150,6 +153,15 @@ class AntennaPattern: vairations expected to be less than 0.05 dB and 0.25 deg, respectively. This can speed up the antenna pattern computation. If None, it will be ignored. + freq_band : {'A', 'B'} or None. Optional + If none, the very first frequency band will + be used. + caltone_freq: float or None. Optional + Caltone frequency in Hz. + If None (default), it will be extracted from telemetry DRT in L0B. + delay_ofs_dbf: float, default=-2.1474e-6 + Delay offset (seconds) in data window position of onboard DBF + process applied to all bands and polarizations. """ @@ -158,7 +170,10 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, orbit: Orbit, attitude: Attitude, *, el_lut=None, norm_weight=True, - el_spacing_min=8.72665e-5): + el_spacing_min=8.72665e-5, + freq_band=None, + caltone_freq=None, + delay_ofs_dbf=-2.1474e-6): self.orbit = orbit.copy() self.attitude = attitude.copy() @@ -167,12 +182,25 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.el_spacing_min = el_spacing_min self.el_lut = el_lut - # get pols - channels = PolChannelSet.from_raw(raw) - freqs = tuple({chan.freq_id for chan in channels}) - self.freq_band = "A" if "A" in freqs else freqs[0] - self.txrx_pols = tuple({chan.pol for chan in channels}) - + # get frequency band + freqs = np.sort(raw.frequencies) + if freq_band is None: + self.freq_band = freqs[0] + else: + if freq_band not in freqs: + raise ValueError( + f'freq_band {freq_band} is out of range {freqs}!') + self.freq_band = freq_band + # get all polarization for a frequency band + self.txrx_pols = raw.polarizations[self.freq_band] + # comput all RX channel imbalances over all + # txrx pols of a desired frequency band. + # This RX imbalanced is basically LNA/CALTONE ratio. + self.rx_imb = compute_all_rx_channel_imbalances_from_l0b( + raw, + freq_band=self.freq_band, + caltone_freq=caltone_freq + ) # Parse ref epoch, pulse time, slant range, orbit and attitude from Raw # Except for quad-pol, pulse time is the same for all TX pols. # In case of quad-pol the time offset is half single-pol PRF and thus @@ -192,7 +220,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # parse active RX channels and fs_ta which are polarization # independent! - txrx_pol = raw.polarizations[self.freq_band][0] + txrx_pol = self.txrx_pols[0] self.rx_chanl = raw.getListOfRxTRMs(self.freq_band, txrx_pol) self.fs_ta = ins.sampling_rate_ta(txrx_pol[1]) @@ -205,14 +233,31 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # Loop over all freqs & pols since some RX pols may be found only on # freq B (e.q. the QQP case). Assume RD/WD/WL are the same for all # freqs/pols that have the same RX polarization. - for chan in channels: - rxpol = chan.pol[1] + for txrx_pol in self.txrx_pols: + rxpol = txrx_pol[1] if rxpol in rd_all: continue rd_all[rxpol], wd_all[rxpol], wl_all[rxpol] = raw.getRdWdWl( - chan.freq_id, chan.pol) + self.freq_band, txrx_pol) self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol], - wd_all[rxpol], wl_all[rxpol]) + wd_all[rxpol], wl_all[rxpol]) + # Get range delay offset for band B in split-spectrum to + # correct RDs @ `self.fs_win` used in forming RX DBF pattern. + # This will account for delay after onboard DBF due to + # both the first pulsewidth in sequential TX chirps + # as well as the difference in filter group delays. + tm_delay = get_range_delay_from_raw( + raw, self.freq_band, txrx_pol) + tm_delay += delay_ofs_dbf + n_samp_delay = round(tm_delay * self.fs_win) + if n_samp_delay != 0: + warn( + f'RD of RxDBF for band={self.freq_band} & pol={txrx_pol} ' + f'is corrected by {tm_delay * 1e6:.3f} (usec) or ' + f'equivalently # {n_samp_delay} samples @ ' + f'{self.fs_win * 1e-6} (MHz)!' + ) + rd_all[rxpol][...] = rd_all[rxpol].astype(int) + n_samp_delay # build RxTRMs and the first RxDBF for all possible RX # linear polarizations @@ -295,7 +340,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # instrument file per TX linear pol. self.channel_adj_fact_tx[tx_lp] = ( ins.channel_adjustment_factors_tx(tx_lp) - ) + ) # get tx el-cut patterns el_pat_tx = ant.el_cut_all(tx_lp) @@ -307,9 +352,8 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, el_lut=self.el_lut, norm_weight=self.norm_weight, rg_spacing_min=self.rg_spacing_min) - def form_pattern(self, tseq, slant_range: Linspace, - nearest: bool = False, txrx_pols = None): + nearest: bool = False, txrx_pols=None): """ Get the two-way antenna pattern at a given time and set of ranges for either all or specified polarization combinations if Tx/Rx pols are @@ -333,26 +377,33 @@ def form_pattern(self, tseq, slant_range: Linspace, ------- dict Two-way complex antenna patterns as a function of range bin - over either all or specified TxRx polarization products. The format of dict is - {pol: np.ndarray[complex]}. + over either all or specified TxRx polarization products. + The format of dict is {pol: np.ndarray[complex]}. """ if txrx_pols is None: txrx_pols = self.txrx_pols elif not set(txrx_pols).issubset(self.txrx_pols): raise ValueError(f"Specified txrx_pols {txrx_pols} is out of " - f"available pols {self.txrx_pols}!") + f"available pols {self.txrx_pols}!") tseq = np.atleast_1d(tseq) - rx_pols = {pol[1] for pol in txrx_pols} - # form one-way RX patterns for all linear pols rx_dbf_pat = dict() - for p in rx_pols: + for txrx_pol in txrx_pols: + rxp = txrx_pol[1] + # combine channel adjustment from both internally computed + # RX imbalance (LNA/CALTONE) and secondary correction from + # input INST HDF5 product + channel_adj_fact_rx = ( + self.rx_imb[self.freq_band, txrx_pol].lna_caltone_ratio) + if self.channel_adj_fact_rx[rxp] is not None: + channel_adj_fact_rx *= np.asarray( + self.channel_adj_fact_rx[rxp]) # Split up provided timespan into groups with the same range timing # (Adding one because get_pulse_index uses floor but we want ceil) change_indices = [ - get_pulse_index(tseq, t) + 1 for t in self.finder[p].time_changes + get_pulse_index(tseq, t) + 1 for t in self.finder[rxp].time_changes if t > tseq[0] and t < tseq[-1] ] tgroups = np.split(tseq, change_indices) @@ -361,41 +412,44 @@ def form_pattern(self, tseq, slant_range: Linspace, i0 = 0 for tgroup in tgroups: t = tgroup[0] - rd, wd, wl = self.finder[p].get_dbf_timing(t) + rd, wd, wl = self.finder[rxp].get_dbf_timing(t) - log.info(f'Updating {p}-pol RX antenna pattern because' + log.info(f'Updating {rxp}-pol RX antenna pattern because' ' change in RD/WD/WL') - self.rx_trm[p] = RxTrmInfo( + self.rx_trm[rxp] = RxTrmInfo( self.pulse_times, self.rx_chanl, rd, wd, wl, - self.dbf_coef[p], self.ta_switch[p], self.ela_dbf[p], + self.dbf_coef[rxp], self.ta_switch[rxp], self.ela_dbf[rxp], self.fs_win, self.fs_ta) - self.rx_dbf[p] = RxDBF( - self.orbit, self.attitude, self.dem, self.el_pat_rx[p], - self.rx_trm[p], self.reference_epoch, + self.rx_dbf[rxp] = RxDBF( + self.orbit, self.attitude, self.dem, self.el_pat_rx[rxp], + self.rx_trm[rxp], self.reference_epoch, el_lut=self.el_lut, - norm_weight=self.rx_dbf[p].norm_weight) + norm_weight=self.rx_dbf[rxp].norm_weight) - pat = self.rx_dbf[p].form_pattern( + pat = self.rx_dbf[rxp].form_pattern( tgroup, slant_range, - channel_adj_factors=self.channel_adj_fact_rx[p] + channel_adj_factors=channel_adj_fact_rx ) - # Initialize the pattern array so we can slice this range timing - # group into it - TODO move this outside the loop for clarity? - if p not in rx_dbf_pat: - rx_dbf_pat[p] = np.empty((len(tseq), slant_range.size), - dtype=np.complex64) + # Initialize the pattern array so we can slice this + # range timing group into it + # TODO move this outside the loop for clarity? + if rxp not in rx_dbf_pat: + rx_dbf_pat[rxp] = np.empty((len(tseq), slant_range.size), + dtype=np.complex64) # Slice it into the full array, and # bump up the index for the next slice iend = i0 + len(tgroup) - rx_dbf_pat[p][i0:iend] = pat + rx_dbf_pat[rxp][i0:iend] = pat i0 = iend # form one-way TX patterns for all TX pols - tx_bmf_pat = defaultdict(lambda: np.empty((len(tseq), slant_range.size), - dtype=np.complex64)) + tx_bmf_pat = defaultdict( + lambda: np.empty((len(tseq), slant_range.size), + dtype=np.complex64) + ) for tx_pol in {pol[0] for pol in txrx_pols}: if tx_pol == "L": tx_bmf_pat[tx_pol] = ( @@ -420,8 +474,8 @@ def form_pattern(self, tseq, slant_range: Linspace, else: # other non-compact pol types adj = self.channel_adj_fact_tx[tx_pol] tx_bmf_pat[tx_pol] = self.tx_bmf[tx_pol].form_pattern( - tseq, slant_range, nearest=nearest, channel_adj_factors=adj - ).astype(np.complex64) + tseq, slant_range, nearest=nearest, channel_adj_factors=adj + ).astype(np.complex64) # build two-way pattern for all unique TxRx products obtained from all # freq bands diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py new file mode 100644 index 000000000..4fe0d2f94 --- /dev/null +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -0,0 +1,638 @@ +from __future__ import annotations +from warnings import warn +from typing import Tuple, Dict +from dataclasses import dataclass +from enum import IntEnum, unique + +import numpy as np +import h5py + +from nisar.products.readers.Raw import Raw +from nisar.antenna import get_calib_range_line_idx, CalPath +from isce3.core import speed_of_light + + +@dataclass(frozen=True) +class RxChannelImbalanceProduct: + """ + RX channel imbalance product extracted from LNA/CALTONE ratio + for a certain frequency band and polarization. + + Attributes + ---------- + lna_caltone_ratio: np.ndarray(complex) + Peak-normalized complex LNA/CALTONE ratio over all RXs + ntap_dominant: np.ndarray(int) + Dominant tap number, a value within [1,3] over all RXs. + time_delays_sec: np.ndarray(float) + Time delays from the phase of outlier qFSP in seconds for all RXs. + max_amp_ratio: float + Max amplitude ratio used in peak normalizing `lna_caltone_ratio`. + + """ + lna_caltone_ratio: np.ndarray + ntap_dominant: np.ndarray + time_delays_sec: np.ndarray + max_amp_ratio: float + + def __post_init__(self): + # XXX Size of all arrays must be 12 for L-band NISAR but + # not enforced due to failure of special cases such as unit test + if (self.lna_caltone_ratio.size != self.ntap_dominant.size + != self.time_delays_sec.size): + raise ValueError('The size of all arrays must be equal!') + if self.lna_caltone_ratio.size != 12: + warn('The size of LNA-CALTONE ratio is ' + f'{self.lna_caltone_ratio.size} instead of 12!') + + +@unique +class PolarizationTypeId(IntEnum): + """Enumeration for polarization types of L-band NISAR""" + single_h = 0 + single_v = 1 + dual_h = 2 + dual_v = 3 + quad = 4 + compact = 5 + none = 6 + quasi_quad = 7 + quasi_dual = 8 + + +def compute_all_rx_channel_imbalances_from_l0b( + l0b_file: str | Raw, + *, + caltone_freq: float | None = None, + freq_band: str | None = None, + txrx_pol: str | None = None +) -> Dict[Tuple[str, str], RxChannelImbalanceProduct]: + """ + Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio + for over all bands and polarizations. The bands and polarizations are + used as dictionary keys in the form of [freq_band, txrx_pol]. + + Also report the dominant tap number our of 3 for LNA three-tap + correlator as well as detected relative time delays for all RX channels + for debugging purposes. + + Parameters + ---------- + l0b_file : str or nisar.products.readers.Raw + L0B filename or Raw object + caltone_freq : float or None. Optional + Caltone frequency in Hz. + If None (default), it will be extracted from DRT in L0B. + freq_band : str. Optional + "A" or "B". Default is all. + txrx_pol : str. Optional + TR pol in `freq_band` such as "HH", "HV", etc. + Default is all. + + Returns + ------- + dict: + A dict with keys (freq_band, txrx_pol) and values of type + `RxChannelImbalanceProduct` + + """ + if isinstance(l0b_file, str): + raw = Raw(hdf5file=l0b_file) + else: + raw = l0b_file + frq_pols = raw.polarizations + # get freq_bands and txrx_pols + if freq_band is not None: + frq_pols = {freq_band: frq_pols[freq_band]} + if txrx_pol is not None: + frq_pols = {f: [txrx_pol] for f in frq_pols if txrx_pol in frq_pols[f]} + + out = dict() + for freq_band in frq_pols: + for txrx_pol in frq_pols[freq_band]: + (lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio + ) = compute_rx_channel_imbalance( + raw=raw, + freq_band=freq_band, + txrx_pol=txrx_pol, + caltone_freq=caltone_freq + ) + out[freq_band, txrx_pol] = RxChannelImbalanceProduct( + lna_caltone_ratio=lna_caltone_ratio, + ntap_dominant=n_tap_dominant, + time_delays_sec=time_delays, + max_amp_ratio=max_ratio + ) + return out + + +def compute_rx_channel_imbalance( + raw: Raw, + freq_band: str, + txrx_pol: str, + caltone_freq: float | None = None, +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]: + """ + Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio + for a desired frequency band and TR polarization. + + Also report the dominant tap number our of 3 for LNA three-tap + correlator as well as detected relative time delays for all RX channels + for debugging purposes. + + Returns + ------- + lna_caltone_ratio: np.ndarray(complex) + Peak-normalized complex LNA/CALTONE ratio over all 12 RXs + n_tap_dominant: np.ndarray(int) + Dominant tap number, a value within [1,3] over all 12 RXs. + time_delays: np.ndarray(float) + Time delays from the phase of qFSP outlier in seconds + max_ratio : float + Report peak power among all channels used for amplitude + normalization of RX channel imbalances. + + """ + lna_mean, n_tap_dominant = get_lna_cal_mean(raw, txrx_pol) + # get caltone mean over all RX channels + caltone_mean = get_caltone_mean(raw, freq_band, txrx_pol) + # Get complex ratio LNA/Caltone over all channels + lna_caltone_ratio = lna_mean / caltone_mean + # correct the ratio for the second band if necessary + lna_caltone_ratio, time_delays = correct_lna_caltone_ratio_for_second_band( + lna_caltone_ratio, + raw, + freq_band, + txrx_pol, + caltone_freq=caltone_freq + ) + # peak normalized + max_ratio = np.nanmax(abs(lna_caltone_ratio)) + if not np.isclose(max_ratio, 0): + lna_caltone_ratio /= max_ratio + return lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio + + +def polarization_type_from_drt(raw: Raw) -> PolarizationTypeId: + """Get polarization ID and type from L0B DRT""" + pol_path = f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_POLARIZATION' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_pol = f5[pol_path] + except KeyError: + warn(f'Missing dataset "{pol_path}" in "{raw.filename}"') + id_pol = 6 + else: + i_pol = ds_pol[()] + id_pol = np.nanmedian(i_pol) + return PolarizationTypeId(id_pol) + + +def is_raw_quad_pol(raw: Raw) -> bool: + """Determine whether raw L0B is Quad or not""" + return polarization_type_from_drt(raw) == PolarizationTypeId.quad + + +def parse_rangeline_index_from_hrt( + raw: Raw, + txrx_pol: str = None) -> np.ndarray | None: + """ + Get range line index over all range lines from + HRT if exists otherwise None! + + Returns + ------- + np.ndarray(uint) or None + If not available in L0b, None will be returned. + + """ + hrt_path = raw.TelemetryPath.replace('low', 'high') + freq_band = sorted(raw.frequencies)[0] + pols = raw.polarizations[freq_band] + if txrx_pol is None: + txrx_pol = pols[0] + elif txrx_pol not in pols: + raise ValueError(f'Available pols {pols} but got {txrx_pol}!') + rgl_idx_path = (f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/' + 'RangeLine/RH_RANGELINE_INDEX') + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_rgl_idx = f5[rgl_idx_path] + except KeyError as err: + warn(f'Can not parse range line index from HRT. Error -> {err}') + return None + else: + return ds_rgl_idx[()] + + +def first_tx_pol_for_quad(raw: Raw) -> str: + """Get first TX polarization, H or V, from only Quad pol product""" + if not is_raw_quad_pol(raw): + raise ValueError('Not a quad pol!') + idx_rgl = parse_rangeline_index_from_hrt(raw)[0] + # if not in HRT parse single-pol version from swath path + if idx_rgl is None: + idx_rgl_h = raw.getRangeLineIndex('A', 'H')[0] + idx_rgl_v = raw.getRangeLineIndex('A', 'V')[0] + if idx_rgl_v < idx_rgl_h: + return 'V' + return 'H' + else: # odd range line is V pol first and even is H pol first! + return {0: 'H', 1: 'V'}.get(idx_rgl % 2) + + +def parse_chirpcorrelator_from_hrt_qfsp( + raw: Raw, + txrx_pol: str) -> np.ndarray | None: + """ + Parse three-tap chirp correlator array with shape (lines, 12, 3) + as well ass cal type with shape (lines,) from HRT QFSP. + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(complex) or None + 3-D complex array of chirp correlator with shape (Lines, channels, 3) + If the field does not exist None will be returned. + + """ + # get HRT path + hrt_path = raw.TelemetryPath.replace('low', 'high') + qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + # loop over three qfsp + for i_qfsp in range(3): + p_qfsp = f'{qfsp_path}{i_qfsp}' + # loop over 4 channels per qfsp: + for nn in range(4): + i_chn = nn + i_qfsp * 4 + n_rx = i_chn + 1 + # loop over 3 taps per channel + for i_tap in range(3): + n_tap = i_tap + 1 + # form the path to the dataset per I and Q + # use RX pol! + p_ds_i = (f'{p_qfsp}/CHIRP_CORRELATOR_I{n_tap}_' + f'{txrx_pol[1]}{n_rx:02d}') + p_ds_q = (f'{p_qfsp}/CHIRP_CORRELATOR_Q{n_tap}_' + f'{txrx_pol[1]}{n_rx:02d}') + try: + ds_i = f5[p_ds_i] + except KeyError as err: + warn( + f'Missing dataset {p_ds_i} in {raw.filename}.' + f' Detailed error -> {err}' + ) + return None + else: + # initialize the 3-D array, lines by 12 by 3 + if i_qfsp == nn == i_tap == 0: + # initialize the 3-D array for chirp correlator + num_lines = ds_i.size + chp_cor = np.ones((num_lines, 12, 3), dtype='c8') + chp_cor[:, i_chn, i_tap].real = ds_i[()] + chp_cor[:, i_chn, i_tap].imag = f5[p_ds_q][()] + return chp_cor + + +def parse_caltype_from_hrt_qfsp( + raw: Raw, + txrx_pol: str) -> np.ndarray | None: + """ + Parse cal type with shape (lines,) from HRT QFSP. + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(uint8) or None + 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and + INVALID=255. If the field does not exist None will be returned. + + """ + # get HRT path + hrt_path = raw.TelemetryPath.replace('low', 'high') + qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + # XXX get caltype from the very first qFSP assuming + # it is qFSP independent! + i_qfsp = 0 + p_qfsp = f'{qfsp_path}{i_qfsp}' + p_type = f'{p_qfsp}/CP_CAL_TYPE_{txrx_pol[1]}{i_qfsp}' + # XXX Following Try/exception block is added to + # support old sim L0B products lacking HRT! + try: + ds_cal_type = f5[p_type] + except KeyError as err: + warn(f'Missing dataset "{p_type}" in ' + f'"{raw.filename}". Detailed error -> {err}') + return None + else: + return ds_cal_type[()].astype(CalPath) + + +def _opposite_pol(pol: str) -> str: + """Get the oppsoite pol""" + if pol == 'H': + return 'V' + elif pol == 'V': + return 'H' + else: + return pol + + +def chirpcorrelator_caltype_from_raw( + raw: Raw, + txrx_pol: str +) -> Tuple[np.ndarray, np.ndarray]: + """ + Parse three-tap chirp correlator array with shape (lines, 12, 3) + as well ass cal type with shape (lines,) from Raw L0B for a certain + TxRX pol + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(complex) + 3-D complex array of chirp correlator with shape (Lines, channels, 3) + np.ndarray(uint8) + 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and INVALID=255 + + """ + chp_cor = parse_chirpcorrelator_from_hrt_qfsp(raw, txrx_pol=txrx_pol) + cal_type = parse_caltype_from_hrt_qfsp(raw, txrx_pol=txrx_pol) + # XXX if the respective field does not exist then use co-pol under + # swath in L0B for the sake of backward compatibility + if chp_cor is None or cal_type is None: + freq_band = [f for f in raw.frequencies if + txrx_pol in raw.polarizations[f]][0] + chp_cor = raw.getChirpCorrelator(freq_band, txrx_pol[0]) + cal_type = raw.getCalType(freq_band, txrx_pol[0]) + return chp_cor, cal_type + # Quad pol case + if is_raw_quad_pol(raw): + tx_pol_first = first_tx_pol_for_quad(raw) + if txrx_pol[0] == tx_pol_first: + chp_cor = chp_cor[::2] + cal_type = cal_type[::2] + else: # the second TX pol + # get data from the opssoite TX pol + x_pol = _opposite_pol(txrx_pol[0]) + txrx_pol[1] + chp_cor_x, cal_type_x = chirpcorrelator_caltype_from_raw( + raw, txrx_pol=x_pol) + # if co-pol get HPA value from same TX but + # fill in LNA/BYP from oppsoite TX + if txrx_pol[0] == txrx_pol[1]: + chp_cor = chp_cor[1::2] + cal_type = cal_type[1::2] + _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type_x) + chp_cor[idx_byp] = chp_cor_x[idx_byp] + chp_cor[idx_lna] = chp_cor_x[idx_lna] + cal_type[idx_byp] = CalPath.BYPASS + cal_type[idx_lna] = CalPath.LNA + else: # x-pol product + chp_cor = chp_cor_x + cal_type = cal_type_x + # set x-pol HPA to INVALID given they are the mix of + # LNA from co-pol and HPA from x-pol! + if txrx_pol in ('HV', 'VH'): + idx_hpa, _, _, _ = get_calib_range_line_idx(cal_type) + if idx_hpa.size > 0: + warn(f'Set HPA cal type for x-pol {txrx_pol} to INVALID!') + cal_type[idx_hpa] = CalPath.INVALID + return chp_cor, cal_type + + +def get_lna_cal_mean( + raw: Raw, + txrx_pol: str +) -> Tuple[np.ndarray, np.ndarray]: + """ + Returns mean complex LNA values and dominant tap + numbers within [1, 2, 3] for all channels + """ + chp_cor, cal_type = chirpcorrelator_caltype_from_raw( + raw=raw, + txrx_pol=txrx_pol + ) + n_rxs = chp_cor.shape[1] + _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type) + if len(idx_lna) == 0: + warn('No LNA CAL to represent RX! Use BYPASS Cal instead!') + if len(idx_byp) == 0: + # XXX to avoid failure in unit test or very short L0B + # lacking LNA/BYP CAL datasets, a warning will be issued + # and the values will all be set to unity! + warn('No LNA or BYPASS CAL! LNA mean will be all unity. ' + 'The results will be invalid!') + lna_mean = np.ones(n_rxs, dtype='c8') + n_tap_dominant = np.full(n_rxs, fill_value=2) + return lna_mean, n_tap_dominant + idx_lna = idx_byp + # get LNA for all three taps (or BYPASS) + lna_mean_tap3 = np.zeros((3, n_rxs), dtype='c16') + for nn in range(3): + lna_cal = chp_cor[idx_lna, :, nn] + # get complex mean for all RX channels + lna_mean_tap3[nn] = _mean_2d(lna_cal) + # get dominat taps + abs_lna_mean_tap3 = abs(lna_mean_tap3) + idx_lna_taps = np.nanargmax(abs_lna_mean_tap3, axis=0) + amp_lna_mean = np.zeros(n_rxs) + for nn in range(n_rxs): + amp_lna_mean[nn] = abs_lna_mean_tap3[idx_lna_taps[nn], nn] + _check_if_zero(amp_lna_mean, msg=f'{txrx_pol[0]}-pol LNA Cal') + # get the phase part at a fixed common tap rather than dominant one + phs_lna_mean = np.angle(lna_mean_tap3[1]) + # form complex lna + lna_mean = amp_lna_mean * np.exp(1j * phs_lna_mean) + n_tap_dominant = idx_lna_taps + 1 + return lna_mean, n_tap_dominant + + +def correct_lna_caltone_ratio_for_second_band( + lna_caltone_ratio: np.ndarray, + raw: Raw, + freq_band: str, + txrx_pol: str, + caltone_freq: float | None = None +) -> Tuple[np.ndarray, np.ndarray]: + # Get caltone frequency from DRT if not provided + if caltone_freq is None: + caltone_freq = parse_caltone_freq_from_drt(raw, txrx_pol) + warn(f'Caltone frequency is extracted from {txrx_pol[1]}-pol DRT ' + f'-> {caltone_freq * 1e-6:.3f} (MHz)') + # Check if product from the second band so we can modify + # the results from the first band only if there is a + # relative delay offset in one of qFSP vs others, that is + # one of the qFSP is an outlier due to ADC clock/delay issue + # check if there is delay anomaly among three qFSP + if _is_product_from_second_band(raw, freq_band, txrx_pol): + warn(f'correcting LNA/CALTONE for band={freq_band} and pol={txrx_pol}') + fc_a, _, _, _ = raw.getChirpParameters('A', txrx_pol[0]) + # get diff of chirp (band=A) and caltone freq for delay detection + dif_chirp_caltone_freq = fc_a - caltone_freq + time_delay = _get_qfsp_delay_anomaly( + lna_caltone_ratio, dif_chirp_caltone_freq) + # if there is then get diff of frequency bands A and B + # to be used to correct phase from A for B + fc_b, _, _, _ = raw.getChirpParameters('B', txrx_pol[0]) + phs_adj = 2 * np.pi * (fc_b - fc_a) * time_delay + # correct the LNA/CALTONE by delay amount via phase if any. + lna_caltone_ratio *= np.exp(1j * phs_adj) + else: # simply first band either A or B! + fc, _, _, _ = raw.getChirpParameters(freq_band, txrx_pol[0]) + # get diff of chirp (band=A) and caltone freq for delay detection + dif_chirp_caltone_freq = fc - caltone_freq + time_delay = _get_qfsp_delay_anomaly( + lna_caltone_ratio, dif_chirp_caltone_freq) + return lna_caltone_ratio, time_delay + + +def get_caltone_mean( + raw: Raw, + freq_band: str, + txrx_pol: str +) -> np.ndarray: + # now get caltone always from swath + caltone = raw.getCaltone(freq_band, txrx_pol) + caltone_mean = _mean_2d(caltone) + _check_if_zero(caltone_mean, msg=f'{txrx_pol}-pol Caltone') + return caltone_mean + + +def _is_product_from_second_band( + raw: Raw, + freq_band: str, + txrx_pol: str): + """ + Determine whether the produt is avolable on both bands + and it is from the second band. + """ + if freq_band == "B" and len(raw.frequencies) == 2: + if txrx_pol in raw.polarizations['A']: + return True + return False + + +def _get_qfsp_delay_anomaly( + lna_caltone_ratio: np.ndarray, + dif_chirp_caltone_freq: float, + adc_clock: float = 240e6) -> np.ndarray: + """ + If the product is a 12-channel NISAR L-band product, + return the time delays for a qFSP with phase anomaly. + Else, return zeros. + """ + if lna_caltone_ratio.size == 12: + # group them into three 4-channels, one per qFSP + lna2cal_ratio = lna_caltone_ratio.reshape(3, 4) + # get unwrap phase across 4 channels per qFSP (radians) + lna2cal_phs = np.unwrap(np.angle(lna2cal_ratio), axis=1) + # get median phase per qfsp, total 3 phase values (radians) + # and then unwrap three values + qfps_phs = np.unwrap(np.nanmedian(lna2cal_phs, axis=1)) + # use median among all three to be used as a reference to + # catch a single outlier + phs_ref = np.median(qfps_phs) + # phase due to ADC delay + phs_adc_delay = 2 * np.pi * dif_chirp_caltone_freq / adc_clock + n_delay_qfsp = np.round((qfps_phs - phs_ref) / phs_adc_delay) + # now repeat sample delay 4x per qFSP + n_delays = np.repeat( + n_delay_qfsp[:, np.newaxis], repeats=4, axis=1).ravel() + time_delays = n_delays / adc_clock + else: + time_delays = np.zeros(lna_caltone_ratio.size) + return time_delays + + +def _mean_2d(data: np.ndarray, perc: float = 0.0) -> np.asarray: + """ + Compute mean within percentile [perc, 100-perc], + of a 2-D complex array with shape (rangelines, channels) + due to bad telemetry. + """ + # or simply np.nanmean(data, axis=0) + d = np.sort(np.abs(data), axis=0) + q1_all, q3_all = np.percentile(d, q=[perc, 100 - perc], axis=0) + mean_all = [] + for cc, (q1, q3) in enumerate(zip(q1_all, q3_all)): + data_q1_q3 = data[(d[:, cc] >= q1) & (d[:, cc] <= q3), cc] + mean_all.append(np.nanmean(data_q1_q3)) + return np.asarray(mean_all) + + +def _check_if_zero(arr: np.ndarray, msg: str): + is_zero = np.isclose(arr, 0) + if is_zero.all(): + # XXX to avoid unit test failure and old sim L0B + # a warning will be issued and all values will be set + # to unity! + warn(f'All values are zero for {msg}! They are set to untiy. ' + 'Result may be invalid!') + arr[...] = 1.0 + if is_zero.any(): + warn(f'Some values are zero for {msg}!') + + +def get_range_delay_from_raw( + raw: Raw, + freq_band: str, + txrx_pol: str +) -> float: + """ + Get delay (seconds) of the second pulse wrt the pulsewidth + of the first TX pulse in sequential split-spectrum transmit + for a desired dataset in L0B. + """ + # check if band is B and it is split spectrum + if freq_band == 'B' and len(raw.frequencies) == 2: + pols = raw.polarizations + # check if this is sequential transmit + if txrx_pol in pols['A']: + sr_b = raw.getRanges('B', txrx_pol[0]) + sr_a = raw.getRanges('A', txrx_pol[0]) + delay = 2 * (sr_b.first - sr_a.first) / speed_of_light + return delay + return 0.0 + + +def parse_caltone_freq_from_drt( + raw: Raw, + txrx_pol: str +) -> float: + """get caltone frequency in Hz from low rate telemetry in L0B.""" + # default caltone if dataset is not available (Hz) + default = 1214.88e6 + # frequency of local oscillator (Hz) + lo = 1200e6 + # ADC clock (Hz) + clock = 240e6 + c_p = (f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_CALTONE_PHASE_STEP_' + f'{txrx_pol[1]}') + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_caltone_phase = f5[c_p] + except KeyError: + warn(f'Missing path "{c_p}" in L0B! Caltone frequency will ' + f'be set to {default} (Hz)') + return default + else: + i_cal = np.median(ds_caltone_phase[()]).astype(int) + caltone_freq = (i_cal / 2**32) * clock + lo + return caltone_freq diff --git a/python/packages/nisar/workflows/focus.py b/python/packages/nisar/workflows/focus.py index dde3c990a..308ed0cfe 100644 --- a/python/packages/nisar/workflows/focus.py +++ b/python/packages/nisar/workflows/focus.py @@ -1909,9 +1909,14 @@ def temp(suffix): # Precompute antenna patterns at downsampled spacing if cfg.processing.is_enabled.eap: + # XXX Due to a bug in respective DRT of some L0B products + # (CRID=05007), caltone.frequency is extrated from runconfig + # otherwise, it shall be set to None to be determined from DRT! antpat = AntennaPattern(raw, dem, antparser, instparser, orbit, attitude, - el_lut=el_lut) + el_lut=el_lut, + freq_band=frequency, + caltone_freq=cfg.processing.caltone.frequency) log.info("Precomputing antenna patterns") i = np.arange(rc_grid.shape[0])