Skip to content
Open
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ dependencies = [
"fastapi[standard]",
"pyyaml",
"ruamel.yaml",
"cabaret",
"cabaret==0.5.1",
"astroquery>=0.4.11"
]
description = "Automated Survey observaTory Robotised with Alpaca"
Expand Down
78 changes: 66 additions & 12 deletions src/astra/action_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,12 @@ class ObjectActionConfig(BaseActionConfig):
``nonsidereal_recenter_interval``. The interval controls how often the
telescope re-slews to the updated ephemeris position (in seconds).
Autoguiding is incompatible with non-sidereal tracking and will be disabled.
If using TLE's for Earth-orbiting objects, provide the ``tle`` and use
"TLE" as the ``lookup_name``.

Currently supports astropy built-in bodies (planets, Moon, Sun).
Support for comets and asteroids (via JPL Horizons) is planned.
Currently supports astropy built-in bodies (planets, Moon, Sun) and small
bodies (asteroids, comets) that have ephemerides in JPL Horizons, and
Earth-orbiting objects through TLEs.

**Schedule example for tracking Saturn**::

Expand All @@ -509,6 +512,7 @@ class ObjectActionConfig(BaseActionConfig):
"exptime": 30,
"filter": "Clear",
"nonsidereal_recenter_interval": 300,
"nonsidereal_start_lead_time_seconds": 60,
},
"start_time":"2025-01-01 00:00:00.000",
"end_time":"2025-01-01 01:00:00.000",
Expand All @@ -523,6 +527,7 @@ class ObjectActionConfig(BaseActionConfig):
alt: Optional[float] = None
az: Optional[float] = None
lookup_name: Optional[str] = None
tle: Optional[str] = None
filter: Optional[str] = None
focus_shift: Optional[float] = None
focus_position: Optional[float] = None
Expand All @@ -539,9 +544,12 @@ class ObjectActionConfig(BaseActionConfig):
subframe_center_x: float = 0.5
subframe_center_y: float = 0.5
nonsidereal_recenter_interval: int = 0
nonsidereal_start_lead_time_seconds: float = 60.0
_nonsidereal: bool = field(default=False, init=False, repr=False)
_ra_interp: Any = field(default=None, init=False, repr=False)
_dec_interp: Any = field(default=None, init=False, repr=False)
_ra_rate_interp: Any = field(default=None, init=False, repr=False)
_dec_rate_interp: Any = field(default=None, init=False, repr=False)

