diff --git a/README.md b/README.md index 0d6ece0..24a708c 100644 --- a/README.md +++ b/README.md @@ -59,12 +59,49 @@ Alternatively, you can install the package using pip: The actor is typically started as a service in the PFS ICS environment using the `sdss-actorcore` tools. +## Exposure Pipeline + +The AG actor has two operational modes that share a common exposure pipeline (`exposure.py`): + +### Field Acquisition (one-shot) + +Performed via the `acquire_field` command (`AgCmd.acquire_field`). This is a one-off procedure used to initially find the field. It: + +- Fetches its own guide catalog internally (with `is_guide=False`) for this single frame. +- Does **not** apply correction range checking (`max_correction=None`) because the initial offset from the expected field center can be large. +- Whether corrections are actually sent to the telescope is controlled by the user's `guide` keyword (`send_offsets` parameter). + +### Autoguiding (continuous loop) + +Performed via `AgThread.run` in the AG controller. This is the continuous guiding mode that takes consecutive exposures and computes incremental offsets. It: + +- Pre-loads a `GuideCatalog` once (with `is_guide=True`) before the guide loop starts, and reuses it across all iterations. +- Each iteration only re-fetches telescope status and detected objects for the current frame. +- Applies correction range checking (`max_correction`, default 10″) — corrections that exceed this threshold are rejected and an alert is raised. +- Always sends corrections to the telescope when the offset is within the allowed range. + +### Shared Pipeline Steps + +Both modes delegate to `run_exposure_pipeline()` in `exposure.py`, which executes the following steps in order: + +1. **Take AGCC exposure** — build and queue the AG camera command. +2. **Gather telescope state** — at mid-exposure, collect status from mlp1/gen2/opDB as configured. +3. **Wait for exposure** — block until the AGCC exposure completes. +4. **Compute `taken_at` timestamp** — calculate the effective observation time. +5. **Compute guide offsets** — either via `field_acquisition.acquire_field` (acquisition) or `autoguide.get_exposure_offsets` (guiding). +6. **Report results** — emit `cmd.inform` messages with offset values and save NumPy files. +7. **Range check & send corrections** — validate offsets against `max_correction` (if set) and send to the telescope. +8. **Compute focus** — calculate focus offset and tilt from matched detected objects. +9. **Write to opDB** — persist guide offsets and match results to the operational database. + ## Project Structure - `python/agActor/`: Main source code - `main.py`: Actor entry point and core functionality - - `autoguide.py`: Auto-guiding implementation - - `field_acquisition.py`: Field acquisition functionality + - `exposure.py`: Shared AG exposure pipeline (see [Exposure Pipeline](#exposure-pipeline)) + - `config.py`: Shared configuration parsed from `actorConfig` + - `autoguide.py`: Auto-guiding offset computation (uses pre-loaded guide catalog) + - `field_acquisition.py`: Field acquisition offset computation (fetches its own guide catalog) - `Commands/`: Command handlers for actor commands - `Controllers/`: Hardware controllers - `catalog/`: Star catalog integration diff --git a/python/agActor/Commands/AgCmd.py b/python/agActor/Commands/AgCmd.py index e518c5e..871e34f 100644 --- a/python/agActor/Commands/AgCmd.py +++ b/python/agActor/Commands/AgCmd.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -import time import numpy as np import opscore.protocols.keys as keys @@ -8,13 +7,11 @@ from pfs.utils.database.opdb import OpDB from pfs.utils.database.gaia import GaiaDB -from agActor import field_acquisition +from agActor.config import AgConfig from agActor.Controllers.ag import ag -from agActor.catalog import pfs_design -from agActor.utils import actorCalls, data as data_utils +from agActor.exposure import run_exposure_pipeline +from agActor.utils import data as data_utils from agActor.utils.focus import focus -from agActor.utils.actorCalls import send_guide_offsets -from agActor.utils.telescope_center import telCenter as tel_center class AgCmd: @@ -171,20 +168,82 @@ def __init__(self, actor): OpDB.set_default_connection(**db_params.get("opdb", {})) GaiaDB.set_default_connection(**db_params.get("gaia", {})) - self.with_opdb_agc_guide_offset = actor.actorConfig.get( - "agc_guide_offset", False + # Parse shared configuration once and store on the actor for AgThread to use. + self.cfg = AgConfig.from_actor_config(actor.actorConfig) + actor.ag_config = self.cfg + self.actor.logger.info(f"AgCmd: ag_config={self.cfg}") + + def _parse_design(self, cmd): + """Parse design_id/design_path keywords into a design tuple or None.""" + design_id = None + if "design_id" in cmd.cmd.keywords: + design_id = int(cmd.cmd.keywords["design_id"].values[0], 0) + design_path = self.cfg.with_design_path if design_id is not None else None + if "design_path" in cmd.cmd.keywords: + design_path = str(cmd.cmd.keywords["design_path"].values[0]) + design = ( + (design_id, design_path) + if any(x is not None for x in (design_id, design_path)) + else None ) - self.with_opdb_agc_match = actor.actorConfig.get("agc_match", False) - self.with_agcc_timestamp = actor.actorConfig.get("agcc_timestamp", False) + return design_id, design_path, design - tel_status = [ - x.strip() - for x in actor.actorConfig.get("tel_status", ("agc_exposure",)).split(",") - ] - self.with_gen2_status = "gen2" in tel_status - self.with_mlp1_status = "mlp1" in tel_status - self.with_opdb_tel_status = "tel_status" in tel_status - self.with_design_path = actor.actorConfig.get("design_path", "").strip() or None + def _parse_visit(self, cmd): + """Parse visit_id/visit keywords.""" + visit_id = None + if "visit_id" in cmd.cmd.keywords: + visit_id = int(cmd.cmd.keywords["visit_id"].values[0]) + elif "visit" in cmd.cmd.keywords: + visit_id = int(cmd.cmd.keywords["visit"].values[0]) + return visit_id + + def _parse_exposure_time(self, cmd, default=2000): + """Parse exposure_time keyword with a minimum of 100 ms.""" + exposure_time = default + if "exposure_time" in cmd.cmd.keywords: + exposure_time = int(cmd.cmd.keywords["exposure_time"].values[0]) + if exposure_time < 100: + exposure_time = 100 + return exposure_time + + def _parse_cadence(self, cmd, default=0): + """Parse cadence keyword with a minimum of 0 ms.""" + cadence = default + if "cadence" in cmd.cmd.keywords: + cadence = int(cmd.cmd.keywords["cadence"].values[0]) + if cadence < 0: + cadence = 0 + return cadence + + # Mapping of keyword name -> (type_converter, default_value_or_None). + # Keywords with a default of None are only added to the options dict when present. + _OPTION_KEYWORDS = { + "magnitude": (float, None), + "dry_run": (bool, None), + "fit_dinr": (bool, None), + "fit_dscale": (bool, None), + "max_ellipticity": (float, None), + "max_size": (float, None), + "min_size": (float, None), + "max_residual": (float, None), + "max_correction": (float, None), + "exposure_delay": (int, None), + "tec_off": (bool, None), + "filter_bad_shape": (bool, None), + } + + def _parse_options(self, cmd): + """Parse optional keywords that map into the options/kwargs dict. + + Only keywords that are present in the command are included in the + returned dict, so callers can distinguish "not provided" from an + explicit value. + """ + options = {} + for key, (converter, _) in self._OPTION_KEYWORDS.items(): + if key in cmd.cmd.keywords: + options[key] = converter(cmd.cmd.keywords[key].values[0]) + return options def ping(self, cmd): """Return a product name.""" @@ -219,32 +278,15 @@ def acquire_field(self, cmd): cmd.fail(f'text="AgCmd.acquire_field: mode={mode}"') return - design_id = None - if "design_id" in cmd.cmd.keywords: - design_id = int(cmd.cmd.keywords["design_id"].values[0], 0) - design_path = self.with_design_path if design_id is not None else None - if "design_path" in cmd.cmd.keywords: - design_path = str(cmd.cmd.keywords["design_path"].values[0]) - design = ( - (design_id, design_path) - if any(x is not None for x in (design_id, design_path)) - else None - ) - visit_id = None - if "visit_id" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit_id"].values[0]) - elif "visit" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit"].values[0]) + design_id, design_path, design = self._parse_design(cmd) + visit_id = self._parse_visit(cmd) visit0 = None if "visit0" in cmd.cmd.keywords: visit0 = int(cmd.cmd.keywords["visit0"].values[0]) - exposure_time = 2000 # ms - if "exposure_time" in cmd.cmd.keywords: - exposure_time = int(cmd.cmd.keywords["exposure_time"].values[0]) - if exposure_time < 100: - exposure_time = 100 + exposure_time = self._parse_exposure_time(cmd) + guide = True if "guide" in cmd.cmd.keywords: guide = bool(cmd.cmd.keywords["guide"].values[0]) @@ -258,255 +300,44 @@ def acquire_field(self, cmd): if "dinr" in cmd.cmd.keywords: dinr = float(cmd.cmd.keywords["dinr"].values[0]) - kwargs = {} - if "magnitude" in cmd.cmd.keywords: - magnitude = float(cmd.cmd.keywords["magnitude"].values[0]) - kwargs["magnitude"] = magnitude - dry_run = ag.DRY_RUN - if "dry_run" in cmd.cmd.keywords: - dry_run = bool(cmd.cmd.keywords["dry_run"].values[0]) - if "fit_dinr" in cmd.cmd.keywords: - fit_dinr = bool(cmd.cmd.keywords["fit_dinr"].values[0]) - kwargs["fit_dinr"] = fit_dinr - if "fit_dscale" in cmd.cmd.keywords: - fit_dscale = bool(cmd.cmd.keywords["fit_dscale"].values[0]) - kwargs["fit_dscale"] = fit_dscale - - max_ellipticity = ag.MAX_ELLIPTICITY - max_size = ag.MAX_SIZE - min_size = ag.MIN_SIZE - max_residual = ag.MAX_RESIDUAL - - if "max_ellipticity" in cmd.cmd.keywords: - max_ellipticity = float(cmd.cmd.keywords["max_ellipticity"].values[0]) - kwargs["max_ellipticity"] = max_ellipticity - if "max_size" in cmd.cmd.keywords: - max_size = float(cmd.cmd.keywords["max_size"].values[0]) - kwargs["max_size"] = max_size - if "min_size" in cmd.cmd.keywords: - min_size = float(cmd.cmd.keywords["min_size"].values[0]) - kwargs["min_size"] = min_size - if "max_residual" in cmd.cmd.keywords: - max_residual = float(cmd.cmd.keywords["max_residual"].values[0]) - kwargs["max_residual"] = max_residual - - exposure_delay = ag.EXPOSURE_DELAY - if "exposure_delay" in cmd.cmd.keywords: - exposure_delay = int(cmd.cmd.keywords["exposure_delay"].values[0]) - tec_off = ag.TEC_OFF - if "tec_off" in cmd.cmd.keywords: - tec_off = bool(cmd.cmd.keywords["tec_off"].values[0]) - - kwargs["filter_bad_shape"] = ag.FILTER_BAD_SHAPE - if "filter_bad_shape" in cmd.cmd.keywords: - kwargs["filter_bad_shape"] = bool(cmd.cmd.keywords["filter_bad_shape"].values[0]) + kwargs = self._parse_options(cmd) + + # acquire_field-specific defaults for options not provided by the user. + dry_run = kwargs.pop("dry_run", ag.DRY_RUN) + max_ellipticity = kwargs.get("max_ellipticity", ag.MAX_ELLIPTICITY) + max_size = kwargs.get("max_size", ag.MAX_SIZE) + min_size = kwargs.get("min_size", ag.MIN_SIZE) + exposure_delay = kwargs.pop("exposure_delay", ag.EXPOSURE_DELAY) + tec_off = kwargs.pop("tec_off", ag.TEC_OFF) + kwargs.setdefault("filter_bad_shape", ag.FILTER_BAD_SHAPE) self.actor.logger.info(f"AgCmd.acquire_field: kwargs={kwargs}") try: - cmd.inform(f"exposureTime={exposure_time}") - # start an exposure - cmdStr = f"expose object exptime={exposure_time / 1000} centroid=1" - if visit_id is not None: - cmdStr += f" visit={visit_id}" - if exposure_delay > 0: - cmdStr += f" threadDelay={exposure_delay}" - if tec_off: - cmdStr += " tecOFF" - - self.actor.logger.info(f"AgCmd.acquire_field: Sending agcc cmdStr={cmdStr}") - agcc_exposure_result = self.actor.queueCommand( - actor="agcc", - cmdStr=cmdStr, - timeLim=((exposure_time + 6 * exposure_delay) // 1000 + 15), - ) - # This synchronous sleep is to defer the request for telescope info to roughly - # the middle of the exposure. - time.sleep((exposure_time + 7 * exposure_delay) / 1000 / 2) - telescope_state = None - - if self.with_mlp1_status: - telescope_state = self.actor.mlp1.telescopeState - self.actor.logger.info( - f"AgCmd.acquire_field: telescopeState={telescope_state}" - ) - kwargs["inr"] = telescope_state["rotator_real_angle"] - - if self.with_gen2_status or self.with_opdb_tel_status: - if self.with_gen2_status: - # update gen2 status values - tel_status = actorCalls.updateTelStatus( - self.actor, self.actor.logger, visit_id - ) - self.actor.logger.info( - f"AgCmd.acquire_field: tel_status={tel_status}" - ) - kwargs["tel_status"] = tel_status - _tel_center = tel_center( - actor=self.actor, - center=center, - design=design, - tel_status=tel_status, - ) - - if all(x is None for x in (center, design)): - center, _offset = ( - _tel_center.dither - ) # dithered center and guide offset correction (insrot only) - self.actor.logger.info(f"AgCmd.acquire_field: center={center}") - else: - _offset = ( - _tel_center.offset - ) # dithering and guide offset correction - - if offset is None: - offset = _offset - self.actor.logger.info(f"AgCmd.acquire_field: offset={offset}") - - if self.with_opdb_tel_status: - status_update = self.actor.gen2.statusUpdate - status_id = (status_update["visit"], status_update["sequenceNum"]) - self.actor.logger.info( - f"AgCmd.acquire_field: status_id={status_id}" - ) - kwargs["status_id"] = status_id - - # wait for an exposure to complete - agcc_exposure_result.get() - frame_id = self.actor.agcc.frameId - self.actor.logger.info(f"AgCmd.acquire_field: frameId={frame_id}") - data_time = self.actor.agcc.dataTime - self.actor.logger.info(f"AgCmd.acquire_field: dataTime={data_time}") - taken_at = data_time + (exposure_time + 7 * exposure_delay) / 1000 / 2 - self.actor.logger.info(f"AgCmd.acquire_field: taken_at={taken_at}") - if self.with_agcc_timestamp: - kwargs["taken_at"] = ( - taken_at # unix timestamp, not timezone-aware datetime - ) - if self.with_mlp1_status: - # possibly override timestamp from agcc - taken_at = self.actor.mlp1.setUnixDay( - telescope_state["az_el_detect_time"], taken_at - ) - kwargs["taken_at"] = taken_at - if center is not None: - kwargs["center"] = center - if offset is not None: - kwargs["offset"] = offset - if dinr is not None: - kwargs["dinr"] = dinr - - # retrieve field center coordinates from opdb - # retrieve exposure information from opdb - # retrieve guide star coordinates from opdb - # retrieve metrics of detected objects from opdb - # compute offsets, scale, transparency, and seeing - cmd.inform("detectionState=1") - - self.actor.logger.info( - "AgCmd.acquire_field: Calling field_acquisition.acquire_field for guiding" - ) - guide_offsets = field_acquisition.acquire_field( + run_exposure_pipeline( + actor=self.actor, + cmd=cmd, + cfg=self.cfg, design_id=design_id, + design_path=design_path, + design=design, + visit_id=visit_id, visit0=visit0, - frame_id=frame_id, - **kwargs, - ) - - ra = guide_offsets.ra - dec = guide_offsets.dec - inst_pa = guide_offsets.inst_pa - dra = guide_offsets.ra_offset - ddec = guide_offsets.dec_offset - dinr = guide_offsets.inr_offset - dscale = guide_offsets.scale_offset - dalt = guide_offsets.dalt - daz = guide_offsets.daz - - cmd.inform( - f'text="{ra=},{dec=},{inst_pa=},{dra=},{ddec=},{dinr=},{dscale=},{dalt=},{daz=}"' - ) - - filenames = guide_offsets.save_numpy_files() - - cmd.inform( - 'data={},{},{},"{}","{}","{}"'.format(ra, dec, inst_pa, *filenames) - ) - cmd.inform("detectionState=0") - - if guide: - dx, dy, size, peak, flux = ( - guide_offsets.dx, - guide_offsets.dy, - guide_offsets.size, - guide_offsets.peak, - guide_offsets.flux, - ) - send_guide_offsets( - actor=self.actor, - taken_at=taken_at, - daz=daz, - dalt=dalt, - dx=dx, - dy=dy, - size=size, - peak=peak, - flux=flux, - dry_run=dry_run, - logger=self.actor.logger, - ) - - # always compute focus offset and tilt - self.actor.logger.info("AgCmd.acquire_field: Calling focus") - dz, dzs = focus( - detected_objects=guide_offsets.detected_objects, + exposure_time=exposure_time, + exposure_delay=exposure_delay, + tec_off=tec_off, + center=center, + offset=offset, + dinr=dinr, + guide_catalog=None, + send_offsets=guide, + dry_run=dry_run, + max_correction=None, # no range checking for acquire_field max_ellipticity=max_ellipticity, max_size=max_size, min_size=min_size, + **kwargs, ) - # send corrections to gen2 (or iic) - guide_status = "OK" - if dalt is None: - dalt = np.nan - if daz is None: - daz = np.nan - - cmd.inform( - "guideErrors={},{},{},{},{},{},{},{},{}".format( - frame_id, dra, ddec, dinr, daz, dalt, dz, dscale, guide_status - ) - ) - cmd.inform("focusErrors={},{},{},{},{},{},{}".format(frame_id, *dzs)) - # store results in opdb - if self.with_opdb_agc_guide_offset: - data_utils.write_agc_guide_offset( - frame_id=frame_id, - ra=ra, - dec=dec, - pa=inst_pa, - delta_ra=dra, - delta_dec=ddec, - delta_insrot=dinr, - delta_scale=dscale, - delta_az=daz, - delta_el=dalt, - delta_z=dz, - delta_zs=dzs, - ) - if self.with_opdb_agc_match: - data_utils.write_agc_match( - design_id=( - design_id - if design_id is not None - else pfs_design.pfsDesign.to_design_id(design_path) - if design_path is not None - else 0 - ), - frame_id=frame_id, - guide_objects=guide_offsets.guide_objects, - detected_objects=guide_offsets.detected_objects, - identified_objects=guide_offsets.identified_objects, - ) except Exception as e: self.actor.logger.exception("AgCmd.acquire_field:") cmd.fail(f'text="AgCmd.acquire_field: {e}"') @@ -521,34 +352,15 @@ def focus(self, cmd): cmd.fail(f'text="AgCmd.focus: mode={mode}"') return - visit_id = None - if "visit_id" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit_id"].values[0]) - elif "visit" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit"].values[0]) - exposure_time = 2000 # ms - if "exposure_time" in cmd.cmd.keywords: - exposure_time = int(cmd.cmd.keywords["exposure_time"].values[0]) - if exposure_time < 100: - exposure_time = 100 - - max_ellipticity = ag.MAX_ELLIPTICITY - max_size = ag.MAX_SIZE - min_size = ag.MIN_SIZE + visit_id = self._parse_visit(cmd) + exposure_time = self._parse_exposure_time(cmd) - if "max_ellipticity" in cmd.cmd.keywords: - max_ellipticity = float(cmd.cmd.keywords["max_ellipticity"].values[0]) - if "max_size" in cmd.cmd.keywords: - max_size = float(cmd.cmd.keywords["max_size"].values[0]) - if "min_size" in cmd.cmd.keywords: - min_size = float(cmd.cmd.keywords["min_size"].values[0]) - - exposure_delay = ag.EXPOSURE_DELAY - if "exposure_delay" in cmd.cmd.keywords: - exposure_delay = int(cmd.cmd.keywords["exposure_delay"].values[0]) - tec_off = ag.TEC_OFF - if "tec_off" in cmd.cmd.keywords: - tec_off = bool(cmd.cmd.keywords["tec_off"].values[0]) + options = self._parse_options(cmd) + max_ellipticity = options.get("max_ellipticity", ag.MAX_ELLIPTICITY) + max_size = options.get("max_size", ag.MAX_SIZE) + min_size = options.get("min_size", ag.MIN_SIZE) + exposure_delay = options.get("exposure_delay", ag.EXPOSURE_DELAY) + tec_off = options.get("tec_off", ag.TEC_OFF) try: cmd.inform(f"exposureTime={exposure_time}") @@ -598,7 +410,7 @@ def focus(self, cmd): ) cmd.inform("focusErrors={},{},{},{},{},{},{}".format(frame_id, *dzs)) # store results in opdb - if self.with_opdb_agc_guide_offset: + if self.cfg.with_opdb_agc_guide_offset: self.actor.logger.info( f"AgCmd.focus: Writing opdb_agc_guide_offset: {dz=} {dzs=}" ) @@ -615,78 +427,23 @@ def start_autoguide(self, cmd): self.actor.logger.info(f"AgCmd.start_autoguide: {cmd.cmd.keywords}") controller = self.actor.controllers["ag"] - design_id = None - if "design_id" in cmd.cmd.keywords: - design_id = int(cmd.cmd.keywords["design_id"].values[0], 0) - design_path = self.with_design_path if design_id is not None else None - if "design_path" in cmd.cmd.keywords: - design_path = str(cmd.cmd.keywords["design_path"].values[0]) - design = ( - (design_id, design_path) - if any(x is not None for x in (design_id, design_path)) - else None - ) - visit_id = None - if "visit_id" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit_id"].values[0]) - elif "visit" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit"].values[0]) + design_id, design_path, design = self._parse_design(cmd) + visit_id = self._parse_visit(cmd) visit0 = None if "visit0" in cmd.cmd.keywords: visit0 = int(cmd.cmd.keywords["visit0"].values[0]) - from_sky = None if "from_sky" in cmd.cmd.keywords: from_sky = bool(cmd.cmd.keywords["from_sky"].values[0]) - exposure_time = 2000 # ms - if "exposure_time" in cmd.cmd.keywords: - exposure_time = int(cmd.cmd.keywords["exposure_time"].values[0]) - if exposure_time < 100: - exposure_time = 100 - cadence = 0 # ms - if "cadence" in cmd.cmd.keywords: - cadence = int(cmd.cmd.keywords["cadence"].values[0]) - if cadence < 0: - cadence = 0 + exposure_time = self._parse_exposure_time(cmd) + cadence = self._parse_cadence(cmd) center = None if "center" in cmd.cmd.keywords: center = tuple([float(x) for x in cmd.cmd.keywords["center"].values]) - kwargs = {} - if "magnitude" in cmd.cmd.keywords: - magnitude = float(cmd.cmd.keywords["magnitude"].values[0]) - kwargs["magnitude"] = magnitude - if "dry_run" in cmd.cmd.keywords: - dry_run = bool(cmd.cmd.keywords["dry_run"].values[0]) - kwargs["dry_run"] = dry_run - if "fit_dinr" in cmd.cmd.keywords: - fit_dinr = bool(cmd.cmd.keywords["fit_dinr"].values[0]) - kwargs["fit_dinr"] = fit_dinr - if "fit_dscale" in cmd.cmd.keywords: - fit_dscale = bool(cmd.cmd.keywords["fit_dscale"].values[0]) - kwargs["fit_dscale"] = fit_dscale - if "max_ellipticity" in cmd.cmd.keywords: - max_ellipticity = float(cmd.cmd.keywords["max_ellipticity"].values[0]) - kwargs["max_ellipticity"] = max_ellipticity - if "max_size" in cmd.cmd.keywords: - max_size = float(cmd.cmd.keywords["max_size"].values[0]) - kwargs["max_size"] = max_size - if "min_size" in cmd.cmd.keywords: - min_size = float(cmd.cmd.keywords["min_size"].values[0]) - kwargs["min_size"] = min_size - if "max_residual" in cmd.cmd.keywords: - max_residual = float(cmd.cmd.keywords["max_residual"].values[0]) - kwargs["max_residual"] = max_residual - if "exposure_delay" in cmd.cmd.keywords: - exposure_delay = int(cmd.cmd.keywords["exposure_delay"].values[0]) - kwargs["exposure_delay"] = exposure_delay - if "tec_off" in cmd.cmd.keywords: - tec_off = bool(cmd.cmd.keywords["tec_off"].values[0]) - kwargs["tec_off"] = tec_off - if "max_correction" in cmd.cmd.keywords: - max_correction = float(cmd.cmd.keywords["max_correction"].values[0]) - kwargs["max_correction"] = max_correction + + kwargs = self._parse_options(cmd) try: self.actor.logger.info(f"AgCmd.start_autoguide: kwargs={kwargs}") @@ -711,72 +468,19 @@ def initialize_autoguide(self, cmd): self.actor.logger.info(f"AgCmd.initialize_autoguide: {cmd.cmd.keywords}") controller = self.actor.controllers["ag"] - design_id = None - if "design_id" in cmd.cmd.keywords: - design_id = int(cmd.cmd.keywords["design_id"].values[0], 0) - design_path = self.with_design_path if design_id is not None else None - if "design_path" in cmd.cmd.keywords: - design_path = str(cmd.cmd.keywords["design_path"].values[0]) - design = ( - (design_id, design_path) - if any(x is not None for x in (design_id, design_path)) - else None - ) - visit_id = None - if "visit_id" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit_id"].values[0]) - elif "visit" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit"].values[0]) + design_id, design_path, design = self._parse_design(cmd) + visit_id = self._parse_visit(cmd) + from_sky = None if "from_sky" in cmd.cmd.keywords: from_sky = bool(cmd.cmd.keywords["from_sky"].values[0]) - exposure_time = 2000 # ms - if "exposure_time" in cmd.cmd.keywords: - exposure_time = int(cmd.cmd.keywords["exposure_time"].values[0]) - if exposure_time < 100: - exposure_time = 100 - cadence = 0 # ms - if "cadence" in cmd.cmd.keywords: - cadence = int(cmd.cmd.keywords["cadence"].values[0]) - if cadence < 0: - cadence = 0 + exposure_time = self._parse_exposure_time(cmd) + cadence = self._parse_cadence(cmd) center = None if "center" in cmd.cmd.keywords: center = tuple([float(x) for x in cmd.cmd.keywords["center"].values]) - kwargs = {} - if "magnitude" in cmd.cmd.keywords: - magnitude = float(cmd.cmd.keywords["magnitude"].values[0]) - kwargs["magnitude"] = magnitude - if "dry_run" in cmd.cmd.keywords: - dry_run = bool(cmd.cmd.keywords["dry_run"].values[0]) - kwargs["dry_run"] = dry_run - if "fit_dinr" in cmd.cmd.keywords: - fit_dinr = bool(cmd.cmd.keywords["fit_dinr"].values[0]) - kwargs["fit_dinr"] = fit_dinr - if "fit_dscale" in cmd.cmd.keywords: - fit_dscale = bool(cmd.cmd.keywords["fit_dscale"].values[0]) - kwargs["fit_dscale"] = fit_dscale - if "max_ellipticity" in cmd.cmd.keywords: - max_ellipticity = float(cmd.cmd.keywords["max_ellipticity"].values[0]) - kwargs["max_ellipticity"] = max_ellipticity - if "max_size" in cmd.cmd.keywords: - max_size = float(cmd.cmd.keywords["max_size"].values[0]) - kwargs["max_size"] = max_size - if "min_size" in cmd.cmd.keywords: - min_size = float(cmd.cmd.keywords["min_size"].values[0]) - kwargs["min_size"] = min_size - if "max_residual" in cmd.cmd.keywords: - max_residual = float(cmd.cmd.keywords["max_residual"].values[0]) - kwargs["max_residual"] = max_residual - if "exposure_delay" in cmd.cmd.keywords: - exposure_delay = int(cmd.cmd.keywords["exposure_delay"].values[0]) - kwargs["exposure_delay"] = exposure_delay - if "tec_off" in cmd.cmd.keywords: - tec_off = bool(cmd.cmd.keywords["tec_off"].values[0]) - kwargs["tec_off"] = tec_off - if "max_correction" in cmd.cmd.keywords: - max_correction = float(cmd.cmd.keywords["max_correction"].values[0]) - kwargs["max_correction"] = max_correction + + kwargs = self._parse_options(cmd) try: self.actor.logger.info(f"AgCmd.initialize_autoguide: kwargs={kwargs}") @@ -825,55 +529,15 @@ def reconfigure_autoguide(self, cmd): controller = self.actor.controllers["ag"] kwargs = {} - if "visit_id" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit_id"].values[0]) - kwargs["visit_id"] = visit_id - elif "visit" in cmd.cmd.keywords: - visit_id = int(cmd.cmd.keywords["visit"].values[0]) + visit_id = self._parse_visit(cmd) + if visit_id is not None: kwargs["visit_id"] = visit_id if "exposure_time" in cmd.cmd.keywords: - exposure_time = int(cmd.cmd.keywords["exposure_time"].values[0]) - if exposure_time < 100: - exposure_time = 100 - kwargs["exposure_time"] = exposure_time + kwargs["exposure_time"] = self._parse_exposure_time(cmd) if "cadence" in cmd.cmd.keywords: - cadence = int(cmd.cmd.keywords["cadence"].values[0]) - if cadence < 0: - cadence = 0 - kwargs["cadence"] = cadence - if "dry_run" in cmd.cmd.keywords: - dry_run = bool(cmd.cmd.keywords["dry_run"].values[0]) - kwargs["dry_run"] = dry_run - if "fit_dinr" in cmd.cmd.keywords: - fit_dinr = bool(cmd.cmd.keywords["fit_dinr"].values[0]) - kwargs["fit_dinr"] = fit_dinr - if "fit_dscale" in cmd.cmd.keywords: - fit_dscale = bool(cmd.cmd.keywords["fit_dscale"].values[0]) - kwargs["fit_dscale"] = fit_dscale - if "max_ellipticity" in cmd.cmd.keywords: - max_ellipticity = float(cmd.cmd.keywords["max_ellipticity"].values[0]) - kwargs["max_ellipticity"] = max_ellipticity - if "max_size" in cmd.cmd.keywords: - max_size = float(cmd.cmd.keywords["max_size"].values[0]) - kwargs["max_size"] = max_size - if "min_size" in cmd.cmd.keywords: - min_size = float(cmd.cmd.keywords["min_size"].values[0]) - kwargs["min_size"] = min_size - if "max_residual" in cmd.cmd.keywords: - max_residual = float(cmd.cmd.keywords["max_residual"].values[0]) - kwargs["max_residual"] = max_residual - if "exposure_delay" in cmd.cmd.keywords: - exposure_delay = int(cmd.cmd.keywords["exposure_delay"].values[0]) - kwargs["exposure_delay"] = exposure_delay - if "tec_off" in cmd.cmd.keywords: - tec_off = bool(cmd.cmd.keywords["tec_off"].values[0]) - kwargs["tec_off"] = tec_off - if "max_correction" in cmd.cmd.keywords: - max_correction = float(cmd.cmd.keywords["max_correction"].values[0]) - kwargs["max_correction"] = max_correction - - if "filter_bad_shape" in cmd.cmd.keywords: - kwargs["filter_bad_shape"] = bool(cmd.cmd.keywords['filter_bad_shape'].values[0]) + kwargs["cadence"] = self._parse_cadence(cmd) + + kwargs.update(self._parse_options(cmd)) try: controller.reconfigure_autoguide(cmd=cmd, **kwargs) diff --git a/python/agActor/Controllers/ag.py b/python/agActor/Controllers/ag.py index c7c08f7..b9edc94 100644 --- a/python/agActor/Controllers/ag.py +++ b/python/agActor/Controllers/ag.py @@ -3,17 +3,11 @@ import threading import time -import numpy as np - from opscore.utility.qstr import qstr -from agActor import autoguide -from agActor.utils import actorCalls -from agActor.utils import data as data_utils -from agActor.utils.focus import focus -from agActor.utils.actorCalls import sendAlert, send_guide_offsets -from agActor.utils.data import GuideOffsetFlag, get_guide_objects -from agActor.utils.telescope_center import telCenter as tel_center +from agActor.exposure import run_exposure_pipeline +from agActor.utils.actorCalls import sendAlert +from agActor.utils.data import get_guide_objects class ag: @@ -198,19 +192,6 @@ def __init__(self, actor=None, logger=None): self.__abort = threading.Event() self.__stop = threading.Event() - self.with_opdb_agc_guide_offset = actor.actorConfig.get( - "agc_guide_offset", False - ) - self.with_opdb_agc_match = actor.actorConfig.get("agc_match", False) - self.with_agcc_timestamp = actor.actorConfig.get("agcc_timestamp", False) - tel_status = [ - x.strip() - for x in actor.actorConfig.get("tel_status", ("agc_exposure",)).split(",") - ] - self.with_gen2_status = "gen2" in tel_status - self.with_mlp1_status = "mlp1" in tel_status - self.with_opdb_tel_status = "tel_status" in tel_status - def __del__(self): self.logger.info("AgThread.__del__:") @@ -278,7 +259,6 @@ def run(self): ) design_id, design_path = design if design is not None else (None, None) - dither, offset = None, None guide_catalog = None except Exception as e: @@ -308,261 +288,41 @@ def run(self): # Do the actual AG exposure. exposure_delay = options.get("exposure_delay", ag.EXPOSURE_DELAY) tec_off = options.get("tec_off", ag.TEC_OFF) - - cmd.inform(f"exposureTime={exposure_time}") - - cmd_str = f"expose object exptime={exposure_time / 1000} centroid=1" - if visit_id is not None: - cmd_str += f" visit={visit_id}" - if exposure_delay > 0: - cmd_str += f" threadDelay={exposure_delay}" - if tec_off: - cmd_str += " tecOFF" - - self.logger.info(f"Taking AG exposure with: {cmd_str=}") - agcc_exposure_result = self.actor.queueCommand( - actor="agcc", - cmdStr=cmd_str, - timeLim=((exposure_time + 6 * exposure_delay) // 1000 + 15), - ) - time.sleep((exposure_time + 7 * exposure_delay) / 1000 / 2) - - kwargs = {} - telescope_state = None - if self.with_mlp1_status: - telescope_state = self.actor.mlp1.telescopeState - self.logger.info( - f"AgThread.run: telescopeState={telescope_state}" - ) - kwargs["inr"] = telescope_state["rotator_real_angle"] - - # update gen2 status values. - if self.with_gen2_status or self.with_opdb_tel_status: - self.logger.info("AgThread.run: getting gen2 status") - if self.with_gen2_status: - try: - tel_status = actorCalls.updateTelStatus( - self.actor, self.logger, visit_id - ) - except Exception as e: - # Raising a RuntimeError as this call is usually not fatal. - raise RuntimeError( - f"AgThread.updateTelStatus error: {e}" - ) - - self.logger.info(f"AgThread.run: {tel_status=}") - kwargs["tel_status"] = tel_status - _tel_center = tel_center( - actor=self.actor, - center=center, - design=design, - tel_status=tel_status, - ) - if all(x is None for x in (center, design)): - # dithered center and guide offset correction (insrot only) - center, offset = _tel_center.dither - self.logger.info(f"AgThread.run: {center=}") - else: - # dithering and guide offset correction - offset = _tel_center.offset - self.logger.info(f"AgThread.run: {offset=}") - - if self.with_opdb_tel_status: - status_update = self.actor.gen2.statusUpdate - status_id = ( - status_update["visit"], - status_update["sequenceNum"], - ) - self.logger.info(f"AgThread.run: status_id={status_id}") - kwargs["status_id"] = status_id - # wait for the exposure to complete. - agcc_exposure_result.get() - - data_time = self.actor.agcc.dataTime - self.logger.info(f"AgThread.run: dataTime={data_time}") - taken_at = ( - data_time + (exposure_time + 7 * exposure_delay) / 1000 / 2 - ) - self.logger.info(f"AgThread.run: taken_at={taken_at}") - if self.with_agcc_timestamp: - # unix timestamp, not timezone-aware datetime - kwargs["taken_at"] = taken_at - if self.with_mlp1_status: - # possibly override timestamp from agcc - taken_at = self.actor.mlp1.setUnixDay( - telescope_state["az_el_detect_time"], taken_at - ) - kwargs["taken_at"] = taken_at - - if center is not None: - kwargs["center"] = center - if offset is not None: - kwargs["offset"] = offset - max_correction = options.get("max_correction", ag.MAX_CORRECTION) max_ellipticity = options.get("max_ellipticity", ag.MAX_ELLIPTICITY) max_size = options.get("max_size", ag.MAX_SIZE) min_size = options.get("min_size", ag.MIN_SIZE) - max_residual = options.get("max_residual", ag.MAX_RESIDUAL) + dry_run = options.get("dry_run", ag.DRY_RUN) + pipeline_kwargs = {} + if "max_residual" in options: + pipeline_kwargs["max_residual"] = options["max_residual"] if "filter_bad_shape" in options: - kwargs["filter_bad_shape"] = options.get("filter_bad_shape") - - # Compute guide errors for exposure. - cmd.inform("detectionState=1") - frame_id = self.actor.agcc.frameId - self.logger.info( - f"AgThread.run: autoguide.autoguide for {frame_id=}" - ) - guide_offsets = autoguide.get_exposure_offsets( - frame_id=frame_id, + pipeline_kwargs["filter_bad_shape"] = options["filter_bad_shape"] + + run_exposure_pipeline( + actor=self.actor, + cmd=cmd, + cfg=self.actor.ag_config, + design_id=design_id, + design_path=design_path, + design=design, + visit_id=visit_id, + visit0=visit0, + exposure_time=exposure_time, + exposure_delay=exposure_delay, + tec_off=tec_off, + center=center, guide_catalog=guide_catalog, + send_offsets=True, + dry_run=dry_run, + max_correction=max_correction, max_ellipticity=max_ellipticity, max_size=max_size, min_size=min_size, - max_residual=max_residual, - **kwargs - ) - - # Extract values from the AutoguideResult dataclass - ra = guide_offsets.ra - dec = guide_offsets.dec - inst_pa = guide_offsets.inst_pa - dra = guide_offsets.ra_offset - ddec = guide_offsets.dec_offset - dinr = guide_offsets.inr_offset - dscale = guide_offsets.scale_offset - dalt = guide_offsets.dalt - daz = guide_offsets.daz - cmd.inform( - f'text="{ra=},{dec=},{inst_pa=},{dra=},{ddec=},{dinr=},{dscale=},{dalt=},{daz=}"' + **pipeline_kwargs, ) - filenames = guide_offsets.save_numpy_files() - cmd.inform( - 'data={},{},{},"{}","{}","{}"'.format( - ra, dec, inst_pa, *filenames - ) - ) - cmd.inform("detectionState=0") - - dx = guide_offsets.dx - dy = guide_offsets.dy - size = guide_offsets.size - peak = guide_offsets.peak - flux = guide_offsets.flux - self.logger.info( - f"AgThread.run: Sending mlp1 command {dx=},{dy=},{size=},{peak=},{flux=}" - ) - - offset_in_range = ( - abs(dra) < max_correction and abs(ddec) < max_correction - ) - - if offset_in_range: - offset_flags = GuideOffsetFlag.OK - guide_status = "OK" - else: - offset_flags = GuideOffsetFlag.INVALID_OFFSET - guide_status = f"INVALID_OFFSET" - - if offset_flags == GuideOffsetFlag.OK: - # send corrections to mlp1 and gen2 (or iic). - dry_run = options.get("dry_run", ag.DRY_RUN) - send_guide_offsets( - actor=self.actor, - taken_at=taken_at, - daz=daz, - dalt=dalt, - dx=dx, - dy=dy, - size=size, - peak=peak, - flux=flux, - dry_run=dry_run, - logger=self.actor.logger, - ) - else: - cmd.inform( - f'text="Calculated offset not in allowed range, skipping: {dra=} {ddec=} {max_correction=}"' - ) - sendAlert( - actor=self.actor, - alert_id="AG.OFFSET_OUT_OF_RANGE", - alert_name="Autoguide Offset Out of Range", - alert_description="The calculated autoguide offset is out of the allowed range, no corrections have been sent to the telescope.", - alert_detail=f"Calculated offsets: {frame_id=} {visit_id=} {dra=}, {ddec=}, {max_correction=}", - alert_severity="warning", - logger=self.actor.logger, - ) - - # always compute focus offset and tilt. - self.logger.info( - f"AgThread.run: focus.focus for frame_id={frame_id}" - ) - - _detected_objects = guide_offsets.detected_objects.loc[guide_offsets.identified_objects.query('matched == 1').detected_object_id.values] - - dz, dzs = focus( - detected_objects=_detected_objects, - max_ellipticity=max_ellipticity, - max_size=max_size, - min_size=min_size, - ) - - # send corrections to gen2 (or iic). - if dalt is None: - dalt = np.nan - if daz is None: - daz = np.nan - cmd.inform( - "guideErrors={},{},{},{},{},{},{},{},{}".format( - frame_id, - dra, - ddec, - dinr, - daz, - dalt, - dz, - dscale, - guide_status, - ) - ) - cmd.inform( - "focusErrors={},{},{},{},{},{},{}".format(frame_id, *dzs) - ) - - if self.with_opdb_agc_guide_offset: - self.logger.info( - f"AgThread.run: write_agc_guide_offset for {frame_id=}" - ) - data_utils.write_agc_guide_offset( - frame_id=frame_id, - ra=ra, - dec=dec, - pa=inst_pa, - delta_ra=dra, - delta_dec=ddec, - delta_insrot=dinr, - delta_scale=dscale, - delta_az=daz, - delta_el=dalt, - delta_z=dz, - delta_zs=dzs, - offset_flags=offset_flags, - ) - if self.with_opdb_agc_match: - self.logger.info( - f"AgThread.run: write_agc_match for {frame_id=}" - ) - data_utils.write_agc_match( - design_id=design_id, - frame_id=frame_id, - guide_objects=guide_offsets.guide_objects, - detected_objects=guide_offsets.detected_objects, - identified_objects=guide_offsets.identified_objects, - ) - if mode & ag.Mode.ONCE: self.logger.info("AgThread.run: ONCE") self._set_params(mode=ag.Mode.OFF) diff --git a/python/agActor/config.py b/python/agActor/config.py new file mode 100644 index 0000000..e9241be --- /dev/null +++ b/python/agActor/config.py @@ -0,0 +1,40 @@ +"""Shared configuration for the AG actor. + +Parsed once from ``actorConfig`` and stored on the actor instance so that +both ``AgCmd`` and ``AgThread`` can access the same values without +duplicating the parsing logic. +""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class AgConfig: + """Immutable snapshot of AG-relevant actor configuration flags.""" + + with_opdb_agc_guide_offset: bool = False + with_opdb_agc_match: bool = False + with_agcc_timestamp: bool = False + with_gen2_status: bool = False + with_mlp1_status: bool = False + with_opdb_tel_status: bool = False + with_design_path: str | None = None + + @classmethod + def from_actor_config(cls, actor_config: dict) -> "AgConfig": + """Build an ``AgConfig`` from the raw ``actorConfig`` dict.""" + tel_status_raw = [ + x.strip() + for x in actor_config.get("tel_status", "agc_exposure").split(",") + ] + design_path = actor_config.get("design_path", "").strip() or None + + return cls( + with_opdb_agc_guide_offset=actor_config.get("agc_guide_offset", False), + with_opdb_agc_match=actor_config.get("agc_match", False), + with_agcc_timestamp=actor_config.get("agcc_timestamp", False), + with_gen2_status="gen2" in tel_status_raw, + with_mlp1_status="mlp1" in tel_status_raw, + with_opdb_tel_status="tel_status" in tel_status_raw, + with_design_path=design_path, + ) diff --git a/python/agActor/exposure.py b/python/agActor/exposure.py new file mode 100644 index 0000000..7589bfe --- /dev/null +++ b/python/agActor/exposure.py @@ -0,0 +1,447 @@ +"""Shared AG exposure pipeline. + +Encapsulates the common logic for taking an AG camera exposure, gathering +telescope state, computing guide offsets, computing focus, reporting results, +and writing to opDB. + +Used by both field acquisition (one-shot) and autoguiding (continuous loop). +See the "Exposure Pipeline" section in README.md for full documentation of the +two operational modes and the pipeline steps. +""" + +import logging +import time + +import numpy as np + +from agActor import autoguide, field_acquisition +from agActor.catalog import pfs_design +from agActor.config import AgConfig +from agActor.utils import actorCalls, data as data_utils +from agActor.utils.actorCalls import sendAlert, send_guide_offsets +from agActor.utils.data import GuideOffsetFlag +from agActor.utils.focus import focus +from agActor.utils.telescope_center import telCenter as tel_center + +logger = logging.getLogger(__name__) + + +def _build_agcc_command( + exposure_time: int, + visit_id: int | None, + exposure_delay: int, + tec_off: bool, +) -> str: + """Build the AGCC exposure command string.""" + cmd_str = f"expose object exptime={exposure_time / 1000} centroid=1" + if visit_id is not None: + cmd_str += f" visit={visit_id}" + if exposure_delay > 0: + cmd_str += f" threadDelay={exposure_delay}" + if tec_off: + cmd_str += " tecOFF" + return cmd_str + + +def _gather_telescope_state( + actor, + cfg: AgConfig, + visit_id: int | None, + center, + design, + offset, + kwargs: dict, +): + """Gather telescope state from mlp1/gen2/opdb during mid-exposure. + + Mutates *kwargs* in-place with telescope state information and returns + the (possibly updated) *center*, *offset*, and *telescope_state*. + """ + telescope_state = None + + if cfg.with_mlp1_status: + telescope_state = actor.mlp1.telescopeState + logger.info(f"telescopeState={telescope_state}") + kwargs["inr"] = telescope_state["rotator_real_angle"] + + if cfg.with_gen2_status or cfg.with_opdb_tel_status: + if cfg.with_gen2_status: + try: + tel_status = actorCalls.updateTelStatus( + actor, actor.logger, visit_id + ) + except Exception as e: + raise RuntimeError(f"updateTelStatus error: {e}") + + logger.info(f"{tel_status=}") + kwargs["tel_status"] = tel_status + _tel_center = tel_center( + actor=actor, + center=center, + design=design, + tel_status=tel_status, + ) + if all(x is None for x in (center, design)): + center, _offset = _tel_center.dither + logger.info(f"{center=}") + else: + _offset = _tel_center.offset + if offset is None: + offset = _offset + logger.info(f"{offset=}") + + if cfg.with_opdb_tel_status: + status_update = actor.gen2.statusUpdate + status_id = (status_update["visit"], status_update["sequenceNum"]) + logger.info(f"status_id={status_id}") + kwargs["status_id"] = status_id + + return center, offset, telescope_state + + +def _compute_taken_at( + actor, + cfg: AgConfig, + exposure_time: int, + exposure_delay: int, + telescope_state, + kwargs: dict, +) -> float: + """Compute the ``taken_at`` timestamp after exposure completes. + + Mutates *kwargs* with the ``taken_at`` key when appropriate and returns + the final ``taken_at`` value. + """ + data_time = actor.agcc.dataTime + logger.info(f"dataTime={data_time}") + taken_at = data_time + (exposure_time + 7 * exposure_delay) / 1000 / 2 + logger.info(f"{taken_at=}") + + if cfg.with_agcc_timestamp: + kwargs["taken_at"] = taken_at # unix timestamp, not timezone-aware datetime + if cfg.with_mlp1_status: + taken_at = actor.mlp1.setUnixDay( + telescope_state["az_el_detect_time"], taken_at + ) + kwargs["taken_at"] = taken_at + + return taken_at + + +def _report_guide_offsets(cmd, guide_offsets): + """Emit the standard cmd.inform messages for guide offsets.""" + ra = guide_offsets.ra + dec = guide_offsets.dec + inst_pa = guide_offsets.inst_pa + dra = guide_offsets.ra_offset + ddec = guide_offsets.dec_offset + dinr = guide_offsets.inr_offset + dscale = guide_offsets.scale_offset + dalt = guide_offsets.dalt + daz = guide_offsets.daz + + cmd.inform( + f'text="{ra=},{dec=},{inst_pa=},{dra=},{ddec=},{dinr=},{dscale=},{dalt=},{daz=}"' + ) + + filenames = guide_offsets.save_numpy_files() + cmd.inform( + 'data={},{},{},"{}","{}","{}"'.format(ra, dec, inst_pa, *filenames) + ) + + +def _compute_focus(guide_offsets, max_ellipticity, max_size, min_size): + """Compute focus offset and tilt from matched detected objects. + + Uses only matched detected objects (those with ``matched == 1``). + """ + matched_ids = ( + guide_offsets.identified_objects + .query("matched == 1") + .detected_object_id + .values + ) + # Use only matched detected objects for focus computation. + detected = guide_offsets.detected_objects + if len(matched_ids) > 0: + detected = detected.loc[matched_ids] + + return focus( + detected_objects=detected, + max_ellipticity=max_ellipticity, + max_size=max_size, + min_size=min_size, + ) + + +def run_exposure_pipeline( + *, + actor, + cmd, + cfg: AgConfig, + design_id: int | None, + design_path: str | None, + design, + visit_id: int | None, + visit0: int | None, + exposure_time: int, + exposure_delay: int, + tec_off: bool, + center=None, + offset=None, + dinr=None, + guide_catalog=None, + send_offsets: bool = True, + dry_run: bool = False, + max_correction: float | None = None, + max_ellipticity: float = 2.0, + max_size: float = 1.0e12, + min_size: float = -1.0, + **kwargs, +): + """Run the full AG exposure-to-correction pipeline. + + This is the single entry point for the shared logic between + ``AgCmd.acquire_field`` (one-shot field acquisition) and ``AgThread.run`` + (continuous autoguiding loop). + + The two modes are distinguished primarily by the ``guide_catalog`` parameter: + + * **Field acquisition** (``guide_catalog=None``): The pipeline calls + ``field_acquisition.acquire_field`` which fetches its own guide catalog + (with ``is_guide=False``) and detected objects for this single frame. + Typically called with ``max_correction=None`` (no range checking) since + the initial acquisition offset can be large. + + * **Autoguiding** (``guide_catalog=``): The pipeline calls + ``autoguide.get_exposure_offsets`` using a pre-loaded guide catalog + (fetched once with ``is_guide=True`` before the guide loop begins). + Only telescope status and detected objects are re-fetched per frame. + Called with a ``max_correction`` value to reject unreasonably large + corrections. + + Parameters + ---------- + actor : AgActor + The actor instance for queuing commands and accessing models. + cmd + The command object for sending ``cmd.inform`` messages. + cfg : AgConfig + Shared AG configuration flags. + design_id : int or None + Design ID for guide star lookup. + design_path : str or None + Path to the pfsDesign file. + design : tuple or None + ``(design_id, design_path)`` tuple, or None. + visit_id : int or None + Visit ID for the exposure. + visit0 : int or None + The visit0 for guide star lookup from pfs_config_agc. + exposure_time : int + Exposure time in milliseconds. + exposure_delay : int + Exposure delay in milliseconds. + tec_off : bool + Whether to turn off TEC. + center : tuple or None + Field center as (ra, dec[, pa]). + offset : tuple or None + Field offset as (dra, ddec[, dpa[, dinr]]). + dinr : float or None + Instrument rotator offset. + guide_catalog : GuideCatalog or None + Pre-loaded guide catalog for autoguiding (fetched with ``is_guide=True``). + If provided, uses ``autoguide.get_exposure_offsets`` which reuses this + catalog across frames. If None, uses ``field_acquisition.acquire_field`` + which fetches its own catalog (with ``is_guide=False``) for one-shot + field acquisition. + send_offsets : bool + Whether to send guide offsets to the telescope. Defaults to True. + dry_run : bool + If True, don't actually send corrections. + max_correction : float or None + Maximum allowed correction in arcsec. If None, no range checking. + max_ellipticity : float + Maximum ellipticity for source filtering. + max_size : float + Maximum size for source filtering. + min_size : float + Minimum size for source filtering. + **kwargs + Additional keyword arguments passed through to the offset computation + (e.g. ``magnitude``, ``fit_dinr``, ``fit_dscale``, ``max_residual``, + ``filter_bad_shape``). + + Returns + ------- + GuideOffsets + The computed guide offsets. + """ + # --- 1. Take the AGCC exposure --- + cmd.inform(f"exposureTime={exposure_time}") + cmd_str = _build_agcc_command(exposure_time, visit_id, exposure_delay, tec_off) + logger.info(f"Sending agcc {cmd_str=}") + + agcc_exposure_result = actor.queueCommand( + actor="agcc", + cmdStr=cmd_str, + timeLim=((exposure_time + 6 * exposure_delay) // 1000 + 15), + ) + + # Sleep until roughly mid-exposure to gather telescope state. + time.sleep((exposure_time + 7 * exposure_delay) / 1000 / 2) + + # --- 2. Gather telescope state mid-exposure --- + center, offset, telescope_state = _gather_telescope_state( + actor, cfg, visit_id, center, design, offset, kwargs + ) + + # --- 3. Wait for exposure to complete --- + agcc_exposure_result.get() + frame_id = actor.agcc.frameId + logger.info(f"{frame_id=}") + + # --- 4. Compute taken_at --- + taken_at = _compute_taken_at( + actor, cfg, exposure_time, exposure_delay, telescope_state, kwargs + ) + + # Pack remaining positional params into kwargs for offset computation. + if center is not None: + kwargs["center"] = center + if offset is not None: + kwargs["offset"] = offset + if dinr is not None: + kwargs["dinr"] = dinr + + # --- 5. Compute guide offsets --- + cmd.inform("detectionState=1") + + if guide_catalog is not None: + # Autoguide path: use the pre-loaded guide catalog (is_guide=True). + # Only telescope status and detected objects are re-fetched per frame. + max_residual = kwargs.pop("max_residual", 0.5) + logger.info(f"Computing offsets via autoguide for {frame_id=}") + guide_offsets = autoguide.get_exposure_offsets( + frame_id=frame_id, + guide_catalog=guide_catalog, + max_ellipticity=max_ellipticity, + max_size=max_size, + min_size=min_size, + max_residual=max_residual, + **kwargs, + ) + else: + # Acquire-field path: fetch guide catalog internally (is_guide=False) + # for this single frame. The initial offset can be large. + logger.info(f"Computing offsets via field_acquisition for {frame_id=}") + guide_offsets = field_acquisition.acquire_field( + design_id=design_id, + visit0=visit0, + frame_id=frame_id, + **kwargs, + ) + + # --- 6. Report results --- + _report_guide_offsets(cmd, guide_offsets) + cmd.inform("detectionState=0") + + ra = guide_offsets.ra + dec = guide_offsets.dec + inst_pa = guide_offsets.inst_pa + dra = guide_offsets.ra_offset + ddec = guide_offsets.dec_offset + dinr_offset = guide_offsets.inr_offset + dscale = guide_offsets.scale_offset + dalt = guide_offsets.dalt + daz = guide_offsets.daz + + # --- 7. Range checking and sending guide offsets --- + offset_flags = GuideOffsetFlag.OK + guide_status = "OK" + + if max_correction is not None: + offset_in_range = abs(dra) < max_correction and abs(ddec) < max_correction + if not offset_in_range: + offset_flags = GuideOffsetFlag.INVALID_OFFSET + guide_status = "INVALID_OFFSET" + + if send_offsets and offset_flags == GuideOffsetFlag.OK: + send_guide_offsets( + actor=actor, + taken_at=taken_at, + daz=daz, + dalt=dalt, + dx=guide_offsets.dx, + dy=guide_offsets.dy, + size=guide_offsets.size, + peak=guide_offsets.peak, + flux=guide_offsets.flux, + dry_run=dry_run, + logger=actor.logger, + ) + elif max_correction is not None and offset_flags != GuideOffsetFlag.OK: + cmd.inform( + f'text="Calculated offset not in allowed range, skipping: {dra=} {ddec=} {max_correction=}"' + ) + sendAlert( + actor=actor, + alert_id="AG.OFFSET_OUT_OF_RANGE", + alert_name="Autoguide Offset Out of Range", + alert_description="The calculated autoguide offset is out of the allowed range, no corrections have been sent to the telescope.", + alert_detail=f"Calculated offsets: {frame_id=} {visit_id=} {dra=}, {ddec=}, {max_correction=}", + alert_severity="warning", + logger=actor.logger, + ) + + # --- 8. Focus --- + logger.info(f"Computing focus for {frame_id=}") + dz, dzs = _compute_focus(guide_offsets, max_ellipticity, max_size, min_size) + + if dalt is None: + dalt = np.nan + if daz is None: + daz = np.nan + + cmd.inform( + "guideErrors={},{},{},{},{},{},{},{},{}".format( + frame_id, dra, ddec, dinr_offset, daz, dalt, dz, dscale, guide_status + ) + ) + cmd.inform("focusErrors={},{},{},{},{},{},{}".format(frame_id, *dzs)) + + # --- 9. Write to opDB --- + if cfg.with_opdb_agc_guide_offset: + logger.info(f"Writing agc_guide_offset for {frame_id=}") + data_utils.write_agc_guide_offset( + frame_id=frame_id, + ra=ra, + dec=dec, + pa=inst_pa, + delta_ra=dra, + delta_dec=ddec, + delta_insrot=dinr_offset, + delta_scale=dscale, + delta_az=daz, + delta_el=dalt, + delta_z=dz, + delta_zs=dzs, + offset_flags=offset_flags, + ) + if cfg.with_opdb_agc_match: + logger.info(f"Writing agc_match for {frame_id=}") + _design_id = design_id + if _design_id is None and design_path is not None: + _design_id = pfs_design.pfsDesign.to_design_id(design_path) + if _design_id is None: + _design_id = 0 + data_utils.write_agc_match( + design_id=_design_id, + frame_id=frame_id, + guide_objects=guide_offsets.guide_objects, + detected_objects=guide_offsets.detected_objects, + identified_objects=guide_offsets.identified_objects, + ) + + return guide_offsets