Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3b88d2f
Add a new mod to nisar.antenna to compute RX imbalances from Raw/L0B
rad-eng-59 Dec 1, 2025
1759142
Modify pattern.py to compute and apply RX imbalances per freq band
rad-eng-59 Dec 1, 2025
d0ce1ab
Update beamformer to allow single scalar for RX channel adjustment
rad-eng-59 Dec 1, 2025
37df7fb
Pass frequency band to EAP block in focus.py
rad-eng-59 Dec 1, 2025
71dc922
Add a function to compute pulsewidth delay in sequential TX
rad-eng-59 Dec 2, 2025
ed83d0e
Modify pattern.py to adjustment RD for the second band in RX DBF
rad-eng-59 Dec 2, 2025
07fae4c
Fix the log in pattern.py
rad-eng-59 Dec 3, 2025
0e2df26
Correct the sign for RD correction of band B in RX DBF pattern
rad-eng-59 Dec 3, 2025
0dd72f1
Use slant range diff between A and B in place of pulsewidth delay
rad-eng-59 Dec 4, 2025
9d3dd59
Parse caltone frequency from DRT in L0B
rad-eng-59 Dec 4, 2025
f28dba7
Set default caltone frequency to none in pattern.py
rad-eng-59 Dec 4, 2025
bce7e7a
Add onboard DBF delay offset of 2.1474 us to RD in pattern.py
rad-eng-59 Dec 9, 2025
53a24d9
Use camel case naming convention for rx channel imbalance class
rad-eng-59 Jan 20, 2026
2938fc0
Fix docstrings, comments, typos, and some minor improvement
rad-eng-59 Jan 20, 2026
18d739f
Warn if array size in rx channel imbalance class is not 12
rad-eng-59 Jan 20, 2026
3bf3ae6
Merge remote-tracking branch 'upstream-pub/develop' into rximb-dbfpat…
rad-eng-59 Jan 20, 2026
81a83c0
Fix sign for delay_ofs_dbf in pattern.py
rad-eng-59 Jan 30, 2026
27bb15e
Add support for QP in computing RX channel imbalance
rad-eng-59 Feb 5, 2026
547999d
Get caltone freq from runconfig rather than DRT in focus.py for now
rad-eng-59 Feb 5, 2026
27e9c2e
Fix a bug in rx imbalance module for quasi case
rad-eng-59 Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/packages/nisar/antenna/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
from .beamformer import TxBMF, RxDBF, compute_receive_pattern_weights, \
compute_transmit_pattern_weights, get_calib_range_line_idx
from .pattern import AntennaPattern
from .rx_channel_imbalance_helpers import (
compute_all_rx_channel_imbalances_from_l0b
)
57 changes: 37 additions & 20 deletions python/packages/nisar/antenna/beamformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,13 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None,
# check the pulse_time to be within time tag of TxTRM
if (pulse_time[0] < self._tm_first_trm or
pulse_time[-1] > self._tm_last_trm):
raise ValueError("Requested time interval "
raise ValueError(
"Requested time interval "
f"[{pulse_time[0]}, {pulse_time[-1]}] (s) is not fully "
"contained within expected TxTrmInfo time interval "
f"[{self.trm_info.time[0]}, {self.trm_info.time[-1]}] (s) "
f"relative to {str(self.orbit.reference_epoch)}")
f"relative to {str(self.orbit.reference_epoch)}"
)

# get total number of TX channels
_, num_chanl = self.trm_info.correlator_tap2.shape
Expand Down Expand Up @@ -380,9 +382,11 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None,
# Old method: compute monotonically increasing elevation angles
# at each successive slant range bin

# Compute the respective slant range for beamformed antenna pattern
# Compute the respective slant range for beamformed
# antenna pattern.
# Simply calculate slant range for every few pulses where
# S/C pos/vel and DEM barely changes. This speeds up the process!
# S/C pos/vel and DEM barely changes. This speeds up
# the process!
if (pp % self.num_pulse_skip == 0):
sr_ant = self._elaz2slantrange(tm)

