feat: Added JPL Horizons, and TLE Support#136
Conversation
There was a problem hiding this comment.
Pull request overview
This PR extends Astra’s non-sidereal tracking framework to support JPL Horizons-resolved minor bodies and Earth satellites via raw Two-Line Element (TLE) strings, including pre-pointing ahead of sequence start and (optionally) using Horizons-provided angular rates.
Changes:
- Add JPL Horizons + TLE handling to ephemeris/coordinate utilities, including optional precomputed RA/Dec rate interpolators.
- Update non-sidereal orchestration to support “pre-point then wait then apply rates”, and refresh tracking rates during long image-save operations.
- Expand scheduling config and tests to cover TLE input, lead-time activation, and additional non-sidereal scenarios; pin
cabaretto0.5.1.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Pins cabaret to 0.5.1 in the lockfile. |
| pyproject.toml | Pins cabaret dependency version. |
| src/astra/utils.py | Adds Horizons/TLE support, optional rate interpolators, and Horizons-output logging/persistence. |
| src/astra/action_configs.py | Adds tle and nonsidereal_start_lead_time_seconds; wires ephemeris precompute to return rates. |
| src/astra/nonsidereal.py | Uses precomputed rate interpolators when available; adds pre-point/activation helpers. |
| src/astra/observatory.py | Adds pre-point slew + delayed activation for non-sidereal; refreshes rates while saving images. |
| tests/test_utils.py | Adds unit/network tests for Horizons rates + TLE ephemeris behavior. |
| tests/test_nonsidereal.py | Adds tests for rate-interpolator usage and pre-point/activation behavior. |
| tests/test_subframe_config.py | Adds validation tests for nonsidereal_start_lead_time_seconds. |
| tests/test_observatory_running_schedule.py | Adds schedule/integration tests for Horizons + TLE tracking and pre-pointing behavior. |
Comments suppressed due to low confidence (3)
src/astra/utils.py:494
precompute_ephemeris()no longer retries Horizons queries withid_type='smallbody'. Many comet/asteroid names require this (and there is a test in this PR expecting the fallback). Please restore aValueError(or specific Horizons exception) fallback that retries withid_type='smallbody'before raisingNotMovingBodyError.
try:
from astroquery.jplhorizons import Horizons
location = {
"lon": obs_location.lon.deg,
"lat": obs_location.lat.deg,
"elevation": obs_location.height.to(u.km).value,
}
epochs = {
"start": start_time.iso,
"stop": stop_time.iso,
"step": str(n_points - 1), # Horizons returns n+1 rows for n steps
}
# Handle TLE data
if body_name.upper() == "TLE" or tle_data is not None:
if tle_data is None:
raise ValueError(
"tle_data parameter is required when body_name is 'TLE'"
)
call_input = {
"id": "TLE",
"location": location,
"epochs": epochs,
"optional_settings": {"TLE": tle_data},
}
obj = Horizons(id='TLE', location=location, epochs=epochs)
eph = obj.ephemerides(optional_settings={"TLE": tle_data})
else:
call_input = {"id": body_name, "location": location, "epochs": epochs}
obj = Horizons(id=body_name, location=location, epochs=epochs)
eph = obj.ephemerides()
_save_and_log_horizons_output(body_name, "precompute_ephemeris", eph, call_input)
bodies = SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg)
seconds = (Time(eph["datetime_jd"], format="jd") - start_time).to(u.s).value
if "RA_rate" in eph.colnames and "DEC_rate" in eph.colnames:
ra_rate_as_per_hour = np.asarray(eph["RA_rate"], dtype=float)
dec_rate_as_per_hour = np.asarray(eph["DEC_rate"], dtype=float)
except ConnectionError:
raise
except Exception as e:
raise NotMovingBodyError(
f"'{body_name}' could not be resolved as a solar system or minor body: {e}"
) from e
src/astra/utils.py:494
- The intended
ValueErrorfor missingtle_data(whenbody_nameis 'TLE') is currently caught by the broadexcept Exceptionand re-raised asNotMovingBodyError, which contradicts the docstring and the tests added in this PR. Handle this case explicitly (e.g., let thatValueErrorpropagate) so callers get a clear configuration error.
# Handle TLE data
if body_name.upper() == "TLE" or tle_data is not None:
if tle_data is None:
raise ValueError(
"tle_data parameter is required when body_name is 'TLE'"
)
call_input = {
"id": "TLE",
"location": location,
"epochs": epochs,
"optional_settings": {"TLE": tle_data},
}
obj = Horizons(id='TLE', location=location, epochs=epochs)
eph = obj.ephemerides(optional_settings={"TLE": tle_data})
else:
call_input = {"id": body_name, "location": location, "epochs": epochs}
obj = Horizons(id=body_name, location=location, epochs=epochs)
eph = obj.ephemerides()
_save_and_log_horizons_output(body_name, "precompute_ephemeris", eph, call_input)
bodies = SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg)
seconds = (Time(eph["datetime_jd"], format="jd") - start_time).to(u.s).value
if "RA_rate" in eph.colnames and "DEC_rate" in eph.colnames:
ra_rate_as_per_hour = np.asarray(eph["RA_rate"], dtype=float)
dec_rate_as_per_hour = np.asarray(eph["DEC_rate"], dtype=float)
except ConnectionError:
raise
except Exception as e:
raise NotMovingBodyError(
f"'{body_name}' could not be resolved as a solar system or minor body: {e}"
) from e
src/astra/utils.py:562
compute_nonsidereal_rates_from_interp()convertsdtto sidereal seconds usingdt / _SOLAR_TO_SIDEREAL, but_SOLAR_TO_SIDEREALis defined as1 dayinsday(~1.0027379), i.e., sidereal-seconds-per-solar-second. To express rates per sidereal second,dt_in_sidereal_sshould increase relative to solardt(multiply by_SOLAR_TO_SIDEREAL), not decrease. As written, RA/Dec rates will be systematically high by ~0.27%.
# Scale dt from solar seconds to sidereal seconds for correct per-sidereal-second rates.
# One sidereal second is shorter than one solar second.
dt_in_sidereal_s = dt / _SOLAR_TO_SIDEREAL
delta_ra_deg = float(ra_interp(t_seconds + dt)) - float(ra_interp(t_seconds))
delta_dec_deg = float(dec_interp(t_seconds + dt)) - float(dec_interp(t_seconds))
# Convert to ASCOM units (RA: s/s_sidereal, Dec: as/s_sidereal)
ra_rate = (delta_ra_deg * 240.0) / dt_in_sidereal_s
dec_rate = (delta_dec_deg * 3600.0) / dt_in_sidereal_s
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import logging | ||
| import time | ||
| from datetime import datetime | ||
| from pathlib import Path |
| """Persist raw Horizons output locally and mirror the API call input into the logger.""" | ||
| try: | ||
| from astra.config import Config | ||
|
|
||
| horizons_dir = Config().paths.logs / "horizons" | ||
| horizons_dir.mkdir(parents=True, exist_ok=True) | ||
| safe_name = body_name.replace(" ", "_").replace("/", "_") | ||
| output_path = horizons_dir / f"{safe_name}_{context}_{datetime.utcnow().strftime('%Y%m%dT%H%M%S%f')}.ecsv" | ||
| eph.write(output_path, format="ascii.ecsv", overwrite=True) | ||
| logger.warning("Saved raw Horizons output for %s (%s) to %s", body_name, context, output_path) | ||
| except Exception as exc: | ||
| logger.warning("Failed to save raw Horizons output for %s (%s): %s", body_name, context, exc) | ||
|
|
||
| try: | ||
| logger.warning("Horizons API call input for %s (%s): %s", body_name, context, call_input) | ||
| except Exception as exc: | ||
| logger.warning("Failed to log Horizons API call input for %s (%s): %s", body_name, context, exc) |
| elif near and tle is None: | ||
| location = {'lon': obs_location.lon.deg, 'lat': obs_location.lat.deg, 'elevation': obs_location.height.to(u.km).value} | ||
| call_input = {"id": body_name, "location": location, "epochs": obs_time.jd} | ||
| obj = Horizons(**call_input) | ||
| eph = obj.ephemerides() | ||
| _save_and_log_horizons_output(body_name, "get_body_coordinates", eph, call_input) | ||
| return SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=obs_time).transform_to('gcrs')[0] | ||
| elif near: | ||
| location = {'lon': obs_location.lon.deg, 'lat': obs_location.lat.deg, 'elevation': obs_location.height.to(u.km).value} | ||
| call_input = {"id": "TLE", "location": location, "epochs": obs_time.jd, "optional_settings": {"TLE": tle}} | ||
| obj = Horizons(id='TLE', location=location, epochs=obs_time.jd) | ||
| eph = obj.ephemerides(optional_settings={"TLE": tle}) | ||
| _save_and_log_horizons_output(body_name, "get_body_coordinates", eph, call_input) | ||
| return SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=obs_time).transform_to('gcrs')[0] |
There was a problem hiding this comment.
jpl_horizons checks for objects with id_type='smallbody' by default when called and no match is found among major bodies
| duration_hours = (end_time - start_time).to_value("hr") + 0.5 | ||
| try: | ||
| self._ra_interp, self._dec_interp = precompute_ephemeris( | ||
| self.lookup_name, start_time, duration_hours, observatory_location | ||
| ( | ||
| self._ra_interp, | ||
| self._dec_interp, | ||
| self._ra_rate_interp, | ||
| self._dec_rate_interp, | ||
| ) = precompute_ephemeris( | ||
| self.lookup_name, | ||
| start_time, | ||
| duration_hours, | ||
| observatory_location, | ||
| self.nonsidereal_recenter_interval / 60, | ||
| self.tle, | ||
| return_rates=True, | ||
| ) |
| def test_compute_nonsidereal_rates_uses_sidereal_second_conversion(self): | ||
| ra_interp = _make_interp(slope=1.0 / 60.0, intercept=0.0) | ||
| dec_interp = _make_interp(slope=0.0, intercept=0.0) | ||
|
|
||
| ra_rate, dec_rate = compute_nonsidereal_rates_from_interp( | ||
| ra_interp, | ||
| dec_interp, | ||
| t_seconds=0.0, | ||
| dt=60.0, | ||
| ) | ||
|
|
||
| assert ra_rate == pytest.approx(4.010951637, rel=1e-9) | ||
| assert dec_rate == pytest.approx(0.0, abs=1e-12) |
…real target coordinates
…e calculation for accuracy
…e assertion accuracy
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (1)
src/astra/utils.py:628
compute_nonsidereal_rates_from_interpconvertsdtfrom solar seconds to sidereal seconds usingdt / _SOLAR_TO_SIDEREAL, but_SOLAR_TO_SIDEREAL(solar day → sidereal day) is > 1, so this makes the sidereal interval smaller than the solar interval. Since sidereal seconds are shorter, the number of sidereal seconds elapsed overdtsolar seconds should be larger (i.e., multiply by_SOLAR_TO_SIDEREAL), otherwise the returned ASCOM rates will be systematically too high.
# Scale dt from solar seconds to sidereal seconds for correct per-sidereal-second rates.
# One sidereal second is shorter than one solar second.
dt_in_sidereal_s = dt / _SOLAR_TO_SIDEREAL
delta_ra_deg = float(ra_interp(t_seconds + dt)) - float(ra_interp(t_seconds))
delta_dec_deg = float(dec_interp(t_seconds + dt)) - float(dec_interp(t_seconds))
# Convert to ASCOM units (RA: s/s_sidereal, Dec: as/s_sidereal)
ra_rate = (delta_ra_deg * 240.0) / dt_in_sidereal_s
dec_rate = (delta_dec_deg * 3600.0) / dt_in_sidereal_s
| elif near and tle is None: | ||
| location = { | ||
| "lon": obs_location.lon.deg, | ||
| "lat": obs_location.lat.deg, | ||
| "elevation": obs_location.height.to(u.km).value, | ||
| } | ||
| call_input = {"id": body_name, "location": location, "epochs": obs_time.jd} | ||
| obj = Horizons(**call_input) | ||
| eph = obj.ephemerides() | ||
| _save_and_log_horizons_output( | ||
| body_name, "get_body_coordinates", eph, call_input | ||
| ) | ||
| return SkyCoord( | ||
| ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=obs_time | ||
| ).transform_to("gcrs")[0] |
| # Handle TLE data | ||
| if body_name.upper() == "TLE" or tle_data is not None: | ||
| if tle_data is None: | ||
| raise ValueError( | ||
| "tle_data parameter is required when body_name is 'TLE'" | ||
| ) | ||
| call_input = { | ||
| "id": "TLE", | ||
| "location": location, | ||
| "epochs": epochs, | ||
| "optional_settings": {"TLE": tle_data}, | ||
| } | ||
| obj = Horizons(id="TLE", location=location, epochs=epochs) | ||
| eph = obj.ephemerides(optional_settings={"TLE": tle_data}) | ||
| else: | ||
| call_input = {"id": body_name, "location": location, "epochs": epochs} | ||
| obj = Horizons(id=body_name, location=location, epochs=epochs) | ||
| eph = obj.ephemerides() |
| elif tle is None: | ||
| obj = Horizons(id=body_name, location="500", epochs=t.jd) | ||
| eph = obj.ephemerides() | ||
| body = SkyCoord( | ||
| ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=t | ||
| ).transform_to("gcrs") | ||
| latitude = body.dec.deg |
| self.pre_sequence( | ||
| action, | ||
| paired_devices, | ||
| near=nonsidereal.is_active, # not sure I like the 'near' terminology - PPP |
| try: | ||
| obs = OBSERVATORY | ||
| if ( | ||
| obs is None | ||
| or not hasattr(obs, "schedule_manager") | ||
| or obs.schedule_manager is None | ||
| ): | ||
| logger.warning( | ||
| "Schedule request but OBSERVATORY not initialized or has no schedule_manager" | ||
| ) | ||
| return [] |
| if self.nonsidereal_start_lead_time_seconds < 0: | ||
| raise ValueError( | ||
| "nonsidereal_start_lead_time_seconds must be >= 0, " | ||
| f"got {self.nonsidereal_start_lead_time_seconds}" | ||
| ) |
| 1.0, | ||
| self.tle, |
Pull Request
Description
This PR builds upon the non-sidereal framework introduced in #130 to enable direct tracking of Earth satellites via Two-Line Element (TLE) datasets. By extending the ephemeris and coordinate utilities to process TLE formats directly strings, Astra can now dynamically compute and apply the extreme differential tracking rates required for passing satellites in real-time.
Note: Tracking-rate setting in tests will only be functional after the alpaca-simulator is updated per its [#7 PR].
Changes Made
How to Test
Checklist