From 66cbcb198c04ed6d2246b74358c9cfe902056e84 Mon Sep 17 00:00:00 2001 From: SaltyPotatoe Date: Fri, 8 May 2026 18:38:09 +0200 Subject: [PATCH 01/10] feat: jpl horizons and tle support --- pyproject.toml | 4 +- src/astra/action_configs.py | 37 +- src/astra/nonsidereal.py | 49 +- src/astra/observatory.py | 152 +++- src/astra/utils.py | 148 +++- tests/test_nonsidereal.py | 75 ++ tests/test_observatory_running_schedule.py | 837 ++++++++++++++++++++- tests/test_subframe_config.py | 19 + tests/test_utils.py | 151 ++++ uv.lock | 8 +- 10 files changed, 1415 insertions(+), 65 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 924f8b08..62c67fbe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,8 +54,8 @@ dependencies = [ "fastapi[standard]", "pyyaml", "ruamel.yaml", - "cabaret", - "astroquery>=0.4.11" + "cabaret==0.5.1", + "astroquery>=0.4.11", ] description = "Automated Survey observaTory Robotised with Alpaca" license = {text = "GPL-3.0"} diff --git a/src/astra/action_configs.py b/src/astra/action_configs.py index 092f2d60..9279adf4 100644 --- a/src/astra/action_configs.py +++ b/src/astra/action_configs.py @@ -494,9 +494,12 @@ class ObjectActionConfig(BaseActionConfig): ``nonsidereal_recenter_interval``. The interval controls how often the telescope re-slews to the updated ephemeris position (in seconds). Autoguiding is incompatible with non-sidereal tracking and will be disabled. + If using TLE's for Earth-orbiting objects, provide the ``tle`` and use + "TLE" as the ``lookup_name``. - Currently supports astropy built-in bodies (planets, Moon, Sun). - Support for comets and asteroids (via JPL Horizons) is planned. + Currently supports astropy built-in bodies (planets, Moon, Sun) and small + bodies (asteroids, comets) that have ephemerides in JPL Horizons, and + Earth-orbiting objects through TLEs. **Schedule example for tracking Saturn**:: @@ -509,6 +512,7 @@ class ObjectActionConfig(BaseActionConfig): "exptime": 30, "filter": "Clear", "nonsidereal_recenter_interval": 300, + "nonsidereal_start_lead_time_seconds": 60, }, "start_time":"2025-01-01 00:00:00.000", "end_time":"2025-01-01 01:00:00.000", @@ -523,6 +527,7 @@ class ObjectActionConfig(BaseActionConfig): alt: Optional[float] = None az: Optional[float] = None lookup_name: Optional[str] = None + tle: Optional[str] = None filter: Optional[str] = None focus_shift: Optional[float] = None focus_position: Optional[float] = None @@ -539,9 +544,12 @@ class ObjectActionConfig(BaseActionConfig): subframe_center_x: float = 0.5 subframe_center_y: float = 0.5 nonsidereal_recenter_interval: int = 0 + nonsidereal_start_lead_time_seconds: float = 60.0 _nonsidereal: bool = field(default=False, init=False, repr=False) _ra_interp: Any = field(default=None, init=False, repr=False) _dec_interp: Any = field(default=None, init=False, repr=False) + _ra_rate_interp: Any = field(default=None, init=False, repr=False) + _dec_rate_interp: Any = field(default=None, init=False, repr=False) FIELD_DESCRIPTIONS: ClassVar[dict[str, str]] = { "object": "Target name.", @@ -551,12 +559,14 @@ class ObjectActionConfig(BaseActionConfig): "alt": "Altitude coordinate when issuing Alt/Az pointings.", "az": "Azimuth coordinate when issuing Alt/Az pointings.", "lookup_name": "Instead of specifying ra/dec or alt/az, use SIMBAD/Astropy to look up coordinates for celestial body to observe (e.g., 'mars', 'M31').", + "tle": "TLE data for Earth-orbiting objects", "filter": "Filter name to load before imaging.", "focus_shift": "Focus offset relative to the stored best focus.", "focus_position": "Absolute focus position override.", "n": "Number of exposures in the sequence. If not specified, defaults to infinite exposures until end_time.", "guiding": "Start autoguiding with Donuts before imaging. Should be False for solar system objects using non-sidereal tracking, as the star field drifts relative to the guide reference.", "nonsidereal_recenter_interval": "For solar system objects (resolved via lookup_name): re-slew to the updated ephemeris position and refresh tracking rates every N seconds. Set to 0 to disable. Ignored for non-solar-system targets.", + "nonsidereal_start_lead_time_seconds": "For non-sidereal targets: initial lead time in seconds used to pre-point before sequence start. The telescope slews to the predicted target position at this offset, waits until the offset time is reached, then starts imaging and applies tracking rates. Set to 0 to start immediately.", "pointing": "Perform pointing correction with twirl before imaging.", "bin": "Camera binning factor.", "dir": "Base directory path for saving images.", @@ -625,6 +635,12 @@ def validate(self): # Subframe validation self.validate_subframe() + if self.nonsidereal_start_lead_time_seconds < 0: + raise ValueError( + "nonsidereal_start_lead_time_seconds must be >= 0, " + f"got {self.nonsidereal_start_lead_time_seconds}" + ) + def _resolve_lookup_name( self, start_time: Time, @@ -643,14 +659,27 @@ def _resolve_lookup_name( """ duration_hours = (end_time - start_time).to_value("hr") + 0.5 try: - self._ra_interp, self._dec_interp = precompute_ephemeris( - self.lookup_name, start_time, duration_hours, observatory_location + ( + self._ra_interp, + self._dec_interp, + self._ra_rate_interp, + self._dec_rate_interp, + ) = precompute_ephemeris( + self.lookup_name, + start_time, + duration_hours, + observatory_location, + self.nonsidereal_recenter_interval / 60, + self.tle, + return_rates=True, ) self._nonsidereal = True ra = float(self._ra_interp(0.0)) % 360.0 dec = float(self._dec_interp(0.0)) except NotMovingBodyError: self._nonsidereal = False + self._ra_rate_interp = None + self._dec_rate_interp = None target_coord = get_body_coordinates( body_name=self.lookup_name, diff --git a/src/astra/nonsidereal.py b/src/astra/nonsidereal.py index a3c41f26..c66764c4 100644 --- a/src/astra/nonsidereal.py +++ b/src/astra/nonsidereal.py @@ -1,12 +1,13 @@ """Non-sidereal (solar system body) tracking for observatory imaging sequences. This module provides the machinery for tracking solar system objects (comets, -asteroids, planets) whose celestial coordinates change significantly over short -timescales. It uses high-precision cubic interpolation of pre-computed ephemerides -to provide smooth, differential tracking rates to the telescope mount via ASCOM. +asteroids, planets, Earth-orbiting objects) whose celestial coordinates change +significantly over short timescales. It uses high-precision cubic interpolation +of pre-computed ephemerides to provide smooth, differential tracking rates to +the telescope mount via ASCOM. Key Capabilities: - - Ephemeris Pre-computation: Uses Astropy to generate a sequence of + - Ephemeris Pre-computation: Uses Astropy or JPL Horizons to generate a sequence of positions for a given object over the duration of an observation. - Cubic Interpolation: Provides sub-second precision for RA/Dec coordinates without requiring repeated, expensive lookups. @@ -55,6 +56,8 @@ class _NonSiderealState: body_name: str ra_interp: Any dec_interp: Any + ra_rate_interp: Any + dec_rate_interp: Any sequence_start_time: Time recenter_interval: int last_recenter_time: float = field(default_factory=time.time) @@ -99,6 +102,30 @@ def apply_rates(self, telescope: AlpacaDevice) -> None: return self._apply_rates(telescope, self._state) + def prepoint_coordinates(self, lead_time_seconds: float = 60.0) -> tuple[float, float] | None: + """Return RA/Dec for an initial lead-pointing slew. + + Args: + lead_time_seconds: Seconds after sequence start used as the pre-pointing + target. Default is 60 seconds. + + Returns: + Tuple of (ra_deg, dec_deg) or None when non-sidereal tracking is inactive. + """ + if self._state is None: + return None + + state = self._state + ra_deg = float(state.ra_interp(lead_time_seconds)) % 360.0 + dec_deg = float(state.dec_interp(lead_time_seconds)) + return (ra_deg, dec_deg) + + def tracking_activation_time(self, lead_time_seconds: float = 60.0) -> Time | None: + """Return the time when non-sidereal rates and imaging should begin.""" + if self._state is None: + return None + return self._state.sequence_start_time + lead_time_seconds * u.s + def should_recenter(self) -> bool: """Return True if the recenter interval has elapsed.""" if self._state is None or self._state.recenter_interval <= 0: @@ -181,6 +208,8 @@ def _setup(self, action: Action) -> _NonSiderealState | None: lname = action.action_value.get("lookup_name") ra_interp = action.action_value.get("_ra_interp") dec_interp = action.action_value.get("_dec_interp") + ra_rate_interp = action.action_value.get("_ra_rate_interp") + dec_rate_interp = action.action_value.get("_dec_rate_interp") recenter_interval = int( action.action_value.get("nonsidereal_recenter_interval", 0) ) @@ -203,6 +232,8 @@ def _setup(self, action: Action) -> _NonSiderealState | None: body_name=lname, ra_interp=ra_interp, dec_interp=dec_interp, + ra_rate_interp=ra_rate_interp, + dec_rate_interp=dec_rate_interp, sequence_start_time=Time(action.start_time), recenter_interval=recenter_interval, ) @@ -211,9 +242,13 @@ def _apply_rates(self, telescope: AlpacaDevice, state: _NonSiderealState) -> Non """Set ASCOM RightAscensionRate / DeclinationRate from the interpolated ephemeris.""" try: t_seconds = (Time.now() - state.sequence_start_time).to(u.s).value - ra_rate, dec_rate = astra.utils.compute_nonsidereal_rates_from_interp( - state.ra_interp, state.dec_interp, t_seconds - ) + if state.ra_rate_interp is not None and state.dec_rate_interp is not None: + ra_rate = float(state.ra_rate_interp(t_seconds)) + dec_rate = float(state.dec_rate_interp(t_seconds)) + else: + ra_rate, dec_rate = astra.utils.compute_nonsidereal_rates_from_interp( + state.ra_interp, state.dec_interp, t_seconds + ) telescope.set("RightAscensionRate", ra_rate) telescope.set("DeclinationRate", dec_rate) self.logger.info( diff --git a/src/astra/observatory.py b/src/astra/observatory.py index 1460d03e..a85188e6 100644 --- a/src/astra/observatory.py +++ b/src/astra/observatory.py @@ -35,6 +35,7 @@ import logging import math import os +import threading import time from datetime import UTC, datetime, timedelta from pathlib import Path @@ -1370,7 +1371,11 @@ def cool_camera( ) def pre_sequence( - self, action: Action, paired_devices: dict | PairedDevices + self, + action: Action, + paired_devices: dict | PairedDevices, + near: bool = False, + slew_target_radec_deg: tuple[float, float] | None = None, ) -> None: """ Prepare the observatory and metadata for a sequence. @@ -1393,7 +1398,12 @@ def pre_sequence( self.logger.debug(f"Running pre_sequence for {action.summary_string()}") # prepare observatory for sequence - self.setup_observatory(paired_devices, action.action_value) + self.setup_observatory( + paired_devices, + action.action_value, + near=near, + slew_target_radec_deg=slew_target_radec_deg, + ) # Create image handler self.setup_image_handler(action=action, paired_devices=paired_devices) @@ -1460,6 +1470,8 @@ def setup_observatory( paired_devices: PairedDevices | dict, action_value: BaseActionConfig, filter_list_index: int = 0, + near: bool = False, + slew_target_radec_deg: tuple[float, float] | None = None, ) -> None: """ Prepares the observatory for a sequence by performing necessary setup actions. @@ -1517,6 +1529,8 @@ def setup_observatory( body_name=lookup_name, obs_time=now, obs_location=obs_location, + tle=action_value.get("tle"), + near=near ) ra = target_coord.ra.deg # type: ignore @@ -1550,10 +1564,15 @@ def setup_observatory( f"Converted Alt/Az ({alt:.2f}°, {az:.2f}°) to RA/Dec ({ra:.2f}°, {dec:.2f}°)" ) + slew_ra = ra + slew_dec = dec + if slew_target_radec_deg is not None: + slew_ra, slew_dec = slew_target_radec_deg + # Slew to target coordinates, open observatory if needed if ( - (ra is not None) - and (dec is not None) + (slew_ra is not None) + and (slew_dec is not None) and (action_value.get("disable_telescope_movement", False) is False) and self.check_conditions() ): @@ -1576,14 +1595,16 @@ def setup_observatory( # slew to target # Convert RA from degrees to hours (RA in deg / 360 * 24 = RA in hours) - ra_hours = ra / 15.0 # 360 degrees / 24 hours = 15 degrees per hour + ra_hours = ( + slew_ra / 15.0 + ) # 360 degrees / 24 hours = 15 degrees per hour self.logger.info( - f"Slewing Telescope {telescope_name} to RA/Dec {ra:.2f}°/{dec:.2f}°" + f"Slewing Telescope {telescope_name} to RA/Dec {slew_ra:.2f}°/{slew_dec:.2f}°" ) telescope.get( "SlewToCoordinatesAsync", RightAscension=ra_hours, # RA in hours - Declination=dec, # Dec in degrees + Declination=slew_dec, # Dec in degrees ) time.sleep(1) @@ -1959,6 +1980,8 @@ def perform_exposure( maximal_sleep_time=0.1, sequence_counter: int = 0, wcs=None, + nonsidereal_manager: NonSiderealManager | None = None, + telescope: AlpacaDevice | None = None, ) -> tuple[bool, Path | None]: """ Execute a camera exposure and handle image acquisition. @@ -1982,6 +2005,11 @@ def perform_exposure( in a series. Defaults to 0. wcs (WCS, optional): World Coordinate System solution for the image. Defaults to None. + nonsidereal_manager (NonSiderealManager | None, optional): + Active non-sidereal manager used to refresh tracking rates during + long image save operations. Defaults to None. + telescope (AlpacaDevice | None, optional): Telescope device used to + apply non-sidereal rates while saving. Defaults to None. Returns: tuple: A tuple containing: @@ -2057,6 +2085,12 @@ def perform_exposure( exception=e, ) + if ( + nonsidereal_manager is not None + and telescope is not None + and nonsidereal_manager.is_active + ): + nonsidereal_manager.apply_rates(telescope) time.sleep(min(maximal_sleep_time, exptime / 10)) exposure_end_time = time.time() @@ -2076,15 +2110,49 @@ def perform_exposure( image = camera.get("ImageArray") image_info = camera.get("ImageArrayInfo") - filepath = image_handler.save_image( - image=image, - image_info=image_info, - maxadu=maxadu, - device_name=camera.device_name, - exposure_start_datetime=exposure_start_datetime, - wcs=wcs, - sequence_counter=sequence_counter, - ) + if ( + nonsidereal_manager is not None + and telescope is not None + and nonsidereal_manager.is_active + ): + save_result: dict[str, Path] = {} + save_error: dict[str, Exception] = {} + + def _save_image() -> None: + try: + save_result["filepath"] = image_handler.save_image( + image=image, + image_info=image_info, + maxadu=maxadu, + device_name=camera.device_name, + exposure_start_datetime=exposure_start_datetime, + wcs=wcs, + sequence_counter=sequence_counter, + ) + except Exception as e: + save_error["error"] = e + + save_thread = threading.Thread(target=_save_image, daemon=True) + save_thread.start() + + while save_thread.is_alive(): + nonsidereal_manager.apply_rates(telescope) + save_thread.join(timeout=0.1) + + if "error" in save_error: + raise save_error["error"] + + filepath = save_result["filepath"] + else: + filepath = image_handler.save_image( + image=image, + image_info=image_info, + maxadu=maxadu, + device_name=camera.device_name, + exposure_start_datetime=exposure_start_datetime, + wcs=wcs, + sequence_counter=sequence_counter, + ) self.logger.info( f"Image saved as {os.path.basename(filepath)}. " @@ -2224,12 +2292,22 @@ def image_sequence(self, action: Action, paired_devices: PairedDevices) -> None: ) nonsidereal = NonSiderealManager(action, self.logger) + nonsidereal_lead_seconds = float( + action.action_value.get("nonsidereal_start_lead_time_seconds", 60.0) + ) + nonsidereal_prepoint = nonsidereal.prepoint_coordinates( + lead_time_seconds=nonsidereal_lead_seconds + ) + nonsidereal_activation_time = nonsidereal.tracking_activation_time( + lead_time_seconds=nonsidereal_lead_seconds + ) - self.pre_sequence(action, paired_devices) - - # Set initial non-sidereal tracking rates right after the slew completes - if nonsidereal.is_active and "Telescope" in paired_devices: - nonsidereal.apply_rates(paired_devices.telescope) + self.pre_sequence( + action, + paired_devices, + near=nonsidereal.is_active, + slew_target_radec_deg=nonsidereal_prepoint, + ) action_value = action.action_value @@ -2275,6 +2353,32 @@ def image_sequence(self, action: Action, paired_devices: PairedDevices) -> None: last_flip_check_time = 0 sequence_counter = 0 try: + if ( + nonsidereal.is_active + and "Telescope" in paired_devices + and nonsidereal_activation_time is not None + ): + initial_wait_seconds = ( + nonsidereal_activation_time - Time.now() + ).to_value("s") + if initial_wait_seconds > 0: + self.logger.info( + "Non-sidereal pre-point complete. Waiting " + f"{initial_wait_seconds:.1f}s to start imaging and tracking rates." + ) + while Time.now() < nonsidereal_activation_time: + if not self.check_conditions(action): + break + remaining_seconds = ( + nonsidereal_activation_time - Time.now() + ).to_value("s") + if remaining_seconds <= 0: + break + time.sleep(min(1.0, remaining_seconds)) + + if self.check_conditions(action): + nonsidereal.apply_rates(paired_devices.telescope) + for i, exptime in enumerate(exptime_list): if not self.check_conditions(action): break @@ -2327,6 +2431,12 @@ def image_sequence(self, action: Action, paired_devices: PairedDevices) -> None: log_option=log_option, wcs=wcs_solve, sequence_counter=sequence_counter, + nonsidereal_manager=nonsidereal, + telescope=( + paired_devices.telescope + if "Telescope" in paired_devices + else None + ), ) sequence_counter += 1 diff --git a/src/astra/utils.py b/src/astra/utils.py index 84dfbff7..e708b97a 100644 --- a/src/astra/utils.py +++ b/src/astra/utils.py @@ -14,8 +14,10 @@ """ import math +import logging import time from datetime import datetime +from pathlib import Path from typing import Any, Tuple import astropy.units as u @@ -32,11 +34,14 @@ ) from astropy.stats import SigmaClip, sigma_clipped_stats from astropy.time import Time +from astroquery.jplhorizons import Horizons from donuts.image import Image from photutils.background import Background2D, MedianBackground from scipy import ndimage from scipy.interpolate import interp1d +logger = logging.getLogger(__name__) + _SOLAR_SYSTEM_BODIES: frozenset[str] = frozenset(solar_system_ephemeris.bodies) _SOLAR_TO_SIDEREAL = u.Quantity(1, "day").to("sday").value @@ -45,6 +50,28 @@ class NotMovingBodyError(ValueError): """Raised when a lookup_name cannot be resolved as a solar system or minor body.""" +def _save_and_log_horizons_output( + body_name: str, context: str, eph: Any, call_input: dict[str, Any] +) -> None: + """Persist raw Horizons output locally and mirror the API call input into the logger.""" + try: + from astra.config import Config + + horizons_dir = Config().paths.logs / "horizons" + horizons_dir.mkdir(parents=True, exist_ok=True) + safe_name = body_name.replace(" ", "_").replace("/", "_") + output_path = horizons_dir / f"{safe_name}_{context}_{datetime.utcnow().strftime('%Y%m%dT%H%M%S%f')}.ecsv" + eph.write(output_path, format="ascii.ecsv", overwrite=True) + logger.warning("Saved raw Horizons output for %s (%s) to %s", body_name, context, output_path) + except Exception as exc: + logger.warning("Failed to save raw Horizons output for %s (%s): %s", body_name, context, exc) + + try: + logger.warning("Horizons API call input for %s (%s): %s", body_name, context, call_input) + except Exception as exc: + logger.warning("Failed to log Horizons API call input for %s (%s): %s", body_name, context, exc) + + class CustomImageClass(Image): """Enhanced image processing class with background subtraction and cleaning.""" @@ -303,7 +330,7 @@ def clean_image(data: np.ndarray) -> np.ndarray: ## planet or SIMBAD positions def get_body_coordinates( - body_name: str, obs_time: Time, obs_location: EarthLocation + body_name: str, obs_time: Time, obs_location: EarthLocation, tle: str = None, near: bool = False ) -> SkyCoord: """Get the position of a celestial body (Solar System or Deep Sky). @@ -322,6 +349,20 @@ def get_body_coordinates( # solar_system_ephemeris.bodies normally contains lowercase strings if body_name.lower() in _SOLAR_SYSTEM_BODIES: return get_body(body_name, obs_time, obs_location) + elif near and tle is None: + location = {'lon': obs_location.lon.deg, 'lat': obs_location.lat.deg, 'elevation': obs_location.height.to(u.km).value} + call_input = {"id": body_name, "location": location, "epochs": obs_time.jd} + obj = Horizons(**call_input) + eph = obj.ephemerides() + _save_and_log_horizons_output(body_name, "get_body_coordinates", eph, call_input) + return SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=obs_time).transform_to('gcrs')[0] + elif near: + location = {'lon': obs_location.lon.deg, 'lat': obs_location.lat.deg, 'elevation': obs_location.height.to(u.km).value} + call_input = {"id": "TLE", "location": location, "epochs": obs_time.jd, "optional_settings": {"TLE": tle}} + obj = Horizons(id='TLE', location=location, epochs=obs_time.jd) + eph = obj.ephemerides(optional_settings={"TLE": tle}) + _save_and_log_horizons_output(body_name, "get_body_coordinates", eph, call_input) + return SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=obs_time).transform_to('gcrs')[0] # Otherwise, try to resolve as a deep sky object (ICRS) return SkyCoord.from_name(body_name) @@ -341,44 +382,66 @@ def precompute_ephemeris( duration_hours: float, obs_location: EarthLocation, interval_minutes: float = 1.0, -) -> tuple["interp1d", "interp1d"]: + tle_data: str | None = None, + return_rates: bool = False, +) -> tuple["interp1d", "interp1d"] | tuple["interp1d", "interp1d", "interp1d", "interp1d"]: """Pre-compute a moving body's sky positions over a time window. Performs a single vectorised get_body() call and returns cubic interpolation functions keyed on seconds since start_time. Querying the interpolators is orders of magnitude faster than repeated positional lookups at runtime. - Currently supports astropy built-in bodies (planets, Moon, Sun). To add - support for comets/asteroids, extend this function to detect bodies not in - ``_SOLAR_SYSTEM_BODIES`` and delegate to an alternative source (e.g. - ``astroquery.jplhorizons``). + Currently supports astropy built-in bodies (planets, Moon, Sun), minor bodies + via JPL Horizons, and Two-Line Element (TLE) sets for satellites and debris. Args: body_name: Name of the body (e.g. 'mars', 'moon'). Must be present in - astropy's built-in solar system ephemeris. + astropy's built-in solar system ephemeris, resolvable by JPL Horizons, + or 'TLE' if tle_data is provided. start_time: Start of the observation window. duration_hours: Length of the window in hours. obs_location: Observer EarthLocation. interval_minutes: Ephemeris sampling interval (default 1 min). + tle_data: Two-line element (TLE) data as a string with two lines separated + by newline. Required when body_name is 'TLE'. Example format: + "1 25544U 98067A 08264.51782528 -.00002182 00000-0 -11606-4 0 2927\\n + 2 25544 51.6416 247.4627 0006703 130.5360 325.0288 15.72125391563537" Returns: - (ra_interp, dec_interp): Two callables mapping elapsed seconds to degrees. - RA is unwrapped (continuous, not modulo 360) to avoid discontinuities at wrap boundaries. + If return_rates is False (default): + (ra_interp, dec_interp): Two callables mapping elapsed seconds to degrees. + RA is unwrapped (continuous, not modulo 360) to avoid discontinuities at wrap boundaries. + + If return_rates is True: + (ra_interp, dec_interp, ra_rate_interp, dec_rate_interp), where + ra_rate_interp and dec_rate_interp map elapsed seconds to ASCOM tracking + units (RightAscensionRate in s/s_sidereal and DeclinationRate in as/s_sidereal). + + Raises: + NotMovingBodyError: If body cannot be resolved as a solar system body, + minor body, or TLE. + ValueError: If body_name is 'TLE' but tle_data is not provided. Example usage: - Examples -------- Get the position of Mars as observed from Greenwich at the current time: from astropy.coordinates import get_body, EarthLocation, solar_system_ephemeris from astropy.time import Time location = EarthLocation.of_site('greenwich') ra_interp, dec_interp = precompute_ephemeris('mars', Time.now(), 4, location) + + Get position of ISS using TLE data: + tle = "1 25544U 98067A 23001.00000000 .00016717 00000-0 29641-3 0 9991\n2 25544 51.6416 339.8014 0002571 235.7582 1.5976 15.54178122381131" + ra_interp, dec_interp = precompute_ephemeris('TLE', start_time, 4, location, tle_data=tle) """ n_points = int(duration_hours * 60 / interval_minutes) + 1 minutes = np.linspace(0, duration_hours * 60, n_points) times = start_time + u.Quantity(minutes, "min") stop_time = start_time + duration_hours * u.hour + ra_rate_as_per_hour = None + dec_rate_as_per_hour = None + if body_name.lower() in _SOLAR_SYSTEM_BODIES: with solar_system_ephemeris.set("builtin"): bodies = get_body(body_name, times, obs_location) @@ -397,19 +460,32 @@ def precompute_ephemeris( "stop": stop_time.iso, "step": str(n_points - 1), # Horizons returns n+1 rows for n steps } - try: + + # Handle TLE data + if body_name.upper() == "TLE" or tle_data is not None: + if tle_data is None: + raise ValueError( + "tle_data parameter is required when body_name is 'TLE'" + ) + call_input = { + "id": "TLE", + "location": location, + "epochs": epochs, + "optional_settings": {"TLE": tle_data}, + } + obj = Horizons(id='TLE', location=location, epochs=epochs) + eph = obj.ephemerides(optional_settings={"TLE": tle_data}) + else: + call_input = {"id": body_name, "location": location, "epochs": epochs} obj = Horizons(id=body_name, location=location, epochs=epochs) eph = obj.ephemerides() - except ValueError: - obj = Horizons( - id=body_name, - location=location, - epochs=epochs, - id_type="smallbody", - ) - eph = obj.ephemerides() + _save_and_log_horizons_output(body_name, "precompute_ephemeris", eph, call_input) + bodies = SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg) seconds = (Time(eph["datetime_jd"], format="jd") - start_time).to(u.s).value + if "RA_rate" in eph.colnames and "DEC_rate" in eph.colnames: + ra_rate_as_per_hour = np.asarray(eph["RA_rate"], dtype=float) + dec_rate_as_per_hour = np.asarray(eph["DEC_rate"], dtype=float) except ConnectionError: raise except Exception as e: @@ -422,7 +498,34 @@ def precompute_ephemeris( ra_interp = interp1d(seconds, ra_coords, kind="cubic", fill_value="extrapolate") dec_interp = interp1d(seconds, dec_coords, kind="cubic", fill_value="extrapolate") - return ra_interp, dec_interp + + if not return_rates: + return ra_interp, dec_interp + + if ra_rate_as_per_hour is not None and dec_rate_as_per_hour is not None: + # Horizons rates are in arcsec/hour (solar seconds). Convert to ASCOM units. + ra_rates = (ra_rate_as_per_hour / (15.0 * 3600.0)) / _SOLAR_TO_SIDEREAL + dec_rates = (dec_rate_as_per_hour / 3600.0) / _SOLAR_TO_SIDEREAL + else: + # Fallback for astropy bodies: derive rates from sampled sky positions. + ra_rate_deg_per_solar_s = np.gradient(ra_coords, seconds) + dec_rate_deg_per_solar_s = np.gradient(dec_coords, seconds) + ra_rates = (ra_rate_deg_per_solar_s * 240.0) / _SOLAR_TO_SIDEREAL + dec_rates = (dec_rate_deg_per_solar_s * 3600.0) / _SOLAR_TO_SIDEREAL + + ra_rate_interp = interp1d( + seconds, + ra_rates, + kind="linear", + fill_value="extrapolate", + ) + dec_rate_interp = interp1d( + seconds, + dec_rates, + kind="linear", + fill_value="extrapolate", + ) + return ra_interp, dec_interp, ra_rate_interp, dec_rate_interp def compute_nonsidereal_rates_from_interp( @@ -447,8 +550,9 @@ def compute_nonsidereal_rates_from_interp( ra_rate - seconds of time per sidereal second (ASCOM RightAscensionRate) dec_rate - arcseconds per sidereal second (ASCOM DeclinationRate) """ - # Scale dt from solar seconds to sidereal seconds for correct per-sidereal-second rates - dt_in_sidereal_s = dt * _SOLAR_TO_SIDEREAL + # Scale dt from solar seconds to sidereal seconds for correct per-sidereal-second rates. + # One sidereal second is shorter than one solar second. + dt_in_sidereal_s = dt / _SOLAR_TO_SIDEREAL delta_ra_deg = float(ra_interp(t_seconds + dt)) - float(ra_interp(t_seconds)) delta_dec_deg = float(dec_interp(t_seconds + dt)) - float(dec_interp(t_seconds)) diff --git a/tests/test_nonsidereal.py b/tests/test_nonsidereal.py index 21e038bf..83832500 100644 --- a/tests/test_nonsidereal.py +++ b/tests/test_nonsidereal.py @@ -15,9 +15,13 @@ from unittest.mock import MagicMock, patch import numpy as np +import pytest +import astropy.units as u from scipy.interpolate import interp1d +from astropy.time import Time from astra.nonsidereal import NonSiderealManager +from astra.utils import compute_nonsidereal_rates_from_interp from astra.scheduler import Action @@ -34,6 +38,8 @@ def _make_action( disable_telescope_movement=False, ra_interp=None, dec_interp=None, + ra_rate_interp=None, + dec_rate_interp=None, recenter_interval=0, lookup_name="mars", start_time=None, @@ -49,6 +55,8 @@ def _make_action( "disable_telescope_movement": disable_telescope_movement, "_ra_interp": ra_interp, "_dec_interp": dec_interp, + "_ra_rate_interp": ra_rate_interp, + "_dec_rate_interp": dec_rate_interp, "nonsidereal_recenter_interval": recenter_interval, "lookup_name": lookup_name, }.get(key, default) @@ -146,6 +154,73 @@ def test_warns_and_does_not_raise_on_telescope_error(self): mgr.apply_rates(telescope) mgr.logger.warning.assert_called_once() + @patch("astra.utils.compute_nonsidereal_rates_from_interp") + def test_uses_precomputed_rate_interpolators_when_available(self, rate_fn): + action = _make_action( + ra_interp=_make_interp(slope=1e-4, intercept=100.0), + dec_interp=_make_interp(slope=0.0, intercept=20.0), + ra_rate_interp=_make_interp(slope=0.0, intercept=0.123), + dec_rate_interp=_make_interp(slope=0.0, intercept=-4.56), + ) + mgr = NonSiderealManager(action, MagicMock()) + telescope = MagicMock() + + mgr.apply_rates(telescope) + + telescope.set.assert_any_call("RightAscensionRate", 0.123) + telescope.set.assert_any_call("DeclinationRate", -4.56) + rate_fn.assert_not_called() + + +class TestNonsiderealRateHelper: + def test_compute_nonsidereal_rates_uses_sidereal_second_conversion(self): + ra_interp = _make_interp(slope=1.0 / 60.0, intercept=0.0) + dec_interp = _make_interp(slope=0.0, intercept=0.0) + + ra_rate, dec_rate = compute_nonsidereal_rates_from_interp( + ra_interp, + dec_interp, + t_seconds=0.0, + dt=60.0, + ) + + assert ra_rate == pytest.approx(4.010951637, rel=1e-9) + assert dec_rate == pytest.approx(0.0, abs=1e-12) + + +class TestPrepointAndActivation: + def test_prepoint_coordinates_return_offset_position(self): + mgr = _make_active_manager(ra_slope=1e-3, dec_slope=2e-3) + + ra_deg, dec_deg = mgr.prepoint_coordinates(lead_time_seconds=60.0) + + assert ra_deg == pytest.approx((100.0 + 0.001 * 60.0) % 360.0) + assert dec_deg == pytest.approx(20.0 + 0.002 * 60.0) + + def test_prepoint_coordinates_none_when_inactive(self): + action = _make_action(nonsidereal=False) + mgr = NonSiderealManager(action, MagicMock()) + assert mgr.prepoint_coordinates() is None + + def test_tracking_activation_time_is_start_plus_offset(self): + start = datetime(2025, 6, 1, 0, 0, 0, tzinfo=UTC) + action = _make_action( + start_time=start, + ra_interp=_make_interp(slope=1e-4, intercept=100.0), + dec_interp=_make_interp(slope=0.0, intercept=20.0), + ) + mgr = NonSiderealManager(action, MagicMock()) + + activation_time = mgr.tracking_activation_time(lead_time_seconds=60.0) + + assert activation_time is not None + assert activation_time.unix == pytest.approx((Time(start) + 60.0 * u.s).unix) + + def test_tracking_activation_time_none_when_inactive(self): + action = _make_action(nonsidereal=False) + mgr = NonSiderealManager(action, MagicMock()) + assert mgr.tracking_activation_time() is None + class TestResetRates: def test_zeros_both_rates(self): diff --git a/tests/test_observatory_running_schedule.py b/tests/test_observatory_running_schedule.py index f0be1d05..a712b697 100644 --- a/tests/test_observatory_running_schedule.py +++ b/tests/test_observatory_running_schedule.py @@ -6,6 +6,7 @@ import json import logging import time +import threading from contextlib import contextmanager from datetime import UTC, datetime, timedelta @@ -15,6 +16,23 @@ from astra.observatory import Observatory, ObservatoryConfig logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +logger.propagate = False +if not any(getattr(handler, "_astra_test_handler", False) for handler in logger.handlers): + test_handler = logging.StreamHandler() + test_handler.setLevel(logging.INFO) + test_handler.setFormatter(logging.Formatter("%(message)s")) + test_handler._astra_test_handler = True + logger.addHandler(test_handler) +tracking_logger = logging.getLogger(f"{__name__}.tracking") +tracking_logger.setLevel(logging.WARNING) +tracking_logger.propagate = False +if not any(getattr(handler, "_astra_tracking_handler", False) for handler in tracking_logger.handlers): + tracking_handler = logging.StreamHandler() + tracking_handler.setLevel(logging.WARNING) + tracking_handler.setFormatter(logging.Formatter("%(message)s")) + tracking_handler._astra_tracking_handler = True + tracking_logger.addHandler(tracking_handler) def check_simulators_available(server_url="http://localhost:11111"): @@ -126,6 +144,36 @@ def create_schedule_data( "guiding": False, "pointing": False, "nonsidereal_recenter_interval": 10, + "nonsidereal_start_lead_time_seconds": 15 + }, + "duration": 1, + }, + "asteroid": { + "action_type": "object", + "action_value": { + "object": "433 Eros", + "lookup_name": "A898 PA", + "exptime": 1, + "filter": "Clear", + "guiding": False, + "pointing": False, + "nonsidereal_recenter_interval": 100, + "nonsidereal_start_lead_time_seconds": 15, + }, + "duration": 1, + }, + "tle": { + "action_type": "object", + "action_value": { + "object": "ISS", + "lookup_name": "TLE", + "tle": "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209", + "exptime": 1, + "filter": "Clear", + "guiding": False, + "pointing": False, + "nonsidereal_recenter_interval": 100, + "nonsidereal_start_lead_time_seconds": 15, }, "duration": 1, }, @@ -352,18 +400,30 @@ def wait_for_schedule_completion( return final_completed > 0, final_completed, error_free_maintained -def set_location_for_body_visibility(server_url, body_name: str, latitude: float = 0.0): +def set_location_for_body_visibility(server_url, body_name: str, latitude: float = 0.0, tle: str = None): """Set telescope simulator location so that body is near the meridian and above the horizon.""" - from astropy.coordinates import AltAz, EarthLocation, get_body + from astropy.coordinates import AltAz, EarthLocation, get_body, solar_system_ephemeris, SkyCoord from astropy.time import Time + from astroquery.jplhorizons import Horizons + import astropy.units as u t = Time.now() - body = get_body(body_name, t) + if body_name.lower() in solar_system_ephemeris.bodies: + body = get_body(body_name, t) + + elif tle is None: + obj = Horizons(id=body_name, location='500', epochs=t.jd) + eph = obj.ephemerides() + body = SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=t).transform_to('gcrs') + latitude = body.dec.deg + else: + obj = Horizons(id='TLE', location='500', epochs=t.jd) + eph = obj.ephemerides(optional_settings={"TLE": tle}) + body = SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=t).transform_to('gcrs') + latitude = body.dec.deg gmst = t.sidereal_time("mean", "greenwich").deg transit_lon = ((body.ra.deg - gmst + 180) % 360) - 180 - import astropy.units as u - loc = EarthLocation(lat=latitude * u.deg, lon=transit_lon * u.deg, height=0 * u.m) alt = body.transform_to(AltAz(obstime=t, location=loc)).alt.deg assert alt > 0, ( @@ -380,6 +440,12 @@ def set_location_for_body_visibility(server_url, body_name: str, latitude: float ) assert r.status_code == 200, "Failed to set observatory longitude." + r = requests.put( + f"{server_url}/api/v1/telescope/0/siteelevation", + data={"SiteElevation": 1000.0}, + ) + assert r.status_code == 200, "Failed to set observatory elevation." + def set_safety_monitor_safe(server_url): """Set the safety monitor to safe.""" @@ -391,6 +457,274 @@ def set_safety_monitor_safe(server_url): assert False, "Failed to set safety monitor to safe." +def _wait_for_schedule_running(observatory: Observatory, timeout_s: float = 60.0) -> None: + """Wait until scheduler thread reports a running state.""" + t0 = time.time() + while time.time() - t0 < timeout_s: + if observatory.schedule_manager.running: + return + time.sleep(0.5) + raise TimeoutError("Schedule did not enter running state in time.") + + +def _get_telescope_radec(server_url: str) -> tuple[float, float]: + """Return telescope (RA deg, Dec deg) from Alpaca endpoints.""" + ra_hours = ( + requests.get(f"{server_url}/api/v1/telescope/0/rightascension", timeout=5) + .json() + .get("Value") + ) + dec_deg = ( + requests.get(f"{server_url}/api/v1/telescope/0/declination", timeout=5) + .json() + .get("Value") + ) + return float(ra_hours) * 15.0, float(dec_deg) + + +def _assert_telescope_position_matches_expected( + server_url: str, + expected_ra_deg: float, + expected_dec_deg: float, + *, + max_separation_deg: float = 0.1, +) -> None: + """Assert the telescope is pointed near the expected RA/Dec.""" + import astropy.units as u + from astropy.coordinates import SkyCoord + + actual_ra_deg, actual_dec_deg = _get_telescope_radec(server_url) + actual_coord = SkyCoord(ra=actual_ra_deg * u.deg, dec=actual_dec_deg * u.deg) + expected_coord = SkyCoord( + ra=expected_ra_deg * u.deg, + dec=expected_dec_deg * u.deg, + ) + separation_deg = actual_coord.separation(expected_coord).deg + + assert separation_deg <= max_separation_deg, ( + "Telescope was not pre-pointed to the expected non-sidereal position: " + f"actual=({actual_ra_deg:.6f}, {actual_dec_deg:.6f}) deg, " + f"expected=({expected_ra_deg:.6f}, {expected_dec_deg:.6f}) deg, " + f"separation={separation_deg:.4f} deg" + ) + + +def _get_site_location(server_url: str): + """Build EarthLocation from simulator site coordinates.""" + import astropy.units as u + from astropy.coordinates import EarthLocation + + lat = ( + requests.get(f"{server_url}/api/v1/telescope/0/sitelatitude", timeout=5) + .json() + .get("Value") + ) + lon = ( + requests.get(f"{server_url}/api/v1/telescope/0/sitelongitude", timeout=5) + .json() + .get("Value") + ) + elevation = ( + requests.get(f"{server_url}/api/v1/telescope/0/siteelevation", timeout=5) + .json() + .get("Value") + ) + return EarthLocation( + lat=float(lat) * u.deg, + lon=float(lon) * u.deg, + height=float(elevation) * u.m, + ) + + +def _precompute_tracking_path( + body_name: str, + schedule_data: dict, + obs_location, + sample_interval_s: float, + *, + tle: str | None = None, +): + """Precompute ephemeris interpolation functions for tracking validation. + + Mirrors schedule-side setup so the predicted path is directly comparable to + the interpolators prepared during schedule loading. + """ + from astropy.time import Time + + from astra.utils import precompute_ephemeris + + schedule_start = datetime.fromisoformat(schedule_data["start_time"]) + schedule_end = datetime.fromisoformat(schedule_data["end_time"]) + if schedule_start.tzinfo is None: + schedule_start = schedule_start.replace(tzinfo=UTC) + if schedule_end.tzinfo is None: + schedule_end = schedule_end.replace(tzinfo=UTC) + + # Schedule validation precomputes from action start through action end + 0.5 h. + duration_hours = (schedule_end - schedule_start).total_seconds() / 3600.0 + 0.5 + + recenter_interval_s = float( + schedule_data.get("action_value", {}).get("nonsidereal_recenter_interval", 0) + ) + if recenter_interval_s > 0: + interval_minutes = recenter_interval_s / 60.0 + else: + interval_minutes = max(sample_interval_s / 60.0, 0.01) + + ephem_start = Time(schedule_start) + logger.info( + "Precomputing tracking path: ephem_start=%s, duration_hours=%.3f, interval_minutes=%.3f", + ephem_start.isot, + duration_hours, + interval_minutes, + ) + ra_interp, dec_interp, ra_rate_interp, dec_rate_interp = precompute_ephemeris( + body_name=body_name, + start_time=ephem_start, + duration_hours=duration_hours, + obs_location=obs_location, + interval_minutes=interval_minutes, + tle_data=tle, + return_rates=True, + ) + return ephem_start, ra_interp, dec_interp, ra_rate_interp, dec_rate_interp + + +def _get_tracking_activation_time(schedule_data: dict) -> datetime: + """Return when non-sidereal tracking rates should become active.""" + lead_time_seconds = float( + schedule_data["action_value"].get("nonsidereal_start_lead_time_seconds", 60.0) + ) + start_time = datetime.fromisoformat(schedule_data["start_time"]) + if start_time.tzinfo is None: + start_time = start_time.replace(tzinfo=UTC) + return start_time + timedelta(seconds=lead_time_seconds) + + +def _wait_until_datetime(target_time: datetime, timeout_s: float = 120.0) -> None: + """Wait until the requested UTC timestamp is reached.""" + if target_time.tzinfo is None: + target_time = target_time.replace(tzinfo=UTC) + deadline = time.time() + timeout_s + while True: + remaining_s = (target_time - datetime.now(UTC)).total_seconds() + if remaining_s <= 0: + return + if time.time() > deadline: + raise TimeoutError(f"Timed out waiting for {target_time.isoformat()}.") + time.sleep(min(1.0, remaining_s)) + + +def _max_sample_count_for_interval( + schedule_data: dict, + sample_interval_s: float, + *, + start_time: datetime | None = None, + safety_margin_s: float = 5.0, +) -> int: + """Return maximum feasible sample count for the schedule and interval.""" + end_time = datetime.fromisoformat(schedule_data["end_time"]) + if start_time is None: + start_time = datetime.now(UTC) + elif start_time.tzinfo is None: + start_time = start_time.replace(tzinfo=UTC) + remaining_s = (end_time - start_time).total_seconds() - safety_margin_s + if remaining_s <= 0: + return 1 + return max(1, int(remaining_s // sample_interval_s) + 1) + + +def _tracking_direction(rate: float, *, tolerance: float = 1e-6) -> str: + """Translate a numeric rate into a readable direction label.""" + if rate > tolerance: + return "positive" + if rate < -tolerance: + return "negative" + return "stationary" + + +def _sample_tracking_rate_alignment( + server_url: str, + ephem_start, + ra_interp, + dec_interp, + ra_rate_interp, + dec_rate_interp, + sample_count: int, + sample_interval_s: float, + end_time: datetime | None = None, +) -> list[dict[str, float | str]]: + """Sample actual and predicted rate directions over the tracking window.""" + import astropy.units as u + from astropy.coordinates import SkyCoord + from astropy.time import Time + + samples: list[dict[str, float | str]] = [] + for i in range(sample_count): + if end_time is not None: + if end_time.tzinfo is None: + end_time = end_time.replace(tzinfo=UTC) + if datetime.now(UTC) >= end_time: + break + if i > 0: + if end_time is not None: + remaining_s = (end_time - datetime.now(UTC)).total_seconds() + if remaining_s <= 0: + break + time.sleep(min(sample_interval_s, remaining_s)) + else: + time.sleep(sample_interval_s) + + obs_time = Time.now() + elapsed_s = (obs_time - ephem_start).to_value("s") + expected_ra_deg = float(ra_interp(elapsed_s)) % 360.0 + expected_dec_deg = float(dec_interp(elapsed_s)) + expected_ra_rate = float(ra_rate_interp(elapsed_s)) + expected_dec_rate = float(dec_rate_interp(elapsed_s)) + exp_coord = SkyCoord(ra=expected_ra_deg * u.deg, dec=expected_dec_deg * u.deg) + + ra_deg, dec_deg = _get_telescope_radec(server_url) + tel_coord = SkyCoord(ra=ra_deg * u.deg, dec=dec_deg * u.deg) + + tracking_logger.warning( + "Tracking comparison t+%.1fs: actual RA=%.6f deg, Dec=%.6f deg; " + "predicted RA=%.6f deg, Dec=%.6f deg", + float(elapsed_s), + ra_deg, + dec_deg, + expected_ra_deg, + expected_dec_deg, + ) + + actual_ra_rate = float( + requests.get(f"{server_url}/api/v1/telescope/0/rightascensionrate", timeout=5) + .json() + .get("Value", 0.0) + ) + actual_dec_rate = float( + requests.get(f"{server_url}/api/v1/telescope/0/declinationrate", timeout=5) + .json() + .get("Value", 0.0) + ) + separation = tel_coord.separation(exp_coord).deg + + samples.append( + { + "elapsed_s": float(elapsed_s), + "expected_ra_rate": expected_ra_rate, + "expected_dec_rate": expected_dec_rate, + "actual_ra_rate": actual_ra_rate, + "actual_dec_rate": actual_dec_rate, + "separation_deg": float(separation), + "expected_ra_direction": _tracking_direction(expected_ra_rate), + "expected_dec_direction": _tracking_direction(expected_dec_rate), + "actual_ra_direction": _tracking_direction(actual_ra_rate), + "actual_dec_direction": _tracking_direction(actual_dec_rate), + } + ) + + return samples + def prepare_flats(server_url, sunset=True): """Prepare flats by setting sunlight conditions and placing observatory where the sun is setting or rising.""" @@ -481,6 +815,7 @@ def get_sun_terminator_longitude( @pytest.mark.slow +@pytest.mark.network class TestScheduleActionTypes: """Test each schedule action type individually.""" @@ -704,6 +1039,498 @@ def test_nonsidereal_object_action( f"DeclinationRate was not reset after sequence: {dec_rate}" ) + def test_nonsidereal_object_action_with_weather_alert( + self, observatory, schedule_manager, server_url, temp_config + ): + """Non-sidereal object tracking must stop safely during weather alert. + + Verifies that when safety monitor switches to unsafe mid-sequence, the + schedule is interrupted and mount tracking/rates are stopped/reset. + """ + set_safety_monitor_safe(server_url) + set_location_for_body_visibility(server_url, "mars") + schedule_data = create_schedule_data( + "nonsidereal_object", + temp_config, + inject_weather_alert=True, + inject_weather_alert_delay=30, + ) + + with schedule_manager(schedule_data): + success, completed, error_free_maintained = wait_for_schedule_completion( + observatory, schedule_data, server_url, temp_config + ) + + assert error_free_maintained, ( + "error_free became False during nonsidereal weather alert test. " + f"Error sources: {observatory.logger.error_source}" + ) + assert success, ( + "nonsidereal_object action did not complete successfully under " + f"weather alert. Error sources: {observatory.logger.error_source}" + ) + assert completed > 0, "No actions were completed" + + tracking = ( + requests.get(f"{server_url}/api/v1/telescope/0/tracking") + .json() + .get("Value") + ) + ra_rate = ( + requests.get(f"{server_url}/api/v1/telescope/0/rightascensionrate") + .json() + .get("Value") + ) + dec_rate = ( + requests.get(f"{server_url}/api/v1/telescope/0/declinationrate") + .json() + .get("Value") + ) + + assert tracking is False, "Tracking should be stopped after weather alert" + assert ra_rate == 0.0, ( + f"RightAscensionRate was not reset after weather alert: {ra_rate}" + ) + assert dec_rate == 0.0, ( + f"DeclinationRate was not reset after weather alert: {dec_rate}" + ) + + def test_jpl_horizons_object_action( + self, observatory, schedule_manager, server_url, temp_config + ): + """Test object action with non-sidereal tracking via JPL Horizons api. + + Verifies that a sequence using lookup_name='mars' completes without errors, + takes at least one image, and that tracking rates are reset on teardown. + """ + set_safety_monitor_safe(server_url) + set_location_for_body_visibility(server_url, "A898 PA") + schedule_data = create_schedule_data("asteroid", temp_config) + + with schedule_manager(schedule_data): + success, completed, error_free_maintained = wait_for_schedule_completion( + observatory, schedule_data, server_url, temp_config + ) + + assert error_free_maintained, ( + "error_free became False during nonsidereal_object action. " + f"Error sources: {observatory.logger.error_source}" + ) + assert success, ( + "nonsidereal_object action did not complete successfully. " + f"Error sources: {observatory.logger.error_source}" + ) + assert completed > 0, "No actions were completed" + + # Verify tracking rates were reset via the simulator endpoint + ra_rate = ( + requests.get(f"{server_url}/api/v1/telescope/0/rightascensionrate") + .json() + .get("Value") + ) + dec_rate = ( + requests.get(f"{server_url}/api/v1/telescope/0/declinationrate") + .json() + .get("Value") + ) + assert ra_rate == 0.0, ( + f"RightAscensionRate was not reset after sequence: {ra_rate}" + ) + assert dec_rate == 0.0, ( + f"DeclinationRate was not reset after sequence: {dec_rate}" + ) + def test_tle_object_action( + self, observatory, schedule_manager, server_url, temp_config + ): + """Test object action with non-sidereal tracking via TLE. + + Verifies that a sequence using lookup_name='tle' completes without errors, + takes at least one image, and that tracking rates are reset on teardown. + """ + set_safety_monitor_safe(server_url) + set_location_for_body_visibility(server_url, "tle", tle="1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209") + schedule_data = create_schedule_data("tle", temp_config) + + with schedule_manager(schedule_data): + success, completed, error_free_maintained = wait_for_schedule_completion( + observatory, schedule_data, server_url, temp_config + ) + + assert error_free_maintained, ( + "error_free became False during nonsidereal_object action. " + f"Error sources: {observatory.logger.error_source}" + ) + assert success, ( + "nonsidereal_object action did not complete successfully. " + f"Error sources: {observatory.logger.error_source}" + ) + assert completed > 0, "No actions were completed" + + # Verify tracking rates were reset via the simulator endpoint + ra_rate = ( + requests.get(f"{server_url}/api/v1/telescope/0/rightascensionrate") + .json() + .get("Value") + ) + dec_rate = ( + requests.get(f"{server_url}/api/v1/telescope/0/declinationrate") + .json() + .get("Value") + ) + assert ra_rate == 0.0, ( + f"RightAscensionRate was not reset after sequence: {ra_rate}" + ) + assert dec_rate == 0.0, ( + f"DeclinationRate was not reset after sequence: {dec_rate}" + ) + + def test_tle_object_action_with_weather_alert( + self, observatory, schedule_manager, server_url, temp_config + ): + """TLE non-sidereal tracking must stop safely during weather alert.""" + tle = ( + "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n" + "2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209" + ) + set_safety_monitor_safe(server_url) + set_location_for_body_visibility(server_url, "tle", tle=tle) + schedule_data = create_schedule_data( + "tle", + temp_config, + inject_weather_alert=True, + inject_weather_alert_delay=30, + ) + + with schedule_manager(schedule_data): + success, completed, error_free_maintained = wait_for_schedule_completion( + observatory, schedule_data, server_url, temp_config + ) + + assert error_free_maintained, ( + "error_free became False during TLE weather alert test. " + f"Error sources: {observatory.logger.error_source}" + ) + assert success, ( + "tle action did not complete successfully under weather alert. " + f"Error sources: {observatory.logger.error_source}" + ) + assert completed > 0, "No actions were completed" + + tracking = ( + requests.get(f"{server_url}/api/v1/telescope/0/tracking") + .json() + .get("Value") + ) + ra_rate = ( + requests.get(f"{server_url}/api/v1/telescope/0/rightascensionrate") + .json() + .get("Value") + ) + dec_rate = ( + requests.get(f"{server_url}/api/v1/telescope/0/declinationrate") + .json() + .get("Value") + ) + + assert tracking is False, "Tracking should be stopped after weather alert" + assert ra_rate == 0.0, ( + f"RightAscensionRate was not reset after weather alert: {ra_rate}" + ) + assert dec_rate == 0.0, ( + f"DeclinationRate was not reset after weather alert: {dec_rate}" + ) + + def test_precompute_tracking_path_matches_schedule_interpolators( + self, observatory, schedule_manager, server_url, temp_config + ): + """Helper precomputed path should closely match schedule-generated ephemeris.""" + tle = ( + "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n" + "2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209" + ) + set_safety_monitor_safe(server_url) + set_location_for_body_visibility(server_url, "TLE", tle=tle) + obs_location = _get_site_location(server_url) + schedule_data = create_schedule_data("tle", temp_config) + start_time = datetime.fromisoformat(schedule_data["start_time"]) + schedule_data["end_time"] = (start_time + timedelta(minutes=10)).isoformat() + schedule_data["_duration"] = 10 + + lead_time_s = float( + schedule_data["action_value"].get("nonsidereal_start_lead_time_seconds", 60.0) + ) + recenter_interval_s = float( + schedule_data["action_value"].get("nonsidereal_recenter_interval", 60.0) + ) + action_duration_s = ( + datetime.fromisoformat(schedule_data["end_time"]) + - datetime.fromisoformat(schedule_data["start_time"]) + ).total_seconds() + + with schedule_manager(schedule_data): + schedule = observatory.schedule_manager.read() + assert schedule is not None and len(schedule) > 0, "Failed to load schedule." + action = schedule[0] + + schedule_ra_interp = action.action_value.get("_ra_interp") + schedule_dec_interp = action.action_value.get("_dec_interp") + schedule_ra_rate_interp = action.action_value.get("_ra_rate_interp") + schedule_dec_rate_interp = action.action_value.get("_dec_rate_interp") + + assert schedule_ra_interp is not None, "Schedule RA interpolator missing." + assert schedule_dec_interp is not None, "Schedule Dec interpolator missing." + assert schedule_ra_rate_interp is not None, "Schedule RA rate interpolator missing." + assert schedule_dec_rate_interp is not None, "Schedule Dec rate interpolator missing." + + _, helper_ra_interp, helper_dec_interp, helper_ra_rate_interp, helper_dec_rate_interp = _precompute_tracking_path( + body_name="TLE", + schedule_data=schedule_data, + obs_location=obs_location, + sample_interval_s=recenter_interval_s, + tle=tle, + ) + + max_helper_time_s = max(0.0, action_duration_s - lead_time_s) + sample_times_s = [ + 0.0, + max_helper_time_s * 0.25, + max_helper_time_s * 0.5, + max_helper_time_s * 0.75, + max_helper_time_s, + ] + + def angular_sep_deg(a_deg: float, b_deg: float) -> float: + return abs((a_deg - b_deg + 180.0) % 360.0 - 180.0) + + for t_s in sample_times_s: + + helper_ra = float(helper_ra_interp(t_s)) % 360.0 + helper_dec = float(helper_dec_interp(t_s)) + helper_ra_rate = float(helper_ra_rate_interp(t_s)) + helper_dec_rate = float(helper_dec_rate_interp(t_s)) + + schedule_ra = float(schedule_ra_interp(t_s)) % 360.0 + schedule_dec = float(schedule_dec_interp(t_s)) + schedule_ra_rate = float(schedule_ra_rate_interp(t_s)) + schedule_dec_rate = float(schedule_dec_rate_interp( t_s)) + + assert angular_sep_deg(helper_ra, schedule_ra) < 0.2, ( + f"RA interpolator mismatch at t={t_s:.1f}s: " + f"helper={helper_ra:.6f}, schedule={schedule_ra:.6f}" + ) + assert abs(helper_dec - schedule_dec) < 0.2, ( + f"Dec interpolator mismatch at t={t_s:.1f}s: " + f"helper={helper_dec:.6f}, schedule={schedule_dec:.6f}" + ) + assert abs(helper_ra_rate - schedule_ra_rate) < 50.0, ( + f"RA rate interpolator mismatch at t={t_s:.1f}s: " + f"helper={helper_ra_rate:.6f}, schedule={schedule_ra_rate:.6f}" + ) + assert abs(helper_dec_rate - schedule_dec_rate) < 200.0, ( + f"Dec rate interpolator mismatch at t={t_s:.1f}s: " + f"helper={helper_dec_rate:.6f}, schedule={schedule_dec_rate:.6f}" + ) + + def test_jpl_horizons_rates_keep_pointing_on_target_over_time( + self, observatory, schedule_manager, server_url, temp_config + ): + """Mount pointing should remain close to JPL-Horizons target at multiple times.""" + set_safety_monitor_safe(server_url) + body_name = "A898 PA" + set_location_for_body_visibility(server_url, body_name) + obs_location = _get_site_location(server_url) + schedule_data = create_schedule_data("asteroid", temp_config) + sample_interval_s = 2.0 + end_time = datetime.fromisoformat(schedule_data["end_time"]) + if end_time.tzinfo is None: + end_time = end_time.replace(tzinfo=UTC) + + with schedule_manager(schedule_data): + ephem_start, ra_interp, dec_interp, ra_rate_interp, dec_rate_interp = _precompute_tracking_path( + body_name=body_name, + schedule_data=schedule_data, + obs_location=obs_location, + sample_interval_s=sample_interval_s, + ) + observatory.start_schedule() + _wait_for_schedule_running(observatory) + _wait_until_datetime(_get_tracking_activation_time(schedule_data)) + sample_count = _max_sample_count_for_interval( + schedule_data, + sample_interval_s, + start_time=datetime.now(UTC), + ) + + samples = _sample_tracking_rate_alignment( + server_url=server_url, + ephem_start=ephem_start, + ra_interp=ra_interp, + dec_interp=dec_interp, + ra_rate_interp=ra_rate_interp, + dec_rate_interp=dec_rate_interp, + sample_count=sample_count, + sample_interval_s=sample_interval_s, + end_time=end_time, + ) + + observatory.schedule_manager.stop_schedule( + thread_manager=observatory.thread_manager + ) + + assert samples, "No tracking-rate samples were collected." + assert max(sample["separation_deg"] for sample in samples) < 0.5, ( + "JPL Horizons non-sidereal tracking drifted too far from expected " + f"target coordinates. Separations (deg): {[sample['separation_deg'] for sample in samples]}" + ) + + def test_tle_rates_keep_pointing_on_target_over_time( + self, observatory, schedule_manager, server_url, temp_config + ): + """Mount pointing should remain close to TLE target at multiple times.""" + tle = ( + "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209" + ) + set_safety_monitor_safe(server_url) + set_location_for_body_visibility(server_url, "TLE", tle=tle) + obs_location = _get_site_location(server_url) + schedule_data = create_schedule_data("tle", temp_config) + sample_interval_s = 2.0 + end_time = datetime.fromisoformat(schedule_data["end_time"]) + if end_time.tzinfo is None: + end_time = end_time.replace(tzinfo=UTC) + + with schedule_manager(schedule_data): + ephem_start, ra_interp, dec_interp, ra_rate_interp, dec_rate_interp = _precompute_tracking_path( + body_name="TLE", + schedule_data=schedule_data, + obs_location=obs_location, + sample_interval_s=sample_interval_s, + tle=tle, + ) + observatory.start_schedule() + _wait_for_schedule_running(observatory) + _wait_until_datetime(_get_tracking_activation_time(schedule_data)) + sample_count = _max_sample_count_for_interval( + schedule_data, + sample_interval_s, + start_time=datetime.now(UTC), + ) + + samples = _sample_tracking_rate_alignment( + server_url=server_url, + ephem_start=ephem_start, + ra_interp=ra_interp, + dec_interp=dec_interp, + ra_rate_interp=ra_rate_interp, + dec_rate_interp=dec_rate_interp, + sample_count=sample_count, + sample_interval_s=sample_interval_s, + end_time=end_time, + ) + + observatory.schedule_manager.stop_schedule( + thread_manager=observatory.thread_manager + ) + + assert samples, "No tracking-rate samples were collected." + assert max(sample["separation_deg"] for sample in samples) < 0.5, ( + "TLE non-sidereal tracking drifted too far from expected target " + f"coordinates. Separations (deg): {[sample['separation_deg'] for sample in samples]}" + ) + + def test_tle_prepointed_before_tracking_rates_apply( + self, observatory, schedule_manager, server_url, temp_config, monkeypatch + ): + """The telescope should be on the pre-point target before rates turn on.""" + tle = ( + "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209" + ) + set_safety_monitor_safe(server_url) + set_location_for_body_visibility(server_url, "TLE", tle=tle) + obs_location = _get_site_location(server_url) + schedule_data = create_schedule_data("tle", temp_config) + + from astra.nonsidereal import NonSiderealManager + + apply_rates_called = threading.Event() + rate_assertion_errors: list[str] = [] + captured_prepoint: dict[str, float] = {} + original_apply_rates = NonSiderealManager.apply_rates + original_prepoint_coordinates = NonSiderealManager.prepoint_coordinates + + def wrapped_prepoint_coordinates(self, lead_time_seconds: float = 60.0): + result = original_prepoint_coordinates(self, lead_time_seconds=lead_time_seconds) + if result is not None: + captured_prepoint["ra_deg"] = float(result[0]) + captured_prepoint["dec_deg"] = float(result[1]) + return result + + def wrapped_apply_rates(self, telescope): + try: + assert "ra_deg" in captured_prepoint and "dec_deg" in captured_prepoint, ( + "Pre-point coordinates were not captured before rates were applied." + ) + + _assert_telescope_position_matches_expected( + server_url, + captured_prepoint["ra_deg"], + captured_prepoint["dec_deg"], + max_separation_deg=0.5, + ) + + ra_rate_before = float( + requests.get( + f"{server_url}/api/v1/telescope/0/rightascensionrate", + timeout=5, + ) + .json() + .get("Value", 0.0) + ) + dec_rate_before = float( + requests.get( + f"{server_url}/api/v1/telescope/0/declinationrate", + timeout=5, + ) + .json() + .get("Value", 0.0) + ) + assert ra_rate_before == 0.0, ( + f"RightAscensionRate should still be zero before activation: {ra_rate_before}" + ) + assert dec_rate_before == 0.0, ( + f"DeclinationRate should still be zero before activation: {dec_rate_before}" + ) + + original_apply_rates(self, telescope) + + ra_rate_after = float(telescope.get("RightAscensionRate")) + dec_rate_after = float(telescope.get("DeclinationRate")) + assert abs(ra_rate_after) > 0.0 or abs(dec_rate_after) > 0.0, ( + "Non-sidereal rates were not applied after the pre-point slew." + ) + except Exception as exc: + rate_assertion_errors.append(str(exc)) + raise + finally: + apply_rates_called.set() + + monkeypatch.setattr(NonSiderealManager, "prepoint_coordinates", wrapped_prepoint_coordinates) + monkeypatch.setattr(NonSiderealManager, "apply_rates", wrapped_apply_rates) + + with schedule_manager(schedule_data): + observatory.start_schedule() + _wait_for_schedule_running(observatory) + + assert apply_rates_called.wait(timeout=120), ( + "Non-sidereal rates were never applied during the schedule." + ) + if rate_assertion_errors: + pytest.fail(rate_assertion_errors[0]) + + observatory.schedule_manager.stop_schedule( + thread_manager=observatory.thread_manager + ) + def test_autofocus_action( self, observatory, schedule_manager, server_url, temp_config ): diff --git a/tests/test_subframe_config.py b/tests/test_subframe_config.py index a2b3d600..6bbcd550 100644 --- a/tests/test_subframe_config.py +++ b/tests/test_subframe_config.py @@ -134,6 +134,25 @@ def test_subframe_only_height_specified_raises(self): subframe_height=100, ) + def test_nonsidereal_start_lead_time_defaults_to_60_seconds(self): + """Test default lead wait used for non-sidereal pre-pointing.""" + config = ObjectActionConfig(object="M31", exptime=300.0, ra=10.68, dec=41.27) + assert config.nonsidereal_start_lead_time_seconds == 60.0 + + def test_nonsidereal_start_lead_time_negative_raises(self): + """Negative lead wait is invalid.""" + with pytest.raises( + ValueError, + match="nonsidereal_start_lead_time_seconds must be >= 0", + ): + ObjectActionConfig( + object="M31", + exptime=300.0, + ra=10.68, + dec=41.27, + nonsidereal_start_lead_time_seconds=-1, + ) + class TestSubframeInAllImagingConfigs: """Test that subframe works across all imaging action configs.""" diff --git a/tests/test_utils.py b/tests/test_utils.py index 54f2de05..d09c9efc 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,9 +6,11 @@ import pandas as pd import pytest from astropy.coordinates import EarthLocation, SkyCoord +from astropy.table import Table from astropy.time import Time, TimeDelta from astra.utils import ( + NotMovingBodyError, __to_format, compute_nonsidereal_rates_from_interp, get_body_coordinates, @@ -357,6 +359,47 @@ def test_precompute_ephemeris_returns_callables(location): assert callable(dec_interp) +def test_precompute_ephemeris_return_rates_from_horizons(location, monkeypatch): + obs_time = Time("2025-06-01T00:00:00", format="isot", scale="utc") + + class FakeHorizons: + def __init__(self, id, location, epochs): + self.id = id + + def ephemerides(self, optional_settings=None): + return Table( + { + "RA": [10.0, 10.1, 10.2, 10.3], + "DEC": [20.0, 20.1, 20.2, 20.3], + "datetime_jd": [ + obs_time.jd, + obs_time.jd + 1.0 / 24.0, + obs_time.jd + 2.0 / 24.0, + obs_time.jd + 3.0 / 24.0, + ], + "RA_rate": [5400.0, 5400.0, 5400.0, 5400.0], + "DEC_rate": [7200.0, 7200.0, 7200.0, 7200.0], + } + ) + + monkeypatch.setattr("astroquery.jplhorizons.Horizons", FakeHorizons) + + ra_interp, dec_interp, ra_rate_interp, dec_rate_interp = precompute_ephemeris( + "90000033", + obs_time, + 3.0, + location, + return_rates=True, + ) + + assert callable(ra_interp) + assert callable(dec_interp) + assert callable(ra_rate_interp) + assert callable(dec_rate_interp) + assert float(ra_rate_interp(0.0)) == pytest.approx(0.0997, abs=1e-3) + assert float(dec_rate_interp(0.0)) == pytest.approx(1.9945, abs=1e-3) + + def test_precompute_ephemeris_ra_dec_ranges(location): obs_time = Time("2025-06-01T00:00:00", format="isot", scale="utc") ra_interp, dec_interp = precompute_ephemeris("mars", obs_time, 1.0, location) @@ -424,3 +467,111 @@ def test_precompute_ephemeris_minor_body(location): # Adjacent-second values should be smooth (no discontinuity) assert abs(float(ra_interp(1)) - float(ra_interp(0))) < 0.01 assert abs(float(dec_interp(1)) - float(dec_interp(0))) < 0.01 + +@pytest.mark.network +def test_precompute_ephemeris_tle(location): + """precompute_ephemeris should resolve a TLE via astroquery.jplhorizons.""" + obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") + tle_data = "1 25544U 98067A 26084.45430866 .00012951 00000-0 24673-3 0 9999\n2 25544 51.6344 354.4276 0006215 231.1671 128.8763 15.48531543558777" + ra_interp, dec_interp = precompute_ephemeris( + "TLE", obs_time, duration_hours=1.0, obs_location=location, tle_data=tle_data) + assert callable(ra_interp) + assert callable(dec_interp) + + # Spot-check a few points across the window + for t in [0, 1800, 3600]: + ra = float(ra_interp(t)) + dec = float(dec_interp(t)) + assert -360.0 <= ra <= 360.0, f"RA out of range at t={t}: {ra}" + assert -90.0 <= dec <= 90.0, f"Dec out of range at t={t}: {dec}" + +@pytest.mark.network +def test_precompute_ephemeris_tle_missing_data(location): + """Should raise ValueError if body_name is 'TLE' but tle_data is None.""" + obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") + + with pytest.raises(ValueError, match="tle_data parameter is required"): + precompute_ephemeris("TLE", obs_time, 1.0, location) + +@pytest.mark.network +def test_precompute_ephemeris_tle_case_insensitive(location): + """Should accept 'TLE', 'tle', 'Tle' etc.""" + obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") + tle_data = "1 25544U 98067A 26084.45430866 .00012951 00000-0 24673-3 0 9999\n2 25544 51.6344 354.4276 0006215 231.1671 128.8763 15.48531543558777" + + for name_variant in ["TLE", "tle", "Tle"]: + ra_interp, dec_interp = precompute_ephemeris( + name_variant, obs_time, 1.0, location, tle_data=tle_data + ) + assert callable(ra_interp) + assert callable(dec_interp) + +@pytest.mark.network +def test_precompute_ephemeris_tle_invalid_data(location): + """Should raise NotMovingBodyError for malformed TLE data.""" + obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") + invalid_tle = "this is not valid tle data" + + with pytest.raises(NotMovingBodyError): + precompute_ephemeris("TLE", obs_time, 1.0, location, tle_data=invalid_tle) + +@pytest.mark.network +def test_precompute_ephemeris_tle_rates_plausible(location): + """ISS (TLE) should have appreciable tracking rates.""" + obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") + tle_data = "1 25544U 98067A 26084.45430866 .00012951 00000-0 24673-3 0 9999\n2 25544 51.6344 354.4276 0006215 231.1671 128.8763 15.48531543558777" + + ra_interp, dec_interp = precompute_ephemeris( + "TLE", obs_time, 1.0, location, tle_data=tle_data + ) + ra_rate, dec_rate = compute_nonsidereal_rates_from_interp(ra_interp, dec_interp, 0) + + # ISS moves much faster than planets + assert abs(ra_rate) > 0.01, "ISS should have significant RA rate" + +@pytest.mark.network +def test_precompute_ephemeris_minor_body_fallback(location): + """Should fallback to id_type='smallbody' for objects like comets.""" + obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") + ra_interp, dec_interp = precompute_ephemeris( + "C/2023 A3", obs_time, 1.0, location # Comet example + ) + assert callable(ra_interp) + assert callable(dec_interp) + +@pytest.mark.network +def test_precompute_ephemeris_unresolvable_body(location): + """Should raise NotMovingBodyError with descriptive message.""" + obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") + + with pytest.raises(NotMovingBodyError, match="could not be resolved"): + precompute_ephemeris("12345INVALID", obs_time, 1.0, location) + +@pytest.mark.network +def test_precompute_ephemeris_short_interval(location): + """Should handle very short observation windows with fine resolution.""" + obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") + ra_interp, dec_interp = precompute_ephemeris( + "mars", obs_time, duration_hours=0.1, interval_minutes=0.5, obs_location=location + ) + assert callable(ra_interp) + assert callable(dec_interp) + + # Should produce consistent values + t1 = float(ra_interp(0)) + t2 = float(ra_interp(60)) + assert abs(t2 - t1) < 0.1 # Should change smoothly + +@pytest.mark.network +def test_precompute_ephemeris_different_tle_objects(location): + """Should work with different satellite TLEs.""" + obs_time = Time("2010-01-01T00:00:00", format="isot", scale="utc") + + # Example: Hubble Space Telescope TLE + tle_hst = "1 20580U 90037B 10001.00000000 .00001524 00000-0 75821-5 0 8017\n2 20580 28.4698 279.0467 0002853 247.4627 112.6160 15.09299652680691" + + ra_interp, dec_interp = precompute_ephemeris( + "TLE", obs_time, 1.0, location, tle_data=tle_hst + ) + assert callable(ra_interp) + assert callable(dec_interp) \ No newline at end of file diff --git a/uv.lock b/uv.lock index 156827b4..5e315dff 100644 --- a/uv.lock +++ b/uv.lock @@ -150,7 +150,7 @@ requires-dist = [ { name = "astrafocus", specifier = ">=0.1.1" }, { name = "astropy" }, { name = "astroquery", specifier = ">=0.4.11" }, - { name = "cabaret" }, + { name = "cabaret", specifier = "==0.5.1" }, { name = "donuts" }, { name = "fastapi", extras = ["standard"] }, { name = "jinja2" }, @@ -310,16 +310,16 @@ wheels = [ [[package]] name = "cabaret" -version = "0.4.5" +version = "0.5.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "astropy" }, { name = "astroquery" }, { name = "numpy" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/06/0c/b94ab530441e1e39e986cb8174236f85139a2a7f94a621732c7ea435d6f2/cabaret-0.4.5.tar.gz", hash = "sha256:b1c8d2353c40882c9653588b25a713f7f9aebadff939c251e350f59d9ac62dbb", size = 502276, upload-time = "2025-12-20T17:54:56.603Z" } +sdist = { url = "https://files.pythonhosted.org/packages/30/65/ec99cbfb378298ec3e753dc2114fa98d79c56d9e8b9b568967dbcb822400/cabaret-0.5.1.tar.gz", hash = "sha256:8511a70be0788a4571fd86b23135bd09fd446a9a0997660c6cc28a9e2b1239e2", size = 656103, upload-time = "2026-04-24T13:21:20.91Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/18/11/d7e35e1d7b43cd8490b1bd4684995afafd800dedcba29e69d6001efaefaa/cabaret-0.4.5-py3-none-any.whl", hash = "sha256:096d3c1ee24049b6845a3ace096bcabaa4e7d676f1bee700fec3fa2874ce9826", size = 28441, upload-time = "2025-12-20T17:54:55.076Z" }, + { url = "https://files.pythonhosted.org/packages/50/b4/8ae0040f1762988b88ced06d80a4bba55c06089770e78a733174c6f01507/cabaret-0.5.1-py3-none-any.whl", hash = "sha256:91bb232645e7462ad4811c142a12def12b639de429c33a82f2864ced1e037041", size = 40064, upload-time = "2026-04-24T13:21:19.447Z" }, ] [[package]] From 9d0666968be99435e848408fa1ff9574c539c2be Mon Sep 17 00:00:00 2001 From: SaltyPotatoe Date: Thu, 21 May 2026 15:51:14 +0200 Subject: [PATCH 02/10] fix: Update TLE for non-sidereal tracking tests --- tests/test_observatory_running_schedule.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/test_observatory_running_schedule.py b/tests/test_observatory_running_schedule.py index a712b697..ae014406 100644 --- a/tests/test_observatory_running_schedule.py +++ b/tests/test_observatory_running_schedule.py @@ -167,7 +167,7 @@ def create_schedule_data( "action_value": { "object": "ISS", "lookup_name": "TLE", - "tle": "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209", + "tle": "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565", "exptime": 1, "filter": "Clear", "guiding": False, @@ -1148,7 +1148,7 @@ def test_tle_object_action( takes at least one image, and that tracking rates are reset on teardown. """ set_safety_monitor_safe(server_url) - set_location_for_body_visibility(server_url, "tle", tle="1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209") + set_location_for_body_visibility(server_url, "tle", tle="1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565") schedule_data = create_schedule_data("tle", temp_config) with schedule_manager(schedule_data): @@ -1189,8 +1189,7 @@ def test_tle_object_action_with_weather_alert( ): """TLE non-sidereal tracking must stop safely during weather alert.""" tle = ( - "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n" - "2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209" + "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" ) set_safety_monitor_safe(server_url) set_location_for_body_visibility(server_url, "tle", tle=tle) @@ -1245,8 +1244,7 @@ def test_precompute_tracking_path_matches_schedule_interpolators( ): """Helper precomputed path should closely match schedule-generated ephemeris.""" tle = ( - "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n" - "2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209" + "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" ) set_safety_monitor_safe(server_url) set_location_for_body_visibility(server_url, "TLE", tle=tle) @@ -1388,7 +1386,7 @@ def test_tle_rates_keep_pointing_on_target_over_time( ): """Mount pointing should remain close to TLE target at multiple times.""" tle = ( - "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209" + "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" ) set_safety_monitor_safe(server_url) set_location_for_body_visibility(server_url, "TLE", tle=tle) @@ -1443,7 +1441,7 @@ def test_tle_prepointed_before_tracking_rates_apply( ): """The telescope should be on the pre-point target before rates turn on.""" tle = ( - "1 25544U 98067A 26119.49027627 .00007115 00000-0 13705-3 0 9999\n2 25544 51.6319 181.1364 0007113 4.2135 355.8912 15.49020532564209" + "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" ) set_safety_monitor_safe(server_url) set_location_for_body_visibility(server_url, "TLE", tle=tle) From f927b8fbd27caf64dd2db44a8b62723790194969 Mon Sep 17 00:00:00 2001 From: SaltyPotatoe Date: Thu, 21 May 2026 17:22:30 +0200 Subject: [PATCH 03/10] fix: addressed copilot comments --- src/astra/action_configs.py | 2 ++ src/astra/utils.py | 40 ++++++++++++++++++++++++++++--------- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/astra/action_configs.py b/src/astra/action_configs.py index 9279adf4..7cd78701 100644 --- a/src/astra/action_configs.py +++ b/src/astra/action_configs.py @@ -658,6 +658,8 @@ def _resolve_lookup_name( (ra_deg, dec_deg) at start_time. """ duration_hours = (end_time - start_time).to_value("hr") + 0.5 + if (self.nonsidereal_recenter_interval == 0): + self.nonsidereal_recenter_interval = duration_hours * 3600 try: ( self._ra_interp, diff --git a/src/astra/utils.py b/src/astra/utils.py index e708b97a..4be49c60 100644 --- a/src/astra/utils.py +++ b/src/astra/utils.py @@ -17,7 +17,6 @@ import logging import time from datetime import datetime -from pathlib import Path from typing import Any, Tuple import astropy.units as u @@ -53,23 +52,46 @@ class NotMovingBodyError(ValueError): def _save_and_log_horizons_output( body_name: str, context: str, eph: Any, call_input: dict[str, Any] ) -> None: - """Persist raw Horizons output locally and mirror the API call input into the logger.""" + """Persist Horizons diagnostics only when debug logging is enabled.""" + if not logger.isEnabledFor(logging.DEBUG): + return + try: from astra.config import Config horizons_dir = Config().paths.logs / "horizons" horizons_dir.mkdir(parents=True, exist_ok=True) safe_name = body_name.replace(" ", "_").replace("/", "_") - output_path = horizons_dir / f"{safe_name}_{context}_{datetime.utcnow().strftime('%Y%m%dT%H%M%S%f')}.ecsv" + output_path = horizons_dir / (f"{safe_name}_{context}_{datetime.utcnow().strftime('%Y%m%dT%H%M%S%f')}.ecsv") eph.write(output_path, format="ascii.ecsv", overwrite=True) - logger.warning("Saved raw Horizons output for %s (%s) to %s", body_name, context, output_path) - except Exception as exc: - logger.warning("Failed to save raw Horizons output for %s (%s): %s", body_name, context, exc) + logger.debug( + "Saved raw Horizons output for %s (%s) to %s", + body_name, + context, + output_path, + ) + except BaseException as exc: + logger.debug( + "Failed to save raw Horizons output for %s (%s): %s", + body_name, + context, + exc, + ) try: - logger.warning("Horizons API call input for %s (%s): %s", body_name, context, call_input) - except Exception as exc: - logger.warning("Failed to log Horizons API call input for %s (%s): %s", body_name, context, exc) + logger.debug( + "Horizons API call input for %s (%s): %s", + body_name, + context, + call_input, + ) + except BaseException as exc: + logger.debug( + "Failed to log Horizons API call input for %s (%s): %s", + body_name, + context, + exc, + ) class CustomImageClass(Image): From 7f476477e7330bc954edf5b4c3bfb327de156028 Mon Sep 17 00:00:00 2001 From: Peter Pihlmann Pedersen Date: Sat, 23 May 2026 21:35:02 +0200 Subject: [PATCH 04/10] fix: api: improve schedule function error handling and sanitize action values --- src/astra/main.py | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/astra/main.py b/src/astra/main.py index 71aa6fb0..636a99a7 100644 --- a/src/astra/main.py +++ b/src/astra/main.py @@ -624,21 +624,21 @@ async def schedule(): list: Schedule items with start/end times formatted as HH:MM:SS, or empty list if no schedule exists. """ - obs = OBSERVATORY - if ( - obs is None - or not hasattr(obs, "schedule_manager") - or obs.schedule_manager is None - ): - logger.warning( - "Schedule request but OBSERVATORY not initialized or has no schedule_manager" - ) - return [] + try: + obs = OBSERVATORY + if ( + obs is None + or not hasattr(obs, "schedule_manager") + or obs.schedule_manager is None + ): + logger.warning( + "Schedule request but OBSERVATORY not initialized or has no schedule_manager" + ) + return [] - if getattr(obs.schedule_manager, "schedule_mtime", 0) == 0: - return [] + if getattr(obs.schedule_manager, "schedule_mtime", 0) == 0: + return [] - try: schedule_obj = obs.schedule_manager.get_schedule() schedule = schedule_obj.to_dataframe() @@ -649,8 +649,16 @@ async def schedule(): schedule["end_HHMMSS"] = pd.to_datetime( schedule["end_time"], errors="coerce" ).apply(lambda x: x.strftime("%H:%M:%S") if pd.notna(x) else "") + + # remove private keys from action_value + schedule["action_value"] = schedule["action_value"].apply( + lambda x: {k: v for k, v in x.items() if not k.startswith("_")} + if isinstance(x, dict) + else x + ) + obs.logger.debug("Schedule read for frontend") - result = schedule.to_dict(orient="records") + result = json.loads(schedule.to_json(orient="records", date_format="iso")) return result From de8befad4a82d063862f488cdbcb7ceb9e0574f9 Mon Sep 17 00:00:00 2001 From: Peter Pihlmann Pedersen Date: Sat, 23 May 2026 22:25:06 +0200 Subject: [PATCH 05/10] fix: enhance ObjectActionConfig visibility checker to handle non-sidereal target coordinates --- src/astra/action_configs.py | 45 +++++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/src/astra/action_configs.py b/src/astra/action_configs.py index 7cd78701..cfc95ad4 100644 --- a/src/astra/action_configs.py +++ b/src/astra/action_configs.py @@ -494,11 +494,11 @@ class ObjectActionConfig(BaseActionConfig): ``nonsidereal_recenter_interval``. The interval controls how often the telescope re-slews to the updated ephemeris position (in seconds). Autoguiding is incompatible with non-sidereal tracking and will be disabled. - If using TLE's for Earth-orbiting objects, provide the ``tle`` and use + If using TLE's for Earth-orbiting objects, provide the ``tle`` and use "TLE" as the ``lookup_name``. - Currently supports astropy built-in bodies (planets, Moon, Sun) and small - bodies (asteroids, comets) that have ephemerides in JPL Horizons, and + Currently supports astropy built-in bodies (planets, Moon, Sun) and small + bodies (asteroids, comets) that have ephemerides in JPL Horizons, and Earth-orbiting objects through TLEs. **Schedule example for tracking Saturn**:: @@ -658,7 +658,7 @@ def _resolve_lookup_name( (ra_deg, dec_deg) at start_time. """ duration_hours = (end_time - start_time).to_value("hr") + 0.5 - if (self.nonsidereal_recenter_interval == 0): + if self.nonsidereal_recenter_interval == 0: self.nonsidereal_recenter_interval = duration_hours * 3600 try: ( @@ -745,13 +745,6 @@ def validate_visibility( if ra is None or dec is None: return - # Create target coordinate - target = SkyCoord( - ra=u.Quantity(ra, "deg"), - dec=u.Quantity(dec, "deg"), - frame="icrs", - ) - # Check times: start, middle, end mid_time = Time( (start_time.unix + end_time.unix) / 2, @@ -766,6 +759,29 @@ def validate_visibility( visibility_issues = [] for label, check_time in check_times: + # For non-sidereal targets, query the ephemeris interpolators at each + # check time so that the object's actual position is used rather than + # its start-time coordinates. + if ( + self._nonsidereal + and self._ra_interp is not None + and self._dec_interp is not None + ): + elapsed_s = check_time.unix - start_time.unix + check_ra = float(self._ra_interp(elapsed_s)) % 360.0 + check_dec = float(self._dec_interp(elapsed_s)) + target = SkyCoord( + ra=u.Quantity(check_ra, "deg"), + dec=u.Quantity(check_dec, "deg"), + frame="icrs", + ) + else: + target = SkyCoord( + ra=u.Quantity(ra, "deg"), + dec=u.Quantity(dec, "deg"), + frame="icrs", + ) + # Transform to horizontal coordinates altaz_frame = AltAz(obstime=check_time, location=observatory_location) target_altaz = target.transform_to(altaz_frame) @@ -778,8 +794,13 @@ def validate_visibility( ) if visibility_issues: + coord_str = ( + f"(non-sidereal, lookup_name='{self.lookup_name}')" + if self._nonsidereal + else f"at RA={ra:.2f}°, Dec={dec:.2f}°" + ) raise ValueError( - f"Target '{self.object}' at RA={ra:.2f}°, Dec={dec:.2f}° " + f"Target '{self.object}' {coord_str} " f"is not visible during observation window:\n " + "\n ".join(visibility_issues) ) From c5b225c60887b99ddc55244a7be84ae8b1de6d93 Mon Sep 17 00:00:00 2001 From: Peter Pihlmann Pedersen Date: Sat, 23 May 2026 23:02:35 +0200 Subject: [PATCH 06/10] fix: adjust nonsidereal recenter interval handling and improve RA rate calculation for accuracy --- src/astra/action_configs.py | 8 ++-- src/astra/utils.py | 86 +++++++++++++++++++++++++++---------- 2 files changed, 69 insertions(+), 25 deletions(-) diff --git a/src/astra/action_configs.py b/src/astra/action_configs.py index cfc95ad4..9c77ee83 100644 --- a/src/astra/action_configs.py +++ b/src/astra/action_configs.py @@ -658,8 +658,6 @@ def _resolve_lookup_name( (ra_deg, dec_deg) at start_time. """ duration_hours = (end_time - start_time).to_value("hr") + 0.5 - if self.nonsidereal_recenter_interval == 0: - self.nonsidereal_recenter_interval = duration_hours * 3600 try: ( self._ra_interp, @@ -671,7 +669,11 @@ def _resolve_lookup_name( start_time, duration_hours, observatory_location, - self.nonsidereal_recenter_interval / 60, + # Always use a fine sampling interval so that rate interpolation + # is accurate throughout the observation. nonsidereal_recenter_interval + # controls only when the telescope physically re-slews, and must not + # be conflated with the ephemeris resolution. + 1.0, self.tle, return_rates=True, ) diff --git a/src/astra/utils.py b/src/astra/utils.py index 4be49c60..1ddc0e93 100644 --- a/src/astra/utils.py +++ b/src/astra/utils.py @@ -62,14 +62,16 @@ def _save_and_log_horizons_output( horizons_dir = Config().paths.logs / "horizons" horizons_dir.mkdir(parents=True, exist_ok=True) safe_name = body_name.replace(" ", "_").replace("/", "_") - output_path = horizons_dir / (f"{safe_name}_{context}_{datetime.utcnow().strftime('%Y%m%dT%H%M%S%f')}.ecsv") + output_path = horizons_dir / ( + f"{safe_name}_{context}_{datetime.utcnow().strftime('%Y%m%dT%H%M%S%f')}.ecsv" + ) eph.write(output_path, format="ascii.ecsv", overwrite=True) logger.debug( - "Saved raw Horizons output for %s (%s) to %s", - body_name, - context, - output_path, - ) + "Saved raw Horizons output for %s (%s) to %s", + body_name, + context, + output_path, + ) except BaseException as exc: logger.debug( "Failed to save raw Horizons output for %s (%s): %s", @@ -352,7 +354,11 @@ def clean_image(data: np.ndarray) -> np.ndarray: ## planet or SIMBAD positions def get_body_coordinates( - body_name: str, obs_time: Time, obs_location: EarthLocation, tle: str = None, near: bool = False + body_name: str, + obs_time: Time, + obs_location: EarthLocation, + tle: str = None, + near: bool = False, ) -> SkyCoord: """Get the position of a celestial body (Solar System or Deep Sky). @@ -372,19 +378,40 @@ def get_body_coordinates( if body_name.lower() in _SOLAR_SYSTEM_BODIES: return get_body(body_name, obs_time, obs_location) elif near and tle is None: - location = {'lon': obs_location.lon.deg, 'lat': obs_location.lat.deg, 'elevation': obs_location.height.to(u.km).value} + location = { + "lon": obs_location.lon.deg, + "lat": obs_location.lat.deg, + "elevation": obs_location.height.to(u.km).value, + } call_input = {"id": body_name, "location": location, "epochs": obs_time.jd} obj = Horizons(**call_input) eph = obj.ephemerides() - _save_and_log_horizons_output(body_name, "get_body_coordinates", eph, call_input) - return SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=obs_time).transform_to('gcrs')[0] + _save_and_log_horizons_output( + body_name, "get_body_coordinates", eph, call_input + ) + return SkyCoord( + ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=obs_time + ).transform_to("gcrs")[0] elif near: - location = {'lon': obs_location.lon.deg, 'lat': obs_location.lat.deg, 'elevation': obs_location.height.to(u.km).value} - call_input = {"id": "TLE", "location": location, "epochs": obs_time.jd, "optional_settings": {"TLE": tle}} - obj = Horizons(id='TLE', location=location, epochs=obs_time.jd) + location = { + "lon": obs_location.lon.deg, + "lat": obs_location.lat.deg, + "elevation": obs_location.height.to(u.km).value, + } + call_input = { + "id": "TLE", + "location": location, + "epochs": obs_time.jd, + "optional_settings": {"TLE": tle}, + } + obj = Horizons(id="TLE", location=location, epochs=obs_time.jd) eph = obj.ephemerides(optional_settings={"TLE": tle}) - _save_and_log_horizons_output(body_name, "get_body_coordinates", eph, call_input) - return SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=obs_time).transform_to('gcrs')[0] + _save_and_log_horizons_output( + body_name, "get_body_coordinates", eph, call_input + ) + return SkyCoord( + ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=obs_time + ).transform_to("gcrs")[0] # Otherwise, try to resolve as a deep sky object (ICRS) return SkyCoord.from_name(body_name) @@ -406,7 +433,10 @@ def precompute_ephemeris( interval_minutes: float = 1.0, tle_data: str | None = None, return_rates: bool = False, -) -> tuple["interp1d", "interp1d"] | tuple["interp1d", "interp1d", "interp1d", "interp1d"]: +) -> ( + tuple["interp1d", "interp1d"] + | tuple["interp1d", "interp1d", "interp1d", "interp1d"] +): """Pre-compute a moving body's sky positions over a time window. Performs a single vectorised get_body() call and returns cubic interpolation @@ -482,7 +512,7 @@ def precompute_ephemeris( "stop": stop_time.iso, "step": str(n_points - 1), # Horizons returns n+1 rows for n steps } - + # Handle TLE data if body_name.upper() == "TLE" or tle_data is not None: if tle_data is None: @@ -495,14 +525,16 @@ def precompute_ephemeris( "epochs": epochs, "optional_settings": {"TLE": tle_data}, } - obj = Horizons(id='TLE', location=location, epochs=epochs) + obj = Horizons(id="TLE", location=location, epochs=epochs) eph = obj.ephemerides(optional_settings={"TLE": tle_data}) else: call_input = {"id": body_name, "location": location, "epochs": epochs} obj = Horizons(id=body_name, location=location, epochs=epochs) eph = obj.ephemerides() - _save_and_log_horizons_output(body_name, "precompute_ephemeris", eph, call_input) - + _save_and_log_horizons_output( + body_name, "precompute_ephemeris", eph, call_input + ) + bodies = SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg) seconds = (Time(eph["datetime_jd"], format="jd") - start_time).to(u.s).value if "RA_rate" in eph.colnames and "DEC_rate" in eph.colnames: @@ -525,8 +557,18 @@ def precompute_ephemeris( return ra_interp, dec_interp if ra_rate_as_per_hour is not None and dec_rate_as_per_hour is not None: - # Horizons rates are in arcsec/hour (solar seconds). Convert to ASCOM units. - ra_rates = (ra_rate_as_per_hour / (15.0 * 3600.0)) / _SOLAR_TO_SIDEREAL + # Horizons RA_rate is dRA*cos(D) in arcsec/hr — the angular velocity projected + # onto the sky, not the RA coordinate rate. Divide by cos(Dec) to recover + # d(RA_coord)/dt before converting to ASCOM RightAscensionRate units + # (seconds of RA per sidereal second). Without this factor the mount tracks at + # cos(Dec) of the required rate, causing steady RA drift between recenters that + # manifests as a visible jump when each recenter slew corrects the error. + cos_dec = np.cos(np.radians(dec_coords)) + # Guard against division by zero within ~0.003° of the celestial poles. + cos_dec = np.where(np.abs(cos_dec) < 5e-5, 5e-5, cos_dec) + ra_rates = ( + ra_rate_as_per_hour / (15.0 * 3600.0 * cos_dec) + ) / _SOLAR_TO_SIDEREAL dec_rates = (dec_rate_as_per_hour / 3600.0) / _SOLAR_TO_SIDEREAL else: # Fallback for astropy bodies: derive rates from sampled sky positions. From be382157cc5fafd58a819ed9e324bf955844be96 Mon Sep 17 00:00:00 2001 From: Peter Pihlmann Pedersen Date: Sat, 23 May 2026 23:02:49 +0200 Subject: [PATCH 07/10] imports --- src/astra/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/astra/utils.py b/src/astra/utils.py index 1ddc0e93..fb5e4dc3 100644 --- a/src/astra/utils.py +++ b/src/astra/utils.py @@ -13,8 +13,8 @@ - SPECULOOS telescope error checking and acknowledgement """ -import math import logging +import math import time from datetime import datetime from typing import Any, Tuple From 614e2b67444cd4b906d591abbb990cbc0d851e97 Mon Sep 17 00:00:00 2001 From: Peter Pihlmann Pedersen Date: Sat, 23 May 2026 23:47:15 +0200 Subject: [PATCH 08/10] added docstrings --- src/astra/observatory.py | 10 +++++++--- src/astra/utils.py | 6 ++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/astra/observatory.py b/src/astra/observatory.py index a85188e6..d023f59d 100644 --- a/src/astra/observatory.py +++ b/src/astra/observatory.py @@ -65,10 +65,10 @@ FileHandler, ObservatoryLogger, ) +from astra.nonsidereal import NonSiderealManager from astra.paired_devices import PairedDevices from astra.pointer import calculate_pointing_correction_from_fits from astra.queue_manager import QueueManager -from astra.nonsidereal import NonSiderealManager from astra.safety_monitor import SafetyMonitor from astra.scheduler import Action, BaseActionConfig, ScheduleManager from astra.thread_manager import ThreadManager @@ -1388,6 +1388,8 @@ def pre_sequence( Parameters: action (Action): An Action object containing information about the action to be performed. paired_devices (dict): A list of paired devices required for the sequence. + near (bool, optional): If True, use JPL Horizons for near-Earth objects or TLEs. Defaults to False. + slew_target_radec_deg (tuple[float, float] | None, optional): The target RA/Dec coordinates for slewing the telescope, in degrees. Defaults to None. """ if not isinstance(paired_devices, PairedDevices): paired_devices = PairedDevices.from_observatory( @@ -1480,6 +1482,8 @@ def setup_observatory( paired_devices (dict): A dictionary specifying paired devices for the sequence. action_value (dict): A dictionary containing information about the action to be performed. filter_list_index (int, optional): The index of the filter in the filter list (default is 0). + near (bool, optional): If True, use JPL Horizons for near-Earth objects or TLEs. + slew_target_radec_deg (tuple[float, float] | None, optional): The target RA/Dec coordinates for slewing the telescope. This method prepares the observatory for a sequence by performing the following steps: @@ -1530,7 +1534,7 @@ def setup_observatory( obs_time=now, obs_location=obs_location, tle=action_value.get("tle"), - near=near + near=near, ) ra = target_coord.ra.deg # type: ignore @@ -2305,7 +2309,7 @@ def image_sequence(self, action: Action, paired_devices: PairedDevices) -> None: self.pre_sequence( action, paired_devices, - near=nonsidereal.is_active, + near=nonsidereal.is_active, # not sure I like the 'near' terminology - PPP slew_target_radec_deg=nonsidereal_prepoint, ) diff --git a/src/astra/utils.py b/src/astra/utils.py index fb5e4dc3..5bb9280c 100644 --- a/src/astra/utils.py +++ b/src/astra/utils.py @@ -16,7 +16,7 @@ import logging import math import time -from datetime import datetime +from datetime import UTC, datetime from typing import Any, Tuple import astropy.units as u @@ -63,7 +63,7 @@ def _save_and_log_horizons_output( horizons_dir.mkdir(parents=True, exist_ok=True) safe_name = body_name.replace(" ", "_").replace("/", "_") output_path = horizons_dir / ( - f"{safe_name}_{context}_{datetime.utcnow().strftime('%Y%m%dT%H%M%S%f')}.ecsv" + f"{safe_name}_{context}_{datetime.now(UTC).strftime('%Y%m%dT%H%M%S%f')}.ecsv" ) eph.write(output_path, format="ascii.ecsv", overwrite=True) logger.debug( @@ -369,6 +369,8 @@ def get_body_coordinates( body_name (str): Name of the body (e.g., 'mars', 'jupiter', 'M31', 'Vega'). obs_time (Time): Observation time (used for solar system bodies). obs_location (EarthLocation): Observer's geographic location (used for solar system bodies). + tle (str, optional): Two-line element set for satellites. Required if body_name is 'TLE'. + near (bool, optional): If True, use JPL Horizons for near-Earth objects or TLEs. Returns: SkyCoord: Position of the body in the sky. From edf258a92ce3dd0c009ce84504b357037fec3a15 Mon Sep 17 00:00:00 2001 From: Peter Pihlmann Pedersen Date: Sun, 24 May 2026 00:31:43 +0200 Subject: [PATCH 09/10] fix: update nonsidereal tracking interval handling and improve RA rate assertion accuracy --- pyproject.toml | 2 +- src/astra/nonsidereal.py | 10 +- tests/test_observatory_running_schedule.py | 177 ++++++++++++++------- tests/test_utils.py | 48 ++++-- 4 files changed, 158 insertions(+), 79 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 62c67fbe..47f0b89e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,7 +55,7 @@ dependencies = [ "pyyaml", "ruamel.yaml", "cabaret==0.5.1", - "astroquery>=0.4.11", + "astroquery>=0.4.11" ] description = "Automated Survey observaTory Robotised with Alpaca" license = {text = "GPL-3.0"} diff --git a/src/astra/nonsidereal.py b/src/astra/nonsidereal.py index c66764c4..c9bbb8d7 100644 --- a/src/astra/nonsidereal.py +++ b/src/astra/nonsidereal.py @@ -1,9 +1,9 @@ """Non-sidereal (solar system body) tracking for observatory imaging sequences. This module provides the machinery for tracking solar system objects (comets, -asteroids, planets, Earth-orbiting objects) whose celestial coordinates change -significantly over short timescales. It uses high-precision cubic interpolation -of pre-computed ephemerides to provide smooth, differential tracking rates to +asteroids, planets, Earth-orbiting objects) whose celestial coordinates change +significantly over short timescales. It uses high-precision cubic interpolation +of pre-computed ephemerides to provide smooth, differential tracking rates to the telescope mount via ASCOM. Key Capabilities: @@ -102,7 +102,9 @@ def apply_rates(self, telescope: AlpacaDevice) -> None: return self._apply_rates(telescope, self._state) - def prepoint_coordinates(self, lead_time_seconds: float = 60.0) -> tuple[float, float] | None: + def prepoint_coordinates( + self, lead_time_seconds: float = 60.0 + ) -> tuple[float, float] | None: """Return RA/Dec for an initial lead-pointing slew. Args: diff --git a/tests/test_observatory_running_schedule.py b/tests/test_observatory_running_schedule.py index ae014406..539bfb65 100644 --- a/tests/test_observatory_running_schedule.py +++ b/tests/test_observatory_running_schedule.py @@ -5,8 +5,8 @@ import json import logging -import time import threading +import time from contextlib import contextmanager from datetime import UTC, datetime, timedelta @@ -18,7 +18,9 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.propagate = False -if not any(getattr(handler, "_astra_test_handler", False) for handler in logger.handlers): +if not any( + getattr(handler, "_astra_test_handler", False) for handler in logger.handlers +): test_handler = logging.StreamHandler() test_handler.setLevel(logging.INFO) test_handler.setFormatter(logging.Formatter("%(message)s")) @@ -27,7 +29,10 @@ tracking_logger = logging.getLogger(f"{__name__}.tracking") tracking_logger.setLevel(logging.WARNING) tracking_logger.propagate = False -if not any(getattr(handler, "_astra_tracking_handler", False) for handler in tracking_logger.handlers): +if not any( + getattr(handler, "_astra_tracking_handler", False) + for handler in tracking_logger.handlers +): tracking_handler = logging.StreamHandler() tracking_handler.setLevel(logging.WARNING) tracking_handler.setFormatter(logging.Formatter("%(message)s")) @@ -144,7 +149,7 @@ def create_schedule_data( "guiding": False, "pointing": False, "nonsidereal_recenter_interval": 10, - "nonsidereal_start_lead_time_seconds": 15 + "nonsidereal_start_lead_time_seconds": 15, }, "duration": 1, }, @@ -400,26 +405,38 @@ def wait_for_schedule_completion( return final_completed > 0, final_completed, error_free_maintained -def set_location_for_body_visibility(server_url, body_name: str, latitude: float = 0.0, tle: str = None): +def set_location_for_body_visibility( + server_url, body_name: str, latitude: float = 0.0, tle: str = None +): """Set telescope simulator location so that body is near the meridian and above the horizon.""" - from astropy.coordinates import AltAz, EarthLocation, get_body, solar_system_ephemeris, SkyCoord + import astropy.units as u + from astropy.coordinates import ( + AltAz, + EarthLocation, + SkyCoord, + get_body, + solar_system_ephemeris, + ) from astropy.time import Time from astroquery.jplhorizons import Horizons - import astropy.units as u t = Time.now() if body_name.lower() in solar_system_ephemeris.bodies: body = get_body(body_name, t) - + elif tle is None: - obj = Horizons(id=body_name, location='500', epochs=t.jd) + obj = Horizons(id=body_name, location="500", epochs=t.jd) eph = obj.ephemerides() - body = SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=t).transform_to('gcrs') + body = SkyCoord( + ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=t + ).transform_to("gcrs") latitude = body.dec.deg else: - obj = Horizons(id='TLE', location='500', epochs=t.jd) + obj = Horizons(id="TLE", location="500", epochs=t.jd) eph = obj.ephemerides(optional_settings={"TLE": tle}) - body = SkyCoord(ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=t).transform_to('gcrs') + body = SkyCoord( + ra=eph["RA"].data * u.deg, dec=eph["DEC"].data * u.deg, obstime=t + ).transform_to("gcrs") latitude = body.dec.deg gmst = t.sidereal_time("mean", "greenwich").deg transit_lon = ((body.ra.deg - gmst + 180) % 360) - 180 @@ -457,7 +474,9 @@ def set_safety_monitor_safe(server_url): assert False, "Failed to set safety monitor to safe." -def _wait_for_schedule_running(observatory: Observatory, timeout_s: float = 60.0) -> None: +def _wait_for_schedule_running( + observatory: Observatory, timeout_s: float = 60.0 +) -> None: """Wait until scheduler thread reports a running state.""" t0 = time.time() while time.time() - t0 < timeout_s: @@ -566,10 +585,13 @@ def _precompute_tracking_path( recenter_interval_s = float( schedule_data.get("action_value", {}).get("nonsidereal_recenter_interval", 0) ) - if recenter_interval_s > 0: - interval_minutes = recenter_interval_s / 60.0 - else: - interval_minutes = max(sample_interval_s / 60.0, 0.01) + # Mirror the schedule-side setup: action_configs.py hardcodes interval_minutes=1.0 + # for non-sidereal ephemeris computation regardless of the recenter interval. + # Using the recenter interval here (e.g. 100 s → 1.667 min) produces different + # cubic-spline nodes and therefore different interpolated positions/rates for + # fast-moving objects like the ISS. + _ = recenter_interval_s # retained for clarity; not used for interval selection + interval_minutes = 1.0 ephem_start = Time(schedule_start) logger.info( @@ -697,7 +719,9 @@ def _sample_tracking_rate_alignment( ) actual_ra_rate = float( - requests.get(f"{server_url}/api/v1/telescope/0/rightascensionrate", timeout=5) + requests.get( + f"{server_url}/api/v1/telescope/0/rightascensionrate", timeout=5 + ) .json() .get("Value", 0.0) ) @@ -725,6 +749,7 @@ def _sample_tracking_rate_alignment( return samples + def prepare_flats(server_url, sunset=True): """Prepare flats by setting sunlight conditions and placing observatory where the sun is setting or rising.""" @@ -1139,6 +1164,7 @@ def test_jpl_horizons_object_action( assert dec_rate == 0.0, ( f"DeclinationRate was not reset after sequence: {dec_rate}" ) + def test_tle_object_action( self, observatory, schedule_manager, server_url, temp_config ): @@ -1148,7 +1174,11 @@ def test_tle_object_action( takes at least one image, and that tracking rates are reset on teardown. """ set_safety_monitor_safe(server_url) - set_location_for_body_visibility(server_url, "tle", tle="1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565") + set_location_for_body_visibility( + server_url, + "tle", + tle="1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565", + ) schedule_data = create_schedule_data("tle", temp_config) with schedule_manager(schedule_data): @@ -1188,9 +1218,7 @@ def test_tle_object_action_with_weather_alert( self, observatory, schedule_manager, server_url, temp_config ): """TLE non-sidereal tracking must stop safely during weather alert.""" - tle = ( - "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" - ) + tle = "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" set_safety_monitor_safe(server_url) set_location_for_body_visibility(server_url, "tle", tle=tle) schedule_data = create_schedule_data( @@ -1243,19 +1271,19 @@ def test_precompute_tracking_path_matches_schedule_interpolators( self, observatory, schedule_manager, server_url, temp_config ): """Helper precomputed path should closely match schedule-generated ephemeris.""" - tle = ( - "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" - ) + tle = "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" set_safety_monitor_safe(server_url) set_location_for_body_visibility(server_url, "TLE", tle=tle) obs_location = _get_site_location(server_url) schedule_data = create_schedule_data("tle", temp_config) start_time = datetime.fromisoformat(schedule_data["start_time"]) - schedule_data["end_time"] = (start_time + timedelta(minutes=10)).isoformat() - schedule_data["_duration"] = 10 + schedule_data["end_time"] = (start_time + timedelta(minutes=2)).isoformat() + schedule_data["_duration"] = 2 lead_time_s = float( - schedule_data["action_value"].get("nonsidereal_start_lead_time_seconds", 60.0) + schedule_data["action_value"].get( + "nonsidereal_start_lead_time_seconds", 60.0 + ) ) recenter_interval_s = float( schedule_data["action_value"].get("nonsidereal_recenter_interval", 60.0) @@ -1267,7 +1295,9 @@ def test_precompute_tracking_path_matches_schedule_interpolators( with schedule_manager(schedule_data): schedule = observatory.schedule_manager.read() - assert schedule is not None and len(schedule) > 0, "Failed to load schedule." + assert schedule is not None and len(schedule) > 0, ( + "Failed to load schedule." + ) action = schedule[0] schedule_ra_interp = action.action_value.get("_ra_interp") @@ -1277,10 +1307,20 @@ def test_precompute_tracking_path_matches_schedule_interpolators( assert schedule_ra_interp is not None, "Schedule RA interpolator missing." assert schedule_dec_interp is not None, "Schedule Dec interpolator missing." - assert schedule_ra_rate_interp is not None, "Schedule RA rate interpolator missing." - assert schedule_dec_rate_interp is not None, "Schedule Dec rate interpolator missing." + assert schedule_ra_rate_interp is not None, ( + "Schedule RA rate interpolator missing." + ) + assert schedule_dec_rate_interp is not None, ( + "Schedule Dec rate interpolator missing." + ) - _, helper_ra_interp, helper_dec_interp, helper_ra_rate_interp, helper_dec_rate_interp = _precompute_tracking_path( + ( + _, + helper_ra_interp, + helper_dec_interp, + helper_ra_rate_interp, + helper_dec_rate_interp, + ) = _precompute_tracking_path( body_name="TLE", schedule_data=schedule_data, obs_location=obs_location, @@ -1301,7 +1341,6 @@ def angular_sep_deg(a_deg: float, b_deg: float) -> float: return abs((a_deg - b_deg + 180.0) % 360.0 - 180.0) for t_s in sample_times_s: - helper_ra = float(helper_ra_interp(t_s)) % 360.0 helper_dec = float(helper_dec_interp(t_s)) helper_ra_rate = float(helper_ra_rate_interp(t_s)) @@ -1310,7 +1349,7 @@ def angular_sep_deg(a_deg: float, b_deg: float) -> float: schedule_ra = float(schedule_ra_interp(t_s)) % 360.0 schedule_dec = float(schedule_dec_interp(t_s)) schedule_ra_rate = float(schedule_ra_rate_interp(t_s)) - schedule_dec_rate = float(schedule_dec_rate_interp( t_s)) + schedule_dec_rate = float(schedule_dec_rate_interp(t_s)) assert angular_sep_deg(helper_ra, schedule_ra) < 0.2, ( f"RA interpolator mismatch at t={t_s:.1f}s: " @@ -1344,11 +1383,13 @@ def test_jpl_horizons_rates_keep_pointing_on_target_over_time( end_time = end_time.replace(tzinfo=UTC) with schedule_manager(schedule_data): - ephem_start, ra_interp, dec_interp, ra_rate_interp, dec_rate_interp = _precompute_tracking_path( - body_name=body_name, - schedule_data=schedule_data, - obs_location=obs_location, - sample_interval_s=sample_interval_s, + ephem_start, ra_interp, dec_interp, ra_rate_interp, dec_rate_interp = ( + _precompute_tracking_path( + body_name=body_name, + schedule_data=schedule_data, + obs_location=obs_location, + sample_interval_s=sample_interval_s, + ) ) observatory.start_schedule() _wait_for_schedule_running(observatory) @@ -1385,9 +1426,7 @@ def test_tle_rates_keep_pointing_on_target_over_time( self, observatory, schedule_manager, server_url, temp_config ): """Mount pointing should remain close to TLE target at multiple times.""" - tle = ( - "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" - ) + tle = "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" set_safety_monitor_safe(server_url) set_location_for_body_visibility(server_url, "TLE", tle=tle) obs_location = _get_site_location(server_url) @@ -1398,12 +1437,14 @@ def test_tle_rates_keep_pointing_on_target_over_time( end_time = end_time.replace(tzinfo=UTC) with schedule_manager(schedule_data): - ephem_start, ra_interp, dec_interp, ra_rate_interp, dec_rate_interp = _precompute_tracking_path( - body_name="TLE", - schedule_data=schedule_data, - obs_location=obs_location, - sample_interval_s=sample_interval_s, - tle=tle, + ephem_start, ra_interp, dec_interp, ra_rate_interp, dec_rate_interp = ( + _precompute_tracking_path( + body_name="TLE", + schedule_data=schedule_data, + obs_location=obs_location, + sample_interval_s=sample_interval_s, + tle=tle, + ) ) observatory.start_schedule() _wait_for_schedule_running(observatory) @@ -1431,21 +1472,33 @@ def test_tle_rates_keep_pointing_on_target_over_time( ) assert samples, "No tracking-rate samples were collected." - assert max(sample["separation_deg"] for sample in samples) < 0.5, ( - "TLE non-sidereal tracking drifted too far from expected target " - f"coordinates. Separations (deg): {[sample['separation_deg'] for sample in samples]}" + # The ASCOM Alpaca simulator stores RightAscensionRate / DeclinationRate but + # does NOT integrate them into the reported RightAscension / Declination. + # Positional agreement cannot be verified against a non-integrating simulator, + # so this test verifies that the rates being applied to the mount match the + # expected TLE ephemeris rates within 10 % relative tolerance. + ra_rate_errors = [ + abs(s["actual_ra_rate"] - s["expected_ra_rate"]) + / abs(s["expected_ra_rate"]) + for s in samples + if abs(s["expected_ra_rate"]) > 1.0 + ] + assert ra_rate_errors, ( + "No samples with a non-negligible expected RA rate were collected." + ) + assert max(ra_rate_errors) < 0.1, ( + "RA tracking rate diverged more than 10 % from expected TLE ephemeris rate. " + f"Relative errors: {[f'{e:.3f}' for e in ra_rate_errors]}" ) def test_tle_prepointed_before_tracking_rates_apply( self, observatory, schedule_manager, server_url, temp_config, monkeypatch ): """The telescope should be on the pre-point target before rates turn on.""" - tle = ( - "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" - ) + tle = "1 25544U 98067A 26141.16510469 .00005835 00000-0 11282-3 0 9994\n2 25544 51.6328 73.8715 0007529 81.3651 278.8190 15.49291753567565" set_safety_monitor_safe(server_url) set_location_for_body_visibility(server_url, "TLE", tle=tle) - obs_location = _get_site_location(server_url) + # obs_location = _get_site_location(server_url) schedule_data = create_schedule_data("tle", temp_config) from astra.nonsidereal import NonSiderealManager @@ -1457,7 +1510,9 @@ def test_tle_prepointed_before_tracking_rates_apply( original_prepoint_coordinates = NonSiderealManager.prepoint_coordinates def wrapped_prepoint_coordinates(self, lead_time_seconds: float = 60.0): - result = original_prepoint_coordinates(self, lead_time_seconds=lead_time_seconds) + result = original_prepoint_coordinates( + self, lead_time_seconds=lead_time_seconds + ) if result is not None: captured_prepoint["ra_deg"] = float(result[0]) captured_prepoint["dec_deg"] = float(result[1]) @@ -1465,9 +1520,9 @@ def wrapped_prepoint_coordinates(self, lead_time_seconds: float = 60.0): def wrapped_apply_rates(self, telescope): try: - assert "ra_deg" in captured_prepoint and "dec_deg" in captured_prepoint, ( - "Pre-point coordinates were not captured before rates were applied." - ) + assert ( + "ra_deg" in captured_prepoint and "dec_deg" in captured_prepoint + ), "Pre-point coordinates were not captured before rates were applied." _assert_telescope_position_matches_expected( server_url, @@ -1512,7 +1567,9 @@ def wrapped_apply_rates(self, telescope): finally: apply_rates_called.set() - monkeypatch.setattr(NonSiderealManager, "prepoint_coordinates", wrapped_prepoint_coordinates) + monkeypatch.setattr( + NonSiderealManager, "prepoint_coordinates", wrapped_prepoint_coordinates + ) monkeypatch.setattr(NonSiderealManager, "apply_rates", wrapped_apply_rates) with schedule_manager(schedule_data): diff --git a/tests/test_utils.py b/tests/test_utils.py index d09c9efc..abfc61ef 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -396,7 +396,10 @@ def ephemerides(self, optional_settings=None): assert callable(dec_interp) assert callable(ra_rate_interp) assert callable(dec_rate_interp) - assert float(ra_rate_interp(0.0)) == pytest.approx(0.0997, abs=1e-3) + # Expected value: 5400 arcsec/hr ÷ (15 × 3600 × cos(20°)) ÷ SOLAR_TO_SIDEREAL ≈ 0.1061. + # Horizons RA_rate is dRA*cos(Dec)/dt; dividing by cos(Dec) recovers the + # RA coordinate rate before conversion to ASCOM s/sidereal-s units. + assert float(ra_rate_interp(0.0)) == pytest.approx(0.1061, abs=1e-3) assert float(dec_rate_interp(0.0)) == pytest.approx(1.9945, abs=1e-3) @@ -468,13 +471,15 @@ def test_precompute_ephemeris_minor_body(location): assert abs(float(ra_interp(1)) - float(ra_interp(0))) < 0.01 assert abs(float(dec_interp(1)) - float(dec_interp(0))) < 0.01 + @pytest.mark.network def test_precompute_ephemeris_tle(location): """precompute_ephemeris should resolve a TLE via astroquery.jplhorizons.""" obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") tle_data = "1 25544U 98067A 26084.45430866 .00012951 00000-0 24673-3 0 9999\n2 25544 51.6344 354.4276 0006215 231.1671 128.8763 15.48531543558777" ra_interp, dec_interp = precompute_ephemeris( - "TLE", obs_time, duration_hours=1.0, obs_location=location, tle_data=tle_data) + "TLE", obs_time, duration_hours=1.0, obs_location=location, tle_data=tle_data + ) assert callable(ra_interp) assert callable(dec_interp) @@ -485,20 +490,22 @@ def test_precompute_ephemeris_tle(location): assert -360.0 <= ra <= 360.0, f"RA out of range at t={t}: {ra}" assert -90.0 <= dec <= 90.0, f"Dec out of range at t={t}: {dec}" + @pytest.mark.network def test_precompute_ephemeris_tle_missing_data(location): """Should raise ValueError if body_name is 'TLE' but tle_data is None.""" obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") - + with pytest.raises(ValueError, match="tle_data parameter is required"): precompute_ephemeris("TLE", obs_time, 1.0, location) + @pytest.mark.network def test_precompute_ephemeris_tle_case_insensitive(location): """Should accept 'TLE', 'tle', 'Tle' etc.""" obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") tle_data = "1 25544U 98067A 26084.45430866 .00012951 00000-0 24673-3 0 9999\n2 25544 51.6344 354.4276 0006215 231.1671 128.8763 15.48531543558777" - + for name_variant in ["TLE", "tle", "Tle"]: ra_interp, dec_interp = precompute_ephemeris( name_variant, obs_time, 1.0, location, tle_data=tle_data @@ -506,72 +513,85 @@ def test_precompute_ephemeris_tle_case_insensitive(location): assert callable(ra_interp) assert callable(dec_interp) + @pytest.mark.network def test_precompute_ephemeris_tle_invalid_data(location): """Should raise NotMovingBodyError for malformed TLE data.""" obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") invalid_tle = "this is not valid tle data" - + with pytest.raises(NotMovingBodyError): precompute_ephemeris("TLE", obs_time, 1.0, location, tle_data=invalid_tle) + @pytest.mark.network def test_precompute_ephemeris_tle_rates_plausible(location): """ISS (TLE) should have appreciable tracking rates.""" obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") tle_data = "1 25544U 98067A 26084.45430866 .00012951 00000-0 24673-3 0 9999\n2 25544 51.6344 354.4276 0006215 231.1671 128.8763 15.48531543558777" - + ra_interp, dec_interp = precompute_ephemeris( "TLE", obs_time, 1.0, location, tle_data=tle_data ) ra_rate, dec_rate = compute_nonsidereal_rates_from_interp(ra_interp, dec_interp, 0) - + # ISS moves much faster than planets assert abs(ra_rate) > 0.01, "ISS should have significant RA rate" + @pytest.mark.network def test_precompute_ephemeris_minor_body_fallback(location): """Should fallback to id_type='smallbody' for objects like comets.""" obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") ra_interp, dec_interp = precompute_ephemeris( - "C/2023 A3", obs_time, 1.0, location # Comet example + "C/2023 A3", + obs_time, + 1.0, + location, # Comet example ) assert callable(ra_interp) assert callable(dec_interp) + @pytest.mark.network def test_precompute_ephemeris_unresolvable_body(location): """Should raise NotMovingBodyError with descriptive message.""" obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") - + with pytest.raises(NotMovingBodyError, match="could not be resolved"): precompute_ephemeris("12345INVALID", obs_time, 1.0, location) + @pytest.mark.network def test_precompute_ephemeris_short_interval(location): """Should handle very short observation windows with fine resolution.""" obs_time = Time("2026-04-01T00:00:00", format="isot", scale="utc") ra_interp, dec_interp = precompute_ephemeris( - "mars", obs_time, duration_hours=0.1, interval_minutes=0.5, obs_location=location + "mars", + obs_time, + duration_hours=0.1, + interval_minutes=0.5, + obs_location=location, ) assert callable(ra_interp) assert callable(dec_interp) - + # Should produce consistent values t1 = float(ra_interp(0)) t2 = float(ra_interp(60)) assert abs(t2 - t1) < 0.1 # Should change smoothly + @pytest.mark.network def test_precompute_ephemeris_different_tle_objects(location): """Should work with different satellite TLEs.""" obs_time = Time("2010-01-01T00:00:00", format="isot", scale="utc") - + # Example: Hubble Space Telescope TLE tle_hst = "1 20580U 90037B 10001.00000000 .00001524 00000-0 75821-5 0 8017\n2 20580 28.4698 279.0467 0002853 247.4627 112.6160 15.09299652680691" - + ra_interp, dec_interp = precompute_ephemeris( "TLE", obs_time, 1.0, location, tle_data=tle_hst ) assert callable(ra_interp) - assert callable(dec_interp) \ No newline at end of file + assert callable(dec_interp) From 9d0527b448920d7726d70ecb30f02e623062f7bc Mon Sep 17 00:00:00 2001 From: Peter Pihlmann Pedersen Date: Sun, 24 May 2026 09:31:43 +0200 Subject: [PATCH 10/10] enhance: optimize non-sidereal rate application to reduce unnecessary updates --- src/astra/nonsidereal.py | 52 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/src/astra/nonsidereal.py b/src/astra/nonsidereal.py index c9bbb8d7..489cfb33 100644 --- a/src/astra/nonsidereal.py +++ b/src/astra/nonsidereal.py @@ -38,6 +38,13 @@ __all__ = ["NonSiderealManager"] +# Skip pushing new rates to the mount when both axes change by less than this +# fraction of the previously applied rate. Reduces ASCOM chatter on near-stable +# tracks. The absolute floor below catches the near-zero-rate case where a +# percentage threshold is meaningless. +_RATE_CHANGE_THRESHOLD_FRACTION = 0.001 # 0.1% +_RATE_ABSOLUTE_FLOOR = 1e-6 # arcsec/s — below this, treat as "no meaningful change" + @dataclass class _NonSiderealState: @@ -61,6 +68,8 @@ class _NonSiderealState: sequence_start_time: Time recenter_interval: int last_recenter_time: float = field(default_factory=time.time) + last_applied_ra_rate: float | None = None + last_applied_dec_rate: float | None = None class NonSiderealManager: @@ -169,6 +178,10 @@ def recenter( ) time.sleep(1) wait_for_slew_fn(paired_devices) + # Force the post-slew rate push regardless of delta — the mount has + # just moved and we want a known-good rate applied immediately. + state.last_applied_ra_rate = None + state.last_applied_dec_rate = None self._apply_rates(telescope, state) state.last_recenter_time = time.time() return True @@ -188,6 +201,8 @@ def reset_rates(self, telescope: AlpacaDevice) -> None: try: telescope.set("RightAscensionRate", 0.0) telescope.set("DeclinationRate", 0.0) + self._state.last_applied_ra_rate = None + self._state.last_applied_dec_rate = None self.logger.info("Non-sidereal tracking rates reset to zero") except Exception as e: self.logger.warning(f"Could not reset non-sidereal tracking rates: {e}") @@ -241,7 +256,13 @@ def _setup(self, action: Action) -> _NonSiderealState | None: ) def _apply_rates(self, telescope: AlpacaDevice, state: _NonSiderealState) -> None: - """Set ASCOM RightAscensionRate / DeclinationRate from the interpolated ephemeris.""" + """Set ASCOM RightAscensionRate / DeclinationRate from the interpolated ephemeris. + + Skips the telescope set calls when both axes change by less than + ``_RATE_CHANGE_THRESHOLD_FRACTION`` (and the absolute delta is below + ``_RATE_ABSOLUTE_FLOOR``) relative to the previously applied rates, to + avoid spamming the mount with negligible adjustments. + """ try: t_seconds = (Time.now() - state.sequence_start_time).to(u.s).value if state.ra_rate_interp is not None and state.dec_rate_interp is not None: @@ -251,8 +272,20 @@ def _apply_rates(self, telescope: AlpacaDevice, state: _NonSiderealState) -> Non ra_rate, dec_rate = astra.utils.compute_nonsidereal_rates_from_interp( state.ra_interp, state.dec_interp, t_seconds ) + + if self._rate_change_negligible( + state.last_applied_ra_rate, ra_rate + ) and self._rate_change_negligible(state.last_applied_dec_rate, dec_rate): + self.logger.debug( + f"Non-sidereal rate change below threshold; skipping mount update " + f"(dRA={ra_rate:.6f} s/s, dDec={dec_rate:.6f} as/s)" + ) + return + telescope.set("RightAscensionRate", ra_rate) telescope.set("DeclinationRate", dec_rate) + state.last_applied_ra_rate = ra_rate + state.last_applied_dec_rate = dec_rate self.logger.info( f"Non-sidereal tracking rates: dRA={ra_rate:.6f} s/s, dDec={dec_rate:.6f} as/s" ) @@ -265,3 +298,20 @@ def _apply_rates(self, telescope: AlpacaDevice, state: _NonSiderealState) -> Non ) else: self.logger.warning(f"Could not set non-sidereal tracking rates: {e}") + + @staticmethod + def _rate_change_negligible(prev: float | None, new: float) -> bool: + """Return True if ``new`` is close enough to ``prev`` to skip pushing it. + + First call (prev is None) always returns False so the initial rate is + applied. The absolute floor guards against the case where ``prev`` is + near zero and the percentage delta blows up. + """ + if prev is None: + return False + abs_delta = abs(new - prev) + if abs_delta < _RATE_ABSOLUTE_FLOOR: + return True + if prev == 0: + return False + return abs_delta / abs(prev) < _RATE_CHANGE_THRESHOLD_FRACTION