Expand Down Expand Up @@ -580,11 +584,13 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None):
# check the pulse_time to be within time tag of RxTRM
if (pulse_time[0] < self._tm_first_trm or
pulse_time[-1] > self._tm_last_trm):
raise ValueError("Requested time interval "
raise ValueError(
"Requested time interval "
f"[{pulse_time[0]}, {pulse_time[-1]}] (s) is not fully "
"contained within expected RxTrmInfo time interval "
f"[{self.trm_info.time[0]}, {self.trm_info.time[-1]}] (s) "
f"relative to {str(self.orbit.reference_epoch)}")
f"relative to {str(self.orbit.reference_epoch)}"
)

# EL-cut pattern with shape active beams by EL angles
ant_pat_el = self.el_ant_info.copol_pattern[self.active_channel_idx]
Expand All @@ -609,16 +615,27 @@ def form_pattern(self, pulse_time, slant_range, channel_adj_factors=None):
# if provided
if channel_adj_factors is not None:
# check the size of correction factor container
if len(channel_adj_factors) != num_chanl:
raise ValueError('Size of RX "channel adjustment factor" '
f'must be {num_chanl}')
# check if the correction factor is zero for all active channels
cor_fact = np.asarray(channel_adj_factors)[self.active_channel_idx]
if np.isclose(abs(cor_fact).max(), 0):
raise ValueError('"channel_adj_factors" are zeros for all '
'active RX channels!')
rx_wgt *= cor_fact[:, None]

if len(channel_adj_factors) == num_chanl:
# check if the correction factor is zero for all
# active channels
cor_fact = np.asarray(channel_adj_factors)[
self.active_channel_idx]
if np.isclose(abs(cor_fact).max(), 0):
raise ValueError('"channel_adj_factors" are zeros for all '
'active RX channels!')
rx_wgt *= cor_fact[:, None]
elif len(channel_adj_factors) == 1: # fixed scalar
if np.isclose(channel_adj_factors[0], 0):
raise ValueError(
'"channel_adj_factors" is a zero-value scalar for all'
'active RX channels!'
)
rx_wgt *= channel_adj_factors[0]
else: # neither 1 (scalar) nor `num_chanl`
raise ValueError(
'Size of RX "channel adjustment factor" must be either '
f'{num_chanl} or 1 but got {len(channel_adj_factors)}!'
)
# initialize the RX DBF pattern
rx_pat = np.zeros((len(pulse_time), slant_range.size), dtype='complex')
num_active_chanl = len(self.active_channel_idx)
Expand Down Expand Up @@ -779,7 +796,7 @@ def compute_transmit_pattern_weights(tx_trm_info, norm=False):
warnings.warn(
'HPA Cal contains some zero values. These will be replaced with'
' the nearest non-zero values.', category=BadHPACalWarning
)
)
# replace zero values with nearest non-zero ones
for n in range(active_tx_idx.size):
mask_zr = np.isclose(hcal_abs[:, n], 0)
Expand All @@ -789,7 +806,7 @@ def compute_transmit_pattern_weights(tx_trm_info, norm=False):
f_nearest = interp1d(
i_hpa_nz, tx_weights[i_hpa_nz, n], kind='nearest',
fill_value='extrapolate', assume_sorted=True
)
)
tx_weights[i_hpa_z, n] = f_nearest(i_hpa_z)

# If BCAL exists compute ratio HCAL/(BCAL/BCAL[0])
Expand Down Expand Up @@ -975,8 +992,8 @@ def get_pulse_index(pulse_times, t, nearest=False, eps=5e-10):
eps : float
Tolerance for snapping time tags, e.g., when
`abs(pulse_times[i] - t) <= eps`
then return `i`. This accommodates floating point precision issues like
`n * pri != n / prf`.
then return `i`. This accommodates floating point precision
issues like `n * pri != n / prf`.

