From 3b88d2fc6f5aef73257dfae42978fc1f77a64ab8 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Sun, 30 Nov 2025 16:34:30 -0800 Subject: [PATCH 01/19] Add a new mod to nisar.antenna to compute RX imbalances from Raw/L0B --- python/packages/nisar/antenna/__init__.py | 3 + .../antenna/rx_channel_imbalance_helpers.py | 379 ++++++++++++++++++ 2 files changed, 382 insertions(+) create mode 100644 python/packages/nisar/antenna/rx_channel_imbalance_helpers.py diff --git a/python/packages/nisar/antenna/__init__.py b/python/packages/nisar/antenna/__init__.py index 23134a4f9..afb9d4fea 100644 --- a/python/packages/nisar/antenna/__init__.py +++ b/python/packages/nisar/antenna/__init__.py @@ -2,3 +2,6 @@ from .beamformer import TxBMF, RxDBF, compute_receive_pattern_weights, \ compute_transmit_pattern_weights, get_calib_range_line_idx from .pattern import AntennaPattern +from .rx_channel_imbalance_helpers import ( + compute_all_rx_channel_imbalances_from_l0b +) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py new file mode 100644 index 000000000..d858b697c --- /dev/null +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -0,0 +1,379 @@ +from __future__ import annotations +from warnings import warn +from typing import Tuple, Dict +from dataclasses import dataclass + +import numpy as np +import h5py + +from nisar.products.readers.Raw import Raw +from nisar.antenna import get_calib_range_line_idx, CalPath + + +@dataclass(frozen=True) +class RX_CHANNEL_IMBALANCE_PRODUCT: + """ + RX channel imbalance product extracted from LNA/CALTONE ratio + for a certain frequency band and polarization. + + Attributes + ---------- + lna_caltone_ratio: np.ndarray(complex) + Peak-normalized complex LNA/CALTONE ratio over all RXs + ntap_dominant: np.ndarray(int) + Dominant tap number, a value within [1,3] over all RXs. + time_delays_sec: np.ndarray(float) + Time delays from the phase of outlier qFSP in seconds for all RXs. + max_amp_ratio: float + Max amplitude ratio used in peak normalizing `lna_caltone_ratio`. + + """ + lna_caltone_ratio: np.ndarray + ntap_dominant: np.ndarray + time_delays_sec: np.ndarray + max_amp_ratio: float + + def __post_init__(self): + # XXX Size of all arrays must be 12 for L-band NISAR but + # not enforced due to failure of special cases such as unit test + if (self.lna_caltone_ratio.size != self.ntap_dominant.size + != self.time_delays_sec.size): + raise ValueError('The size of all arrays must be equal!') + + +def compute_all_rx_channel_imbalances_from_l0b( + l0b_file: str | Raw, + *, + caltone_freq: float = 1214.88e6, + freq_band: str | None = None, + txrx_pol: str | None = None +) -> Dict[Tuple[str, str], RX_CHANNEL_IMBALANCE_PRODUCT]: + """ + Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio + for over all bands and polarizations. The bands and polarizations are + used as dictionary keys in the form of [freq_band, txrx_pol]. + + Also report the dominant tap number our of 3 for LNA three-tap + correlator as well as detected relative time delays for all RX channels + for debugging purposes. + + Parameters + ---------- + l0b_file : str or nisar.products.readers.Raw + L0B filename or Raw object + caltone_freq : float, default=1214.88e6 + Caltone frequency in Hz. + freq_band : str, optional + "A" or "B". Default is all. + txrx_pol: str, optional + TR pol in `freq_band` such as "HH", "HV", etc. + Default is all. + + Returns + ------- + dict: + A dict with keys (freq_band, txrx_pol) and values of type + `RX_CHANNEL_IMBALANCE_PRODUCT` + + """ + if isinstance(l0b_file, str): + raw = Raw(hdf5file=l0b_file) + else: + raw = l0b_file + frq_pols = raw.polarizations + # get freq_bands and txrx_pols + if freq_band is not None: + frq_pols = {freq_band: frq_pols[freq_band]} + if txrx_pol is not None: + frq_pols = {f: [txrx_pol] for f in frq_pols if txrx_pol in frq_pols[f]} + + out = dict() + for freq_band in frq_pols: + for txrx_pol in frq_pols[freq_band]: + (lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio + ) = compute_rx_channel_imbalance( + raw, + freq_band, + txrx_pol, + caltone_freq=caltone_freq + ) + out[freq_band, txrx_pol] = RX_CHANNEL_IMBALANCE_PRODUCT( + lna_caltone_ratio=lna_caltone_ratio, + ntap_dominant=n_tap_dominant, + time_delays_sec=time_delays, + max_amp_ratio=max_ratio + ) + return out + + +def compute_rx_channel_imbalance( + raw: Raw, + freq_band: str, + txrx_pol: str, + caltone_freq: float = 1214.88e6 +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]: + """ + Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio + for a desired frequency band and TR polarization. + + Also report the dominant tap number our of 3 for LNA three-tap + correlator as well as detected relative time delays for all RX channels + for debugging purposes. + + Returns + ------- + lna_caltone_ratio: np.ndarray(complex) + Peak-normalized complex LNA/CALTONE ratio over all 12 RXs + n_tap_dominant: np.ndarray(int) + Dominant tap number, a value within [1,3] over all 12 RXs. + time_delays: np.ndarray(float) + Time delays from the phase of qFSP outlier + max_ratio : float + Report peak power among all channels used for amplitude + normalization of RX channel imbalances. + + """ + lna_mean, n_tap_dominant = get_lna_cal_mean( + raw, freq_band, txrx_pol) + # get caltone mean over all RX channels + caltone_mean = get_caltone_mean(raw, freq_band, txrx_pol) + # Get complex ratio LNA/Caltone over all channels + lna_caltone_ratio = lna_mean / caltone_mean + # correct the ratio for the second band if necessary + lna_caltone_ratio, time_delays = correct_lna_caltone_ratio_for_second_band( + lna_caltone_ratio, + raw, + freq_band, + txrx_pol, + caltone_freq=caltone_freq + ) + # peak normalized + max_ratio = np.nanmax(abs(lna_caltone_ratio)) + if not np.isclose(max_ratio, 0): + lna_caltone_ratio /= max_ratio + return lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio + + +def parse_chirp_corr_from_hrt_qfsp( + raw: Raw, + txrx_pol: str) -> Tuple[np.ndarray, np.ndarray]: + """ + Parse three-tap chirp correlator array with shape (lines, 12, 3) + as well ass cal type with shape (lines,) from HRT QFSP. + """ + # get HRT path + hrt_path = raw.TelemetryPath.replace('low', 'high') + qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + # loop over three qfsp + for i_qfsp in range(3): + p_qfsp = f'{qfsp_path}{i_qfsp}' + # loop over 4 channels per qfsp: + for nn in range(4): + i_chn = nn + i_qfsp * 4 + n_rx = i_chn + 1 + # loop over 3 taps + for i_tap in range(3): + n_tap = i_tap + 1 + # form the path to the dataset per I and Q + # use RX pol! + p_ds_i = (f'{p_qfsp}/CHIRP_CORRELATOR_I{n_tap}_' + f'{txrx_pol[1]}{n_rx:02d}') + p_ds_q = (f'{p_qfsp}/CHIRP_CORRELATOR_Q{n_tap}_' + f'{txrx_pol[1]}{n_rx:02d}') + # initialize the 3-D array, lines by 12 by 3 + if i_qfsp == nn == i_tap == 0: + # XXX get caltype from the very first qFSP assuming + # it is qFSP independent! + p_type = f'{p_qfsp}/CP_CAL_TYPE_{txrx_pol[1]}{i_qfsp}' + # XXX Following Try/exception block is added to + # support old sim L0B products lacking HRT! + try: + ds_cal_type = f5[p_type] + except KeyError: + warn(f'Missing dataset "{p_type}" in ' + f'"{raw.filename}". LNA CAL values ' + 'from co-pol will be used instead. ' + 'Results may be invalid!') + freq_band = [f for f in raw.frequencies if + txrx_pol in raw.polarizations[f]][0] + chp_cor = raw.getChirpCorrelator( + freq_band, txrx_pol[0]) + cal_type = raw.getCalType(freq_band, txrx_pol[0]) + return chp_cor, cal_type + else: + cal_type = ds_cal_type[()].astype(CalPath) + # initialize the 3-D array for chirp correlator + num_lines = f5[p_ds_i].size + chp_cor = np.ones((num_lines, 12, 3), dtype='c8') + chp_cor[:, i_chn, i_tap].real = f5[p_ds_i][()] + chp_cor[:, i_chn, i_tap].imag = f5[p_ds_q][()] + return chp_cor, cal_type + + +def get_lna_cal_mean( + raw: Raw, + freq_band: str, + txrx_pol: str +) -> Tuple[np.ndarray, np.ndarray]: + """ + Returns mean complex LNA values and dominant tap + numbers within [1, 2, 3] for all channels + """ + if txrx_pol[0] == txrx_pol[1]: + chp_cor = raw.getChirpCorrelator(freq_band, txrx_pol[0]) + cal_type = raw.getCalType(freq_band, txrx_pol[0]) + else: # get x-pol chirp correlator from HRT + chp_cor, cal_type = parse_chirp_corr_from_hrt_qfsp( + raw, + txrx_pol + ) + n_rxs = chp_cor.shape[1] + _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type) + if len(idx_lna) == 0: + warn('No LNA CAL to represent RX! Use BYPASS Cal instead!') + if len(idx_byp) == 0: + # XXX to avoid failure in unit test or very short L0B + # lacking LNA/BYP CAL datasets, a warning will be issued + # and the values will all be set to unity! + warn('No LNA or BYPASS CAL! LNA mean will be all unity. ' + 'The results will be invalid!') + lna_mean = np.ones(n_rxs, dtype='c8') + n_tap_dominant = np.full(n_rxs, fill_value=2) + return lna_mean, n_tap_dominant + idx_lna = idx_byp + # get LNA for all three taps (or BYPASS) + lna_mean_tap3 = np.zeros((3, n_rxs), dtype='c16') + for nn in range(3): + lna_cal = chp_cor[idx_lna, :, nn] + # get complex mean for all RX channels + lna_mean_tap3[nn] = _mean_2d(lna_cal) + # get dominat taps + abs_lna_mean_tap3 = abs(lna_mean_tap3) + idx_lna_taps = np.nanargmax(abs_lna_mean_tap3, axis=0) + amp_lna_mean = np.zeros(n_rxs) + for nn in range(n_rxs): + amp_lna_mean[nn] = abs_lna_mean_tap3[idx_lna_taps[nn], nn] + _check_if_zero(amp_lna_mean, msg=f'{txrx_pol[0]}-pol LNA Cal') + # get the phase part at a fixed common tap rather than dominant one + phs_lna_mean = np.angle(lna_mean_tap3[1]) + # form complex lna + lna_mean = amp_lna_mean * np.exp(1j * phs_lna_mean) + n_tap_dominant = idx_lna_taps + 1 + return lna_mean, n_tap_dominant + + +def correct_lna_caltone_ratio_for_second_band( + lna_caltone_ratio: np.ndarray, + raw: Raw, + freq_band: str, + txrx_pol: str, + caltone_freq: float = 1214.88e6 +) -> Tuple[np.ndarray, np.ndarray]: + # XXX check if product from the second band so we can modify + # the results from the first band only if there is a + # relative delay offset in one of qFSP vs others, that is + # one of the qFSP is an outlier due to ADC clock/delay issue + # check if there is delay anomaly among three qFSP + fc_a, _, _, _ = raw.getChirpParameters('A', txrx_pol[0]) + # get diff of chirp (band=A) and caltone freq for delay detection + dif_chirp_caltone_freq = fc_a - caltone_freq + time_delay = _get_qfsp_delay_anomaly( + lna_caltone_ratio, dif_chirp_caltone_freq) + if _is_product_from_second_band(raw, freq_band, txrx_pol): + warn(f'correcting LNA/CALTONE for band={freq_band} and pol={txrx_pol}') + # if there is then get diff of frequency bands A dn B + # to be used to correct phase from A for B + fc_b, _, _, _ = raw.getChirpParameters('B', txrx_pol[0]) + phs_adj = 2 * np.pi * (fc_b - fc_a) * time_delay + # correct the LNA/CALTONE by delay amount via phase if any. + lna_caltone_ratio *= np.exp(1j * phs_adj) + return lna_caltone_ratio, time_delay + + +def get_caltone_mean( + raw: Raw, + freq_band: str, + txrx_pol: str +) -> np.ndarray: + # now get caltone always from swath + caltone = raw.getCaltone(freq_band, txrx_pol) + caltone_mean = _mean_2d(caltone) + _check_if_zero(caltone_mean, msg=f'{txrx_pol}-pol Caltone') + return caltone_mean + + +def _is_product_from_second_band( + raw: Raw, + freq_band: str, + txrx_pol: str): + """ + Determine whether the produt is avolable on both bands + and it is from the second band. + """ + if freq_band == "B" and len(raw.frequencies) == 2: + if txrx_pol in raw.polarizations['A']: + return True + return False + return False + + +def _get_qfsp_delay_anomaly( + lna_caltone_ratio: np.ndarray, + dif_chirp_caltone_freq: float, + adc_clock: float = 240e6) -> np.ndarray: + """ + get time delays for a qfSP with phase anomaly only for + 12 channel NISAR L-band product. + For other case, it will be set to zero! + """ + if lna_caltone_ratio.size == 12: + # group them into three 4-channels, one per qFSP + lna2cal_ratio = lna_caltone_ratio.reshape(3, 4) + # get unwrap phase across 4 channels per qFSP (radians) + lna2cal_phs = np.unwrap(np.angle(lna2cal_ratio), axis=1) + # get median phase per qfsp, total 3 phase values (radians) + # and then unwrap three values + qfps_phs = np.unwrap(np.nanmedian(lna2cal_phs, axis=1)) + # use median among all three to be used as a reference to + # catch a single outlier + phs_ref = np.median(qfps_phs) + # phase due to ADC delay + phs_adc_delay = 2 * np.pi * dif_chirp_caltone_freq / adc_clock + n_delay_qfsp = np.round((qfps_phs - phs_ref) / phs_adc_delay) + # now repeat sample delay 4x per qFSP + n_delays = np.repeat( + n_delay_qfsp[:, np.newaxis], repeats=4, axis=1).ravel() + time_delays = n_delays / adc_clock + else: + time_delays = np.zeros(lna_caltone_ratio.size) + return time_delays + + +def _mean_2d(data: np.ndarray, perc: float = 0.0) -> np.asarray: + """ + Compute mean within percentile [perc, 100-perc], + of a 2-D complex array with shape (rangelines, channels) + due to bad telemetry. + """ + # or simply np.nanmean(data, axis=0) + d = np.sort(np.abs(data), axis=0) + q1_all, q3_all = np.percentile(d, q=[perc, 100 - perc], axis=0) + mean_all = [] + for cc, (q1, q3) in enumerate(zip(q1_all, q3_all)): + data_q1_q3 = data[(d[:, cc] >= q1) & (d[:, cc] <= q3), cc] + mean_all.append(np.nanmean(data_q1_q3)) + return np.asarray(mean_all) + + +def _check_if_zero(arr: np.ndarray, msg: str): + is_zero = np.isclose(arr, 0) + if is_zero.all(): + # XXX to avoid unit test failure and old sim L0B + # a warning will be issued and all values will be set + # to unity! + warn(f'All values are zero for {msg}! They are set to untiy. ' + 'Result may be invalid!') + arr[...] = 1.0 + if is_zero.any(): + warn(f'Some values are zero for {msg}!') From 1759142aef8ba9c6860afd0843ac4982cd7d8a7f Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Sun, 30 Nov 2025 16:36:15 -0800 Subject: [PATCH 02/19] Modify pattern.py to compute and apply RX imbalances per freq band --- python/packages/nisar/antenna/pattern.py | 120 ++++++++++++++--------- 1 file changed, 76 insertions(+), 44 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 9b6b65b44..cb0a2ff92 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -1,14 +1,15 @@ from collections import defaultdict -from enum import IntEnum, unique from isce3.core import Orbit, Attitude, Linspace from isce3.geometry import DEMInterpolator import logging -from nisar.mixed_mode.logic import PolChannelSet from nisar.products.readers.antenna import AntennaParser from nisar.products.readers.instrument import InstrumentParser from nisar.products.readers.Raw import Raw from nisar.antenna import TxTrmInfo, RxTrmInfo, TxBMF, RxDBF from nisar.antenna.beamformer import get_pulse_index +from nisar.antenna.rx_channel_imbalance_helpers import ( + compute_all_rx_channel_imbalances_from_l0b +) import numpy as np log = logging.getLogger("nisar.antenna.pattern") @@ -150,6 +151,11 @@ class AntennaPattern: vairations expected to be less than 0.05 dB and 0.25 deg, respectively. This can speed up the antenna pattern computation. If None, it will be ignored. + freq_band : {'A', 'B'} or None. Optional + If none, the very first frequency band will + be used. + caltone_freq: float, default=1214.88e6 + Caltone frequency in Hz. """ @@ -158,7 +164,9 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, orbit: Orbit, attitude: Attitude, *, el_lut=None, norm_weight=True, - el_spacing_min=8.72665e-5): + el_spacing_min=8.72665e-5, + freq_band=None, + caltone_freq=1214.88e6): self.orbit = orbit.copy() self.attitude = attitude.copy() @@ -167,12 +175,27 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.el_spacing_min = el_spacing_min self.el_lut = el_lut - # get pols - channels = PolChannelSet.from_raw(raw) - freqs = tuple({chan.freq_id for chan in channels}) - self.freq_band = "A" if "A" in freqs else freqs[0] - self.txrx_pols = tuple({chan.pol for chan in channels}) - + # get frequency band + freqs = np.sort(raw.frequencies) + if freq_band is None: + self.freq_band = freqs[0] + else: + if freq_band not in freqs: + raise ValueError( + f'freq_band {freq_band} is out of range {freqs}!') + self.freq_band = freq_band + # get all polarization for a frequency band + self.txrx_pols = raw.polarizations[self.freq_band] + # comput all RX channel imbalances over all + # txrx pols of a desired frequency band. + # This RX imbalanced is basically LNA/CALTONE ratio! + # XXX perhaps Caltone frequency can be parsed from L0B DRT + # rather than provided as an input! + self.rx_imb = compute_all_rx_channel_imbalances_from_l0b( + raw, + freq_band=self.freq_band, + caltone_freq=caltone_freq + ) # Parse ref epoch, pulse time, slant range, orbit and attitude from Raw # Except for quad-pol, pulse time is the same for all TX pols. # In case of quad-pol the time offset is half single-pol PRF and thus @@ -192,7 +215,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # parse active RX channels and fs_ta which are polarization # independent! - txrx_pol = raw.polarizations[self.freq_band][0] + txrx_pol = self.txrx_pols[0] self.rx_chanl = raw.getListOfRxTRMs(self.freq_band, txrx_pol) self.fs_ta = ins.sampling_rate_ta(txrx_pol[1]) @@ -205,14 +228,14 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # Loop over all freqs & pols since some RX pols may be found only on # freq B (e.q. the QQP case). Assume RD/WD/WL are the same for all # freqs/pols that have the same RX polarization. - for chan in channels: - rxpol = chan.pol[1] + for txrx_pol in self.txrx_pols: + rxpol = txrx_pol[1] if rxpol in rd_all: continue rd_all[rxpol], wd_all[rxpol], wl_all[rxpol] = raw.getRdWdWl( - chan.freq_id, chan.pol) + self.freq_band, txrx_pol) self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol], - wd_all[rxpol], wl_all[rxpol]) + wd_all[rxpol], wl_all[rxpol]) # build RxTRMs and the first RxDBF for all possible RX # linear polarizations @@ -295,7 +318,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # instrument file per TX linear pol. self.channel_adj_fact_tx[tx_lp] = ( ins.channel_adjustment_factors_tx(tx_lp) - ) + ) # get tx el-cut patterns el_pat_tx = ant.el_cut_all(tx_lp) @@ -307,9 +330,8 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, el_lut=self.el_lut, norm_weight=self.norm_weight, rg_spacing_min=self.rg_spacing_min) - def form_pattern(self, tseq, slant_range: Linspace, - nearest: bool = False, txrx_pols = None): + nearest: bool = False, txrx_pols=None): """ Get the two-way antenna pattern at a given time and set of ranges for either all or specified polarization combinations if Tx/Rx pols are @@ -333,26 +355,33 @@ def form_pattern(self, tseq, slant_range: Linspace, ------- dict Two-way complex antenna patterns as a function of range bin - over either all or specified TxRx polarization products. The format of dict is - {pol: np.ndarray[complex]}. + over either all or specified TxRx polarization products. + The format of dict is {pol: np.ndarray[complex]}. """ if txrx_pols is None: txrx_pols = self.txrx_pols elif not set(txrx_pols).issubset(self.txrx_pols): raise ValueError(f"Specified txrx_pols {txrx_pols} is out of " - f"available pols {self.txrx_pols}!") + f"available pols {self.txrx_pols}!") tseq = np.atleast_1d(tseq) - rx_pols = {pol[1] for pol in txrx_pols} - # form one-way RX patterns for all linear pols rx_dbf_pat = dict() - for p in rx_pols: + for txrx_pol in txrx_pols: + rxp = txrx_pol[1] + # combine channel adjustment from both internally computed + # RX imbalance (LNA/CALTONE) and secondary correction from + # input INST HDF5 product + channel_adj_fact_rx = ( + self.rx_imb[self.freq_band, txrx_pol].lna_caltone_ratio) + if self.channel_adj_fact_rx[rxp] is not None: + channel_adj_fact_rx *= np.asarray( + self.channel_adj_fact_rx[rxp]) # Split up provided timespan into groups with the same range timing # (Adding one because get_pulse_index uses floor but we want ceil) change_indices = [ - get_pulse_index(tseq, t) + 1 for t in self.finder[p].time_changes + get_pulse_index(tseq, t) + 1 for t in self.finder[rxp].time_changes if t > tseq[0] and t < tseq[-1] ] tgroups = np.split(tseq, change_indices) @@ -361,41 +390,44 @@ def form_pattern(self, tseq, slant_range: Linspace, i0 = 0 for tgroup in tgroups: t = tgroup[0] - rd, wd, wl = self.finder[p].get_dbf_timing(t) + rd, wd, wl = self.finder[rxp].get_dbf_timing(t) - log.info(f'Updating {p}-pol RX antenna pattern because' + log.info(f'Updating {rxp}-pol RX antenna pattern because' ' change in RD/WD/WL') - self.rx_trm[p] = RxTrmInfo( + self.rx_trm[rxp] = RxTrmInfo( self.pulse_times, self.rx_chanl, rd, wd, wl, - self.dbf_coef[p], self.ta_switch[p], self.ela_dbf[p], + self.dbf_coef[rxp], self.ta_switch[rxp], self.ela_dbf[rxp], self.fs_win, self.fs_ta) - self.rx_dbf[p] = RxDBF( - self.orbit, self.attitude, self.dem, self.el_pat_rx[p], - self.rx_trm[p], self.reference_epoch, + self.rx_dbf[rxp] = RxDBF( + self.orbit, self.attitude, self.dem, self.el_pat_rx[rxp], + self.rx_trm[rxp], self.reference_epoch, el_lut=self.el_lut, - norm_weight=self.rx_dbf[p].norm_weight) + norm_weight=self.rx_dbf[rxp].norm_weight) - pat = self.rx_dbf[p].form_pattern( + pat = self.rx_dbf[rxp].form_pattern( tgroup, slant_range, - channel_adj_factors=self.channel_adj_fact_rx[p] + channel_adj_factors=channel_adj_fact_rx ) - # Initialize the pattern array so we can slice this range timing - # group into it - TODO move this outside the loop for clarity? - if p not in rx_dbf_pat: - rx_dbf_pat[p] = np.empty((len(tseq), slant_range.size), - dtype=np.complex64) + # Initialize the pattern array so we can slice this + # range timing group into it + # TODO move this outside the loop for clarity? + if rxp not in rx_dbf_pat: + rx_dbf_pat[rxp] = np.empty((len(tseq), slant_range.size), + dtype=np.complex64) # Slice it into the full array, and # bump up the index for the next slice iend = i0 + len(tgroup) - rx_dbf_pat[p][i0:iend] = pat + rx_dbf_pat[rxp][i0:iend] = pat i0 = iend # form one-way TX patterns for all TX pols - tx_bmf_pat = defaultdict(lambda: np.empty((len(tseq), slant_range.size), - dtype=np.complex64)) + tx_bmf_pat = defaultdict( + lambda: np.empty((len(tseq), slant_range.size), + dtype=np.complex64) + ) for tx_pol in {pol[0] for pol in txrx_pols}: if tx_pol == "L": tx_bmf_pat[tx_pol] = ( @@ -420,8 +452,8 @@ def form_pattern(self, tseq, slant_range: Linspace, else: # other non-compact pol types adj = self.channel_adj_fact_tx[tx_pol] tx_bmf_pat[tx_pol] = self.tx_bmf[tx_pol].form_pattern( - tseq, slant_range, nearest=nearest, channel_adj_factors=adj - ).astype(np.complex64) + tseq, slant_range, nearest=nearest, channel_adj_factors=adj + ).astype(np.complex64) # build two-way pattern for all unique TxRx products obtained from all # freq bands From d0ce1abc6ed506b6214e92f3ca767faa4053c8e4 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Sun, 30 Nov 2025 16:37:32 -0800 Subject: [PATCH 03/19] Update beamformer to allow single scalar for RX channel adjustment --- python/packages/nisar/antenna/beamformer.py | 57 +++++++++++++-------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/python/packages/nisar/antenna/beamformer.py b/python/packages/nisar/antenna/beamformer.py index 300549a1d..4acc7cf9b 100644 --- a/python/packages/nisar/antenna/beamformer.py +++ b/python/packages/nisar/antenna/beamformer.py @@ -315,11 +315,13 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None, # check the pulse_time to be within time tag of TxTRM if (pulse_time[0] < self._tm_first_trm or pulse_time[-1] > self._tm_last_trm): - raise ValueError("Requested time interval " + raise ValueError( + "Requested time interval " f"[{pulse_time[0]}, {pulse_time[-1]}] (s) is not fully " "contained within expected TxTrmInfo time interval " f"[{self.trm_info.time[0]}, {self.trm_info.time[-1]}] (s) " - f"relative to {str(self.orbit.reference_epoch)}") + f"relative to {str(self.orbit.reference_epoch)}" + ) # get total number of TX channels _, num_chanl = self.trm_info.correlator_tap2.shape @@ -380,9 +382,11 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None, # Old method: compute monotonically increasing elevation angles # at each successive slant range bin - # Compute the respective slant range for beamformed antenna pattern + # Compute the respective slant range for beamformed + # antenna pattern. # Simply calculate slant range for every few pulses where - # S/C pos/vel and DEM barely changes. This speeds up the process! + # S/C pos/vel and DEM barely changes. This speeds up + # the process! if (pp % self.num_pulse_skip == 0): sr_ant = self._elaz2slantrange(tm) @@ -580,11 +584,13 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None): # check the pulse_time to be within time tag of RxTRM if (pulse_time[0] < self._tm_first_trm or pulse_time[-1] > self._tm_last_trm): - raise ValueError("Requested time interval " + raise ValueError( + "Requested time interval " f"[{pulse_time[0]}, {pulse_time[-1]}] (s) is not fully " "contained within expected RxTrmInfo time interval " f"[{self.trm_info.time[0]}, {self.trm_info.time[-1]}] (s) " - f"relative to {str(self.orbit.reference_epoch)}") + f"relative to {str(self.orbit.reference_epoch)}" + ) # EL-cut pattern with shape active beams by EL angles ant_pat_el = self.el_ant_info.copol_pattern[self.active_channel_idx] @@ -609,16 +615,27 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None): # if provided if channel_adj_factors is not None: # check the size of correction factor container - if len(channel_adj_factors) != num_chanl: - raise ValueError('Size of RX "channel adjustment factor" ' - f'must be {num_chanl}') - # check if the correction factor is zero for all active channels - cor_fact = np.asarray(channel_adj_factors)[self.active_channel_idx] - if np.isclose(abs(cor_fact).max(), 0): - raise ValueError('"channel_adj_factors" are zeros for all ' - 'active RX channels!') - rx_wgt *= cor_fact[:, None] - + if len(channel_adj_factors) == num_chanl: + # check if the correction factor is zero for all + # active channels + cor_fact = np.asarray(channel_adj_factors)[ + self.active_channel_idx] + if np.isclose(abs(cor_fact).max(), 0): + raise ValueError('"channel_adj_factors" are zeros for all ' + 'active RX channels!') + rx_wgt *= cor_fact[:, None] + elif len(channel_adj_factors) == 1: # fixed scalar + if np.isclose(channel_adj_factors[0], 0): + raise ValueError( + '"channel_adj_factors" is a zero-value scalar for all' + 'active RX channels!' + ) + rx_wgt *= channel_adj_factors[0] + else: # neither 1 (scalar) nor `num_chanl` + raise ValueError( + 'Size of RX "channel adjustment factor" must be either ' + f'{num_chanl} or 1 but got {len(channel_adj_factors)}!' + ) # initialize the RX DBF pattern rx_pat = np.zeros((len(pulse_time), slant_range.size), dtype='complex') num_active_chanl = len(self.active_channel_idx) @@ -779,7 +796,7 @@ def compute_transmit_pattern_weights(tx_trm_info, norm=False): warnings.warn( 'HPA Cal contains some zero values. These will be replaced with' ' the nearest non-zero values.', category=BadHPACalWarning - ) + ) # replace zero values with nearest non-zero ones for n in range(active_tx_idx.size): mask_zr = np.isclose(hcal_abs[:, n], 0) @@ -789,7 +806,7 @@ def compute_transmit_pattern_weights(tx_trm_info, norm=False): f_nearest = interp1d( i_hpa_nz, tx_weights[i_hpa_nz, n], kind='nearest', fill_value='extrapolate', assume_sorted=True - ) + ) tx_weights[i_hpa_z, n] = f_nearest(i_hpa_z) # If BCAL exists compute ratio HCAL/(BCAL/BCAL[0]) @@ -975,8 +992,8 @@ def get_pulse_index(pulse_times, t, nearest=False, eps=5e-10): eps : float Tolerance for snapping time tags, e.g., when `abs(pulse_times[i] - t) <= eps` - then return `i`. This accommodates floating point precision issues like - `n * pri != n / prf`. + then return `i`. This accommodates floating point precision + issues like `n * pri != n / prf`. Returns ------- From 37df7fb7337a1117d32900a51476b7a0aca6b484 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Sun, 30 Nov 2025 16:38:38 -0800 Subject: [PATCH 04/19] Pass frequency band to EAP block in focus.py --- python/packages/nisar/workflows/focus.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/packages/nisar/workflows/focus.py b/python/packages/nisar/workflows/focus.py index dde3c990a..1cd4f91ab 100644 --- a/python/packages/nisar/workflows/focus.py +++ b/python/packages/nisar/workflows/focus.py @@ -1911,7 +1911,8 @@ def temp(suffix): if cfg.processing.is_enabled.eap: antpat = AntennaPattern(raw, dem, antparser, instparser, orbit, attitude, - el_lut=el_lut) + el_lut=el_lut, + freq_band=frequency) log.info("Precomputing antenna patterns") i = np.arange(rc_grid.shape[0]) From 71dc922cee466cbb9d467a1e71b5ff71dc933d44 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 2 Dec 2025 15:35:37 -0800 Subject: [PATCH 05/19] Add a function to compute pulsewidth delay in sequential TX --- .../antenna/rx_channel_imbalance_helpers.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index d858b697c..221b8c217 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -377,3 +377,23 @@ def _check_if_zero(arr: np.ndarray, msg: str): arr[...] = 1.0 if is_zero.any(): warn(f'Some values are zero for {msg}!') + + +def get_pulsewidth_delay_from_raw( + raw: Raw, + freq_band: str, + txrx_pol: str +) -> float: + """ + Get delay (seconds) of the second pulse wrt the pulsewidth + of the first TX pulse in sequential split-spectrum transmit + for a desired dataset in L0B. + """ + # check if band is B and it is split spectrum + if freq_band == 'B' and len(raw.frequencies) == 2: + pols = raw.polarizations + # check if this is sequential transmit + if txrx_pol in pols['A']: + _, _, _, _, pw = raw.getChirpParameters('A', txrx_pol[0]) + return pw + return 0.0 From ed83d0e9dfba05929dc07343a3833621020dbbf7 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 2 Dec 2025 15:36:10 -0800 Subject: [PATCH 06/19] Modify pattern.py to adjustment RD for the second band in RX DBF --- python/packages/nisar/antenna/pattern.py | 24 ++++++++++++++++++- .../antenna/rx_channel_imbalance_helpers.py | 2 +- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index cb0a2ff92..0f6a19f72 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -1,3 +1,4 @@ +from warnings import warn from collections import defaultdict from isce3.core import Orbit, Attitude, Linspace from isce3.geometry import DEMInterpolator @@ -8,7 +9,8 @@ from nisar.antenna import TxTrmInfo, RxTrmInfo, TxBMF, RxDBF from nisar.antenna.beamformer import get_pulse_index from nisar.antenna.rx_channel_imbalance_helpers import ( - compute_all_rx_channel_imbalances_from_l0b + compute_all_rx_channel_imbalances_from_l0b, + get_pulsewidth_delay_from_raw ) import numpy as np @@ -236,6 +238,26 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.freq_band, txrx_pol) self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol], wd_all[rxpol], wl_all[rxpol]) + # XXX get pulsewidth delay and number of samples to correct + # RDs @ `self.fs_win` used in forming RX DBF pattern. This + # will account for delay in onboard DBF process due to first + # pulsewidth in sequential split spectrum TX. + # Note that half pulse wdith delay shall also be incorporated + # per ferquency band for all modes. That is delay of 0.5 * pw_a + # for "A" and delay of "pw_a + 0.5 * pw_b" for B. + # The fdollowing does NOT take into account half pulsewidth + # per TX pulse! + tm_delay = get_pulsewidth_delay_from_raw( + raw, self.freq_band, txrx_pol) + n_samp_delay = round(tm_delay * self.fs_win) + if n_samp_delay != 0: + warn( + f'RD of RxDBF for band={self.freq_band} & pol={txrx_pol} ' + f'is corrected by {tm_delay * 1e6:.3f} (usec) or ' + f'equivalently # {n_samp_delay} samples @ ' + f'{self.fs_win} (MHz)!' + ) + rd_all[rxpol] += n_samp_delay # build RxTRMs and the first RxDBF for all possible RX # linear polarizations diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 221b8c217..e11041083 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -394,6 +394,6 @@ def get_pulsewidth_delay_from_raw( pols = raw.polarizations # check if this is sequential transmit if txrx_pol in pols['A']: - _, _, _, _, pw = raw.getChirpParameters('A', txrx_pol[0]) + _, _, _, pw = raw.getChirpParameters('A', txrx_pol[0]) return pw return 0.0 From 07fae4c6b0fe7e6821a75dd6758771ef58709441 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 2 Dec 2025 16:14:01 -0800 Subject: [PATCH 07/19] Fix the log in pattern.py --- python/packages/nisar/antenna/pattern.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 0f6a19f72..bfde92a28 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -255,7 +255,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, f'RD of RxDBF for band={self.freq_band} & pol={txrx_pol} ' f'is corrected by {tm_delay * 1e6:.3f} (usec) or ' f'equivalently # {n_samp_delay} samples @ ' - f'{self.fs_win} (MHz)!' + f'{self.fs_win * 1e-6} (MHz)!' ) rd_all[rxpol] += n_samp_delay From 0e2df26cd64231c14eca00f2917e469e61fa91ce Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Tue, 2 Dec 2025 19:06:15 -0800 Subject: [PATCH 08/19] Correct the sign for RD correction of band B in RX DBF pattern --- python/packages/nisar/antenna/pattern.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index bfde92a28..11eeb2bd7 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -257,7 +257,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, f'equivalently # {n_samp_delay} samples @ ' f'{self.fs_win * 1e-6} (MHz)!' ) - rd_all[rxpol] += n_samp_delay + rd_all[rxpol] -= n_samp_delay # build RxTRMs and the first RxDBF for all possible RX # linear polarizations From 0dd72f14fe62c31961dae91a6653eb73634f3fa4 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 3 Dec 2025 22:14:34 -0800 Subject: [PATCH 09/19] Use slant range diff between A and B in place of pulsewidth delay --- python/packages/nisar/antenna/pattern.py | 20 ++++++++----------- .../antenna/rx_channel_imbalance_helpers.py | 9 ++++++--- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 11eeb2bd7..ff11d140a 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -10,7 +10,7 @@ from nisar.antenna.beamformer import get_pulse_index from nisar.antenna.rx_channel_imbalance_helpers import ( compute_all_rx_channel_imbalances_from_l0b, - get_pulsewidth_delay_from_raw + get_range_delay_from_raw ) import numpy as np @@ -238,16 +238,12 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.freq_band, txrx_pol) self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol], wd_all[rxpol], wl_all[rxpol]) - # XXX get pulsewidth delay and number of samples to correct - # RDs @ `self.fs_win` used in forming RX DBF pattern. This - # will account for delay in onboard DBF process due to first - # pulsewidth in sequential split spectrum TX. - # Note that half pulse wdith delay shall also be incorporated - # per ferquency band for all modes. That is delay of 0.5 * pw_a - # for "A" and delay of "pw_a + 0.5 * pw_b" for B. - # The fdollowing does NOT take into account half pulsewidth - # per TX pulse! - tm_delay = get_pulsewidth_delay_from_raw( + # XXX get range delay offset for band B in split-spectrum to + # correct RDs @ `self.fs_win` used in forming RX DBF pattern. + # This will account for delay after onboard DBF due to + # both the first pulsewidth in sequential TX chirps + # as well as the difference in filter group delays. + tm_delay = get_range_delay_from_raw( raw, self.freq_band, txrx_pol) n_samp_delay = round(tm_delay * self.fs_win) if n_samp_delay != 0: @@ -257,7 +253,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, f'equivalently # {n_samp_delay} samples @ ' f'{self.fs_win * 1e-6} (MHz)!' ) - rd_all[rxpol] -= n_samp_delay + rd_all[rxpol][...] = rd_all[rxpol].astype(int) + n_samp_delay # build RxTRMs and the first RxDBF for all possible RX # linear polarizations diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index e11041083..f6560d2cc 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -8,6 +8,7 @@ from nisar.products.readers.Raw import Raw from nisar.antenna import get_calib_range_line_idx, CalPath +from isce3.core import speed_of_light @dataclass(frozen=True) @@ -379,7 +380,7 @@ def _check_if_zero(arr: np.ndarray, msg: str): warn(f'Some values are zero for {msg}!') -def get_pulsewidth_delay_from_raw( +def get_range_delay_from_raw( raw: Raw, freq_band: str, txrx_pol: str @@ -394,6 +395,8 @@ def get_pulsewidth_delay_from_raw( pols = raw.polarizations # check if this is sequential transmit if txrx_pol in pols['A']: - _, _, _, pw = raw.getChirpParameters('A', txrx_pol[0]) - return pw + sr_b = raw.getRanges('B', txrx_pol[0]) + sr_a = raw.getRanges('A', txrx_pol[0]) + delay = 2 * (sr_b.first - sr_a.first) / speed_of_light + return delay return 0.0 From 9d3dd594900d4486610a30e44aa0e56b10f6a967 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 3 Dec 2025 23:52:17 -0800 Subject: [PATCH 10/19] Parse caltone frequency from DRT in L0B --- .../antenna/rx_channel_imbalance_helpers.py | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index f6560d2cc..c8d5dac37 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -45,7 +45,7 @@ def __post_init__(self): def compute_all_rx_channel_imbalances_from_l0b( l0b_file: str | Raw, *, - caltone_freq: float = 1214.88e6, + caltone_freq: float | None = None, freq_band: str | None = None, txrx_pol: str | None = None ) -> Dict[Tuple[str, str], RX_CHANNEL_IMBALANCE_PRODUCT]: @@ -62,11 +62,12 @@ def compute_all_rx_channel_imbalances_from_l0b( ---------- l0b_file : str or nisar.products.readers.Raw L0B filename or Raw object - caltone_freq : float, default=1214.88e6 + caltone_freq : float or None. Optional Caltone frequency in Hz. - freq_band : str, optional + If None (default), it will be extracted from DRT in L0B. + freq_band : str. Optional "A" or "B". Default is all. - txrx_pol: str, optional + txrx_pol : str. Optional TR pol in `freq_band` such as "HH", "HV", etc. Default is all. @@ -111,7 +112,7 @@ def compute_rx_channel_imbalance( raw: Raw, freq_band: str, txrx_pol: str, - caltone_freq: float = 1214.88e6 + caltone_freq: float | None = None, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]: """ Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio @@ -269,8 +270,13 @@ def correct_lna_caltone_ratio_for_second_band( raw: Raw, freq_band: str, txrx_pol: str, - caltone_freq: float = 1214.88e6 + caltone_freq: float | None = None ) -> Tuple[np.ndarray, np.ndarray]: + # Get calton efrequency from DRT if not provided + if caltone_freq is None: + caltone_freq = parse_caltone_freq_from_drt(raw, txrx_pol) + warn(f'Caltone frequency is extracted from {txrx_pol[1]}-pol DRT ' + f'-> {caltone_freq * 1e-6:.3f} (MHz)') # XXX check if product from the second band so we can modify # the results from the first band only if there is a # relative delay offset in one of qFSP vs others, that is @@ -400,3 +406,29 @@ def get_range_delay_from_raw( delay = 2 * (sr_b.first - sr_a.first) / speed_of_light return delay return 0.0 + + +def parse_caltone_freq_from_drt( + raw: Raw, + txrx_pol: str +) -> float: + """get caltone frequency in Hz from low rate telemetry in L0B.""" + # default caltone if dataset is not available (Hz) + default = 1214.88e6 + # frequency of local oscillator (Hz) + lo = 1200e6 + # ADC clock (Hz) + clock = 240e6 + c_p = (f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_CALTONE_PHASE_STEP_' + f'{txrx_pol[1]}') + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_caltone_phase = f5[c_p] + except KeyError: + warn(f'Missing path "{c_p}" in L0B! Caltone frequency will ' + f'be set to {default} (Hz)') + return default + else: + i_cal = np.median(ds_caltone_phase[()]).astype(int) + caltone_freq = (i_cal / 2**32) * clock + lo + return caltone_freq From f28dba70b14b3552f9b7812765438b6f48824086 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 3 Dec 2025 23:58:31 -0800 Subject: [PATCH 11/19] Set default caltone frequency to none in pattern.py --- python/packages/nisar/antenna/pattern.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index ff11d140a..cc1fd6fdd 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -156,8 +156,9 @@ class AntennaPattern: freq_band : {'A', 'B'} or None. Optional If none, the very first frequency band will be used. - caltone_freq: float, default=1214.88e6 + caltone_freq: float or None. Optional Caltone frequency in Hz. + If None (default), it will be extracted from telemetry DRT in L0B. """ @@ -168,7 +169,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, norm_weight=True, el_spacing_min=8.72665e-5, freq_band=None, - caltone_freq=1214.88e6): + caltone_freq=None): self.orbit = orbit.copy() self.attitude = attitude.copy() From bce7e7a7b63bdb40a7425a90c9ec59a67680432d Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Mon, 8 Dec 2025 22:04:52 -0800 Subject: [PATCH 12/19] Add onboard DBF delay offset of 2.1474 us to RD in pattern.py --- python/packages/nisar/antenna/pattern.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index cc1fd6fdd..5517557af 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -159,6 +159,9 @@ class AntennaPattern: caltone_freq: float or None. Optional Caltone frequency in Hz. If None (default), it will be extracted from telemetry DRT in L0B. + delay_ofs_dbf: float, default=2.1474e-6 + Delay offset (seconds) in data window position of onboard DBF + process applied to all bands and polarizations. """ @@ -169,7 +172,8 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, norm_weight=True, el_spacing_min=8.72665e-5, freq_band=None, - caltone_freq=None): + caltone_freq=None, + delay_ofs_dbf=2.1474e-6): self.orbit = orbit.copy() self.attitude = attitude.copy() @@ -246,6 +250,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, # as well as the difference in filter group delays. tm_delay = get_range_delay_from_raw( raw, self.freq_band, txrx_pol) + tm_delay += delay_ofs_dbf n_samp_delay = round(tm_delay * self.fs_win) if n_samp_delay != 0: warn( From 53a24d938f1e2c960386a181d9596d7eb952f9c5 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Mon, 19 Jan 2026 16:58:57 -0800 Subject: [PATCH 13/19] Use camel case naming convention for rx channel imbalance class --- .../nisar/antenna/rx_channel_imbalance_helpers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index c8d5dac37..1cb9f14dd 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -12,7 +12,7 @@ @dataclass(frozen=True) -class RX_CHANNEL_IMBALANCE_PRODUCT: +class RxChannelImbalanceProduct: """ RX channel imbalance product extracted from LNA/CALTONE ratio for a certain frequency band and polarization. @@ -48,7 +48,7 @@ def compute_all_rx_channel_imbalances_from_l0b( caltone_freq: float | None = None, freq_band: str | None = None, txrx_pol: str | None = None -) -> Dict[Tuple[str, str], RX_CHANNEL_IMBALANCE_PRODUCT]: +) -> Dict[Tuple[str, str], RxChannelImbalanceProduct]: """ Compute 12 complex RX channel imbalance based on LNA/CALTONE ratio for over all bands and polarizations. The bands and polarizations are @@ -75,7 +75,7 @@ def compute_all_rx_channel_imbalances_from_l0b( ------- dict: A dict with keys (freq_band, txrx_pol) and values of type - `RX_CHANNEL_IMBALANCE_PRODUCT` + `RxChannelImbalanceProduct` """ if isinstance(l0b_file, str): @@ -99,7 +99,7 @@ def compute_all_rx_channel_imbalances_from_l0b( txrx_pol, caltone_freq=caltone_freq ) - out[freq_band, txrx_pol] = RX_CHANNEL_IMBALANCE_PRODUCT( + out[freq_band, txrx_pol] = RxChannelImbalanceProduct( lna_caltone_ratio=lna_caltone_ratio, ntap_dominant=n_tap_dominant, time_delays_sec=time_delays, From 2938fc04b455d59006020c8bd8f68dbb650fae37 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Mon, 19 Jan 2026 17:15:22 -0800 Subject: [PATCH 14/19] Fix docstrings, comments, typos, and some minor improvement --- python/packages/nisar/antenna/pattern.py | 6 ++--- .../antenna/rx_channel_imbalance_helpers.py | 22 +++++++++---------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index 5517557af..e173175b8 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -195,9 +195,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.txrx_pols = raw.polarizations[self.freq_band] # comput all RX channel imbalances over all # txrx pols of a desired frequency band. - # This RX imbalanced is basically LNA/CALTONE ratio! - # XXX perhaps Caltone frequency can be parsed from L0B DRT - # rather than provided as an input! + # This RX imbalanced is basically LNA/CALTONE ratio. self.rx_imb = compute_all_rx_channel_imbalances_from_l0b( raw, freq_band=self.freq_band, @@ -243,7 +241,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, self.freq_band, txrx_pol) self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol], wd_all[rxpol], wl_all[rxpol]) - # XXX get range delay offset for band B in split-spectrum to + # Get range delay offset for band B in split-spectrum to # correct RDs @ `self.fs_win` used in forming RX DBF pattern. # This will account for delay after onboard DBF due to # both the first pulsewidth in sequential TX chirps diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 1cb9f14dd..7d4b74dfa 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -94,9 +94,9 @@ def compute_all_rx_channel_imbalances_from_l0b( for txrx_pol in frq_pols[freq_band]: (lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio ) = compute_rx_channel_imbalance( - raw, - freq_band, - txrx_pol, + raw=raw, + freq_band=freq_band, + txrx_pol=txrx_pol, caltone_freq=caltone_freq ) out[freq_band, txrx_pol] = RxChannelImbalanceProduct( @@ -135,8 +135,7 @@ def compute_rx_channel_imbalance( normalization of RX channel imbalances. """ - lna_mean, n_tap_dominant = get_lna_cal_mean( - raw, freq_band, txrx_pol) + lna_mean, n_tap_dominant = get_lna_cal_mean(raw, freq_band, txrx_pol) # get caltone mean over all RX channels caltone_mean = get_caltone_mean(raw, freq_band, txrx_pol) # Get complex ratio LNA/Caltone over all channels @@ -272,12 +271,12 @@ def correct_lna_caltone_ratio_for_second_band( txrx_pol: str, caltone_freq: float | None = None ) -> Tuple[np.ndarray, np.ndarray]: - # Get calton efrequency from DRT if not provided + # Get caltone frequency from DRT if not provided if caltone_freq is None: caltone_freq = parse_caltone_freq_from_drt(raw, txrx_pol) warn(f'Caltone frequency is extracted from {txrx_pol[1]}-pol DRT ' f'-> {caltone_freq * 1e-6:.3f} (MHz)') - # XXX check if product from the second band so we can modify + # Check if product from the second band so we can modify # the results from the first band only if there is a # relative delay offset in one of qFSP vs others, that is # one of the qFSP is an outlier due to ADC clock/delay issue @@ -289,7 +288,7 @@ def correct_lna_caltone_ratio_for_second_band( lna_caltone_ratio, dif_chirp_caltone_freq) if _is_product_from_second_band(raw, freq_band, txrx_pol): warn(f'correcting LNA/CALTONE for band={freq_band} and pol={txrx_pol}') - # if there is then get diff of frequency bands A dn B + # if there is then get diff of frequency bands A and B # to be used to correct phase from A for B fc_b, _, _, _ = raw.getChirpParameters('B', txrx_pol[0]) phs_adj = 2 * np.pi * (fc_b - fc_a) * time_delay @@ -321,7 +320,6 @@ def _is_product_from_second_band( if freq_band == "B" and len(raw.frequencies) == 2: if txrx_pol in raw.polarizations['A']: return True - return False return False @@ -330,9 +328,9 @@ def _get_qfsp_delay_anomaly( dif_chirp_caltone_freq: float, adc_clock: float = 240e6) -> np.ndarray: """ - get time delays for a qfSP with phase anomaly only for - 12 channel NISAR L-band product. - For other case, it will be set to zero! + If the product is a 12-channel NISAR L-band product, + return the time delays for a qFSP with phase anomaly. + Else, return zeros. """ if lna_caltone_ratio.size == 12: # group them into three 4-channels, one per qFSP From 18d739fd6ceaae391e2390505c7a43b90d706eb0 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Mon, 19 Jan 2026 17:25:35 -0800 Subject: [PATCH 15/19] Warn if array size in rx channel imbalance class is not 12 --- python/packages/nisar/antenna/rx_channel_imbalance_helpers.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 7d4b74dfa..54dc3e2f3 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -40,6 +40,9 @@ def __post_init__(self): if (self.lna_caltone_ratio.size != self.ntap_dominant.size != self.time_delays_sec.size): raise ValueError('The size of all arrays must be equal!') + if self.lna_caltone_ratio.size != 12: + warn('The size of LNA-CALTONE ratio is ' + f'{self.lna_caltone_ratio.size} instead of 12!') def compute_all_rx_channel_imbalances_from_l0b( From 81a83c08bdd6b1874d4ef3600213afadb5e43eb7 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Thu, 29 Jan 2026 16:20:53 -0800 Subject: [PATCH 16/19] Fix sign for delay_ofs_dbf in pattern.py --- python/packages/nisar/antenna/pattern.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/packages/nisar/antenna/pattern.py b/python/packages/nisar/antenna/pattern.py index e173175b8..0648e738e 100644 --- a/python/packages/nisar/antenna/pattern.py +++ b/python/packages/nisar/antenna/pattern.py @@ -159,7 +159,7 @@ class AntennaPattern: caltone_freq: float or None. Optional Caltone frequency in Hz. If None (default), it will be extracted from telemetry DRT in L0B. - delay_ofs_dbf: float, default=2.1474e-6 + delay_ofs_dbf: float, default=-2.1474e-6 Delay offset (seconds) in data window position of onboard DBF process applied to all bands and polarizations. @@ -173,7 +173,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator, el_spacing_min=8.72665e-5, freq_band=None, caltone_freq=None, - delay_ofs_dbf=2.1474e-6): + delay_ofs_dbf=-2.1474e-6): self.orbit = orbit.copy() self.attitude = attitude.copy() From 27bb15e4dc24c7a5fd30f5402abaa0ff8759987a Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 4 Feb 2026 16:22:47 -0800 Subject: [PATCH 17/19] Add support for QP in computing RX channel imbalance --- .../antenna/rx_channel_imbalance_helpers.py | 275 +++++++++++++++--- 1 file changed, 236 insertions(+), 39 deletions(-) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index 54dc3e2f3..eb53d346d 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -2,6 +2,7 @@ from warnings import warn from typing import Tuple, Dict from dataclasses import dataclass +from enum import IntEnum, unique import numpy as np import h5py @@ -45,6 +46,20 @@ def __post_init__(self): f'{self.lna_caltone_ratio.size} instead of 12!') +@unique +class PolarizationTypeId(IntEnum): + """Enumeration for polarization types of L-band NISAR""" + single_h = 0 + single_v = 1 + dual_h = 2 + dual_v = 3 + quad = 4 + compact = 5 + none = 6 + quasi_quad = 7 + quasi_dual = 8 + + def compute_all_rx_channel_imbalances_from_l0b( l0b_file: str | Raw, *, @@ -132,13 +147,13 @@ def compute_rx_channel_imbalance( n_tap_dominant: np.ndarray(int) Dominant tap number, a value within [1,3] over all 12 RXs. time_delays: np.ndarray(float) - Time delays from the phase of qFSP outlier + Time delays from the phase of qFSP outlier in seconds max_ratio : float Report peak power among all channels used for amplitude normalization of RX channel imbalances. """ - lna_mean, n_tap_dominant = get_lna_cal_mean(raw, freq_band, txrx_pol) + lna_mean, n_tap_dominant = get_lna_cal_mean(raw, txrx_pol) # get caltone mean over all RX channels caltone_mean = get_caltone_mean(raw, freq_band, txrx_pol) # Get complex ratio LNA/Caltone over all channels @@ -158,12 +173,93 @@ def compute_rx_channel_imbalance( return lna_caltone_ratio, n_tap_dominant, time_delays, max_ratio -def parse_chirp_corr_from_hrt_qfsp( +def polarization_type_from_drt(raw: Raw) -> PolarizationTypeId: + """Get polarization ID and type from L0B DRT""" + pol_path = f'{raw.TelemetryPath}/DRT/MISC/CP_IFSW_POLARIZATION' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_pol = f5[pol_path] + except KeyError: + warn(f'Missing dataset "{pol_path}" in "{raw.filename}"') + id_pol = 6 + else: + i_pol = ds_pol[()] + id_pol = np.nanmedian(i_pol) + return PolarizationTypeId(id_pol) + + +def is_raw_quad_pol(raw: Raw) -> bool: + """Determine whether raw L0B is Quad or not""" + return polarization_type_from_drt(raw) == PolarizationTypeId.quad + + +def parse_rangeline_index_from_hrt( raw: Raw, - txrx_pol: str) -> Tuple[np.ndarray, np.ndarray]: + txrx_pol: str = None) -> np.ndarray | None: + """ + Get range line index over all range lines from + HRT if exists otherwise None! + + Returns + ------- + np.ndarray(uint) or None + If not available in L0b, None will be returned. + + """ + hrt_path = raw.TelemetryPath.replace('low', 'high') + freq_band = sorted(raw.frequencies)[0] + pols = raw.polarizations[freq_band] + if txrx_pol is None: + txrx_pol = pols[0] + elif txrx_pol not in pols: + raise ValueError(f'Available pols {pols} but got {txrx_pol}!') + rgl_idx_path = (f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/' + 'RangeLine/RH_RANGELINE_INDEX') + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + try: + ds_rgl_idx = f5[rgl_idx_path] + except KeyError as err: + warn(f'Can not parse range line index from HRT. Error -> {err}') + return None + else: + return ds_rgl_idx[()] + + +def first_tx_pol_for_quad(raw: Raw) -> str: + """Get first TX polarization, H or V, from only Quad pol product""" + if not is_raw_quad_pol(raw): + raise ValueError('Not a quad pol!') + idx_rgl = parse_rangeline_index_from_hrt(raw)[0] + # if not in HRT parse single-pol version from swath path + if idx_rgl is None: + idx_rgl_h = raw.getRangeLineIndex('A', 'H')[0] + idx_rgl_v = raw.getRangeLineIndex('A', 'V')[0] + if idx_rgl_v < idx_rgl_h: + return 'V' + return 'H' + else: # odd range line is V pol first and even is H pol first! + return {0: 'H', 1: 'V'}.get(idx_rgl % 2) + + +def parse_chirpcorrelator_from_hrt_qfsp( + raw: Raw, + txrx_pol: str) -> np.ndarray | None: """ Parse three-tap chirp correlator array with shape (lines, 12, 3) as well ass cal type with shape (lines,) from HRT QFSP. + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(complex) or None + 3-D complex array of chirp correlator with shape (Lines, channels, 3) + If the field does not exist None will be returned. + """ # get HRT path hrt_path = raw.TelemetryPath.replace('low', 'high') @@ -176,7 +272,7 @@ def parse_chirp_corr_from_hrt_qfsp( for nn in range(4): i_chn = nn + i_qfsp * 4 n_rx = i_chn + 1 - # loop over 3 taps + # loop over 3 taps per channel for i_tap in range(3): n_tap = i_tap + 1 # form the path to the dataset per I and Q @@ -185,53 +281,154 @@ def parse_chirp_corr_from_hrt_qfsp( f'{txrx_pol[1]}{n_rx:02d}') p_ds_q = (f'{p_qfsp}/CHIRP_CORRELATOR_Q{n_tap}_' f'{txrx_pol[1]}{n_rx:02d}') - # initialize the 3-D array, lines by 12 by 3 - if i_qfsp == nn == i_tap == 0: - # XXX get caltype from the very first qFSP assuming - # it is qFSP independent! - p_type = f'{p_qfsp}/CP_CAL_TYPE_{txrx_pol[1]}{i_qfsp}' - # XXX Following Try/exception block is added to - # support old sim L0B products lacking HRT! - try: - ds_cal_type = f5[p_type] - except KeyError: - warn(f'Missing dataset "{p_type}" in ' - f'"{raw.filename}". LNA CAL values ' - 'from co-pol will be used instead. ' - 'Results may be invalid!') - freq_band = [f for f in raw.frequencies if - txrx_pol in raw.polarizations[f]][0] - chp_cor = raw.getChirpCorrelator( - freq_band, txrx_pol[0]) - cal_type = raw.getCalType(freq_band, txrx_pol[0]) - return chp_cor, cal_type - else: - cal_type = ds_cal_type[()].astype(CalPath) + try: + ds_i = f5[p_ds_i] + except KeyError as err: + warn( + f'Missing dataset {p_ds_i} in {raw.filename}.' + f' Detailed error -> {err}' + ) + return None + else: + # initialize the 3-D array, lines by 12 by 3 + if i_qfsp == nn == i_tap == 0: # initialize the 3-D array for chirp correlator - num_lines = f5[p_ds_i].size + num_lines = ds_i.size chp_cor = np.ones((num_lines, 12, 3), dtype='c8') - chp_cor[:, i_chn, i_tap].real = f5[p_ds_i][()] - chp_cor[:, i_chn, i_tap].imag = f5[p_ds_q][()] + chp_cor[:, i_chn, i_tap].real = ds_i[()] + chp_cor[:, i_chn, i_tap].imag = f5[p_ds_q][()] + return chp_cor + + +def parse_caltype_from_hrt_qfsp( + raw: Raw, + txrx_pol: str) -> np.ndarray | None: + """ + Parse cal type with shape (lines,) from HRT QFSP. + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(uint8) or None + 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and + INVALID=255. If the field does not exist None will be returned. + + """ + # get HRT path + hrt_path = raw.TelemetryPath.replace('low', 'high') + qfsp_path = f'{hrt_path}/tx{txrx_pol[0]}/rx{txrx_pol[1]}/QFSP' + with h5py.File(raw.filename, mode='r', swmr=True) as f5: + # XXX get caltype from the very first qFSP assuming + # it is qFSP independent! + i_qfsp = 0 + p_qfsp = f'{qfsp_path}{i_qfsp}' + p_type = f'{p_qfsp}/CP_CAL_TYPE_{txrx_pol[1]}{i_qfsp}' + # XXX Following Try/exception block is added to + # support old sim L0B products lacking HRT! + try: + ds_cal_type = f5[p_type] + except KeyError as err: + warn(f'Missing dataset "{p_type}" in ' + f'"{raw.filename}". Detailed error -> {err}') + return None + else: + return ds_cal_type[()].astype(CalPath) + + +def _opposite_pol(pol: str) -> str: + """Get the oppsoite pol""" + if pol == 'H': + return 'V' + elif pol == 'V': + return 'H' + else: + return pol + + +def chirpcorrelator_caltype_from_raw( + raw: Raw, + txrx_pol: str +) -> Tuple[np.ndarray, np.ndarray]: + """ + Parse three-tap chirp correlator array with shape (lines, 12, 3) + as well ass cal type with shape (lines,) from Raw L0B for a certain + TxRX pol + + Parameters + ---------- + raw : nisar.products.readers.Raw + txrx_pol : str + TxRx polarization such as HH, VH, etc + + Returns + ------- + np.ndarray(complex) + 3-D complex array of chirp correlator with shape (Lines, channels, 3) + np.ndarray(uint8) + 1-D array of cal type w/ values HPA=0, LNA=1, BYPASS=2, and INVALID=255 + + """ + chp_cor = parse_chirpcorrelator_from_hrt_qfsp(raw, txrx_pol=txrx_pol) + cal_type = parse_caltype_from_hrt_qfsp(raw, txrx_pol=txrx_pol) + # XXX if the respective field does not exist then use co-pol under + # swath in L0B for the sake of backward compatibility + if chp_cor is None or cal_type is None: + freq_band = [f for f in raw.frequencies if + txrx_pol in raw.polarizations[f]][0] + chp_cor = raw.getChirpCorrelator(freq_band, txrx_pol[0]) + cal_type = raw.getCalType(freq_band, txrx_pol[0]) + return chp_cor, cal_type + # Quad pol case + if is_raw_quad_pol(raw): + tx_pol_first = first_tx_pol_for_quad(raw) + if txrx_pol[0] == tx_pol_first: + chp_cor = chp_cor[::2] + cal_type = cal_type[::2] + else: # the second TX pol + # get data from the opssoite TX pol + x_pol = _opposite_pol(txrx_pol[0]) + txrx_pol[1] + chp_cor_x, cal_type_x = chirpcorrelator_caltype_from_raw( + raw, txrx_pol=x_pol) + # if co-pol get HPA value from same TX but + # fill in LNA/BYP from oppsoite TX + if txrx_pol[0] == txrx_pol[1]: + chp_cor = chp_cor[1::2] + cal_type = cal_type[1::2] + _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type_x) + chp_cor[idx_byp] = chp_cor_x[idx_byp] + chp_cor[idx_lna] = chp_cor_x[idx_lna] + cal_type[idx_byp] = CalPath.BYPASS + cal_type[idx_lna] = CalPath.LNA + else: # x-pol product + chp_cor = chp_cor_x + cal_type = cal_type_x + # set x-pol HPA to INVALID given they are the mix of + # LNA from co-pol and HPA from x-pol! + if txrx_pol in ('HV', 'VH'): + idx_hpa, _, _, _ = get_calib_range_line_idx(cal_type) + if idx_hpa.size > 0: + warn(f'Set HPA cal type for x-pol {txrx_pol} to INVALID!') + cal_type[idx_hpa] = CalPath.INVALID return chp_cor, cal_type def get_lna_cal_mean( raw: Raw, - freq_band: str, txrx_pol: str ) -> Tuple[np.ndarray, np.ndarray]: """ Returns mean complex LNA values and dominant tap numbers within [1, 2, 3] for all channels """ - if txrx_pol[0] == txrx_pol[1]: - chp_cor = raw.getChirpCorrelator(freq_band, txrx_pol[0]) - cal_type = raw.getCalType(freq_band, txrx_pol[0]) - else: # get x-pol chirp correlator from HRT - chp_cor, cal_type = parse_chirp_corr_from_hrt_qfsp( - raw, - txrx_pol - ) + chp_cor, cal_type = chirpcorrelator_caltype_from_raw( + raw=raw, + txrx_pol=txrx_pol + ) n_rxs = chp_cor.shape[1] _, idx_byp, idx_lna, _ = get_calib_range_line_idx(cal_type) if len(idx_lna) == 0: From 547999dfab65f7e82faecc7e6c861b9fdaa6b233 Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 4 Feb 2026 16:23:28 -0800 Subject: [PATCH 18/19] Get caltone freq from runconfig rather than DRT in focus.py for now --- python/packages/nisar/workflows/focus.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/packages/nisar/workflows/focus.py b/python/packages/nisar/workflows/focus.py index 1cd4f91ab..308ed0cfe 100644 --- a/python/packages/nisar/workflows/focus.py +++ b/python/packages/nisar/workflows/focus.py @@ -1909,10 +1909,14 @@ def temp(suffix): # Precompute antenna patterns at downsampled spacing if cfg.processing.is_enabled.eap: + # XXX Due to a bug in respective DRT of some L0B products + # (CRID=05007), caltone.frequency is extrated from runconfig + # otherwise, it shall be set to None to be determined from DRT! antpat = AntennaPattern(raw, dem, antparser, instparser, orbit, attitude, el_lut=el_lut, - freq_band=frequency) + freq_band=frequency, + caltone_freq=cfg.processing.caltone.frequency) log.info("Precomputing antenna patterns") i = np.arange(rc_grid.shape[0]) From 27e9c2e7ab35046e5ec0c355ee4c8a5946ff262f Mon Sep 17 00:00:00 2001 From: Hirad Ghaemi Date: Wed, 11 Feb 2026 15:31:47 -0800 Subject: [PATCH 19/19] Fix a bug in rx imbalance module for quasi case --- .../antenna/rx_channel_imbalance_helpers.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py index eb53d346d..4fe0d2f94 100644 --- a/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py +++ b/python/packages/nisar/antenna/rx_channel_imbalance_helpers.py @@ -481,19 +481,25 @@ def correct_lna_caltone_ratio_for_second_band( # relative delay offset in one of qFSP vs others, that is # one of the qFSP is an outlier due to ADC clock/delay issue # check if there is delay anomaly among three qFSP - fc_a, _, _, _ = raw.getChirpParameters('A', txrx_pol[0]) - # get diff of chirp (band=A) and caltone freq for delay detection - dif_chirp_caltone_freq = fc_a - caltone_freq - time_delay = _get_qfsp_delay_anomaly( - lna_caltone_ratio, dif_chirp_caltone_freq) if _is_product_from_second_band(raw, freq_band, txrx_pol): warn(f'correcting LNA/CALTONE for band={freq_band} and pol={txrx_pol}') + fc_a, _, _, _ = raw.getChirpParameters('A', txrx_pol[0]) + # get diff of chirp (band=A) and caltone freq for delay detection + dif_chirp_caltone_freq = fc_a - caltone_freq + time_delay = _get_qfsp_delay_anomaly( + lna_caltone_ratio, dif_chirp_caltone_freq) # if there is then get diff of frequency bands A and B # to be used to correct phase from A for B fc_b, _, _, _ = raw.getChirpParameters('B', txrx_pol[0]) phs_adj = 2 * np.pi * (fc_b - fc_a) * time_delay # correct the LNA/CALTONE by delay amount via phase if any. lna_caltone_ratio *= np.exp(1j * phs_adj) + else: # simply first band either A or B! + fc, _, _, _ = raw.getChirpParameters(freq_band, txrx_pol[0]) + # get diff of chirp (band=A) and caltone freq for delay detection + dif_chirp_caltone_freq = fc - caltone_freq + time_delay = _get_qfsp_delay_anomaly( + lna_caltone_ratio, dif_chirp_caltone_freq) return lna_caltone_ratio, time_delay