FIELD_DESCRIPTIONS: ClassVar[dict[str, str]] = {
"object": "Target name.",
Expand All @@ -551,12 +559,14 @@ class ObjectActionConfig(BaseActionConfig):
"alt": "Altitude coordinate when issuing Alt/Az pointings.",
"az": "Azimuth coordinate when issuing Alt/Az pointings.",
"lookup_name": "Instead of specifying ra/dec or alt/az, use SIMBAD/Astropy to look up coordinates for celestial body to observe (e.g., 'mars', 'M31').",
"tle": "TLE data for Earth-orbiting objects",
"filter": "Filter name to load before imaging.",
"focus_shift": "Focus offset relative to the stored best focus.",
"focus_position": "Absolute focus position override.",
"n": "Number of exposures in the sequence. If not specified, defaults to infinite exposures until end_time.",
"guiding": "Start autoguiding with Donuts before imaging. Should be False for solar system objects using non-sidereal tracking, as the star field drifts relative to the guide reference.",
"nonsidereal_recenter_interval": "For solar system objects (resolved via lookup_name): re-slew to the updated ephemeris position and refresh tracking rates every N seconds. Set to 0 to disable. Ignored for non-solar-system targets.",
"nonsidereal_start_lead_time_seconds": "For non-sidereal targets: initial lead time in seconds used to pre-point before sequence start. The telescope slews to the predicted target position at this offset, waits until the offset time is reached, then starts imaging and applies tracking rates. Set to 0 to start immediately.",
"pointing": "Perform pointing correction with twirl before imaging.",
"bin": "Camera binning factor.",
"dir": "Base directory path for saving images.",
Expand Down Expand Up @@ -625,6 +635,12 @@ def validate(self):
# Subframe validation
self.validate_subframe()

if self.nonsidereal_start_lead_time_seconds < 0:
raise ValueError(
"nonsidereal_start_lead_time_seconds must be >= 0, "
f"got {self.nonsidereal_start_lead_time_seconds}"
)
Comment on lines +638 to +642

def _resolve_lookup_name(
self,
start_time: Time,
Expand All @@ -643,14 +659,31 @@ def _resolve_lookup_name(
"""
duration_hours = (end_time - start_time).to_value("hr") + 0.5
try:
self._ra_interp, self._dec_interp = precompute_ephemeris(
self.lookup_name, start_time, duration_hours, observatory_location
(
self._ra_interp,
self._dec_interp,
self._ra_rate_interp,
self._dec_rate_interp,
) = precompute_ephemeris(
self.lookup_name,
start_time,
duration_hours,
observatory_location,
# Always use a fine sampling interval so that rate interpolation
# is accurate throughout the observation. nonsidereal_recenter_interval
# controls only when the telescope physically re-slews, and must not
# be conflated with the ephemeris resolution.
1.0,
self.tle,
Comment on lines +676 to +677
return_rates=True,
)
Comment on lines 660 to 679
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

self._nonsidereal = True
ra = float(self._ra_interp(0.0)) % 360.0
dec = float(self._dec_interp(0.0))
except NotMovingBodyError:
self._nonsidereal = False
self._ra_rate_interp = None
self._dec_rate_interp = None

target_coord = get_body_coordinates(
body_name=self.lookup_name,
Expand Down Expand Up @@ -714,13 +747,6 @@ def validate_visibility(
if ra is None or dec is None:
return

# Create target coordinate
target = SkyCoord(
ra=u.Quantity(ra, "deg"),
dec=u.Quantity(dec, "deg"),
frame="icrs",
)

# Check times: start, middle, end
mid_time = Time(
(start_time.unix + end_time.unix) / 2,
Expand All @@ -735,6 +761,29 @@ def validate_visibility(
visibility_issues = []

for label, check_time in check_times:
# For non-sidereal targets, query the ephemeris interpolators at each
# check time so that the object's actual position is used rather than
# its start-time coordinates.
if (
self._nonsidereal
and self._ra_interp is not None
and self._dec_interp is not None
):
elapsed_s = check_time.unix - start_time.unix
check_ra = float(self._ra_interp(elapsed_s)) % 360.0
check_dec = float(self._dec_interp(elapsed_s))
target = SkyCoord(
ra=u.Quantity(check_ra, "deg"),
dec=u.Quantity(check_dec, "deg"),
frame="icrs",
)
else:
target = SkyCoord(
ra=u.Quantity(ra, "deg"),
dec=u.Quantity(dec, "deg"),
frame="icrs",
)

# Transform to horizontal coordinates
altaz_frame = AltAz(obstime=check_time, location=observatory_location)
target_altaz = target.transform_to(altaz_frame)
Expand All @@ -747,8 +796,13 @@ def validate_visibility(
)

if visibility_issues:
coord_str = (
f"(non-sidereal, lookup_name='{self.lookup_name}')"
if self._nonsidereal
else f"at RA={ra:.2f}°, Dec={dec:.2f}°"
)
raise ValueError(
f"Target '{self.object}' at RA={ra:.2f}°, Dec={dec:.2f}° "
f"Target '{self.object}' {coord_str} "
f"is not visible during observation window:\n "
+ "\n ".join(visibility_issues)
)
Expand Down
36 changes: 22 additions & 14 deletions src/astra/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,21 +624,21 @@ async def schedule():
list: Schedule items with start/end times formatted as HH:MM:SS,
or empty list if no schedule exists.
"""
obs = OBSERVATORY
if (
obs is None
or not hasattr(obs, "schedule_manager")
or obs.schedule_manager is None
):
logger.warning(
"Schedule request but OBSERVATORY not initialized or has no schedule_manager"
)
return []
try:
obs = OBSERVATORY
if (
obs is None
or not hasattr(obs, "schedule_manager")
or obs.schedule_manager is None
):
logger.warning(
"Schedule request but OBSERVATORY not initialized or has no schedule_manager"
)
return []
Comment on lines +627 to +637

if getattr(obs.schedule_manager, "schedule_mtime", 0) == 0:
return []
if getattr(obs.schedule_manager, "schedule_mtime", 0) == 0:
return []

try:
schedule_obj = obs.schedule_manager.get_schedule()
schedule = schedule_obj.to_dataframe()

Expand All @@ -649,8 +649,16 @@ async def schedule():
schedule["end_HHMMSS"] = pd.to_datetime(
schedule["end_time"], errors="coerce"
).apply(lambda x: x.strftime("%H:%M:%S") if pd.notna(x) else "")

# remove private keys from action_value
schedule["action_value"] = schedule["action_value"].apply(
lambda x: {k: v for k, v in x.items() if not k.startswith("_")}
if isinstance(x, dict)
else x
)

obs.logger.debug("Schedule read for frontend")
result = schedule.to_dict(orient="records")
result = json.loads(schedule.to_json(orient="records", date_format="iso"))

return result

Expand Down
Loading