Returns
-------
Expand Down
142 changes: 98 additions & 44 deletions python/packages/nisar/antenna/pattern.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from warnings import warn
from collections import defaultdict
from enum import IntEnum, unique
from isce3.core import Orbit, Attitude, Linspace
from isce3.geometry import DEMInterpolator
import logging
from nisar.mixed_mode.logic import PolChannelSet
from nisar.products.readers.antenna import AntennaParser
from nisar.products.readers.instrument import InstrumentParser
from nisar.products.readers.Raw import Raw
from nisar.antenna import TxTrmInfo, RxTrmInfo, TxBMF, RxDBF
from nisar.antenna.beamformer import get_pulse_index
from nisar.antenna.rx_channel_imbalance_helpers import (
compute_all_rx_channel_imbalances_from_l0b,
get_range_delay_from_raw
)
import numpy as np

log = logging.getLogger("nisar.antenna.pattern")
Expand Down Expand Up @@ -150,6 +153,15 @@ class AntennaPattern:
vairations expected to be less than 0.05 dB and 0.25 deg, respectively.
This can speed up the antenna pattern computation. If None, it will be
ignored.
freq_band : {'A', 'B'} or None. Optional
If none, the very first frequency band will
be used.
caltone_freq: float or None. Optional
Caltone frequency in Hz.
If None (default), it will be extracted from telemetry DRT in L0B.
delay_ofs_dbf: float, default=-2.1474e-6
Delay offset (seconds) in data window position of onboard DBF
process applied to all bands and polarizations.

