From 3b88d2fc6f5aef73257dfae42978fc1f77a64ab8 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Sun, 30 Nov 2025 16:34:30 -0800 Subject: [PATCH 01/26] Add a new mod to nisar.antenna to compute RX imbalances from Raw/L0B --- python/packages/nisar/antenna/__init__.py | 3 + .../antenna/rx_channel_imbalance_helpers.py | 379 ++++++++++++++++++ 2 files changed, 382 insertions(+) create mode 100644 python/packages/nisar/antenna/rx_channel_imbalance_helpers.py diff --git a/python/packages/nisar/antenna/__init__.py b/python/packages/nisar/antenna/__init__.py index 23134a4f9..afb9d4fea 100644 --- a/python/packages/nisar/antenna/__init__.py +++ b/python/packages/nisar/antenna/__init__.py @@ -2,3 +2,6 @@ from .beamformer import TxBMF, RxDBF, compute_receive_pattern_weights, \ compute_transmit_pattern_weights, get_calib_range_line_idx from .pattern import AntennaPattern +from .rx_channel_imbalance_helpers import ( + compute_all_rx_channel_imbalances_from_l0b +) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py new file mode 100644 index 000000000..d858b697c --- /dev/null +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -0,0 +1,379 @@ +from __future__ import annotations +from warnings import warn +from typing import Tuple, Dict +from dataclasses import dataclass + +import numpy as np +import h5py + +from nisar.products.readers.Raw import Raw +from nisar.antenna import get_calib_range_line_idx, CalPath + + +@dataclass(frozen=True) +class RX_CHANNEL_IMBALANCE_PRODUCT: + """ + RX channel imbalance product extracted from LNA/CALTONE ratio + for a certain frequency band and polarization. + + Attributes + ---------- + lna_caltone_ratio: np.ndarray(complex) + Peak-normalized complex LNA/CALTONE ratio over all RXs + ntap_dominant: np.ndarray(int) + Dominant tap number, a value within [1,3] over all RXs. + time_delays_sec: np.ndarray(float) + Time delays from the phase of outlier qFSP in seconds for all RXs. + max_amp_ratio: float + Max amplitude ratio used in peak normalizing `lna_caltone_ratio`. + + """ + lna_caltone_ratio: np.ndarray + ntap_dominant: np.ndarray + time_delays_sec: np.ndarray + max_amp_ratio: float + + def __post_init__(self): + # XXX Size of all arrays must be 12 for L-band NISAR but + # not enforced due to failure of special cases such as unit test + if (self.lna_caltone_ratio.size != self.ntap_dominant.size + != self.time_delays_sec.size): + raise ValueError('The size of all arrays must be equal!') + + +def compute_all_rx_channel_imbalances_from_l0b( + l0b_file: str | Raw, + *, + caltone_freq: float = 1214.88e6, + freq_band: str | None = None, + txrx_pol: str | None = None +) -> Dict[Tuple[str, str], RX_CHANNEL_IMBALANCE_PRODUCT]: + """ + Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio + for over all bands and polarizations. The bands and polarizations are + used as dictionary keys in the form of [freq_band, txrx_pol]. + + Also report the dominant tap number our of 3 for LNA three-tap + correlator as well as detected relative time delays for all RX channels + for debugging purposes. + + Parameters + ---------- + l0b_file : str or nisar.products.readers.Raw + L0B filename or Raw object + caltone_freq : float, default=1214.88e6 + Caltone frequency in Hz. + freq_band : str, optional + "A" or "B". Default is all. + txrx_pol: str, optional + TR pol in `freq_band` such as "HH", "HV", etc. + Default is all. + + Returns + ------- + dict: + A dict with keys (freq_band, txrx_pol) and values of type + `RX_CHANNEL_IMBALANCE_PRODUCT` + + """ + if isinstance(l0b_file, str): + raw = Raw(hdf5file=l0b_file) + else: + raw = l0b_file + frq_pols = raw.polarizations + # get freq_bands and txrx_pols + if freq_band is not None: + frq_pols = {freq_band: frq_pols[freq_band]} + if txrx_pol is not None: + frq_pols = {f: [txrx_pol] for f in frq_pols if txrx_pol in frq_pols[f]} + + out = dict() + for freq_band in frq_pols: + for txrx_pol in frq_pols[freq_band]: + (lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio + ) = compute_rx_channel_imbalance( + raw, + freq_band, + txrx_pol, + caltone_freq=caltone_freq + ) + out[freq_band, txrx_pol] = RX_CHANNEL_IMBALANCE_PRODUCT( + lna_caltone_ratio=lna_caltone_ratio, + ntap_dominant=n_tap_dominant, + time_delays_sec=time_delays, + max_amp_ratio=max_ratio + ) + return out + + +def compute_rx_channel_imbalance( + raw: Raw, + freq_band: str, + txrx_pol: str, + caltone_freq: float = 1214.88e6 +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]: + """ + Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio + for a desired frequency band and TR polarization. + + Also report the dominant tap number our of 3 for LNA three-tap + correlator as well as detected relative time delays for all RX channels + for debugging purposes. + + Returns + ------- + lna_caltone_ratio: np.ndarray(complex) + Peak-normalized complex LNA/CALTONE ratio over all 12 RXs + n_tap_dominant: np.ndarray(int) + Dominant tap number, a value within [1,3] over all 12 RXs. + time_delays: np.ndarray(float) + Time delays from the phase of qFSP outlier + max_ratio : float + Report peak power among all channels used for amplitude + normalization of RX channel imbalances. + + """ + lna_mean, n_tap_dominant = get_lna_cal_mean( + raw, freq_band, txrx_pol) + # get caltone mean over all RX channels + caltone_mean = get_caltone_mean(raw, freq_band, txrx_pol) + # Get complex ratio LNA/Caltone over all channels + lna_caltone_ratio = lna_mean / caltone_mean + # correct the ratio for the second band if necessary + lna_caltone_ratio, time_delays = correct_lna_caltone_ratio_for_second_band( + lna_caltone_ratio, + raw, + freq_band, + txrx_pol, + caltone_freq=caltone_freq + ) + # peak normalized + max_ratio = np.nanmax(abs(lna_caltone_ratio)) + if not np.isclose(max_ratio, 0): + lna_caltone_ratio /= max_ratio + return lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio + + +def parse_chirp_corr_from_hrt_qfsp( + raw: Raw, + txrx_pol: str) -> Tuple[np.ndarray, np.ndarray]: + """ + Parse three-tap chirp correlator array with shape (lines, 12, 3) + as well ass cal type with shape (lines,) from HRT QFSP. + """ + # get HRT path + hrt_path = raw.TelemetryPath.replace('low', 'high') + qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + # loop over three qfsp + for i_qfsp in range(3): + p_qfsp = f'{qfsp_path}{i_qfsp}' + # loop over 4 channels per qfsp: + for nn in range(4): + i_chn = nn + i_qfsp * 4 + n_rx = i_chn + 1 + # loop over 3 taps + for i_tap in range(3): + n_tap = i_tap + 1 + # form the path to the dataset per I and Q + # use RX pol! + p_ds_i = (f'{p_qfsp}/CHIRP_CORRELATOR_I{n_tap}_' + f'{txrx_pol[1]}{n_rx:02d}') + p_ds_q = (f'{p_qfsp}/CHIRP_CORRELATOR_Q{n_tap}_' + f'{txrx_pol[1]}{n_rx:02d}') + # initialize the 3-D array, lines by 12 by 3 + if i_qfsp == nn == i_tap == 0: + # XXX get caltype from the very first qFSP assuming + # it is qFSP independent! + p_type = f'{p_qfsp}/CP_CAL_TYPE_{txrx_pol[1]}{i_qfsp}' + # XXX Following Try/exception block is added to + # support old sim L0B products lacking HRT! + try: + ds_cal_type = f5[p_type] + except KeyError: + warn(f'Missing dataset "{p_type}" in ' + f'"{raw.filename}". LNA CAL values ' + 'from co-pol will be used instead. ' + 'Results may be invalid!') + freq_band = [f for f in raw.frequencies if + txrx_pol in raw.polarizations[f]][0] + chp_cor = raw.getChirpCorrelator( + freq_band, txrx_pol[0]) + cal_type = raw.getCalType(freq_band, txrx_pol[0]) + return chp_cor, cal_type + else: + cal_type = ds_cal_type[()].astype(CalPath) + # initialize the 3-D array for chirp correlator + num_lines = f5[p_ds_i].size + chp_cor = np.ones((num_lines, 12, 3), dtype='c8') + chp_cor[:, i_chn, i_tap].real = f5[p_ds_i][()] + chp_cor[:, i_chn, i_tap].imag = f5[p_ds_q][()] + return chp_cor, cal_type + + +def get_lna_cal_mean( + raw: Raw, + freq_band: str, + txrx_pol: str +) -> Tuple[np.ndarray, np.ndarray]: + """ + Returns mean complex LNA values and dominant tap + numbers within [1, 2, 3] for all channels + """ + if txrx_pol[0] == txrx_pol[1]: + chp_cor = raw.getChirpCorrelator(freq_band, txrx_pol[0]) + cal_type = raw.getCalType(freq_band, txrx_pol[0]) + else: # get x-pol chirp correlator from HRT + chp_cor, cal_type = parse_chirp_corr_from_hrt_qfsp( + raw, + txrx_pol + ) + n_rxs = chp_cor.shape[1] + _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type) + if len(idx_lna) == 0: + warn('No LNA CAL to represent RX! Use BYPASS Cal instead!') + if len(idx_byp) == 0: + # XXX to avoid failure in unit test or very short L0B + # lacking LNA/BYP CAL datasets, a warning will be issued + # and the values will all be set to unity! + warn('No LNA or BYPASS CAL! LNA mean will be all unity. ' + 'The results will be invalid!') + lna_mean = np.ones(n_rxs, dtype='c8') + n_tap_dominant = np.full(n_rxs, fill_value=2) + return lna_mean, n_tap_dominant + idx_lna = idx_byp + # get LNA for all three taps (or BYPASS) + lna_mean_tap3 = np.zeros((3, n_rxs), dtype='c16') + for nn in range(3): + lna_cal = chp_cor[idx_lna, :, nn] + # get complex mean for all RX channels + lna_mean_tap3[nn] = _mean_2d(lna_cal) + # get dominat taps + abs_lna_mean_tap3 = abs(lna_mean_tap3) + idx_lna_taps = np.nanargmax(abs_lna_mean_tap3, axis=0) + amp_lna_mean = np.zeros(n_rxs) + for nn in range(n_rxs): + amp_lna_mean[nn] = abs_lna_mean_tap3[idx_lna_taps[nn], nn] + _check_if_zero(amp_lna_mean, msg=f'{txrx_pol[0]}-pol LNA Cal') + # get the phase part at a fixed common tap rather than dominant one + phs_lna_mean = np.angle(lna_mean_tap3[1]) + # form complex lna + lna_mean = amp_lna_mean * np.exp(1j * phs_lna_mean) + n_tap_dominant = idx_lna_taps + 1 + return lna_mean, n_tap_dominant + + +def correct_lna_caltone_ratio_for_second_band( + lna_caltone_ratio: np.ndarray, + raw: Raw, + freq_band: str, + txrx_pol: str, + caltone_freq: float = 1214.88e6 +) -> Tuple[np.ndarray, np.ndarray]: + # XXX check if product from the second band so we can modify + # the results from the first band only if there is a + # relative delay offset in one of qFSP vs others, that is + # one of the qFSP is an outlier due to ADC clock/delay issue + # check if there is delay anomaly among three qFSP + fc_a, _, _, _ = raw.getChirpParameters('A', txrx_pol[0]) + # get diff of chirp (band=A) and caltone freq for delay detection + dif_chirp_caltone_freq = fc_a - caltone_freq + time_delay = _get_qfsp_delay_anomaly( + lna_caltone_ratio, dif_chirp_caltone_freq) + if _is_product_from_second_band(raw, freq_band, txrx_pol): + warn(f'correcting LNA/CALTONE for band={freq_band} and pol={txrx_pol}') + # if there is then get diff of frequency bands A dn B + # to be used to correct phase from A for B + fc_b, _, _, _ = raw.getChirpParameters('B', txrx_pol[0]) + phs_adj = 2 * np.pi * (fc_b - fc_a) * time_delay + # correct the LNA/CALTONE by delay amount via phase if any. + lna_caltone_ratio *= np.exp(1j * phs_adj) + return lna_caltone_ratio, time_delay + + +def get_caltone_mean( + raw: Raw, + freq_band: str, + txrx_pol: str +) -> np.ndarray: + # now get caltone always from swath + caltone = raw.getCaltone(freq_band, txrx_pol) + caltone_mean = _mean_2d(caltone) + _check_if_zero(caltone_mean, msg=f'{txrx_pol}-pol Caltone') + return caltone_mean + + +def _is_product_from_second_band( + raw: Raw, + freq_band: str, + txrx_pol: str): + """ + Determine whether the produt is avolable on both bands + and it is from the second band. + """ + if freq_band == "B" and len(raw.frequencies) == 2: + if txrx_pol in raw.polarizations['A']: + return True + return False + return False + + +def _get_qfsp_delay_anomaly( + lna_caltone_ratio: np.ndarray, + dif_chirp_caltone_freq: float, + adc_clock: float = 240e6) -> np.ndarray: + """ + get time delays for a qfSP with phase anomaly only for + 12 channel NISAR L-band product. + For other case, it will be set to zero! + """ + if lna_caltone_ratio.size == 12: + # group them into three 4-channels, one per qFSP + lna2cal_ratio = lna_caltone_ratio.reshape(3, 4) + # get unwrap phase across 4 channels per qFSP (radians) + lna2cal_phs = np.unwrap(np.angle(lna2cal_ratio), axis=1) + # get median phase per qfsp, total 3 phase values (radians) + # and then unwrap three values + qfps_phs = np.unwrap(np.nanmedian(lna2cal_phs, axis=1)) + # use median among all three to be used as a reference to + # catch a single outlier + phs_ref = np.median(qfps_phs) + # phase due to ADC delay + phs_adc_delay = 2 * np.pi * dif_chirp_caltone_freq / adc_clock + n_delay_qfsp = np.round((qfps_phs - phs_ref) / phs_adc_delay) + # now repeat sample delay 4x per qFSP + n_delays = np.repeat( + n_delay_qfsp[:, np.newaxis], repeats=4, axis=1).ravel() + time_delays = n_delays / adc_clock + else: + time_delays = np.zeros(lna_caltone_ratio.size) + return time_delays + + +def _mean_2d(data: np.ndarray, perc: float = 0.0) -> np.asarray: + """ + Compute mean within percentile [perc, 100-perc], + of a 2-D complex array with shape (rangelines, channels) + due to bad telemetry. + """ + # or simply np.nanmean(data, axis=0) + d = np.sort(np.abs(data), axis=0) + q1_all, q3_all = np.percentile(d, q=[perc, 100 - perc], axis=0) + mean_all = [] + for cc, (q1, q3) in enumerate(zip(q1_all, q3_all)): + data_q1_q3 = data[(d[:, cc] >= q1) & (d[:, cc] <= q3), cc] + mean_all.append(np.nanmean(data_q1_q3)) + return np.asarray(mean_all) + + +def _check_if_zero(arr: np.ndarray, msg: str): + is_zero = np.isclose(arr, 0) + if is_zero.all(): + # XXX to avoid unit test failure and old sim L0B + # a warning will be issued and all values will be set + # to unity! + warn(f'All values are zero for {msg}! They are set to untiy. ' + 'Result may be invalid!') + arr[...] = 1.0 + if is_zero.any(): + warn(f'Some values are zero for {msg}!') From 1759142aef8ba9c6860afd0843ac4982cd7d8a7f Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Sun, 30 Nov 2025 16:36:15 -0800 Subject: [PATCH 02/26] Modify pattern.py to compute and apply RX imbalances per freq band --- python/packages/nisar/antenna/pattern.py | 120 ++++++++++++++--------- 1 file changed, 76 insertions(+), 44 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 9b6b65b44..cb0a2ff92 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -1,14 +1,15 @@ from collections import defaultdict -from enum import IntEnum, unique from isce3.core import Orbit, Attitude, Linspace from isce3.geometry import DEMInterpolator import logging -from nisar.mixed_mode.logic import PolChannelSet from nisar.products.readers.antenna import AntennaParser from nisar.products.readers.instrument import InstrumentParser from nisar.products.readers.Raw import Raw from nisar.antenna import TxTrmInfo, RxTrmInfo, TxBMF, RxDBF from nisar.antenna.beamformer import get_pulse_index +from nisar.antenna.rx_channel_imbalance_helpers import ( + compute_all_rx_channel_imbalances_from_l0b +) import numpy as np log = logging.getLogger("nisar.antenna.pattern") @@ -150,6 +151,11 @@ class AntennaPattern: vairations expected to be less than 0.05 dB and 0.25 deg, respectively. This can speed up the antenna pattern computation. If None, it will be ignored. + freq_band : {'A', 'B'} or None. Optional + If none, the very first frequency band will + be used. + caltone_freq: float, default=1214.88e6 + Caltone frequency in Hz. """ @@ -158,7 +164,9 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, orbit: Orbit, attitude: Attitude, *, el_lut=None, norm_weight=True, - el_spacing_min=8.72665e-5): + el_spacing_min=8.72665e-5, + freq_band=None, + caltone_freq=1214.88e6): self.orbit = orbit.copy() self.attitude = attitude.copy() @@ -167,12 +175,27 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.el_spacing_min = el_spacing_min self.el_lut = el_lut - # get pols - channels = PolChannelSet.from_raw(raw) - freqs = tuple({chan.freq_id for chan in channels}) - self.freq_band = "A" if "A" in freqs else freqs[0] - self.txrx_pols = tuple({chan.pol for chan in channels}) - + # get frequency band + freqs = np.sort(raw.frequencies) + if freq_band is None: + self.freq_band = freqs[0] + else: + if freq_band not in freqs: + raise ValueError( + f'freq_band {freq_band} is out of range {freqs}!') + self.freq_band = freq_band + # get all polarization for a frequency band + self.txrx_pols = raw.polarizations[self.freq_band] + # comput all RX channel imbalances over all + # txrx pols of a desired frequency band. + # This RX imbalanced is basically LNA/CALTONE ratio! + # XXX perhaps Caltone frequency can be parsed from L0B DRT + # rather than provided as an input! + self.rx_imb = compute_all_rx_channel_imbalances_from_l0b( + raw, + freq_band=self.freq_band, + caltone_freq=caltone_freq + ) # Parse ref epoch, pulse time, slant range, orbit and attitude from Raw # Except for quad-pol, pulse time is the same for all TX pols. # In case of quad-pol the time offset is half single-pol PRF and thus @@ -192,7 +215,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # parse active RX channels and fs_ta which are polarization # independent! - txrx_pol = raw.polarizations[self.freq_band][0] + txrx_pol = self.txrx_pols[0] self.rx_chanl = raw.getListOfRxTRMs(self.freq_band, txrx_pol) self.fs_ta = ins.sampling_rate_ta(txrx_pol[1]) @@ -205,14 +228,14 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # Loop over all freqs & pols since some RX pols may be found only on # freq B (e.q. the QQP case). Assume RD/WD/WL are the same for all # freqs/pols that have the same RX polarization. - for chan in channels: - rxpol = chan.pol[1] + for txrx_pol in self.txrx_pols: + rxpol = txrx_pol[1] if rxpol in rd_all: continue rd_all[rxpol], wd_all[rxpol], wl_all[rxpol] = raw.getRdWdWl( - chan.freq_id, chan.pol) + self.freq_band, txrx_pol) self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol], - wd_all[rxpol], wl_all[rxpol]) + wd_all[rxpol], wl_all[rxpol]) # build RxTRMs and the first RxDBF for all possible RX # linear polarizations @@ -295,7 +318,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # instrument file per TX linear pol. self.channel_adj_fact_tx[tx_lp] = ( ins.channel_adjustment_factors_tx(tx_lp) - ) + ) # get tx el-cut patterns el_pat_tx = ant.el_cut_all(tx_lp) @@ -307,9 +330,8 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, el_lut=self.el_lut, norm_weight=self.norm_weight, rg_spacing_min=self.rg_spacing_min) - def form_pattern(self, tseq, slant_range: Linspace, - nearest: bool = False, txrx_pols = None): + nearest: bool = False, txrx_pols=None): """ Get the two-way antenna pattern at a given time and set of ranges for either all or specified polarization combinations if Tx/Rx pols are @@ -333,26 +355,33 @@ def form_pattern(self, tseq, slant_range: Linspace, ------- dict Two-way complex antenna patterns as a function of range bin - over either all or specified TxRx polarization products. The format of dict is - {pol: np.ndarray[complex]}. + over either all or specified TxRx polarization products. + The format of dict is {pol: np.ndarray[complex]}. """ if txrx_pols is None: txrx_pols = self.txrx_pols elif not set(txrx_pols).issubset(self.txrx_pols): raise ValueError(f"Specified txrx_pols {txrx_pols} is out of " - f"available pols {self.txrx_pols}!") + f"available pols {self.txrx_pols}!") tseq = np.atleast_1d(tseq) - rx_pols = {pol[1] for pol in txrx_pols} - # form one-way RX patterns for all linear pols rx_dbf_pat = dict() - for p in rx_pols: + for txrx_pol in txrx_pols: + rxp = txrx_pol[1] + # combine channel adjustment from both internally computed + # RX imbalance (LNA/CALTONE) and secondary correction from + # input INST HDF5 product + channel_adj_fact_rx = ( + self.rx_imb[self.freq_band, txrx_pol].lna_caltone_ratio) + if self.channel_adj_fact_rx[rxp] is not None: + channel_adj_fact_rx *= np.asarray( + self.channel_adj_fact_rx[rxp]) # Split up provided timespan into groups with the same range timing # (Adding one because get_pulse_index uses floor but we want ceil) change_indices = [ - get_pulse_index(tseq, t) + 1 for t in self.finder[p].time_changes + get_pulse_index(tseq, t) + 1 for t in self.finder[rxp].time_changes if t > tseq[0] and t < tseq[-1] ] tgroups = np.split(tseq, change_indices) @@ -361,41 +390,44 @@ def form_pattern(self, tseq, slant_range: Linspace, i0 = 0 for tgroup in tgroups: t = tgroup[0] - rd, wd, wl = self.finder[p].get_dbf_timing(t) + rd, wd, wl = self.finder[rxp].get_dbf_timing(t) - log.info(f'Updating {p}-pol RX antenna pattern because' + log.info(f'Updating {rxp}-pol RX antenna pattern because' ' change in RD/WD/WL') - self.rx_trm[p] = RxTrmInfo( + self.rx_trm[rxp] = RxTrmInfo( self.pulse_times, self.rx_chanl, rd, wd, wl, - self.dbf_coef[p], self.ta_switch[p], self.ela_dbf[p], + self.dbf_coef[rxp], self.ta_switch[rxp], self.ela_dbf[rxp], self.fs_win, self.fs_ta) - self.rx_dbf[p] = RxDBF( - self.orbit, self.attitude, self.dem, self.el_pat_rx[p], - self.rx_trm[p], self.reference_epoch, + self.rx_dbf[rxp] = RxDBF( + self.orbit, self.attitude, self.dem, self.el_pat_rx[rxp], + self.rx_trm[rxp], self.reference_epoch, el_lut=self.el_lut, - norm_weight=self.rx_dbf[p].norm_weight) + norm_weight=self.rx_dbf[rxp].norm_weight) - pat = self.rx_dbf[p].form_pattern( + pat = self.rx_dbf[rxp].form_pattern( tgroup, slant_range, - channel_adj_factors=self.channel_adj_fact_rx[p] + channel_adj_factors=channel_adj_fact_rx ) - # Initialize the pattern array so we can slice this range timing - # group into it - TODO move this outside the loop for clarity? - if p not in rx_dbf_pat: - rx_dbf_pat[p] = np.empty((len(tseq), slant_range.size), - dtype=np.complex64) + # Initialize the pattern array so we can slice this + # range timing group into it + # TODO move this outside the loop for clarity? + if rxp not in rx_dbf_pat: + rx_dbf_pat[rxp] = np.empty((len(tseq), slant_range.size), + dtype=np.complex64) # Slice it into the full array, and # bump up the index for the next slice iend = i0 + len(tgroup) - rx_dbf_pat[p][i0:iend] = pat + rx_dbf_pat[rxp][i0:iend] = pat i0 = iend # form one-way TX patterns for all TX pols - tx_bmf_pat = defaultdict(lambda: np.empty((len(tseq), slant_range.size), - dtype=np.complex64)) + tx_bmf_pat = defaultdict( + lambda: np.empty((len(tseq), slant_range.size), + dtype=np.complex64) + ) for tx_pol in {pol[0] for pol in txrx_pols}: if tx_pol == "L": tx_bmf_pat[tx_pol] = ( @@ -420,8 +452,8 @@ def form_pattern(self, tseq, slant_range: Linspace, else: # other non-compact pol types adj = self.channel_adj_fact_tx[tx_pol] tx_bmf_pat[tx_pol] = self.tx_bmf[tx_pol].form_pattern( - tseq, slant_range, nearest=nearest, channel_adj_factors=adj - ).astype(np.complex64) + tseq, slant_range, nearest=nearest, channel_adj_factors=adj + ).astype(np.complex64) # build two-way pattern for all unique TxRx products obtained from all # freq bands From d0ce1abc6ed506b6214e92f3ca767faa4053c8e4 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Sun, 30 Nov 2025 16:37:32 -0800 Subject: [PATCH 03/26] Update beamformer to allow single scalar for RX channel adjustment --- python/packages/nisar/antenna/beamformer.py | 57 +++++++++++++-------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/python/packages/nisar/antenna/beamformer.py b/python/packages/nisar/antenna/beamformer.py index 300549a1d..4acc7cf9b 100644 --- a/python/packages/nisar/antenna/beamformer.py +++ b/python/packages/nisar/antenna/beamformer.py @@ -315,11 +315,13 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None, # check the pulse_time to be within time tag of TxTRM if (pulse_time[0] < self._tm_first_trm or pulse_time[-1] > self._tm_last_trm): - raise ValueError("Requested time interval " + raise ValueError( + "Requested time interval " f"[{pulse_time[0]}, {pulse_time[-1]}] (s) is not fully " "contained within expected TxTrmInfo time interval " f"[{self.trm_info.time[0]}, {self.trm_info.time[-1]}] (s) " - f"relative to {str(self.orbit.reference_epoch)}") + f"relative to {str(self.orbit.reference_epoch)}" + ) # get total number of TX channels _, num_chanl = self.trm_info.correlator_tap2.shape @@ -380,9 +382,11 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None, # Old method: compute monotonically increasing elevation angles # at each successive slant range bin - # Compute the respective slant range for beamformed antenna pattern + # Compute the respective slant range for beamformed + # antenna pattern. # Simply calculate slant range for every few pulses where - # S/C pos/vel and DEM barely changes. This speeds up the process! + # S/C pos/vel and DEM barely changes. This speeds up + # the process! if (pp % self.num_pulse_skip == 0): sr_ant = self._elaz2slantrange(tm) @@ -580,11 +584,13 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None): # check the pulse_time to be within time tag of RxTRM if (pulse_time[0] < self._tm_first_trm or pulse_time[-1] > self._tm_last_trm): - raise ValueError("Requested time interval " + raise ValueError( + "Requested time interval " f"[{pulse_time[0]}, {pulse_time[-1]}] (s) is not fully " "contained within expected RxTrmInfo time interval " f"[{self.trm_info.time[0]}, {self.trm_info.time[-1]}] (s) " - f"relative to {str(self.orbit.reference_epoch)}") + f"relative to {str(self.orbit.reference_epoch)}" + ) # EL-cut pattern with shape active beams by EL angles ant_pat_el = self.el_ant_info.copol_pattern[self.active_channel_idx] @@ -609,16 +615,27 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None): # if provided if channel_adj_factors is not None: # check the size of correction factor container - if len(channel_adj_factors) != num_chanl: - raise ValueError('Size of RX "channel adjustment factor" ' - f'must be {num_chanl}') - # check if the correction factor is zero for all active channels - cor_fact = np.asarray(channel_adj_factors)[self.active_channel_idx] - if np.isclose(abs(cor_fact).max(), 0): - raise ValueError('"channel_adj_factors" are zeros for all ' - 'active RX channels!') - rx_wgt *= cor_fact[:, None] - + if len(channel_adj_factors) == num_chanl: + # check if the correction factor is zero for all + # active channels + cor_fact = np.asarray(channel_adj_factors)[ + self.active_channel_idx] + if np.isclose(abs(cor_fact).max(), 0): + raise ValueError('"channel_adj_factors" are zeros for all ' + 'active RX channels!') + rx_wgt *= cor_fact[:, None] + elif len(channel_adj_factors) == 1: # fixed scalar + if np.isclose(channel_adj_factors[0], 0): + raise ValueError( + '"channel_adj_factors" is a zero-value scalar for all' + 'active RX channels!' + ) + rx_wgt *= channel_adj_factors[0] + else: # neither 1 (scalar) nor `num_chanl` + raise ValueError( + 'Size of RX "channel adjustment factor" must be either ' + f'{num_chanl} or 1 but got {len(channel_adj_factors)}!' + ) # initialize the RX DBF pattern rx_pat = np.zeros((len(pulse_time), slant_range.size), dtype='complex') num_active_chanl = len(self.active_channel_idx) @@ -779,7 +796,7 @@ def compute_transmit_pattern_weights(tx_trm_info, norm=False): warnings.warn( 'HPA Cal contains some zero values. These will be replaced with' ' the nearest non-zero values.', category=BadHPACalWarning - ) + ) # replace zero values with nearest non-zero ones for n in range(active_tx_idx.size): mask_zr = np.isclose(hcal_abs[:, n], 0) @@ -789,7 +806,7 @@ def compute_transmit_pattern_weights(tx_trm_info, norm=False): f_nearest = interp1d( i_hpa_nz, tx_weights[i_hpa_nz, n], kind='nearest', fill_value='extrapolate', assume_sorted=True - ) + ) tx_weights[i_hpa_z, n] = f_nearest(i_hpa_z) # If BCAL exists compute ratio HCAL/(BCAL/BCAL[0]) @@ -975,8 +992,8 @@ def get_pulse_index(pulse_times, t, nearest=False, eps=5e-10): eps : float Tolerance for snapping time tags, e.g., when `abs(pulse_times[i] - t) <= eps` - then return `i`. This accommodates floating point precision issues like - `n * pri != n / prf`. + then return `i`. This accommodates floating point precision + issues like `n * pri != n / prf`. Returns ------- From 37df7fb7337a1117d32900a51476b7a0aca6b484 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Sun, 30 Nov 2025 16:38:38 -0800 Subject: [PATCH 04/26] Pass frequency band to EAP block in focus.py --- python/packages/nisar/workflows/focus.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/packages/nisar/workflows/focus.py b/python/packages/nisar/workflows/focus.py index dde3c990a..1cd4f91ab 100644 --- a/python/packages/nisar/workflows/focus.py +++ b/python/packages/nisar/workflows/focus.py @@ -1911,7 +1911,8 @@ def temp(suffix): if cfg.processing.is_enabled.eap: antpat = AntennaPattern(raw, dem, antparser, instparser, orbit, attitude, - el_lut=el_lut) + el_lut=el_lut, + freq_band=frequency) log.info("Precomputing antenna patterns") i = np.arange(rc_grid.shape[0]) From 71dc922cee466cbb9d467a1e71b5ff71dc933d44 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 2 Dec 2025 15:35:37 -0800 Subject: [PATCH 05/26] Add a function to compute pulsewidth delay in sequential TX --- .../antenna/rx_channel_imbalance_helpers.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index d858b697c..221b8c217 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -377,3 +377,23 @@ def _check_if_zero(arr: np.ndarray, msg: str): arr[...] = 1.0 if is_zero.any(): warn(f'Some values are zero for {msg}!') + + +def get_pulsewidth_delay_from_raw( + raw: Raw, + freq_band: str, + txrx_pol: str +) -> float: + """ + Get delay (seconds) of the second pulse wrt the pulsewidth + of the first TX pulse in sequential split-spectrum transmit + for a desired dataset in L0B. + """ + # check if band is B and it is split spectrum + if freq_band == 'B' and len(raw.frequencies) == 2: + pols = raw.polarizations + # check if this is sequential transmit + if txrx_pol in pols['A']: + _, _, _, _, pw = raw.getChirpParameters('A', txrx_pol[0]) + return pw + return 0.0 From ed83d0e9dfba05929dc07343a3833621020dbbf7 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 2 Dec 2025 15:36:10 -0800 Subject: [PATCH 06/26] Modify pattern.py to adjustment RD for the second band in RX DBF --- python/packages/nisar/antenna/pattern.py | 24 ++++++++++++++++++- .../antenna/rx_channel_imbalance_helpers.py | 2 +- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index cb0a2ff92..0f6a19f72 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -1,3 +1,4 @@ +from warnings import warn from collections import defaultdict from isce3.core import Orbit, Attitude, Linspace from isce3.geometry import DEMInterpolator @@ -8,7 +9,8 @@ from nisar.antenna import TxTrmInfo, RxTrmInfo, TxBMF, RxDBF from nisar.antenna.beamformer import get_pulse_index from nisar.antenna.rx_channel_imbalance_helpers import ( - compute_all_rx_channel_imbalances_from_l0b + compute_all_rx_channel_imbalances_from_l0b, + get_pulsewidth_delay_from_raw ) import numpy as np @@ -236,6 +238,26 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.freq_band, txrx_pol) self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol], wd_all[rxpol], wl_all[rxpol]) + # XXX get pulsewidth delay and number of samples to correct + # RDs @ `self.fs_win` used in forming RX DBF pattern. This + # will account for delay in onboard DBF process due to first + # pulsewidth in sequential split spectrum TX. + # Note that half pulse wdith delay shall also be incorporated + # per ferquency band for all modes. That is delay of 0.5 * pw_a + # for "A" and delay of "pw_a + 0.5 * pw_b" for B. + # The fdollowing does NOT take into account half pulsewidth + # per TX pulse! + tm_delay = get_pulsewidth_delay_from_raw( + raw, self.freq_band, txrx_pol) + n_samp_delay = round(tm_delay * self.fs_win) + if n_samp_delay != 0: + warn( + f'RD of RxDBF for band={self.freq_band} & pol={txrx_pol} ' + f'is corrected by {tm_delay * 1e6:.3f} (usec) or ' + f'equivalently # {n_samp_delay} samples @ ' + f'{self.fs_win} (MHz)!' + ) + rd_all[rxpol] += n_samp_delay # build RxTRMs and the first RxDBF for all possible RX # linear polarizations diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 221b8c217..e11041083 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -394,6 +394,6 @@ def get_pulsewidth_delay_from_raw( pols = raw.polarizations # check if this is sequential transmit if txrx_pol in pols['A']: - _, _, _, _, pw = raw.getChirpParameters('A', txrx_pol[0]) + _, _, _, pw = raw.getChirpParameters('A', txrx_pol[0]) return pw return 0.0 From 07fae4c6b0fe7e6821a75dd6758771ef58709441 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 2 Dec 2025 16:14:01 -0800 Subject: [PATCH 07/26] Fix the log in pattern.py --- python/packages/nisar/antenna/pattern.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 0f6a19f72..bfde92a28 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -255,7 +255,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, f'RD of RxDBF for band={self.freq_band} & pol={txrx_pol} ' f'is corrected by {tm_delay * 1e6:.3f} (usec) or ' f'equivalently # {n_samp_delay} samples @ ' - f'{self.fs_win} (MHz)!' + f'{self.fs_win * 1e-6} (MHz)!' ) rd_all[rxpol] += n_samp_delay From 0e2df26cd64231c14eca00f2917e469e61fa91ce Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 2 Dec 2025 19:06:15 -0800 Subject: [PATCH 08/26] Correct the sign for RD correction of band B in RX DBF pattern --- python/packages/nisar/antenna/pattern.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index bfde92a28..11eeb2bd7 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -257,7 +257,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, f'equivalently # {n_samp_delay} samples @ ' f'{self.fs_win * 1e-6} (MHz)!' ) - rd_all[rxpol] += n_samp_delay + rd_all[rxpol] -= n_samp_delay # build RxTRMs and the first RxDBF for all possible RX # linear polarizations From 0dd72f14fe62c31961dae91a6653eb73634f3fa4 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 3 Dec 2025 22:14:34 -0800 Subject: [PATCH 09/26] Use slant range diff between A and B in place of pulsewidth delay --- python/packages/nisar/antenna/pattern.py | 20 ++++++++----------- .../antenna/rx_channel_imbalance_helpers.py | 9 ++++++--- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 11eeb2bd7..ff11d140a 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -10,7 +10,7 @@ from nisar.antenna.beamformer import get_pulse_index from nisar.antenna.rx_channel_imbalance_helpers import ( compute_all_rx_channel_imbalances_from_l0b, - get_pulsewidth_delay_from_raw + get_range_delay_from_raw ) import numpy as np @@ -238,16 +238,12 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.freq_band, txrx_pol) self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol], wd_all[rxpol], wl_all[rxpol]) - # XXX get pulsewidth delay and number of samples to correct - # RDs @ `self.fs_win` used in forming RX DBF pattern. This - # will account for delay in onboard DBF process due to first - # pulsewidth in sequential split spectrum TX. - # Note that half pulse wdith delay shall also be incorporated - # per ferquency band for all modes. That is delay of 0.5 * pw_a - # for "A" and delay of "pw_a + 0.5 * pw_b" for B. - # The fdollowing does NOT take into account half pulsewidth - # per TX pulse! - tm_delay = get_pulsewidth_delay_from_raw( + # XXX get range delay offset for band B in split-spectrum to + # correct RDs @ `self.fs_win` used in forming RX DBF pattern. + # This will account for delay after onboard DBF due to + # both the first pulsewidth in sequential TX chirps + # as well as the difference in filter group delays. + tm_delay = get_range_delay_from_raw( raw, self.freq_band, txrx_pol) n_samp_delay = round(tm_delay * self.fs_win) if n_samp_delay != 0: @@ -257,7 +253,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, f'equivalently # {n_samp_delay} samples @ ' f'{self.fs_win * 1e-6} (MHz)!' ) - rd_all[rxpol] -= n_samp_delay + rd_all[rxpol][...] = rd_all[rxpol].astype(int) + n_samp_delay # build RxTRMs and the first RxDBF for all possible RX # linear polarizations diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index e11041083..f6560d2cc 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -8,6 +8,7 @@ from nisar.products.readers.Raw import Raw from nisar.antenna import get_calib_range_line_idx, CalPath +from isce3.core import speed_of_light @dataclass(frozen=True) @@ -379,7 +380,7 @@ def _check_if_zero(arr: np.ndarray, msg: str): warn(f'Some values are zero for {msg}!') -def get_pulsewidth_delay_from_raw( +def get_range_delay_from_raw( raw: Raw, freq_band: str, txrx_pol: str @@ -394,6 +395,8 @@ def get_pulsewidth_delay_from_raw( pols = raw.polarizations # check if this is sequential transmit if txrx_pol in pols['A']: - _, _, _, pw = raw.getChirpParameters('A', txrx_pol[0]) - return pw + sr_b = raw.getRanges('B', txrx_pol[0]) + sr_a = raw.getRanges('A', txrx_pol[0]) + delay = 2 * (sr_b.first - sr_a.first) / speed_of_light + return delay return 0.0 From 9d3dd594900d4486610a30e44aa0e56b10f6a967 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 3 Dec 2025 23:52:17 -0800 Subject: [PATCH 10/26] Parse caltone frequency from DRT in L0B --- .../antenna/rx_channel_imbalance_helpers.py | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index f6560d2cc..c8d5dac37 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -45,7 +45,7 @@ def __post_init__(self): def compute_all_rx_channel_imbalances_from_l0b( l0b_file: str | Raw, *, - caltone_freq: float = 1214.88e6, + caltone_freq: float | None = None, freq_band: str | None = None, txrx_pol: str | None = None ) -> Dict[Tuple[str, str], RX_CHANNEL_IMBALANCE_PRODUCT]: @@ -62,11 +62,12 @@ def compute_all_rx_channel_imbalances_from_l0b( ---------- l0b_file : str or nisar.products.readers.Raw L0B filename or Raw object - caltone_freq : float, default=1214.88e6 + caltone_freq : float or None. Optional Caltone frequency in Hz. - freq_band : str, optional + If None (default), it will be extracted from DRT in L0B. + freq_band : str. Optional "A" or "B". Default is all. - txrx_pol: str, optional + txrx_pol : str. Optional TR pol in `freq_band` such as "HH", "HV", etc. Default is all. @@ -111,7 +112,7 @@ def compute_rx_channel_imbalance( raw: Raw, freq_band: str, txrx_pol: str, - caltone_freq: float = 1214.88e6 + caltone_freq: float | None = None, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]: """ Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio @@ -269,8 +270,13 @@ def correct_lna_caltone_ratio_for_second_band( raw: Raw, freq_band: str, txrx_pol: str, - caltone_freq: float = 1214.88e6 + caltone_freq: float | None = None ) -> Tuple[np.ndarray, np.ndarray]: + # Get calton efrequency from DRT if not provided + if caltone_freq is None: + caltone_freq = parse_caltone_freq_from_drt(raw, txrx_pol) + warn(f'Caltone frequency is extracted from {txrx_pol[1]}-pol DRT ' + f'-> {caltone_freq * 1e-6:.3f} (MHz)') # XXX check if product from the second band so we can modify # the results from the first band only if there is a # relative delay offset in one of qFSP vs others, that is @@ -400,3 +406,29 @@ def get_range_delay_from_raw( delay = 2 * (sr_b.first - sr_a.first) / speed_of_light return delay return 0.0 + + +def parse_caltone_freq_from_drt( + raw: Raw, + txrx_pol: str +) -> float: + """get caltone frequency in Hz from low rate telemetry in L0B.""" + # default caltone if dataset is not available (Hz) + default = 1214.88e6 + # frequency of local oscillator (Hz) + lo = 1200e6 + # ADC clock (Hz) + clock = 240e6 + c_p = (f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_CALTONE_PHASE_STEP_' + f'{txrx_pol[1]}') + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_caltone_phase = f5[c_p] + except KeyError: + warn(f'Missing path "{c_p}" in L0B! Caltone frequency will ' + f'be set to {default} (Hz)') + return default + else: + i_cal = np.median(ds_caltone_phase[()]).astype(int) + caltone_freq = (i_cal / 2**32) * clock + lo + return caltone_freq From f28dba70b14b3552f9b7812765438b6f48824086 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 3 Dec 2025 23:58:31 -0800 Subject: [PATCH 11/26] Set default caltone frequency to none in pattern.py --- python/packages/nisar/antenna/pattern.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index ff11d140a..cc1fd6fdd 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -156,8 +156,9 @@ class AntennaPattern: freq_band : {'A', 'B'} or None. Optional If none, the very first frequency band will be used. - caltone_freq: float, default=1214.88e6 + caltone_freq: float or None. Optional Caltone frequency in Hz. + If None (default), it will be extracted from telemetry DRT in L0B. """ @@ -168,7 +169,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, norm_weight=True, el_spacing_min=8.72665e-5, freq_band=None, - caltone_freq=1214.88e6): + caltone_freq=None): self.orbit = orbit.copy() self.attitude = attitude.copy() From bce7e7a7b63bdb40a7425a90c9ec59a67680432d Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Mon, 8 Dec 2025 22:04:52 -0800 Subject: [PATCH 12/26] Add onboard DBF delay offset of 2.1474 us to RD in pattern.py --- python/packages/nisar/antenna/pattern.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index cc1fd6fdd..5517557af 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -159,6 +159,9 @@ class AntennaPattern: caltone_freq: float or None. Optional Caltone frequency in Hz. If None (default), it will be extracted from telemetry DRT in L0B. + delay_ofs_dbf: float, default=2.1474e-6 + Delay offset (seconds) in data window position of onboard DBF + process applied to all bands and polarizations. """ @@ -169,7 +172,8 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, norm_weight=True, el_spacing_min=8.72665e-5, freq_band=None, - caltone_freq=None): + caltone_freq=None, + delay_ofs_dbf=2.1474e-6): self.orbit = orbit.copy() self.attitude = attitude.copy() @@ -246,6 +250,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # as well as the difference in filter group delays. tm_delay = get_range_delay_from_raw( raw, self.freq_band, txrx_pol) + tm_delay += delay_ofs_dbf n_samp_delay = round(tm_delay * self.fs_win) if n_samp_delay != 0: warn( From 53a24d938f1e2c960386a181d9596d7eb952f9c5 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Mon, 19 Jan 2026 16:58:57 -0800 Subject: [PATCH 13/26] Use camel case naming convention for rx channel imbalance class --- .../nisar/antenna/rx_channel_imbalance_helpers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index c8d5dac37..1cb9f14dd 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -12,7 +12,7 @@ @dataclass(frozen=True) -class RX_CHANNEL_IMBALANCE_PRODUCT: +class RxChannelImbalanceProduct: """ RX channel imbalance product extracted from LNA/CALTONE ratio for a certain frequency band and polarization. @@ -48,7 +48,7 @@ def compute_all_rx_channel_imbalances_from_l0b( caltone_freq: float | None = None, freq_band: str | None = None, txrx_pol: str | None = None -) -> Dict[Tuple[str, str], RX_CHANNEL_IMBALANCE_PRODUCT]: +) -> Dict[Tuple[str, str], RxChannelImbalanceProduct]: """ Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio for over all bands and polarizations. The bands and polarizations are @@ -75,7 +75,7 @@ def compute_all_rx_channel_imbalances_from_l0b( ------- dict: A dict with keys (freq_band, txrx_pol) and values of type - `RX_CHANNEL_IMBALANCE_PRODUCT` + `RxChannelImbalanceProduct` """ if isinstance(l0b_file, str): @@ -99,7 +99,7 @@ def compute_all_rx_channel_imbalances_from_l0b( txrx_pol, caltone_freq=caltone_freq ) - out[freq_band, txrx_pol] = RX_CHANNEL_IMBALANCE_PRODUCT( + out[freq_band, txrx_pol] = RxChannelImbalanceProduct( lna_caltone_ratio=lna_caltone_ratio, ntap_dominant=n_tap_dominant, time_delays_sec=time_delays, From 2938fc04b455d59006020c8bd8f68dbb650fae37 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Mon, 19 Jan 2026 17:15:22 -0800 Subject: [PATCH 14/26] Fix docstrings, comments, typos, and some minor improvement --- python/packages/nisar/antenna/pattern.py | 6 ++--- .../antenna/rx_channel_imbalance_helpers.py | 22 +++++++++---------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 5517557af..e173175b8 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -195,9 +195,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.txrx_pols = raw.polarizations[self.freq_band] # comput all RX channel imbalances over all # txrx pols of a desired frequency band. - # This RX imbalanced is basically LNA/CALTONE ratio! - # XXX perhaps Caltone frequency can be parsed from L0B DRT - # rather than provided as an input! + # This RX imbalanced is basically LNA/CALTONE ratio. self.rx_imb = compute_all_rx_channel_imbalances_from_l0b( raw, freq_band=self.freq_band, @@ -243,7 +241,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.freq_band, txrx_pol) self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol], wd_all[rxpol], wl_all[rxpol]) - # XXX get range delay offset for band B in split-spectrum to + # Get range delay offset for band B in split-spectrum to # correct RDs @ `self.fs_win` used in forming RX DBF pattern. # This will account for delay after onboard DBF due to # both the first pulsewidth in sequential TX chirps diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 1cb9f14dd..7d4b74dfa 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -94,9 +94,9 @@ def compute_all_rx_channel_imbalances_from_l0b( for txrx_pol in frq_pols[freq_band]: (lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio ) = compute_rx_channel_imbalance( - raw, - freq_band, - txrx_pol, + raw=raw, + freq_band=freq_band, + txrx_pol=txrx_pol, caltone_freq=caltone_freq ) out[freq_band, txrx_pol] = RxChannelImbalanceProduct( @@ -135,8 +135,7 @@ def compute_rx_channel_imbalance( normalization of RX channel imbalances. """ - lna_mean, n_tap_dominant = get_lna_cal_mean( - raw, freq_band, txrx_pol) + lna_mean, n_tap_dominant = get_lna_cal_mean(raw, freq_band, txrx_pol) # get caltone mean over all RX channels caltone_mean = get_caltone_mean(raw, freq_band, txrx_pol) # Get complex ratio LNA/Caltone over all channels @@ -272,12 +271,12 @@ def correct_lna_caltone_ratio_for_second_band( txrx_pol: str, caltone_freq: float | None = None ) -> Tuple[np.ndarray, np.ndarray]: - # Get calton efrequency from DRT if not provided + # Get caltone frequency from DRT if not provided if caltone_freq is None: caltone_freq = parse_caltone_freq_from_drt(raw, txrx_pol) warn(f'Caltone frequency is extracted from {txrx_pol[1]}-pol DRT ' f'-> {caltone_freq * 1e-6:.3f} (MHz)') - # XXX check if product from the second band so we can modify + # Check if product from the second band so we can modify # the results from the first band only if there is a # relative delay offset in one of qFSP vs others, that is # one of the qFSP is an outlier due to ADC clock/delay issue @@ -289,7 +288,7 @@ def correct_lna_caltone_ratio_for_second_band( lna_caltone_ratio, dif_chirp_caltone_freq) if _is_product_from_second_band(raw, freq_band, txrx_pol): warn(f'correcting LNA/CALTONE for band={freq_band} and pol={txrx_pol}') - # if there is then get diff of frequency bands A dn B + # if there is then get diff of frequency bands A and B # to be used to correct phase from A for B fc_b, _, _, _ = raw.getChirpParameters('B', txrx_pol[0]) phs_adj = 2 * np.pi * (fc_b - fc_a) * time_delay @@ -321,7 +320,6 @@ def _is_product_from_second_band( if freq_band == "B" and len(raw.frequencies) == 2: if txrx_pol in raw.polarizations['A']: return True - return False return False @@ -330,9 +328,9 @@ def _get_qfsp_delay_anomaly( dif_chirp_caltone_freq: float, adc_clock: float = 240e6) -> np.ndarray: """ - get time delays for a qfSP with phase anomaly only for - 12 channel NISAR L-band product. - For other case, it will be set to zero! + If the product is a 12-channel NISAR L-band product, + return the time delays for a qFSP with phase anomaly. + Else, return zeros. """ if lna_caltone_ratio.size == 12: # group them into three 4-channels, one per qFSP From 18d739fd6ceaae391e2390505c7a43b90d706eb0 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Mon, 19 Jan 2026 17:25:35 -0800 Subject: [PATCH 15/26] Warn if array size in rx channel imbalance class is not 12 --- python/packages/nisar/antenna/rx_channel_imbalance_helpers.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 7d4b74dfa..54dc3e2f3 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -40,6 +40,9 @@ def __post_init__(self): if (self.lna_caltone_ratio.size != self.ntap_dominant.size != self.time_delays_sec.size): raise ValueError('The size of all arrays must be equal!') + if self.lna_caltone_ratio.size != 12: + warn('The size of LNA-CALTONE ratio is ' + f'{self.lna_caltone_ratio.size} instead of 12!') def compute_all_rx_channel_imbalances_from_l0b( From 81a83c08bdd6b1874d4ef3600213afadb5e43eb7 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Thu, 29 Jan 2026 16:20:53 -0800 Subject: [PATCH 16/26] Fix sign for delay_ofs_dbf in pattern.py --- python/packages/nisar/antenna/pattern.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index e173175b8..0648e738e 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -159,7 +159,7 @@ class AntennaPattern: caltone_freq: float or None. Optional Caltone frequency in Hz. If None (default), it will be extracted from telemetry DRT in L0B. - delay_ofs_dbf: float, default=2.1474e-6 + delay_ofs_dbf: float, default=-2.1474e-6 Delay offset (seconds) in data window position of onboard DBF process applied to all bands and polarizations. @@ -173,7 +173,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, el_spacing_min=8.72665e-5, freq_band=None, caltone_freq=None, - delay_ofs_dbf=2.1474e-6): + delay_ofs_dbf=-2.1474e-6): self.orbit = orbit.copy() self.attitude = attitude.copy() From 1080837a05b23eef9e4950bfbc9619a0ff1802c7 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 3 Feb 2026 00:16:54 -0800 Subject: [PATCH 17/26] Augment Raw w/ QP-support chirp correlator and caltype functions --- .../nisar/products/readers/Raw/Raw.py | 268 +++++++++++++++++- .../nisar/products/readers/Raw/__init__.py | 10 +- 2 files changed, 275 insertions(+), 3 deletions(-) diff --git a/python/packages/nisar/products/readers/Raw/Raw.py b/python/packages/nisar/products/readers/Raw/Raw.py index c68113dbb..16c68f9c6 100644 --- a/python/packages/nisar/products/readers/Raw/Raw.py +++ b/python/packages/nisar/products/readers/Raw/Raw.py @@ -1,3 +1,4 @@ +from __future__ import annotations from .DataDecoder import DataDecoder import h5py import isce3 @@ -9,8 +10,9 @@ import journal import re from warnings import warn -from scipy.interpolate import interp1d -from nisar.antenna import CalPath +from typing import Tuple +from enum import IntEnum, unique +from nisar.antenna import CalPath, get_calib_range_line_idx # TODO some CSV logger log = logging.getLogger("Raw") @@ -399,6 +401,140 @@ def getRangeLineIndex(self, frequency: str = 'A', tx: str = None): return f[path]["rangeLineIndex"][()] + def _parse_chirpcorrelator_from_hrt_qfsp( + self, + txrx_pol: str) -> np.ndarray | None: + """ + Parse three-tap chirp correlator array with shape (lines, 12, 3) + as well ass cal type with shape (lines,) from HRT QFSP. + + Parameters + ---------- + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(complex) or None + 3-D complex array of chirp correlator with shape (Lines, channels, 3) + If the field does not exist None will be returned. + + """ + # get HRT path + hrt_path = self.TelemetryPath.replace('low', 'high') + qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' + with h5py.File(self.filename, mode='r', swmr=True) as f5: + # loop over three qfsp + for i_qfsp in range(3): + p_qfsp = f'{qfsp_path}{i_qfsp}' + # loop over 4 channels per qfsp: + for nn in range(4): + i_chn = nn + i_qfsp * 4 + n_rx = i_chn + 1 + # loop over 3 taps per channel + for i_tap in range(3): + n_tap = i_tap + 1 + # form the path to the dataset per I and Q + # use RX pol! + p_ds_i = (f'{p_qfsp}/CHIRP_CORRELATOR_I{n_tap}_' + f'{txrx_pol[1]}{n_rx:02d}') + p_ds_q = (f'{p_qfsp}/CHIRP_CORRELATOR_Q{n_tap}_' + f'{txrx_pol[1]}{n_rx:02d}') + try: + ds_i = f5[p_ds_i] + except KeyError as err: + warn( + f'Missing dataset {p_ds_i} in {self.filename}.' + f' Detailed error -> {err}' + ) + return None + else: + # initialize the 3-D array, lines by 12 by 3 + if i_qfsp == nn == i_tap == 0: + # initialize the 3-D array for chirp correlator + num_lines = ds_i.size + chp_cor = np.ones((num_lines, 12, 3), dtype='c8') + chp_cor[:, i_chn, i_tap].real = ds_i[()] + chp_cor[:, i_chn, i_tap].imag = f5[p_ds_q][()] + return chp_cor + + + def _parse_caltype_from_hrt_qfsp( + self, + txrx_pol: str) -> np.ndarray | None: + """ + Parse cal type with shape (lines,) from HRT QFSP. + + Parameters + ---------- + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(uint8) or None + 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and + INVALID=255. If the field does not exist None will be returned. + + """ + # get HRT path + hrt_path = self.TelemetryPath.replace('low', 'high') + qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' + with h5py.File(self.filename, mode='r', swmr=True) as f5: + # XXX get caltype from the very first qFSP assuming + # it is qFSP independent! + i_qfsp = 0 + p_qfsp = f'{qfsp_path}{i_qfsp}' + p_type = f'{p_qfsp}/CP_CAL_TYPE_{txrx_pol[1]}{i_qfsp}' + # XXX Following Try/exception block is added to + # support old sim L0B products lacking HRT! + try: + ds_cal_type = f5[p_type] + except KeyError as err: + warn(f'Missing dataset "{p_type}" in ' + f'"{self.filename}". Detailed error -> {err}') + return None + else: + return ds_cal_type[()].astype(CalPath) + + + def _parse_rangeline_index_from_hrt( + self, + txrx_pol: str = None) -> np.ndarray | None: + """ + Get range line index over all range lines from + HRT if exists otherwise None! + + Parameters + ---------- + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(uint) or None + If not available in L0b, None will be returned. + + """ + hrt_path = self.TelemetryPath.replace('low', 'high') + freq_band = sorted(self.frequencies)[0] + pols = self.polarizations[freq_band] + if txrx_pol is None: + txrx_pol = pols[0] + elif txrx_pol not in pols: + raise ValueError(f'Available pols {pols} but got {txrx_pol}!') + rgl_idx_path = (f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/' + 'RangeLine/RH_RANGELINE_INDEX') + with h5py.File(self.filename, mode='r', swmr=True) as f5: + try: + ds_rgl_idx = f5[rgl_idx_path] + except KeyError as err: + warn(f'Can not parse range line index from HRT. Error -> {err}') + return None + else: + return ds_rgl_idx[()] + + def getCalType(self, frequency: str = 'A', tx: str = None): """ Extract Tx Calibration mask for each range line. @@ -918,3 +1054,131 @@ def open_rrsd(filename) -> RawBase: if "/science/LSAR/RRSD/telemetry" in f: return LegacyRaw(hdf5file=filename) return Raw(hdf5file=filename) + + + +@unique +class PolarizationTypeId(IntEnum): + """Enumeration for polarization types of L-band NISAR""" + single_h = 0 + single_v = 1 + dual_h = 2 + dual_v = 3 + quad = 4 + compact = 5 + none = 6 + quasi_quad = 7 + quasi_dual = 8 + + +def polarization_type_from_drt(raw: Raw) -> PolarizationTypeId: + """Get polarization ID and type from L0B DRT""" + pol_path = f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_POLARIZATION' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_pol = f5[pol_path] + except KeyError: + warn(f'Missing dataset "{pol_path}" in "{raw.filename}"') + id_pol = 6 + else: + i_pol = ds_pol[()] + id_pol = np.nanmedian(i_pol) + return PolarizationTypeId(id_pol) + + +def is_raw_quad_pol(raw: Raw) -> bool: + """Determine whether raw L0B is Quad or not""" + return polarization_type_from_drt(raw) == PolarizationTypeId.quad + + +def first_tx_pol_for_quad(raw: Raw) -> str: + """Get first TX polarization, H or V, from only Quad pol product""" + if not is_raw_quad_pol(raw): + raise ValueError('Not a quad pol!') + idx_rgl = raw._parse_rangeline_index_from_hrt()[0] + # if not in HRT parse single-pol version from swath path + if idx_rgl is None: + idx_rgl_h = raw.getRangeLineIndex('A', 'H')[0] + idx_rgl_v = raw.getRangeLineIndex('A', 'V')[0] + if idx_rgl_v < idx_rgl_h: + return 'V' + return 'H' + else: # odd range line is V pol first and even is H pol first! + return {0: 'H', 1: 'V'}.get(idx_rgl % 2) + + +def opposite_pol(pol: str) -> str: + """Get the oppsoite pol""" + if pol == 'H': + return 'V' + elif pol == 'V': + return 'H' + else: + return pol + + +def chirpcorrelator_caltype_from_raw( + raw: Raw, + txrx_pol: str +) -> Tuple[np.ndarray, np.ndarray]: + """ + Parse three-tap chirp correlator array with shape (lines, 12, 3) + as well ass cal type with shape (lines,) from Raw L0B for a certain + TxRX pol + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(complex) + 3-D complex array of chirp correlator with shape (Lines, channels, 3) + np.ndarray(uint8) + 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and INVALID=255 + + """ + chp_cor = raw._parse_chirpcorrelator_from_hrt_qfsp(txrx_pol=txrx_pol) + cal_type = raw._parse_caltype_from_hrt_qfsp(txrx_pol=txrx_pol) + # XXX if the respective field does not exist then use co-pol under + # swath in L0B for the sake of backward compatibility + if chp_cor is None or cal_type is None: + freq_band = [f for f in raw.frequencies if + txrx_pol in raw.polarizations[f]][0] + chp_cor = raw.getChirpCorrelator(freq_band, txrx_pol[0]) + cal_type = raw.getCalType(freq_band, txrx_pol[0]) + return chp_cor, cal_type + # Quad pol case + if is_raw_quad_pol(raw): + tx_pol_first = first_tx_pol_for_quad(raw) + if txrx_pol[0] == tx_pol_first: + chp_cor = chp_cor[::2] + cal_type = cal_type[::2] + else: # the second TX pol + # get data from the opssoite TX pol + x_pol = opposite_pol(txrx_pol[0]) + txrx_pol[1] + chp_cor_x, cal_type_x = chirpcorrelator_caltype_from_raw( + raw, txrx_pol=x_pol) + # if co-pol get HPA value from same TX but + # fill in LNA/BYP from oppsoite TX + if txrx_pol[0] == txrx_pol[1]: + chp_cor = chp_cor[1::2] + cal_type = cal_type[1::2] + _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type_x) + chp_cor[idx_byp] = chp_cor_x[idx_byp] + chp_cor[idx_lna] = chp_cor_x[idx_lna] + cal_type[idx_byp] = CalPath.BYPASS + cal_type[idx_lna] = CalPath.LNA + else: # x-pol product + chp_cor = chp_cor_x + cal_type = cal_type_x + # set x-pol HPA to INVALID given they are the mix of + # LNA from co-pol and HPA from x-pol! + if txrx_pol in ('HV', 'VH'): + idx_hpa, _, _, _ = get_calib_range_line_idx(cal_type) + if idx_hpa.size > 0: + warn(f'Set HPA cal type for x-pol {txrx_pol} to INVALID!') + cal_type[idx_hpa] = CalPath.INVALID + return chp_cor, cal_type \ No newline at end of file diff --git a/python/packages/nisar/products/readers/Raw/__init__.py b/python/packages/nisar/products/readers/Raw/__init__.py index 67b7cb788..a073287e7 100644 --- a/python/packages/nisar/products/readers/Raw/__init__.py +++ b/python/packages/nisar/products/readers/Raw/__init__.py @@ -1,2 +1,10 @@ -from .Raw import Raw, open_rrsd +from .Raw import ( + Raw, + open_rrsd, + chirpcorrelator_caltype_from_raw, + PolarizationTypeId, + is_raw_quad_pol, + first_tx_pol_for_quad, + opposite_pol +) from .DataDecoder import complex32, DataDecoder From 0b3b97cef64e8959395a6a797f5fc1af4ff3f47a Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 3 Feb 2026 00:20:13 -0800 Subject: [PATCH 18/26] Fix noise estimator module to support full QP noise products --- .../nisar/noise/noise_estimation_from_raw.py | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/python/packages/nisar/noise/noise_estimation_from_raw.py b/python/packages/nisar/noise/noise_estimation_from_raw.py index d31df2855..fa24cb1b2 100644 --- a/python/packages/nisar/noise/noise_estimation_from_raw.py +++ b/python/packages/nisar/noise/noise_estimation_from_raw.py @@ -16,6 +16,12 @@ from nisar.antenna import get_calib_range_line_idx from nisar.log import set_logger from isce3.core import DateTime, TimeDelta +from nisar.products.readers.Raw import ( + chirpcorrelator_caltype_from_raw, + is_raw_quad_pol, + first_tx_pol_for_quad, + opposite_pol +) # Global Noise-related Constants # Min number of range bins recommended per noise range block @@ -203,7 +209,7 @@ def extract_noise_only_lines(raw, freq_band, txrx_pol, max_lines=18944): # special RCIDs (NISAR mode numbers) to be treated as noise-only product rcid_special = (1, 2, 3) # get noise-only range lines if any - cal_path_mask = raw.getCalType(freq_band, tx=txrx_pol[0]) + _, cal_path_mask = chirpcorrelator_caltype_from_raw(raw, txrx_pol=txrx_pol) _, _, _, noise_index = get_calib_range_line_idx(cal_path_mask) # check if it is a special case with RCID=1,2,3 where there is no # noise-only range line and TX=OFF. @@ -456,7 +462,11 @@ def est_noise_power_from_raw( f'Number of range blocks is smaller than min {n_rg_blk_min}' ) logger.info(f'Number of range blocks -> {num_rng_block}') - + # check if product is quad + is_quad_pol = is_raw_quad_pol(raw) + if is_quad_pol: + first_tx_pol = first_tx_pol_for_quad(raw) + logger.info(f'Quad pol product w/ first {first_tx_pol}-pol TX!') # container for all noise products noise_prods = [] # if quad pol, then do MVE or MEE @@ -467,8 +477,9 @@ def est_noise_power_from_raw( # L-band NISAR. # loop over freq bands for freq_band in frq_pol: - # check if it is QP and product differentiation is set to True - if dif_quad and _is_quad_pol(frq_pol[freq_band]): + # check if it contains all linear pol combination and + # product differentiation is set to True + if dif_quad and _contains_all_linear_pols(frq_pol[freq_band]): logger.info('The difference of co-pol and cx-pol with' ' the same RX pol will be used in Noise est!') # let's combine datasets with the same RX Pol @@ -559,7 +570,9 @@ def est_noise_power_from_raw( ) noise_prods.append(ns_prod) - else: # other pol types than QP + else: # no polarimetric diff! + # For qaud pol, use noise range lines of the + # first TX pol for the other TX pol. for txrx_pol in frq_pol[freq_band]: logger.info( 'Processing individually frequency band ' @@ -575,12 +588,20 @@ def est_noise_power_from_raw( # calculate approximate ENBW for relatively white noise! enbw = enbw_from_raw(raw, freq_band, txrx_pol[0]) logger.info(f'Approximate ENBW in (MHz) -> {enbw * 1e-6}') + # check if quad pol per telemetry then use the oppsoite + # TX pol for noise range lines if that pol is not the + # first TX pol. + txrx_p = txrx_pol + if is_quad_pol and txrx_pol[0] != first_tx_pol: + txrx_p = opposite_pol(txrx_pol[0]) + txrx_pol[1] + logger.warning(f'Use noise-only range lines from {txrx_p} ' + f'for {txrx_pol}!') # parse one noise dataset # loop over several AZ blocks of noise-only range lines noise_power_azblk = [] az_dt_utc = [] for (dset_noise, idx_rgl_ns) in extract_noise_only_lines( - raw, freq_band, txrx_pol, max_lines): + raw, freq_band, txrx_p, max_lines): if exclude_first_last: logger.info( 'Exclude the first and last noise range lines.') @@ -641,10 +662,10 @@ def _pow2db(p: float) -> float: return 10 * np.log10(p) -def _is_quad_pol(txrx_pols): +def _contains_all_linear_pols(txrx_pols): """ - Whether the list of two-char TxRx Pols represents linear quad - polarization or not. + Whether the list of two-char TxRx Pols represents all + combinations of linear polarizations or not. Parameters ---------- From de15d58a4b04a44d4a7f4c961760b4f03aa23a63 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 3 Feb 2026 00:22:39 -0800 Subject: [PATCH 19/26] Fix TxTRM of TX pattern formaton in pattern.py in case of QP --- python/packages/nisar/antenna/pattern.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 9b6b65b44..0ab3acee5 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -6,7 +6,7 @@ from nisar.mixed_mode.logic import PolChannelSet from nisar.products.readers.antenna import AntennaParser from nisar.products.readers.instrument import InstrumentParser -from nisar.products.readers.Raw import Raw +from nisar.products.readers.Raw import Raw, chirpcorrelator_caltype_from_raw from nisar.antenna import TxTrmInfo, RxTrmInfo, TxBMF, RxDBF from nisar.antenna.beamformer import get_pulse_index import numpy as np @@ -114,8 +114,10 @@ def build_tx_trm(raw: Raw, pulse_times: np.ndarray, freq_band: str, """Build TxTrmInfo object """ # Parse Tx-related Cal stuff used for Tx BMF tx_chanl = raw.getListOfTxTRMs(freq_band, tx_pol) - corr_tap2 = raw.getChirpCorrelator(freq_band, tx_pol)[..., 1] - cal_type = raw.getCalType(freq_band, tx_pol) + # get chirp correlator and cal type for co-pol product + chp_corr, cal_type = chirpcorrelator_caltype_from_raw( + raw, txrx_pol=2 * tx_pol) + corr_tap2 = chp_corr[..., 1] # build TxTRM from Tx Cal stuff w/o optional "tx_phase" return TxTrmInfo(pulse_times, tx_chanl, corr_tap2, cal_type) From 30c70a941d2143ad0f7b44e29c94cd73145faea2 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 3 Feb 2026 00:25:38 -0800 Subject: [PATCH 20/26] Fix full NESZ estimation in focus.py for QP --- python/packages/nisar/workflows/focus.py | 40 +++++++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/python/packages/nisar/workflows/focus.py b/python/packages/nisar/workflows/focus.py index dde3c990a..d66f25d23 100644 --- a/python/packages/nisar/workflows/focus.py +++ b/python/packages/nisar/workflows/focus.py @@ -16,7 +16,14 @@ find_overlapping_channel) from nisar.products.readers.antenna import AntennaParser from nisar.products.readers.instrument import InstrumentParser -from nisar.products.readers.Raw import Raw, open_rrsd +from nisar.products.readers.Raw import ( + Raw, + open_rrsd, + chirpcorrelator_caltype_from_raw, + is_raw_quad_pol, + first_tx_pol_for_quad, + opposite_pol +) from nisar.products.readers.rslc_cal import (RslcCalibration, parse_rslc_calibration, get_scale_and_delay, check_cal_validity_dates) from nisar.products.writers import SLC @@ -1931,9 +1938,10 @@ def temp(suffix): # Compute NESZ if there exist noise-only range lines # get noise only range line indexes within processing interval - cal_path_mask = raw.getCalType( - channel_in.freq_id, pol[0])[pulse_begin:pulse_end] - _, _, _, idx_noise = get_calib_range_line_idx(cal_path_mask) + _, cal_path_mask = chirpcorrelator_caltype_from_raw( + raw, txrx_pol=pol) + _, _, _, idx_noise = get_calib_range_line_idx( + cal_path_mask[pulse_begin:pulse_end]) # form output slant range vector for all noise products if cfg.processing.noise_equivalent_backscatter.fill_nan_ends: @@ -1962,7 +1970,29 @@ def temp(suffix): data_noise = np.memmap( fid_noise, mode='w+', shape=(nrgl_noise, rc.output_size), dtype=np.complex64) - rc.rangecompress(data_noise, raw_clean[idx_noise]) + # Check if raw is quad pol and the TX pol is not the + # first TX pol. Then extract noise only range line + # from the opposite TX pol w/ the same RX pol. + # XXX No RFI/caltone clean up of noise-only range lines + # for second TX pol products of quad pol! + raw_ns = np.copy(raw_clean[idx_noise]) + if is_raw_quad_pol(raw): + first_tx_pol = first_tx_pol_for_quad(raw) + log.info(f'Quad pol w/ first {first_tx_pol} pol!') + if pol[0] != first_tx_pol: + pol_ns = opposite_pol(pol[0]) + pol[1] + log.warning('Get noise-only range lines from ' + f'{pol_ns} for {pol} of quad pol!') + ds_ns = raw.getRawDataset(channel_in.freq_id, pol_ns) + idx_ns = np.arange(pulse_begin, pulse_end)[idx_noise] + # decode simply noise-only range lines and + # thus no need for memmap + raw_ns = ds_ns[idx_ns] + raw_ns *= bb_phasor[idx_noise, np.newaxis] + raw_ns[np.isnan(raw_ns)] = 0.0 + if cfg.processing.zero_fill_gaps: + fill_gaps(raw_ns, swaths[:, idx_noise, :], 0.0) + rc.rangecompress(data_noise, raw_ns) # build and apply antenna pattern correction for noise # pulses if EAP is True if cfg.processing.is_enabled.eap: From 27bb15e4dc24c7a5fd30f5402abaa0ff8759987a Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 4 Feb 2026 16:22:47 -0800 Subject: [PATCH 21/26] Add support for QP in computing RX channel imbalance --- .../antenna/rx_channel_imbalance_helpers.py | 275 +++++++++++++++--- 1 file changed, 236 insertions(+), 39 deletions(-) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 54dc3e2f3..eb53d346d 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -2,6 +2,7 @@ from warnings import warn from typing import Tuple, Dict from dataclasses import dataclass +from enum import IntEnum, unique import numpy as np import h5py @@ -45,6 +46,20 @@ def __post_init__(self): f'{self.lna_caltone_ratio.size} instead of 12!') +@unique +class PolarizationTypeId(IntEnum): + """Enumeration for polarization types of L-band NISAR""" + single_h = 0 + single_v = 1 + dual_h = 2 + dual_v = 3 + quad = 4 + compact = 5 + none = 6 + quasi_quad = 7 + quasi_dual = 8 + + def compute_all_rx_channel_imbalances_from_l0b( l0b_file: str | Raw, *, @@ -132,13 +147,13 @@ def compute_rx_channel_imbalance( n_tap_dominant: np.ndarray(int) Dominant tap number, a value within [1,3] over all 12 RXs. time_delays: np.ndarray(float) - Time delays from the phase of qFSP outlier + Time delays from the phase of qFSP outlier in seconds max_ratio : float Report peak power among all channels used for amplitude normalization of RX channel imbalances. """ - lna_mean, n_tap_dominant = get_lna_cal_mean(raw, freq_band, txrx_pol) + lna_mean, n_tap_dominant = get_lna_cal_mean(raw, txrx_pol) # get caltone mean over all RX channels caltone_mean = get_caltone_mean(raw, freq_band, txrx_pol) # Get complex ratio LNA/Caltone over all channels @@ -158,12 +173,93 @@ def compute_rx_channel_imbalance( return lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio -def parse_chirp_corr_from_hrt_qfsp( +def polarization_type_from_drt(raw: Raw) -> PolarizationTypeId: + """Get polarization ID and type from L0B DRT""" + pol_path = f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_POLARIZATION' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_pol = f5[pol_path] + except KeyError: + warn(f'Missing dataset "{pol_path}" in "{raw.filename}"') + id_pol = 6 + else: + i_pol = ds_pol[()] + id_pol = np.nanmedian(i_pol) + return PolarizationTypeId(id_pol) + + +def is_raw_quad_pol(raw: Raw) -> bool: + """Determine whether raw L0B is Quad or not""" + return polarization_type_from_drt(raw) == PolarizationTypeId.quad + + +def parse_rangeline_index_from_hrt( raw: Raw, - txrx_pol: str) -> Tuple[np.ndarray, np.ndarray]: + txrx_pol: str = None) -> np.ndarray | None: + """ + Get range line index over all range lines from + HRT if exists otherwise None! + + Returns + ------- + np.ndarray(uint) or None + If not available in L0b, None will be returned. + + """ + hrt_path = raw.TelemetryPath.replace('low', 'high') + freq_band = sorted(raw.frequencies)[0] + pols = raw.polarizations[freq_band] + if txrx_pol is None: + txrx_pol = pols[0] + elif txrx_pol not in pols: + raise ValueError(f'Available pols {pols} but got {txrx_pol}!') + rgl_idx_path = (f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/' + 'RangeLine/RH_RANGELINE_INDEX') + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_rgl_idx = f5[rgl_idx_path] + except KeyError as err: + warn(f'Can not parse range line index from HRT. Error -> {err}') + return None + else: + return ds_rgl_idx[()] + + +def first_tx_pol_for_quad(raw: Raw) -> str: + """Get first TX polarization, H or V, from only Quad pol product""" + if not is_raw_quad_pol(raw): + raise ValueError('Not a quad pol!') + idx_rgl = parse_rangeline_index_from_hrt(raw)[0] + # if not in HRT parse single-pol version from swath path + if idx_rgl is None: + idx_rgl_h = raw.getRangeLineIndex('A', 'H')[0] + idx_rgl_v = raw.getRangeLineIndex('A', 'V')[0] + if idx_rgl_v < idx_rgl_h: + return 'V' + return 'H' + else: # odd range line is V pol first and even is H pol first! + return {0: 'H', 1: 'V'}.get(idx_rgl % 2) + + +def parse_chirpcorrelator_from_hrt_qfsp( + raw: Raw, + txrx_pol: str) -> np.ndarray | None: """ Parse three-tap chirp correlator array with shape (lines, 12, 3) as well ass cal type with shape (lines,) from HRT QFSP. + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(complex) or None + 3-D complex array of chirp correlator with shape (Lines, channels, 3) + If the field does not exist None will be returned. + """ # get HRT path hrt_path = raw.TelemetryPath.replace('low', 'high') @@ -176,7 +272,7 @@ def parse_chirp_corr_from_hrt_qfsp( for nn in range(4): i_chn = nn + i_qfsp * 4 n_rx = i_chn + 1 - # loop over 3 taps + # loop over 3 taps per channel for i_tap in range(3): n_tap = i_tap + 1 # form the path to the dataset per I and Q @@ -185,53 +281,154 @@ def parse_chirp_corr_from_hrt_qfsp( f'{txrx_pol[1]}{n_rx:02d}') p_ds_q = (f'{p_qfsp}/CHIRP_CORRELATOR_Q{n_tap}_' f'{txrx_pol[1]}{n_rx:02d}') - # initialize the 3-D array, lines by 12 by 3 - if i_qfsp == nn == i_tap == 0: - # XXX get caltype from the very first qFSP assuming - # it is qFSP independent! - p_type = f'{p_qfsp}/CP_CAL_TYPE_{txrx_pol[1]}{i_qfsp}' - # XXX Following Try/exception block is added to - # support old sim L0B products lacking HRT! - try: - ds_cal_type = f5[p_type] - except KeyError: - warn(f'Missing dataset "{p_type}" in ' - f'"{raw.filename}". LNA CAL values ' - 'from co-pol will be used instead. ' - 'Results may be invalid!') - freq_band = [f for f in raw.frequencies if - txrx_pol in raw.polarizations[f]][0] - chp_cor = raw.getChirpCorrelator( - freq_band, txrx_pol[0]) - cal_type = raw.getCalType(freq_band, txrx_pol[0]) - return chp_cor, cal_type - else: - cal_type = ds_cal_type[()].astype(CalPath) + try: + ds_i = f5[p_ds_i] + except KeyError as err: + warn( + f'Missing dataset {p_ds_i} in {raw.filename}.' + f' Detailed error -> {err}' + ) + return None + else: + # initialize the 3-D array, lines by 12 by 3 + if i_qfsp == nn == i_tap == 0: # initialize the 3-D array for chirp correlator - num_lines = f5[p_ds_i].size + num_lines = ds_i.size chp_cor = np.ones((num_lines, 12, 3), dtype='c8') - chp_cor[:, i_chn, i_tap].real = f5[p_ds_i][()] - chp_cor[:, i_chn, i_tap].imag = f5[p_ds_q][()] + chp_cor[:, i_chn, i_tap].real = ds_i[()] + chp_cor[:, i_chn, i_tap].imag = f5[p_ds_q][()] + return chp_cor + + +def parse_caltype_from_hrt_qfsp( + raw: Raw, + txrx_pol: str) -> np.ndarray | None: + """ + Parse cal type with shape (lines,) from HRT QFSP. + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(uint8) or None + 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and + INVALID=255. If the field does not exist None will be returned. + + """ + # get HRT path + hrt_path = raw.TelemetryPath.replace('low', 'high') + qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + # XXX get caltype from the very first qFSP assuming + # it is qFSP independent! + i_qfsp = 0 + p_qfsp = f'{qfsp_path}{i_qfsp}' + p_type = f'{p_qfsp}/CP_CAL_TYPE_{txrx_pol[1]}{i_qfsp}' + # XXX Following Try/exception block is added to + # support old sim L0B products lacking HRT! + try: + ds_cal_type = f5[p_type] + except KeyError as err: + warn(f'Missing dataset "{p_type}" in ' + f'"{raw.filename}". Detailed error -> {err}') + return None + else: + return ds_cal_type[()].astype(CalPath) + + +def _opposite_pol(pol: str) -> str: + """Get the oppsoite pol""" + if pol == 'H': + return 'V' + elif pol == 'V': + return 'H' + else: + return pol + + +def chirpcorrelator_caltype_from_raw( + raw: Raw, + txrx_pol: str +) -> Tuple[np.ndarray, np.ndarray]: + """ + Parse three-tap chirp correlator array with shape (lines, 12, 3) + as well ass cal type with shape (lines,) from Raw L0B for a certain + TxRX pol + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(complex) + 3-D complex array of chirp correlator with shape (Lines, channels, 3) + np.ndarray(uint8) + 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and INVALID=255 + + """ + chp_cor = parse_chirpcorrelator_from_hrt_qfsp(raw, txrx_pol=txrx_pol) + cal_type = parse_caltype_from_hrt_qfsp(raw, txrx_pol=txrx_pol) + # XXX if the respective field does not exist then use co-pol under + # swath in L0B for the sake of backward compatibility + if chp_cor is None or cal_type is None: + freq_band = [f for f in raw.frequencies if + txrx_pol in raw.polarizations[f]][0] + chp_cor = raw.getChirpCorrelator(freq_band, txrx_pol[0]) + cal_type = raw.getCalType(freq_band, txrx_pol[0]) + return chp_cor, cal_type + # Quad pol case + if is_raw_quad_pol(raw): + tx_pol_first = first_tx_pol_for_quad(raw) + if txrx_pol[0] == tx_pol_first: + chp_cor = chp_cor[::2] + cal_type = cal_type[::2] + else: # the second TX pol + # get data from the opssoite TX pol + x_pol = _opposite_pol(txrx_pol[0]) + txrx_pol[1] + chp_cor_x, cal_type_x = chirpcorrelator_caltype_from_raw( + raw, txrx_pol=x_pol) + # if co-pol get HPA value from same TX but + # fill in LNA/BYP from oppsoite TX + if txrx_pol[0] == txrx_pol[1]: + chp_cor = chp_cor[1::2] + cal_type = cal_type[1::2] + _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type_x) + chp_cor[idx_byp] = chp_cor_x[idx_byp] + chp_cor[idx_lna] = chp_cor_x[idx_lna] + cal_type[idx_byp] = CalPath.BYPASS + cal_type[idx_lna] = CalPath.LNA + else: # x-pol product + chp_cor = chp_cor_x + cal_type = cal_type_x + # set x-pol HPA to INVALID given they are the mix of + # LNA from co-pol and HPA from x-pol! + if txrx_pol in ('HV', 'VH'): + idx_hpa, _, _, _ = get_calib_range_line_idx(cal_type) + if idx_hpa.size > 0: + warn(f'Set HPA cal type for x-pol {txrx_pol} to INVALID!') + cal_type[idx_hpa] = CalPath.INVALID return chp_cor, cal_type def get_lna_cal_mean( raw: Raw, - freq_band: str, txrx_pol: str ) -> Tuple[np.ndarray, np.ndarray]: """ Returns mean complex LNA values and dominant tap numbers within [1, 2, 3] for all channels """ - if txrx_pol[0] == txrx_pol[1]: - chp_cor = raw.getChirpCorrelator(freq_band, txrx_pol[0]) - cal_type = raw.getCalType(freq_band, txrx_pol[0]) - else: # get x-pol chirp correlator from HRT - chp_cor, cal_type = parse_chirp_corr_from_hrt_qfsp( - raw, - txrx_pol - ) + chp_cor, cal_type = chirpcorrelator_caltype_from_raw( + raw=raw, + txrx_pol=txrx_pol + ) n_rxs = chp_cor.shape[1] _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type) if len(idx_lna) == 0: From 547999dfab65f7e82faecc7e6c861b9fdaa6b233 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 4 Feb 2026 16:23:28 -0800 Subject: [PATCH 22/26] Get caltone freq from runconfig rather than DRT in focus.py for now --- python/packages/nisar/workflows/focus.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/packages/nisar/workflows/focus.py b/python/packages/nisar/workflows/focus.py index 1cd4f91ab..308ed0cfe 100644 --- a/python/packages/nisar/workflows/focus.py +++ b/python/packages/nisar/workflows/focus.py @@ -1909,10 +1909,14 @@ def temp(suffix): # Precompute antenna patterns at downsampled spacing if cfg.processing.is_enabled.eap: + # XXX Due to a bug in respective DRT of some L0B products + # (CRID=05007), caltone.frequency is extrated from runconfig + # otherwise, it shall be set to None to be determined from DRT! antpat = AntennaPattern(raw, dem, antparser, instparser, orbit, attitude, el_lut=el_lut, - freq_band=frequency) + freq_band=frequency, + caltone_freq=cfg.processing.caltone.frequency) log.info("Precomputing antenna patterns") i = np.arange(rc_grid.shape[0]) From 27e9c2e7ab35046e5ec0c355ee4c8a5946ff262f Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 11 Feb 2026 15:31:47 -0800 Subject: [PATCH 23/26] Fix a bug in rx imbalance module for quasi case --- .../antenna/rx_channel_imbalance_helpers.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index eb53d346d..4fe0d2f94 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -481,19 +481,25 @@ def correct_lna_caltone_ratio_for_second_band( # relative delay offset in one of qFSP vs others, that is # one of the qFSP is an outlier due to ADC clock/delay issue # check if there is delay anomaly among three qFSP - fc_a, _, _, _ = raw.getChirpParameters('A', txrx_pol[0]) - # get diff of chirp (band=A) and caltone freq for delay detection - dif_chirp_caltone_freq = fc_a - caltone_freq - time_delay = _get_qfsp_delay_anomaly( - lna_caltone_ratio, dif_chirp_caltone_freq) if _is_product_from_second_band(raw, freq_band, txrx_pol): warn(f'correcting LNA/CALTONE for band={freq_band} and pol={txrx_pol}') + fc_a, _, _, _ = raw.getChirpParameters('A', txrx_pol[0]) + # get diff of chirp (band=A) and caltone freq for delay detection + dif_chirp_caltone_freq = fc_a - caltone_freq + time_delay = _get_qfsp_delay_anomaly( + lna_caltone_ratio, dif_chirp_caltone_freq) # if there is then get diff of frequency bands A and B # to be used to correct phase from A for B fc_b, _, _, _ = raw.getChirpParameters('B', txrx_pol[0]) phs_adj = 2 * np.pi * (fc_b - fc_a) * time_delay # correct the LNA/CALTONE by delay amount via phase if any. lna_caltone_ratio *= np.exp(1j * phs_adj) + else: # simply first band either A or B! + fc, _, _, _ = raw.getChirpParameters(freq_band, txrx_pol[0]) + # get diff of chirp (band=A) and caltone freq for delay detection + dif_chirp_caltone_freq = fc - caltone_freq + time_delay = _get_qfsp_delay_anomaly( + lna_caltone_ratio, dif_chirp_caltone_freq) return lna_caltone_ratio, time_delay From 5b2a9919d7db67d4986a011728d8e467f340d893 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Thu, 26 Feb 2026 12:40:15 -0800 Subject: [PATCH 24/26] Move l0b-related parsers from rx channel imbalance to raw --- python/packages/nisar/antenna/pattern.py | 9 +- .../antenna/rx_channel_imbalance_helpers.py | 317 +----------------- .../nisar/products/readers/Raw/Raw.py | 98 +++++- .../nisar/products/readers/Raw/__init__.py | 4 +- 4 files changed, 103 insertions(+), 325 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 9d95262ec..b9646b659 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -5,12 +5,13 @@ import logging from nisar.products.readers.antenna import AntennaParser from nisar.products.readers.instrument import InstrumentParser -from nisar.products.readers.Raw import Raw, chirpcorrelator_caltype_from_raw +from nisar.products.readers.Raw import ( + Raw, chirpcorrelator_caltype_from_raw, range_delay_sequential_tx_from_raw +) from nisar.antenna import TxTrmInfo, RxTrmInfo, TxBMF, RxDBF from nisar.antenna.beamformer import get_pulse_index from nisar.antenna.rx_channel_imbalance_helpers import ( - compute_all_rx_channel_imbalances_from_l0b, - get_range_delay_from_raw + compute_all_rx_channel_imbalances_from_l0b ) import numpy as np @@ -248,7 +249,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # This will account for delay after onboard DBF due to # both the first pulsewidth in sequential TX chirps # as well as the difference in filter group delays. - tm_delay = get_range_delay_from_raw( + tm_delay = range_delay_sequential_tx_from_raw( raw, self.freq_band, txrx_pol) tm_delay += delay_ofs_dbf n_samp_delay = round(tm_delay * self.fs_win) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 4fe0d2f94..8175dc5ad 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -2,14 +2,13 @@ from warnings import warn from typing import Tuple, Dict from dataclasses import dataclass -from enum import IntEnum, unique import numpy as np -import h5py -from nisar.products.readers.Raw import Raw -from nisar.antenna import get_calib_range_line_idx, CalPath -from isce3.core import speed_of_light +from nisar.products.readers.Raw import ( + Raw, chirpcorrelator_caltype_from_raw, caltone_frequency_from_raw +) +from nisar.antenna import get_calib_range_line_idx @dataclass(frozen=True) @@ -46,20 +45,6 @@ def __post_init__(self): f'{self.lna_caltone_ratio.size} instead of 12!') -@unique -class PolarizationTypeId(IntEnum): - """Enumeration for polarization types of L-band NISAR""" - single_h = 0 - single_v = 1 - dual_h = 2 - dual_v = 3 - quad = 4 - compact = 5 - none = 6 - quasi_quad = 7 - quasi_dual = 8 - - def compute_all_rx_channel_imbalances_from_l0b( l0b_file: str | Raw, *, @@ -173,250 +158,6 @@ def compute_rx_channel_imbalance( return lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio -def polarization_type_from_drt(raw: Raw) -> PolarizationTypeId: - """Get polarization ID and type from L0B DRT""" - pol_path = f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_POLARIZATION' - with h5py.File(raw.filename, mode='r', swmr=True) as f5: - try: - ds_pol = f5[pol_path] - except KeyError: - warn(f'Missing dataset "{pol_path}" in "{raw.filename}"') - id_pol = 6 - else: - i_pol = ds_pol[()] - id_pol = np.nanmedian(i_pol) - return PolarizationTypeId(id_pol) - - -def is_raw_quad_pol(raw: Raw) -> bool: - """Determine whether raw L0B is Quad or not""" - return polarization_type_from_drt(raw) == PolarizationTypeId.quad - - -def parse_rangeline_index_from_hrt( - raw: Raw, - txrx_pol: str = None) -> np.ndarray | None: - """ - Get range line index over all range lines from - HRT if exists otherwise None! - - Returns - ------- - np.ndarray(uint) or None - If not available in L0b, None will be returned. - - """ - hrt_path = raw.TelemetryPath.replace('low', 'high') - freq_band = sorted(raw.frequencies)[0] - pols = raw.polarizations[freq_band] - if txrx_pol is None: - txrx_pol = pols[0] - elif txrx_pol not in pols: - raise ValueError(f'Available pols {pols} but got {txrx_pol}!') - rgl_idx_path = (f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/' - 'RangeLine/RH_RANGELINE_INDEX') - with h5py.File(raw.filename, mode='r', swmr=True) as f5: - try: - ds_rgl_idx = f5[rgl_idx_path] - except KeyError as err: - warn(f'Can not parse range line index from HRT. Error -> {err}') - return None - else: - return ds_rgl_idx[()] - - -def first_tx_pol_for_quad(raw: Raw) -> str: - """Get first TX polarization, H or V, from only Quad pol product""" - if not is_raw_quad_pol(raw): - raise ValueError('Not a quad pol!') - idx_rgl = parse_rangeline_index_from_hrt(raw)[0] - # if not in HRT parse single-pol version from swath path - if idx_rgl is None: - idx_rgl_h = raw.getRangeLineIndex('A', 'H')[0] - idx_rgl_v = raw.getRangeLineIndex('A', 'V')[0] - if idx_rgl_v < idx_rgl_h: - return 'V' - return 'H' - else: # odd range line is V pol first and even is H pol first! - return {0: 'H', 1: 'V'}.get(idx_rgl % 2) - - -def parse_chirpcorrelator_from_hrt_qfsp( - raw: Raw, - txrx_pol: str) -> np.ndarray | None: - """ - Parse three-tap chirp correlator array with shape (lines, 12, 3) - as well ass cal type with shape (lines,) from HRT QFSP. - - Parameters - ---------- - raw : nisar.products.readers.Raw - txrx_pol : str - TxRx polarization such as HH, VH, etc - - Returns - ------- - np.ndarray(complex) or None - 3-D complex array of chirp correlator with shape (Lines, channels, 3) - If the field does not exist None will be returned. - - """ - # get HRT path - hrt_path = raw.TelemetryPath.replace('low', 'high') - qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' - with h5py.File(raw.filename, mode='r', swmr=True) as f5: - # loop over three qfsp - for i_qfsp in range(3): - p_qfsp = f'{qfsp_path}{i_qfsp}' - # loop over 4 channels per qfsp: - for nn in range(4): - i_chn = nn + i_qfsp * 4 - n_rx = i_chn + 1 - # loop over 3 taps per channel - for i_tap in range(3): - n_tap = i_tap + 1 - # form the path to the dataset per I and Q - # use RX pol! - p_ds_i = (f'{p_qfsp}/CHIRP_CORRELATOR_I{n_tap}_' - f'{txrx_pol[1]}{n_rx:02d}') - p_ds_q = (f'{p_qfsp}/CHIRP_CORRELATOR_Q{n_tap}_' - f'{txrx_pol[1]}{n_rx:02d}') - try: - ds_i = f5[p_ds_i] - except KeyError as err: - warn( - f'Missing dataset {p_ds_i} in {raw.filename}.' - f' Detailed error -> {err}' - ) - return None - else: - # initialize the 3-D array, lines by 12 by 3 - if i_qfsp == nn == i_tap == 0: - # initialize the 3-D array for chirp correlator - num_lines = ds_i.size - chp_cor = np.ones((num_lines, 12, 3), dtype='c8') - chp_cor[:, i_chn, i_tap].real = ds_i[()] - chp_cor[:, i_chn, i_tap].imag = f5[p_ds_q][()] - return chp_cor - - -def parse_caltype_from_hrt_qfsp( - raw: Raw, - txrx_pol: str) -> np.ndarray | None: - """ - Parse cal type with shape (lines,) from HRT QFSP. - - Parameters - ---------- - raw : nisar.products.readers.Raw - txrx_pol : str - TxRx polarization such as HH, VH, etc - - Returns - ------- - np.ndarray(uint8) or None - 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and - INVALID=255. If the field does not exist None will be returned. - - """ - # get HRT path - hrt_path = raw.TelemetryPath.replace('low', 'high') - qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' - with h5py.File(raw.filename, mode='r', swmr=True) as f5: - # XXX get caltype from the very first qFSP assuming - # it is qFSP independent! - i_qfsp = 0 - p_qfsp = f'{qfsp_path}{i_qfsp}' - p_type = f'{p_qfsp}/CP_CAL_TYPE_{txrx_pol[1]}{i_qfsp}' - # XXX Following Try/exception block is added to - # support old sim L0B products lacking HRT! - try: - ds_cal_type = f5[p_type] - except KeyError as err: - warn(f'Missing dataset "{p_type}" in ' - f'"{raw.filename}". Detailed error -> {err}') - return None - else: - return ds_cal_type[()].astype(CalPath) - - -def _opposite_pol(pol: str) -> str: - """Get the oppsoite pol""" - if pol == 'H': - return 'V' - elif pol == 'V': - return 'H' - else: - return pol - - -def chirpcorrelator_caltype_from_raw( - raw: Raw, - txrx_pol: str -) -> Tuple[np.ndarray, np.ndarray]: - """ - Parse three-tap chirp correlator array with shape (lines, 12, 3) - as well ass cal type with shape (lines,) from Raw L0B for a certain - TxRX pol - - Parameters - ---------- - raw : nisar.products.readers.Raw - txrx_pol : str - TxRx polarization such as HH, VH, etc - - Returns - ------- - np.ndarray(complex) - 3-D complex array of chirp correlator with shape (Lines, channels, 3) - np.ndarray(uint8) - 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and INVALID=255 - - """ - chp_cor = parse_chirpcorrelator_from_hrt_qfsp(raw, txrx_pol=txrx_pol) - cal_type = parse_caltype_from_hrt_qfsp(raw, txrx_pol=txrx_pol) - # XXX if the respective field does not exist then use co-pol under - # swath in L0B for the sake of backward compatibility - if chp_cor is None or cal_type is None: - freq_band = [f for f in raw.frequencies if - txrx_pol in raw.polarizations[f]][0] - chp_cor = raw.getChirpCorrelator(freq_band, txrx_pol[0]) - cal_type = raw.getCalType(freq_band, txrx_pol[0]) - return chp_cor, cal_type - # Quad pol case - if is_raw_quad_pol(raw): - tx_pol_first = first_tx_pol_for_quad(raw) - if txrx_pol[0] == tx_pol_first: - chp_cor = chp_cor[::2] - cal_type = cal_type[::2] - else: # the second TX pol - # get data from the opssoite TX pol - x_pol = _opposite_pol(txrx_pol[0]) + txrx_pol[1] - chp_cor_x, cal_type_x = chirpcorrelator_caltype_from_raw( - raw, txrx_pol=x_pol) - # if co-pol get HPA value from same TX but - # fill in LNA/BYP from oppsoite TX - if txrx_pol[0] == txrx_pol[1]: - chp_cor = chp_cor[1::2] - cal_type = cal_type[1::2] - _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type_x) - chp_cor[idx_byp] = chp_cor_x[idx_byp] - chp_cor[idx_lna] = chp_cor_x[idx_lna] - cal_type[idx_byp] = CalPath.BYPASS - cal_type[idx_lna] = CalPath.LNA - else: # x-pol product - chp_cor = chp_cor_x - cal_type = cal_type_x - # set x-pol HPA to INVALID given they are the mix of - # LNA from co-pol and HPA from x-pol! - if txrx_pol in ('HV', 'VH'): - idx_hpa, _, _, _ = get_calib_range_line_idx(cal_type) - if idx_hpa.size > 0: - warn(f'Set HPA cal type for x-pol {txrx_pol} to INVALID!') - cal_type[idx_hpa] = CalPath.INVALID - return chp_cor, cal_type - - def get_lna_cal_mean( raw: Raw, txrx_pol: str @@ -473,7 +214,7 @@ def correct_lna_caltone_ratio_for_second_band( ) -> Tuple[np.ndarray, np.ndarray]: # Get caltone frequency from DRT if not provided if caltone_freq is None: - caltone_freq = parse_caltone_freq_from_drt(raw, txrx_pol) + caltone_freq = caltone_frequency_from_raw(raw, txrx_pol) warn(f'Caltone frequency is extracted from {txrx_pol[1]}-pol DRT ' f'-> {caltone_freq * 1e-6:.3f} (MHz)') # Check if product from the second band so we can modify @@ -588,51 +329,3 @@ def _check_if_zero(arr: np.ndarray, msg: str): arr[...] = 1.0 if is_zero.any(): warn(f'Some values are zero for {msg}!') - - -def get_range_delay_from_raw( - raw: Raw, - freq_band: str, - txrx_pol: str -) -> float: - """ - Get delay (seconds) of the second pulse wrt the pulsewidth - of the first TX pulse in sequential split-spectrum transmit - for a desired dataset in L0B. - """ - # check if band is B and it is split spectrum - if freq_band == 'B' and len(raw.frequencies) == 2: - pols = raw.polarizations - # check if this is sequential transmit - if txrx_pol in pols['A']: - sr_b = raw.getRanges('B', txrx_pol[0]) - sr_a = raw.getRanges('A', txrx_pol[0]) - delay = 2 * (sr_b.first - sr_a.first) / speed_of_light - return delay - return 0.0 - - -def parse_caltone_freq_from_drt( - raw: Raw, - txrx_pol: str -) -> float: - """get caltone frequency in Hz from low rate telemetry in L0B.""" - # default caltone if dataset is not available (Hz) - default = 1214.88e6 - # frequency of local oscillator (Hz) - lo = 1200e6 - # ADC clock (Hz) - clock = 240e6 - c_p = (f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_CALTONE_PHASE_STEP_' - f'{txrx_pol[1]}') - with h5py.File(raw.filename, mode='r', swmr=True) as f5: - try: - ds_caltone_phase = f5[c_p] - except KeyError: - warn(f'Missing path "{c_p}" in L0B! Caltone frequency will ' - f'be set to {default} (Hz)') - return default - else: - i_cal = np.median(ds_caltone_phase[()]).astype(int) - caltone_freq = (i_cal / 2**32) * clock + lo - return caltone_freq diff --git a/python/packages/nisar/products/readers/Raw/Raw.py b/python/packages/nisar/products/readers/Raw/Raw.py index 16c68f9c6..12b8d3a5b 100644 --- a/python/packages/nisar/products/readers/Raw/Raw.py +++ b/python/packages/nisar/products/readers/Raw/Raw.py @@ -13,12 +13,14 @@ from typing import Tuple from enum import IntEnum, unique from nisar.antenna import CalPath, get_calib_range_line_idx +from isce3.core import speed_of_light # TODO some CSV logger log = logging.getLogger("Raw") PRODUCT = "RRSD" + def find_case_insensitive(group: h5py.Group, name: str) -> str: for key in group: if key.lower() == name.lower(): @@ -178,12 +180,12 @@ def getRangeBandwidth(self, frequency: str = 'A', tx: str = 'H'): ------- float Bandwidth in Hz. - + """ tx_path = self._pulseMetaPath(frequency=frequency, tx=tx) with h5py.File(self.filename, 'r', libver='latest', swmr=True) as f: return f[tx_path]["rangeBandwidth"][()] - + @property def TelemetryPath(self): return f"{self.ProductPath}/lowRateTelemetry" @@ -272,7 +274,7 @@ def getNominalPRF(self, frequency='A', tx='H'): Returns ------- float - PRF in Hz. + PRF in Hz. """ _, az_time = self.getPulseTimes(frequency, tx) @@ -376,7 +378,7 @@ def getRangeLineIndex(self, frequency: str = 'A', tx: str = None): which starts at 1 at the beginning of a datatake and increases sequentially. Except for the first observation within a datatake, the first index will be some value other than 1. - + If a rangeline was missed due to corrupted data, for example, that would be reflected as a skipped value in the index sequence. @@ -630,7 +632,7 @@ def getCaltone(self, frequency='A', polarization=None): np.ndarray(complex) 2-D complex float of caltone (CW) coefficients, size = [rangelines x channels]. - + """ if polarization is None: polarization = self.polarizations[frequency][0] @@ -1036,7 +1038,6 @@ def getAttitude(self): return isce3.core.Attitude(old.time, qs, old.reference_epoch) - class Raw(RawBase, family='nisar.productreader.raw'): # TODO methods for new telemetry fields. pass @@ -1056,7 +1057,6 @@ def open_rrsd(filename) -> RawBase: return Raw(hdf5file=filename) - @unique class PolarizationTypeId(IntEnum): """Enumeration for polarization types of L-band NISAR""" @@ -1071,6 +1071,8 @@ class PolarizationTypeId(IntEnum): quasi_dual = 8 +# helper functions that uses Raw as input + def polarization_type_from_drt(raw: Raw) -> PolarizationTypeId: """Get polarization ID and type from L0B DRT""" pol_path = f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_POLARIZATION' @@ -1181,4 +1183,84 @@ def chirpcorrelator_caltype_from_raw( if idx_hpa.size > 0: warn(f'Set HPA cal type for x-pol {txrx_pol} to INVALID!') cal_type[idx_hpa] = CalPath.INVALID - return chp_cor, cal_type \ No newline at end of file + return chp_cor, cal_type + + +def caltone_frequency_from_raw( + raw: Raw, + txrx_pol: str +) -> float: + """get caltone frequency in Hz from low rate telemetry in L0B + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + float + Caltone frequency in Hz. + + Notes + ----- + If the resepctive DRT field is not found in L0B, caltone frequency + will be set to 1214.883 MHz. + + """ + # default caltone if dataset is not available (Hz) + default = 1214.883e6 + # frequency of local oscillator (Hz) + lo = 1200e6 + # ADC clock (Hz) + clock = 240e6 + c_p = (f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_CALTONE_PHASE_STEP_' + f'{txrx_pol[1]}') + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_caltone_phase = f5[c_p] + except KeyError: + warn(f'Missing path "{c_p}" in L0B! Caltone frequency will ' + f'be set to {default} (Hz)') + return default + else: + i_cal = np.median(ds_caltone_phase[()]).astype(int) + caltone_freq = (i_cal / 2**32) * clock + lo + return caltone_freq + + +def range_delay_sequential_tx_from_raw( + raw: Raw, + freq_band: str, + txrx_pol: str +) -> float: + """ + Get range delay (seconds) of the second pulse wrt the pulsewidth + of the first TX pulse in sequential split-spectrum transmit + for L0B for specific frequency band and polarization if exists. + + Parameters + ---------- + raw : nisar.products.readers.Raw + freq_band: str + Frequency band such A or B. + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + float + Delay in seconds + + """ + # check if band is B and it is split spectrum + if freq_band == 'B' and len(raw.frequencies) == 2: + pols = raw.polarizations + # check if this is sequential transmit + if txrx_pol in pols['A']: + sr_b = raw.getRanges('B', txrx_pol[0]) + sr_a = raw.getRanges('A', txrx_pol[0]) + delay = 2 * (sr_b.first - sr_a.first) / speed_of_light + return delay + return 0.0 diff --git a/python/packages/nisar/products/readers/Raw/__init__.py b/python/packages/nisar/products/readers/Raw/__init__.py index a073287e7..c8d9e938e 100644 --- a/python/packages/nisar/products/readers/Raw/__init__.py +++ b/python/packages/nisar/products/readers/Raw/__init__.py @@ -5,6 +5,8 @@ PolarizationTypeId, is_raw_quad_pol, first_tx_pol_for_quad, - opposite_pol + opposite_pol, + caltone_frequency_from_raw, + range_delay_sequential_tx_from_raw ) from .DataDecoder import complex32, DataDecoder From 2ef1466e51aaa3cf5980742a10c178ca740c5a1f Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Thu, 26 Feb 2026 13:08:34 -0800 Subject: [PATCH 25/26] Fix handling of CPI update in MEE noise estimator --- .../nisar/noise/noise_estimation_from_raw.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/python/packages/nisar/noise/noise_estimation_from_raw.py b/python/packages/nisar/noise/noise_estimation_from_raw.py index fa24cb1b2..0ee1124f0 100644 --- a/python/packages/nisar/noise/noise_estimation_from_raw.py +++ b/python/packages/nisar/noise/noise_estimation_from_raw.py @@ -910,19 +910,19 @@ def _noise_product_rng_blocks( if cpi is None: # if not set, set CPI to max possible value equal or # greater than 3 with at least two CPI blocks if possible. - cpi = min(max( + cpi_out = min(max( min(nrgl_valid, 3), np.ceil(nrgl_valid / max_num_cpi_blocks).astype(int) ), MAX_CPI_LEN) else: - cpi = min(nrgl_valid, cpi) - if cpi > MAX_CPI_LEN: + cpi_out = min(nrgl_valid, cpi) + if cpi_out > MAX_CPI_LEN: logger.warning( f'Too large CPI value! It exceeds max {MAX_CPI_LEN}!' ) - logger.info(f'MEE CPI size -> {cpi}') + logger.info(f'MEE CPI size -> {cpi_out}') pow_noise[nn] = noise_pow_min_eigval_est( - noise_rng_blk[idx_valid], cpi, scalar=scalar, + noise_rng_blk[idx_valid], cpi_out, scalar=scalar, remove_mean=remove_mean, median_ev=median_ev) elif algorithm == 'MVE': pow_noise[nn] = noise_pow_min_var_est( @@ -1092,6 +1092,7 @@ def est_noise_power_in_focus( category=InvalidNoiseRangeBlockWarning) continue # run noise estimator per range block + cpi_out = cpi if algorithm == 'MEE': if nrgl_valid < 2: # skip a range block if not enough number of valid noise-only @@ -1104,14 +1105,14 @@ def est_noise_power_in_focus( logger.warning( f'CPI={cpi} is larger than valid noise-only range lines ' f'{nrgl_valid}. CPI is set to {nrgl_valid}!') - cpi = nrgl_valid - if cpi > MAX_CPI_LEN: + cpi_out = nrgl_valid + if cpi_out > MAX_CPI_LEN: logger.warning( f'Too large CPI value! It exceeds max {MAX_CPI_LEN}!' ) - logger.info(f'MEE CPI size -> {cpi}') + logger.info(f'MEE CPI size -> {cpi_out}') pow_noise[nn] = noise_pow_min_eigval_est( - noise_rng_blk[idx_valid], cpi, scalar=scalar, + noise_rng_blk[idx_valid], cpi_out, scalar=scalar, remove_mean=remove_mean, median_ev=median_ev) elif algorithm == 'MVE': pow_noise[nn] = noise_pow_min_var_est( From 76da65597eb53f520d319bcb302b497b31110497 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Mon, 2 Mar 2026 14:10:53 -0800 Subject: [PATCH 26/26] Improve docstring for module "rx_channel_imbalance_helpers" --- .../antenna/rx_channel_imbalance_helpers.py | 176 +++++++++++++++++- 1 file changed, 167 insertions(+), 9 deletions(-) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 8175dc5ad..9fc1f0f18 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -125,6 +125,18 @@ def compute_rx_channel_imbalance( correlator as well as detected relative time delays for all RX channels for debugging purposes. + Parameters + ---------- + aw : Raw + ISCE3 NISAR L0B product parser + freq_band: str, + A or B + txrx_pol : str + HH, HV, etc + caltone_freq : float or None, optional. + Caltone frequency in Hz. If None, it will be parsed from DRT + field of L0B product. + Returns ------- lna_caltone_ratio: np.ndarray(complex) @@ -164,7 +176,26 @@ def get_lna_cal_mean( ) -> Tuple[np.ndarray, np.ndarray]: """ Returns mean complex LNA values and dominant tap - numbers within [1, 2, 3] for all channels + numbers within [1, 2, 3] for all channels of a + desired polarization. + + Parameters + ---------- + raw : Raw + ISCE3 NISAR L0B product parser + txrx_pol : str + HH, HV, etc + + Returns + ------- + np.ndarray(complex) + 1-D array of slow-time averaged LNA CAL for all RX channels. + The size is the number of RX channels. + np.ndarray(int) + 1-D array of dominant tap number of LNA three-tap chirp-correlator, + a value within [1, 3] for all RX channels. + The size is the number of RX channels. + """ chp_cor, cal_type = chirpcorrelator_caltype_from_raw( raw=raw, @@ -212,6 +243,37 @@ def correct_lna_caltone_ratio_for_second_band( txrx_pol: str, caltone_freq: float | None = None ) -> Tuple[np.ndarray, np.ndarray]: + """ + Correct complex LNA/CALTONE ratio already obtained from the + first frequency band for the second frequency band if exists + in raw L0B product per desired polarization + and band. + It also return the respective time delays for all three qFSP + (12 RX channels). + + Parameters + ---------- + lna_caltone_ratio: np.ndarray(complex) + Complex LNA/CALTONE ratio over all 12 RXs + raw : Raw + ISCE3 NISAR L0B product parser + freq_band: str, + A or B + txrx_pol : str + HH, HV, etc + caltone_freq : float or None, optional. + Caltone frequency in Hz. If None, it will be parsed from DRT + field of L0B product. + + Returns + ------- + np.ndarray(complex) + 1-D array of slow-time averaged Caltone for all RX channels + np.ndarray(float) + 1-D array of relative time delays of three qFSP (12 RX channels) + in seconds. + + """ # Get caltone frequency from DRT if not provided if caltone_freq is None: caltone_freq = caltone_frequency_from_raw(raw, txrx_pol) @@ -249,6 +311,25 @@ def get_caltone_mean( freq_band: str, txrx_pol: str ) -> np.ndarray: + """ + Get average Caltone across all range lines for a desired + band and polarization from L0B. + + Parameters + ---------- + raw : Raw + ISCE3 NISAR L0B product parser + freq_band: str, + A or B + txrx_pol : str + HH, HV, etc + + Returns + ------- + np.ndarray(complex) + 1-D array of slow-time averaged Caltone for all RX channels + + """ # now get caltone always from swath caltone = raw.getCaltone(freq_band, txrx_pol) caltone_mean = _mean_2d(caltone) @@ -259,10 +340,26 @@ def get_caltone_mean( def _is_product_from_second_band( raw: Raw, freq_band: str, - txrx_pol: str): + txrx_pol: str) -> bool: """ - Determine whether the produt is avolable on both bands - and it is from the second band. + Determine whether the raw prodcut with dersied polarization + is available on both frequency bands and it is representing + the second frequency band "B". + + Parameters + ---------- + raw : Raw + ISCE3 NISAR L0B product parser + freq_band: str, + A or B + txrx_pol : str + HH, HV, etc + + Returns + ------- + bool + True if the product band/pol represent the second frequency band + in split-spectrum case, otherwise false. """ if freq_band == "B" and len(raw.frequencies) == 2: if txrx_pol in raw.polarizations['A']: @@ -278,6 +375,28 @@ def _get_qfsp_delay_anomaly( If the product is a 12-channel NISAR L-band product, return the time delays for a qFSP with phase anomaly. Else, return zeros. + + Parameters + ---------- + lna_caltone_ratio : np.ndarray(complex) + 1-D array of complex LNA/CALTONE ratio with size equals + to the number of RX channels + dif_chirp_caltone_freq: float + Frequency difference between chirp and caltone in Hz. + adc_clock : float, default=240e6 + Analogue-to-digital (ADC) clock rate of NISAR in Hz. + + Returns + ------- + time_delays : np.ndarray(float) + Rleative time delays of all three qFSP (12 RX channels) in seconds. + Same size as `lna_caltone_ratio`. + + Notes + ----- + In case of non-NISAR case with less than 12 channels, all + delays are zero to zeros. + """ if lna_caltone_ratio.size == 12: # group them into three 4-channels, one per qFSP @@ -302,24 +421,63 @@ def _get_qfsp_delay_anomaly( return time_delays -def _mean_2d(data: np.ndarray, perc: float = 0.0) -> np.asarray: +def _mean_2d(data: np.ndarray, perc: float = 5.0) -> np.asarray: """ - Compute mean within percentile [perc, 100-perc], + Compute mean within percentile [perc, 100-perc] along range lines, of a 2-D complex array with shape (rangelines, channels) due to bad telemetry. + + Parameters + ---------- + data : np.ndarray(complex) + 2-D complex float data with shape (rangelines, channels) + perc : float, default=5 + Percentile, a value within [0, 100]. + The values within [perc, 100 - perc] are used in the mean + calculation. If no value in `data` that fullfills this, + the median of `data` will be used instead. + Note that, `perc` and `100-perc` will lead to the same + exact outcome. + Also, `perc=50` is equivalent to `np.nanmedian`. + + Returns + ------- + np.ndarray(complex) + 1-D array of size equal to number of channels representing + the mean across range lines. + """ # or simply np.nanmean(data, axis=0) d = np.sort(np.abs(data), axis=0) q1_all, q3_all = np.percentile(d, q=[perc, 100 - perc], axis=0) mean_all = [] for cc, (q1, q3) in enumerate(zip(q1_all, q3_all)): + q1, q3 = np.sort([q1, q3]) data_q1_q3 = data[(d[:, cc] >= q1) & (d[:, cc] <= q3), cc] - mean_all.append(np.nanmean(data_q1_q3)) + if data_q1_q3.size == 0: + # use median (perc=50) if no values within desired percentiles. + mean_all.append(np.nanmedian(data[:, cc])) + else: # there is at least one value within desired percentiles + mean_all.append(np.nanmean(data_q1_q3)) return np.asarray(mean_all) -def _check_if_zero(arr: np.ndarray, msg: str): - is_zero = np.isclose(arr, 0) +def _check_if_zero(arr: np.ndarray, msg: str) -> None: + """ + Check a telemetry array to see if all or some of its values are zero + and then issue an appropriate warning with message `msg` + If all values are zero, then input wiill be filled with unity to avoid + failure for older L0B products. + + Parameters + ---------- + arr : np.ndarray + Input array to be modified in place only if all its elements are zero!. + msg : str + Message to be used as part of a warning message. + + """ + is_zero = np.isclose(arr, 0, atol=1e-9) if is_zero.all(): # XXX to avoid unit test failure and old sim L0B # a warning will be issued and all values will be set