"""

Expand All @@ -158,7 +170,10 @@ def __init__(self, raw: Raw, dem: DEMInterpolator,
orbit: Orbit, attitude: Attitude,
*, el_lut=None,
norm_weight=True,
el_spacing_min=8.72665e-5):
el_spacing_min=8.72665e-5,
freq_band=None,
caltone_freq=None,
delay_ofs_dbf=-2.1474e-6):

self.orbit = orbit.copy()
self.attitude = attitude.copy()
Expand All @@ -167,12 +182,25 @@ def __init__(self, raw: Raw, dem: DEMInterpolator,
self.el_spacing_min = el_spacing_min
self.el_lut = el_lut

# get pols
channels = PolChannelSet.from_raw(raw)
freqs = tuple({chan.freq_id for chan in channels})
self.freq_band = "A" if "A" in freqs else freqs[0]
self.txrx_pols = tuple({chan.pol for chan in channels})

# get frequency band
freqs = np.sort(raw.frequencies)
if freq_band is None:
self.freq_band = freqs[0]
else:
if freq_band not in freqs:
raise ValueError(
f'freq_band {freq_band} is out of range {freqs}!')
self.freq_band = freq_band
# get all polarization for a frequency band
self.txrx_pols = raw.polarizations[self.freq_band]
# comput all RX channel imbalances over all
# txrx pols of a desired frequency band.
# This RX imbalanced is basically LNA/CALTONE ratio.
self.rx_imb = compute_all_rx_channel_imbalances_from_l0b(
raw,
freq_band=self.freq_band,
caltone_freq=caltone_freq
)
# Parse ref epoch, pulse time, slant range, orbit and attitude from Raw
# Except for quad-pol, pulse time is the same for all TX pols.
# In case of quad-pol the time offset is half single-pol PRF and thus
Expand All @@ -192,7 +220,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator,

# parse active RX channels and fs_ta which are polarization
# independent!
txrx_pol = raw.polarizations[self.freq_band][0]
txrx_pol = self.txrx_pols[0]
self.rx_chanl = raw.getListOfRxTRMs(self.freq_band, txrx_pol)
self.fs_ta = ins.sampling_rate_ta(txrx_pol[1])

Expand All @@ -205,14 +233,31 @@ def __init__(self, raw: Raw, dem: DEMInterpolator,
# Loop over all freqs & pols since some RX pols may be found only on
# freq B (e.q. the QQP case). Assume RD/WD/WL are the same for all
# freqs/pols that have the same RX polarization.
for chan in channels:
rxpol = chan.pol[1]
for txrx_pol in self.txrx_pols:
rxpol = txrx_pol[1]
if rxpol in rd_all:
continue
rd_all[rxpol], wd_all[rxpol], wl_all[rxpol] = raw.getRdWdWl(
chan.freq_id, chan.pol)
self.freq_band, txrx_pol)
self.finder[rxpol] = TimingFinder(self.pulse_times, rd_all[rxpol],
wd_all[rxpol], wl_all[rxpol])
wd_all[rxpol], wl_all[rxpol])
# Get range delay offset for band B in split-spectrum to
# correct RDs @ `self.fs_win` used in forming RX DBF pattern.
# This will account for delay after onboard DBF due to
# both the first pulsewidth in sequential TX chirps
# as well as the difference in filter group delays.
tm_delay = get_range_delay_from_raw(
raw, self.freq_band, txrx_pol)
tm_delay += delay_ofs_dbf
n_samp_delay = round(tm_delay * self.fs_win)
if n_samp_delay != 0:
warn(
f'RD of RxDBF for band={self.freq_band} & pol={txrx_pol} '
f'is corrected by {tm_delay * 1e6:.3f} (usec) or '
f'equivalently # {n_samp_delay} samples @ '
f'{self.fs_win * 1e-6} (MHz)!'
)
rd_all[rxpol][...] = rd_all[rxpol].astype(int) + n_samp_delay

# build RxTRMs and the first RxDBF for all possible RX
# linear polarizations
Expand Down Expand Up @@ -295,7 +340,7 @@ def __init__(self, raw: Raw, dem: DEMInterpolator,
# instrument file per TX linear pol.
self.channel_adj_fact_tx[tx_lp] = (
ins.channel_adjustment_factors_tx(tx_lp)
)
)

# get tx el-cut patterns
el_pat_tx = ant.el_cut_all(tx_lp)
Expand All @@ -307,9 +352,8 @@ def __init__(self, raw: Raw, dem: DEMInterpolator,
el_lut=self.el_lut, norm_weight=self.norm_weight,
rg_spacing_min=self.rg_spacing_min)


def form_pattern(self, tseq, slant_range: Linspace,
nearest: bool = False, txrx_pols = None):
nearest: bool = False, txrx_pols=None):
"""
Get the two-way antenna pattern at a given time and set of ranges for
either all or specified polarization combinations if Tx/Rx pols are
Expand All @@ -333,26 +377,33 @@ def form_pattern(self, tseq, slant_range: Linspace,
-------
dict
Two-way complex antenna patterns as a function of range bin
over either all or specified TxRx polarization products. The format of dict is
{pol: np.ndarray[complex]}.
over either all or specified TxRx polarization products.
The format of dict is {pol: np.ndarray[complex]}.
"""
if txrx_pols is None:
txrx_pols = self.txrx_pols
elif not set(txrx_pols).issubset(self.txrx_pols):
raise ValueError(f"Specified txrx_pols {txrx_pols} is out of "
f"available pols {self.txrx_pols}!")
f"available pols {self.txrx_pols}!")

tseq = np.atleast_1d(tseq)
rx_pols = {pol[1] for pol in txrx_pols}

# form one-way RX patterns for all linear pols
rx_dbf_pat = dict()
for p in rx_pols:
for txrx_pol in txrx_pols:
rxp = txrx_pol[1]
# combine channel adjustment from both internally computed
# RX imbalance (LNA/CALTONE) and secondary correction from
# input INST HDF5 product
channel_adj_fact_rx = (
self.rx_imb[self.freq_band, txrx_pol].lna_caltone_ratio)
if self.channel_adj_fact_rx[rxp] is not None:
channel_adj_fact_rx *= np.asarray(
self.channel_adj_fact_rx[rxp])

# Split up provided timespan into groups with the same range timing
# (Adding one because get_pulse_index uses floor but we want ceil)
change_indices = [
get_pulse_index(tseq, t) + 1 for t in self.finder[p].time_changes
get_pulse_index(tseq, t) + 1 for t in self.finder[rxp].time_changes
if t > tseq[0] and t < tseq[-1]
]
tgroups = np.split(tseq, change_indices)
Expand All @@ -361,41 +412,44 @@ def form_pattern(self, tseq, slant_range: Linspace,
i0 = 0
for tgroup in tgroups:
t = tgroup[0]
rd, wd, wl = self.finder[p].get_dbf_timing(t)
rd, wd, wl = self.finder[rxp].get_dbf_timing(t)

log.info(f'Updating {p}-pol RX antenna pattern because'
log.info(f'Updating {rxp}-pol RX antenna pattern because'
' change in RD/WD/WL')

self.rx_trm[p] = RxTrmInfo(
self.rx_trm[rxp] = RxTrmInfo(
self.pulse_times, self.rx_chanl, rd, wd, wl,
self.dbf_coef[p], self.ta_switch[p], self.ela_dbf[p],
self.dbf_coef[rxp], self.ta_switch[rxp], self.ela_dbf[rxp],
self.fs_win, self.fs_ta)

self.rx_dbf[p] = RxDBF(
self.orbit, self.attitude, self.dem, self.el_pat_rx[p],
self.rx_trm[p], self.reference_epoch,
self.rx_dbf[rxp] = RxDBF(
self.orbit, self.attitude, self.dem, self.el_pat_rx[rxp],
self.rx_trm[rxp], self.reference_epoch,
el_lut=self.el_lut,
norm_weight=self.rx_dbf[p].norm_weight)
norm_weight=self.rx_dbf[rxp].norm_weight)

pat = self.rx_dbf[p].form_pattern(
pat = self.rx_dbf[rxp].form_pattern(
tgroup, slant_range,
channel_adj_factors=self.channel_adj_fact_rx[p]
channel_adj_factors=channel_adj_fact_rx
)
# Initialize the pattern array so we can slice this range timing
# group into it - TODO move this outside the loop for clarity?
if p not in rx_dbf_pat:
rx_dbf_pat[p] = np.empty((len(tseq), slant_range.size),
dtype=np.complex64)
# Initialize the pattern array so we can slice this
# range timing group into it
# TODO move this outside the loop for clarity?
if rxp not in rx_dbf_pat:
rx_dbf_pat[rxp] = np.empty((len(tseq), slant_range.size),
dtype=np.complex64)

# Slice it into the full array, and
# bump up the index for the next slice
iend = i0 + len(tgroup)
rx_dbf_pat[p][i0:iend] = pat
rx_dbf_pat[rxp][i0:iend] = pat
i0 = iend

# form one-way TX patterns for all TX pols
tx_bmf_pat = defaultdict(lambda: np.empty((len(tseq), slant_range.size),
dtype=np.complex64))
tx_bmf_pat = defaultdict(
lambda: np.empty((len(tseq), slant_range.size),
dtype=np.complex64)
)
for tx_pol in {pol[0] for pol in txrx_pols}:
if tx_pol == "L":
tx_bmf_pat[tx_pol] = (
Expand All @@ -420,8 +474,8 @@ def form_pattern(self, tseq, slant_range: Linspace,
else: # other non-compact pol types
adj = self.channel_adj_fact_tx[tx_pol]
tx_bmf_pat[tx_pol] = self.tx_bmf[tx_pol].form_pattern(
tseq, slant_range, nearest=nearest, channel_adj_factors=adj
).astype(np.complex64)
tseq, slant_range, nearest=nearest, channel_adj_factors=adj
).astype(np.complex64)

# build two-way pattern for all unique TxRx products obtained from all
# freq bands
Expand Down
Loading
Loading