From 8d202c7f70403deb6da4a918d0e9f382513dc466 Mon Sep 17 00:00:00 2001 From: Yashvi-Sharma Date: Thu, 11 Sep 2025 11:49:22 -0700 Subject: [PATCH] Initial commit of magma's mascgen algorithm in Python --- lris2-mascgen/LICENSE | 21 + lris2-mascgen/README.md | 5 + lris2-mascgen/config.yaml | 11 + lris2-mascgen/example.ipynb | 105 +++ .../lris2_mascgen/magma_algorithm.py | 600 ++++++++++++++++++ lris2-mascgen/lris2_mascgen/slit.py | 361 +++++++++++ lris2-mascgen/lris2_mascgen/utils.py | 77 +++ lris2-mascgen/pyproject.toml | 53 ++ 8 files changed, 1233 insertions(+) create mode 100755 lris2-mascgen/LICENSE create mode 100755 lris2-mascgen/README.md create mode 100755 lris2-mascgen/config.yaml create mode 100755 lris2-mascgen/example.ipynb create mode 100755 lris2-mascgen/lris2_mascgen/magma_algorithm.py create mode 100755 lris2-mascgen/lris2_mascgen/slit.py create mode 100644 lris2-mascgen/lris2_mascgen/utils.py create mode 100755 lris2-mascgen/pyproject.toml diff --git a/lris2-mascgen/LICENSE b/lris2-mascgen/LICENSE new file mode 100755 index 0000000..63b4b68 --- /dev/null +++ b/lris2-mascgen/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) [year] [fullname] + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/lris2-mascgen/README.md b/lris2-mascgen/README.md new file mode 100755 index 0000000..e28554f --- /dev/null +++ b/lris2-mascgen/README.md @@ -0,0 +1,5 @@ +# lris2-mascgen + +Mask generation algorithm for LRIS2 CSU. + + diff --git a/lris2-mascgen/config.yaml b/lris2-mascgen/config.yaml new file mode 100755 index 0000000..f75cbec --- /dev/null +++ b/lris2-mascgen/config.yaml @@ -0,0 +1,11 @@ +## Configuration file for CSU parameters for LRIS2 (currently same as MOSFIRE) + +instrument: LRIS2 +num_bar_pairs: 46 +arcsec_per_mm: 1.38028 +single_slit_height_mm: 5.082 +overlap_mm: 0.698 +csu_fp_radius: 207.0 +csu_slit_tilt_angle: 4.0 +csu_zero_pt: 137.4 +star_edge_distance: 20.0 \ No newline at end of file diff --git a/lris2-mascgen/example.ipynb b/lris2-mascgen/example.ipynb new file mode 100755 index 0000000..d7a1e1c --- /dev/null +++ b/lris2-mascgen/example.ipynb @@ -0,0 +1,105 @@ +{ + "cells": [ + { + "cell_type": "code", + "id": "initial_id", + "metadata": { + "collapsed": true, + "ExecuteTime": { + "end_time": "2025-09-11T18:04:17.445154Z", + "start_time": "2025-09-11T18:04:16.883236Z" + } + }, + "source": "%run lris2_csulava/magma_algorithm.py --xrange 3 --xcenter 0 --slitwidth 0.7 --ditherspace 2.5 --centerpa 0 --xsteps 1 --xstepsize 1.0 --ysteps 1 --ystepsize 1.0 --pasteps 1 --pastepsize 1.0 --min_alignment_stars 0 --star-edge-buffer 2.0 --runcop ../Mascgen\\ Folder-\\ MAGMA\\ Development\\ history/q1700.coords", + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "INFO:__main__:Center coordinates: RA=255.226286 Dec=64.205267 degrees\n", + "INFO:__main__:Total iterations: 27, xsteps: 3, ysteps: 3, pasteps: 3\n", + "INFO:__main__:*** STARTING OPTIMIZATION ***\n", + "INFO:__main__:New optimal found at run 1, best score so far is 5700.0\n", + "INFO:__main__:New optimal found at run 8, best score so far is 6150.0\n", + "INFO:__main__:New optimal found at run 17, best score so far is 6150.0\n", + "INFO:__main__:New optimal found at run 26, best score so far is 6150.0\n", + "INFO:__main__:*** OPTIMIZATION COMPLETE ***\n", + "INFO:__main__:Optimal solution found at run 26 with total priority 6150.0.\n", + "INFO:__main__:Final Center: RA=255.229485, Dec=64.205545, PA=1.00\n", + "INFO:__main__:Number of legal alignment stars: 0\n", + "INFO:__main__:Number of targets in optimal solution: 28\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Function 'optimize' executed in 0.4102 seconds.\n", + "Function 'optimize' executed in 0.0055 seconds.\n", + "Function 'optimize' executed in 0.0045 seconds.\n", + "Function 'optimize' executed in 0.0044 seconds.\n", + "Function 'optimize' executed in 0.0044 seconds.\n", + "Function 'optimize' executed in 0.0043 seconds.\n", + "Function 'optimize' executed in 0.0043 seconds.\n", + "Function 'optimize' executed in 0.0044 seconds.\n", + "Function 'optimize' executed in 0.0049 seconds.\n", + "Function 'optimize' executed in 0.0042 seconds.\n", + "Function 'optimize' executed in 0.0043 seconds.\n", + "Function 'optimize' executed in 0.0042 seconds.\n", + "Function 'optimize' executed in 0.0042 seconds.\n", + "Function 'optimize' executed in 0.0042 seconds.\n", + "Function 'optimize' executed in 0.0041 seconds.\n", + "Function 'optimize' executed in 0.0041 seconds.\n", + "Function 'optimize' executed in 0.0044 seconds.\n", + "Function 'optimize' executed in 0.0048 seconds.\n", + "Function 'optimize' executed in 0.0050 seconds.\n", + "Function 'optimize' executed in 0.0055 seconds.\n", + "Function 'optimize' executed in 0.0046 seconds.\n", + "Function 'optimize' executed in 0.0041 seconds.\n", + "Function 'optimize' executed in 0.0040 seconds.\n", + "Function 'optimize' executed in 0.0041 seconds.\n", + "Function 'optimize' executed in 0.0041 seconds.\n", + "Function 'optimize' executed in 0.0041 seconds.\n", + "Function 'optimize' executed in 0.0049 seconds.\n", + "Script executed in 0.55 seconds.\n" + ] + } + ], + "execution_count": null + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2025-09-11T17:56:37.942927Z", + "start_time": "2025-09-11T17:56:37.940699Z" + } + }, + "cell_type": "code", + "source": "", + "id": "a793e10dbfecd011", + "outputs": [], + "execution_count": null + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/lris2-mascgen/lris2_mascgen/magma_algorithm.py b/lris2-mascgen/lris2_mascgen/magma_algorithm.py new file mode 100755 index 0000000..24f0e76 --- /dev/null +++ b/lris2-mascgen/lris2_mascgen/magma_algorithm.py @@ -0,0 +1,600 @@ +import os.path + +import numpy as np +import pandas as pd +from astropy.coordinates import SkyCoord +import astropy.units as u +import math, sys, argparse +import yaml, logging + +from utils import transform_coordinates, inCircle, fieldCenter + +''' +This script is the Python implementation of MascgenCore.java +''' + +from functools import wraps +import time + +def timeit(func): + @wraps(func) + def wrapper(*args, **kwargs): + start_time = time.perf_counter() + result = func(*args, **kwargs) + end_time = time.perf_counter() + print(f"Function '{func.__name__}' executed in {end_time - start_time:.4f} seconds.") + return result + return wrapper + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class TargetList: + def __init__(self, df, type='targets'): + self.data = df + self.type = type + + def target_radec_to_xy(self, field_center): + """ + Convert RA/Dec coordinates of targets to x/y coordinates relative to the field center. + :param targets: pd.DataFrame containing targets with 'ra' and 'dec' columns. + :param field_center: XYCoord object representing the field center in x/y coordinates. + :return: pd.DataFrame with updated 'x' and 'y' columns. + """ + self.data['y'] = self.data['dec'] * 3600 + self.data['x'] = math.cos(field_center.y * np.pi / 180. / 3600.) * self.data['ra'] * 3600 + + def set_row(self, buffer, csuparams): + """ + Calculate the row number for each target based on its y coordinate and a buffer. + :param targets: pd.DataFrame containing targets with 'y' column. + :param buffer: float representing the buffer in arcseconds. + :return: pd.DataFrame with an additional 'row' column. + """ + fac1 = (csuparams.single_slit_height - 2.0 * buffer) / csuparams.csu_row_height + fac2 = (buffer + csuparams.overlap / 2.0) / csuparams.csu_row_height + try: + ydivrow = self.data['y_transformed'] / csuparams.csu_row_height + except KeyError: + raise KeyError("Error: y_transformed column not found in targets DataFrame. Make sure to call transform_coordinates first.") + + ydivrow_floor = np.floor(ydivrow) + sel = (ydivrow >= 0) & (ydivrow_floor < csuparams.num_bar_pairs) & (ydivrow > ydivrow_floor + fac2) & (ydivrow < ydivrow_floor + fac1 + fac2) + self.data.loc[sel, 'obj_rr'] = ydivrow_floor[sel].astype(int) + + # def set_overlap_row(self, buffer, csuparams): + # """ + # Calculate the overlap region number for each target based on its y coordinate and a buffer. + # :param targets: pd.DataFrame containing targets with 'y' column. + # :param buffer: float representing the buffer in arcseconds. + # :return: pd.DataFrame with an additional 'overlap_region' column. + # """ + # fac1 = (csuparams.single_slit_height - 2.0 * buffer) / csuparams.csu_row_height + # fac2 = (buffer + csuparams.overlap / 2.0) / csuparams.csu_row_height + # try: + # ydivrow = self.data['y_transformed'] / csuparams.csu_row_height - fac1 - fac2 + # except KeyError: + # raise KeyError("Error: y_transformed column not found in targets DataFrame. Make sure to call transform_coordinates first.") + # ydivrow_floor = np.floor(ydivrow) + # sel = (ydivrow >= 0) & (ydivrow_floor < csuparams.num_bar_pairs - 1) & (ydivrow < ydivrow_floor + 2.0 * fac2) + # self.data.loc[sel, 'obj_or'] = ydivrow_floor[sel].astype(int) + 1 + # + # def update_dither_rows(self, factor, csuparams): + # y1 = self.data['obj_y'] + factor * np.cos(csuparams.csu_slit_tilt_angle_rad) + # y2 = self.data['obj_y'] - factor * np.cos(csuparams.csu_slit_tilt_angle_rad) + # self.data['min_row'] = np.floor((y2 + csuparams.csu_height / 2 - csuparams.overlap) / csuparams.csu_row_height).astype(int) + # self.data['max_row'] = np.floor((y1 + csuparams.csu_height / 2 + csuparams.overlap) / csuparams.csu_row_height).astype(int) + +# Create Node class similar to Java +class Node: + def __init__(self, obj=None): + # obj is expected to be a dict or pandas Series representing a DataFrame row + self.next_node = None + self.obj = obj if obj is not None else {} + self.score = self.obj['priority'] if self.obj else 0.0 + + def set_next_node(self, next_node): + self.next_node = next_node + + def get_next_node(self): + return self.next_node + + def set_obj(self, obj): + self.obj = obj + + def get_obj(self): + return self.obj + + def set_score(self, score): + self.score = score + + def get_score(self): + return self.score + + def total_score(self): + # Recursively sum scores along the linked list + if self.next_node: + return self.score + self.next_node.total_score() + return self.score + + def create_object_dataframe_from_top_node(self): + data = [] + local_node = self + while local_node is not None: + if local_node.obj: + local_node.obj['inValidSlit'] = True + data.append(local_node.obj) + local_node = local_node.next_node + return pd.DataFrame(data) + + def __str__(self): + if not self.next_node: + return str(self.obj) + return f"{self.obj} -> {self.next_node.obj}" + +def get_next_factor(current): + if current > 0: + return -1 * current + else: + return -1 * current + 1 + +class MascgenCore: + def __init__(self, args): + # Validate command line arguments and load CSU parameters + self.csuparams = CSUParameters(args.configfile) + self.validate_args(args) + self.args = args + + # Load target list + self.science_targets, self.alignment_stars, self.ra_coord_wrap = self.load_target_list(args.targetlist) + + # if center coordinates are not provided in args or COP is true run COP estimation + self.field_center = self.get_center_position(args.centerra, args.centerdec, args.runcop) + self.bestNodes = [None] * self.csuparams.num_bar_pairs + self.allNodes = [[] for _ in range(self.csuparams.num_bar_pairs)] + self.optimal_run_num = 0 + self.final_pa = 0 + self.final_field_center = fieldCenter(0, 0) + self.total_priority = 0 + self.final_alignment_stars = None + self.optimum_target_list = None + + def validate_args(self, args): + try: + if args.xsteps < 0 or args.ysteps < 0 or args.pasteps < 0: + raise ValueError("xsteps, ysteps, and pasteps must be positive integers.") + if not (0 < args.xrange <= self.csuparams.csu_width / 60.0): + raise ValueError( + f"Error: x range must be > 0 and <= CSU width ({self.csuparams.csu_width:.3f} arc min).") # Substituting default x range = {config['default_xrange']}.") + if not (-self.csuparams.csu_width / 120.0 < args.xcenter < self.csuparams.csu_width / 120.0): + raise ValueError( + f"Error: x center must be between {-self.csuparams.csu_width / 120.0:.3f} and {self.csuparams.csu_width / 120.0:.3f} arc min.") + if not (0 < args.slitwidth <= self.csuparams.csu_width): + raise ValueError(f"Error: slit width must be > 0 and <= CSU width ({self.csuparams.csu_width:.3f} arc sec).") + if not (0 <= args.ditherspace <= self.csuparams.single_slit_height / 2): + raise ValueError( + f"Error: dither space must be >= 0 and <= {self.csuparams.single_slit_height / 2:.3f} arc sec.") + except ValueError as e: + logger.error(f"Error: {e} Exiting ...") + sys.exit(1) + + self.csuparams.min_legal_x = 60. * (args.xcenter - args.xrange / 2.) + self.csuparams.max_legal_x = 60. * (args.xcenter + args.xrange / 2.) + self.csuparams.star_edge_buffer = args.star_edge_buffer + self.csuparams.min_alignment_stars = args.min_alignment_stars + self.csuparams.dither_space = args.ditherspace + self.csuparams.xcenter = args.xcenter + + def load_target_list(self, input_file): + """ + Load target list from a space separated text file + Args: + input_file (str): Path to the input file containing target data. + Returns: + pd.DataFrame: DataFrame containing the target list. + """ + # Read the input file into a DataFrame (subject to changes in the file format) + try: + targets = pd.read_csv(input_file, sep='\s+', header=None, + names=['target', 'priority', 'magnitude', 'ra_h', 'ra_m', 'ra_s', 'dec_d', 'dec_m', + 'dec_s', 'epoch1', 'epoch2', 'unk1', 'unk2'], + dtype={'target': str, 'priority': float, 'magnitude': float, 'ra_h': int, 'ra_m': int, + 'ra_s': float, 'dec_d': str, 'dec_m': int, + 'dec_s': float}) + except Exception as e: + raise ValueError(f"Error reading target list file (make sure it is in the required format): {e}") + + # Validate target data + high_obj_ra_hour = targets['ra_h'].max() + low_obj_ra_hour = targets['ra_h'].min() + high_obj_dec_deg = float(targets['dec_d'].max()) + low_obj_dec_deg = float(targets['dec_d'].min()) + + if (high_obj_ra_hour - low_obj_ra_hour > 1) and (high_obj_ra_hour != 23 or low_obj_ra_hour != 0): + raise ValueError( + "Error: object list spans more than one hour in Right Ascension. Shorten the list. Exiting ...") + + if high_obj_dec_deg - low_obj_dec_deg > 1: + raise ValueError( + "Error: object list spans more than one degree in Declination. Shorten the list. Exiting ...") + + # Check if RA coordinates wrap around 0h + ra_coord_wrap = high_obj_ra_hour == 23 and low_obj_ra_hour == 0 + if ra_coord_wrap: + targets.loc[targets['ra_h'] == 0, 'ra_h'] = 12 + targets.loc[targets['ra_h'] == 23, 'ra_h'] = 11 + + # Convert RA and Dec to arcseconds (not using SkyCoord because it is slower for large lists) + targets['ra'] = (targets['ra_h'] + targets['ra_m'] / 60.0 + targets['ra_s'] / 3600.0) * 15. + targets['dec_sign'] = [-1 if d.startswith('-') else 1 for d in targets['dec_d']] + targets['dec'] = (np.abs(targets['dec_d'].astype(float)) + targets['dec_m'] / 60.0 + targets[ + 'dec_s'] / 3600.0) * targets['dec_sign'] + + targets['obj_rr'] = -1 # row number for single slits + targets['obj_or'] = -1 # overlap region number for double slits + + # Drop unnecessary columns + targets = targets.drop(columns=['ra_h', 'ra_m', 'ra_s', 'dec_d', 'dec_m', 'dec_s', + 'epoch1', 'epoch2', 'unk1', 'unk2']) + # Split by negative and positive priority and put into TargetList class + science_targets = TargetList(targets[targets['priority'] > 0].reset_index(drop=True)) + alignment_stars = TargetList(targets[targets['priority'] < 0].reset_index(drop=True)) + return science_targets, alignment_stars, ra_coord_wrap + + def get_center_position(self, centerra, centerdec, runcop): + """ + Calculate the center position based on the target list and arguments. + If center coordinates are not provided in args or runcop is true, it runs COP estimation. + :param targets: DataFrame containing target data. + :param args: Parsed command line arguments. + :param ra_coord_wrap: Boolean indicating if RA coordinates wrap around. + :return: Modified args in place + """ + targets = self.science_targets.data + if centerra is None or centerdec is None or runcop: + centerdec = (targets['dec'] * targets['priority']).sum() / targets['priority'].sum() + centerra = (targets['ra'] * targets['priority']).sum() / targets['priority'].sum() + else: + center_coords = SkyCoord(ra=centerra, dec=centerdec, unit=(u.hourangle, u.deg)) + centerra = center_coords.ra.deg # in degrees + centerdec = center_coords.dec.deg # in degrees + + # print the center coordinates + logger.info(f"Center coordinates: RA={centerra:.6f} Dec={centerdec:.6f} degrees") + field_center = fieldCenter(centerra, centerdec) + if self.ra_coord_wrap: + field_center.do_ra_coord_wrap() + return field_center + + @timeit + def optimize(self, field_center, pa): + # reset nodes + self.bestNodes = [None] * self.csuparams.num_bar_pairs + self.allNodes = [[] for _ in range(self.csuparams.num_bar_pairs)] + + # Deep copy target list + objects = self.science_targets.data.copy() + + # Transform coordinates similar to Java version + obj_x, obj_y = transform_coordinates(objects['x'].to_numpy(), objects['y'].to_numpy(), field_center.x, + field_center.y, pa) + objects['obj_x'] = obj_x + objects['obj_y'] = obj_y + obj_x_lower = obj_x - self.csuparams.dither_space * np.sin(self.csuparams.csu_slit_tilt_angle_rad) + obj_y_lower = obj_y - self.csuparams.dither_space * np.cos(self.csuparams.csu_slit_tilt_angle_rad) + obj_x_upper = obj_x + self.csuparams.dither_space * np.sin(self.csuparams.csu_slit_tilt_angle_rad) + obj_y_upper = obj_y + self.csuparams.dither_space * np.cos(self.csuparams.csu_slit_tilt_angle_rad) + + # Check what is within focal plane + in_circle = (inCircle(obj_x_lower, obj_y_lower, 0, 0, self.csuparams.csu_fp_radius) & + inCircle(obj_x_upper, obj_y_upper, 0, 0, self.csuparams.csu_fp_radius)) + # Check legal x bounds + inLegalx = ((obj_x_lower >= self.csuparams.min_legal_x) & (obj_x_upper <= self.csuparams.max_legal_x) + & (obj_x_lower > -self.csuparams.csu_width / 2.) & (obj_x_upper < self.csuparams.csu_width / 2.) + & (obj_y_lower > -self.csuparams.csu_height / 2.) & (obj_y_upper < self.csuparams.csu_height / 2.)) + + objects['min_row'] = np.floor( + (obj_y_lower + self.csuparams.csu_height / 2. - self.csuparams.overlap) / self.csuparams.csu_row_height).astype(int) + objects['max_row'] = np.floor( + (obj_y_upper + self.csuparams.csu_height / 2. + self.csuparams.overlap) / self.csuparams.csu_row_height).astype(int) + + # filter valid objects + mask_valid = in_circle & inLegalx & (objects.min_row >= 0) & (objects.max_row < self.csuparams.num_bar_pairs) + valid_objects = objects[mask_valid].reset_index(drop=True) + + if valid_objects.empty: + return pd.DataFrame() + + for i, n in enumerate(valid_objects.max_row): + self.allNodes[n].append(Node(valid_objects.loc[i].to_dict())) + + best_score = 0 + best_node = Node() + + for i in range(self.csuparams.num_bar_pairs): + current_best = Node() + if i > 0: + current_best.next_node = self.bestNodes[i - 1] + + # Try each object that can fit in this row + for node in self.allNodes[i]: + obj_data = node.obj + + if node.score == 0: + node.score = obj_data['priority'] + # Find valid previous row connection + span = obj_data['max_row'] - obj_data['min_row'] + prev_row = i - span - 1 + + if prev_row >= 0: + node.next_node = self.bestNodes[prev_row] + + # Compare total scores + node_total = node.total_score() + current_total = current_best.total_score() + + if node_total > current_total: + current_best = node + elif node_total == current_total: + # Tie-breaker: prefer object closer to x_center + obj_x = obj_data['obj_x'] + n = i + tempnode = current_best + while n >= obj_data['min_row']: + localobj = tempnode.obj + if localobj != {} and abs(obj_x - self.csuparams.xcenter * 60.0) < abs( + localobj['obj_x'] - self.csuparams.xcenter * 60.0): + current_best = node + break + tempnode = tempnode.get_next_node() + if tempnode is None or tempnode.obj == {}: + break + n = tempnode.obj['max_row'] + + # Check for conflicts with multi-row slits + current_obj = current_best.obj + if current_obj and 'min_row' in current_obj: + span = current_obj['max_row'] - current_obj['min_row'] + for j in range(i - 1, i - span - 1, -1): + if j < 0: + break + # If there is a best node in the conflicting row, compare scores + prev_total = self.bestNodes[j].total_score() + curr_total = current_best.total_score() + if prev_total > curr_total: + current_best = Node() + current_best.next_node = self.bestNodes[j] + + self.bestNodes[i] = current_best + + # Track overall best solution + total_score = current_best.total_score() + if total_score > best_score: + best_score = total_score + best_node = current_best + + return best_node + + def find_legal_stars(self, field_center, pa): + + new_x, new_y = transform_coordinates(self.alignment_stars.data['x'].to_numpy(), self.alignment_stars.data['y'].to_numpy(), + field_center.x, field_center.y, pa, + self.csuparams.csu_width / 2., self.csuparams.csu_height / 2.) + self.alignment_stars.data['x_transformed'] = new_x + self.alignment_stars.data['y_transformed'] = new_y + validstars = ( + inCircle(new_x, new_y, self.csuparams.csu_width / 2, self.csuparams.csu_height / 2, self.csuparams.csu_fp_radius) & + (new_x > self.csuparams.star_edge_distance) & (new_x < self.csuparams.csu_width - self.csuparams.star_edge_distance) & + (new_y > self.csuparams.star_edge_distance) & (new_y < self.csuparams.csu_height - self.csuparams.star_edge_distance)) + self.alignment_stars.data = self.alignment_stars.data[validstars].reset_index(drop=True) + self.alignment_stars.set_row(self.csuparams.star_edge_buffer) + return self.alignment_stars.data[self.alignment_stars.data['obj_rr'] != -1].reset_index(drop=True) + + def run(self): + # Run the three-level loop over position angle, field center y-coordinate, and field center x-coordinate. + # Initialize variables to track the best solution + run_num = 0 + + # print starting message with number of iterations + total_runs = (self.args.xsteps * 2 + 1) * (self.args.ysteps * 2 + 1) * (self.args.pasteps * 2 + 1) + logger.info( + f"Total iterations: {total_runs}, xsteps: {self.args.xsteps * 2 + 1}, ysteps: {self.args.ysteps * 2 + 1}, pasteps: {self.args.pasteps * 2 + 1}") + + # Initialising xfactor, yfactor, pafactor, because instead of linearly iterating, we want to iterate closest to center first. + # The get_next_factor function helps with that. + xfactor, yfactor, pafactor = 0, 0, 0 + + # Initialize other variables + temp_field_center = fieldCenter(0, 0) + + # Start of the three-level loop + logger.info("*** STARTING OPTIMIZATION ***") + success = False + for _ in range(-self.args.xsteps, self.args.xsteps + 1): + temp_field_center.x = self.field_center.x - xfactor * self.args.xstepsize + xfactor = get_next_factor(xfactor) + yfactor = 0 + + for _ in range(-self.args.ysteps, self.args.ysteps + 1): + temp_field_center.y = self.field_center.y - yfactor * self.args.ystepsize + yfactor = get_next_factor(yfactor) + pafactor = 0 + # Once we have the perturbated field center, we need to convert the target list to x,y coordinates relative to that field center. + self.science_targets.target_radec_to_xy(temp_field_center) + self.alignment_stars.target_radec_to_xy(temp_field_center) + + for _ in range(-self.args.pasteps, self.args.pasteps + 1): + temp_pa = self.args.centerpa + pafactor * self.args.pastepsize + pafactor = get_next_factor(pafactor) + + # Check if alignment stars are required, if yes, find valid stars + if self.args.min_alignment_stars > 0: + starlist = self.find_legal_stars(temp_field_center, temp_pa) + starsfound = len(starlist['obj_rr'].unique()) + else: + starlist = pd.DataFrame() + starsfound = 0 + logger.debug( + f"Alignment stars required: {self.args.min_alignment_stars}, Legal stars found: {starsfound}") + run_num += 1 + if starsfound >= self.args.min_alignment_stars: + # Run the optimization + best_node = self.optimize(temp_field_center, temp_pa) + best_score = best_node.total_score() + if best_score >= self.total_priority: + success = True + self.total_priority = best_score + self.optimal_run_num = run_num + logger.info( + f"New optimal found at run {self.optimal_run_num}, best score so far is {best_score}") + # Store the best solution details + temp_field_center.to_radec() + if self.ra_coord_wrap: + temp_field_center.do_ra_coord_wrap() + self.final_field_center = fieldCenter(temp_field_center.ra, temp_field_center.dec) + self.final_pa = temp_pa + self.final_alignment_stars = starlist + self.optimum_target_list = best_node.create_object_dataframe_from_top_node() + elif best_score == self.total_priority: + logger.info( + f"Configuration with same score {best_score} found at run {run_num}. Not updating optimal solution.") + if len(self.optimum_target_list) == 0: + temp_field_center.to_radec() + if self.ra_coord_wrap: + temp_field_center.do_ra_coord_wrap() + self.final_field_center = fieldCenter(temp_field_center.ra, temp_field_center.dec) + self.final_pa = temp_pa + self.final_alignment_stars = starlist + elif len(self.optimum_target_list) == 0 and starsfound > self.args.min_alignment_stars: + temp_field_center.to_radec() + if self.ra_coord_wrap: + temp_field_center.do_ra_coord_wrap() + self.final_field_center = fieldCenter(temp_field_center.ra, temp_field_center.dec) + self.final_pa = temp_pa + self.final_alignment_stars = starlist + logger.info(f"*** OPTIMIZATION COMPLETE ***") + if success: + logger.info(f"Optimal solution found at run {self.optimal_run_num} with total priority {self.total_priority}.") + logger.info( + f"Final Center: RA={self.final_field_center.ra:.6f}, Dec={self.final_field_center.dec:.6f}, PA={self.final_pa:.2f}") + logger.info(f"Number of legal alignment stars: {len(self.final_alignment_stars)}") + logger.info(f"Number of targets in optimal solution: {len(self.optimum_target_list)}") + +class CSUParameters: + def __init__(self, config_path="config.yaml"): + self.config = self.load_csu_config(config_path) + for key, value in self.config.items(): + setattr(self, key, value) + self.single_slit_height = self.single_slit_height_mm * self.arcsec_per_mm + self.overlap = self.overlap_mm * self.arcsec_per_mm + self.csu_row_height = self.single_slit_height + self.overlap + self.csu_row_height_mm = self.single_slit_height_mm + self.overlap_mm + self.csu_height = self.num_bar_pairs * self.csu_row_height + self.csu_height_mm = self.num_bar_pairs * self.csu_row_height_mm + self.csu_width = self.csu_height + self.csu_width_mm = self.csu_height_mm + self.csu_fp_radius_mm = self.csu_fp_radius / self.arcsec_per_mm + self.csu_slit_tilt_angle_rad = np.radians(self.csu_slit_tilt_angle) + self.min_legal_x = None # To be set from validate_args + self.max_legal_x = None # To be set from validate_args + self.star_edge_buffer = None + self.min_alignment_stars = None + self.dither_space = None + self.xcenter = None + + def load_csu_config(self, config_path="config.yaml"): + """ + Load the CSU configuration from a YAML file. + Returns: + dict: Configuration dictionary. + """ + with open(config_path, 'r') as file: + config = yaml.safe_load(file) + return config + +def get_args(): + """ + Parse command line arguments. + Returns: + argparse.Namespace: Parsed arguments. + """ + parser = argparse.ArgumentParser(description="CSU Optimization Script") + parser.add_argument('targetlist', type=str, help='Path to the input target list file') + parser.add_argument('--configfile', type=str, default=f'{os.path.dirname(os.path.abspath(__file__))}/../config.yaml', help='Path to the configuration YAML file') + parser.add_argument("--xrange", type=float, default=3.0, help="Width of legal x coordinate range (arcmin)") + parser.add_argument("--xcenter", type=float, default=0.0, help="Center of legal x coordinate range (arcmin)") + parser.add_argument("--slitwidth", type=float, default=0.7, help="Global slit width (arcsec)") + parser.add_argument("--ditherspace", type=float, default=2.5, help="Minimum distance from slit edge (arcsec)") + parser.add_argument("--centerra", type=str, help="Center RA in HH:MM:SS format") + parser.add_argument("--centerdec", type=str, help="Center Dec in DD:MM:SS format") + parser.add_argument("--centerpa", type=float, default=0, help="Center position angle (deg)") + parser.add_argument("--xsteps", type=int, default=0, help="Number of x iterations (int, must be positive)") + parser.add_argument("--xstepsize", type=float, default=1, help="Size of each x step (arcsec)") + parser.add_argument("--ysteps", type=int, default=0, help="Number of y iterations (int)") + parser.add_argument("--ystepsize", type=float, default=1, help="Size of each y step (arcsec)") + parser.add_argument("--pasteps", type=int, default=0, help="Number of PA iterations (int)") + parser.add_argument("--pastepsize", type=float, default=1, help="Size of each PA step (deg)") + parser.add_argument("--min_alignment_stars", type=int, default=3, help="Minimum number of alignment stars") + parser.add_argument("--star-edge-buffer", type=float, default=2.0, help="Minimum distance ???") + parser.add_argument("--slitlist", type=str, default='slitsolution.txt', help="Output file for slit list") + parser.add_argument("--slitregionfile", type=str, default='slitsolution.reg', help="Output file for region file") + parser.add_argument("--barpositionlist", type=str, default='barposition.txt', help="Output file for bar positions") + parser.add_argument("--runcop", action='store_true', + help="Run COP estimation for priority weighted center position", default=False) + parser.add_argument("--debug", action='store_true', help="Enable debug mode", default=False) + + return parser.parse_args() + + + + # # Generate slit configurations + # slit_configurations = slit_configuration_generator(optimum_astro_obj_array, config, csu_params, final_field_center, final_pa, args.barpositionlist) + # slit_configurations['coord'] = [SkyCoord(ra=ra, dec=dec, unit=(u.deg, u.deg)) for ra, dec in zip(slit_configurations['ra'], slit_configurations['dec'])] + # slit_configurations['ra_hms'] = [coord.ra.to_string(unit=u.hour, sep=':') for coord in slit_configurations['coord']] + # slit_configurations['dec_dms'] = [coord.dec.to_string(unit=u.deg, sep=':') for coord in slit_configurations['coord']] + # + # slit_configurations['coord_obj'] = [SkyCoord(ra=ra, dec=dec, unit=(u.deg, u.deg)) for ra, dec in zip(slit_configurations['obj_ra'], slit_configurations['obj_dec'])] + # slit_configurations['obj_ra_hms'] = [coord.ra.to_string(unit=u.hour, sep=':') for coord in slit_configurations['coord_obj']] + # slit_configurations['obj_dec_dms'] = [coord.dec.to_string(unit=u.deg, sep=':') for coord in slit_configurations['coord_obj']] + + + # if ra_coord_wrap: + # slit_configurations.loc[[int(ra_hms.split(':')[0])==12 for ra_hms in slit_configurations['ra_hms']], 'ra_hms'] = [f"00:{ra_hms.split(':')[1]}:{ra_hms.split(':')[2]}" for ra_hms in slit_configurations['ra_hms']] + # slit_configurations.loc[[int(ra_hms.split(':')[0])==11 for ra_hms in slit_configurations['ra_hms']], 'ra_hms'] = [f"23:{ra_hms.split(':')[1]}:{ra_hms.split(':')[2]}" for ra_hms in slit_configurations['ra_hms']] + # # same for final field center ra dec + # if finalcenter_coords.ra.hour == 12: + # finalcenterra = finalcenterra.replace('12', '00') + # elif finalcenter_coords.ra.hour == 11: + # finalcenterra = finalcenterra.replace('11', '23') + + # # The final step is to print the slit configuration. + # print(f"The optimized slit configuration has been found after {run_num} runs." + # f"\n\tTotal Priority = {total_priority}" + # f"\n\tNumber of Slits = {len(slit_configurations)}" + # f"\n\tCSU Center Position = RA: {finalcenterra} Dec: {finalcenterdec}" + # f"\n\t Position Angle = {final_pa:.1f}°\n") + # + # # Write the slit list out. + # write_out_slit_list(args.slitlist, slit_configurations, csu_params['slitwidth']) + # print(f"The slit list file is: {args.slitlist}") + # + # # Write out the region file. + # print_regions_from_slit_array(slit_configurations, config, csu_params, final_field_center, finalcenterra, finalcenterdec, final_pa, args.slitregionfile) + # print(f"The corresponding bar position list is: {args.barpositionlist}") + +## Run mascgen, track the time it takes to run the script +if __name__ == "__main__": + start_time = time.time() + + args = get_args() + if args.debug: + logger.setLevel(logging.DEBUG) + mascgen = MascgenCore(args) + mascgen.run() + + end_time = time.time() + print(f"Script executed in {end_time - start_time:.2f} seconds.") + + + diff --git a/lris2-mascgen/lris2_mascgen/slit.py b/lris2-mascgen/lris2_mascgen/slit.py new file mode 100755 index 0000000..5c45969 --- /dev/null +++ b/lris2-mascgen/lris2_mascgen/slit.py @@ -0,0 +1,361 @@ +import pandas as pd +import numpy as np +import math + +class Slit: + def __init__(self, slit_number=-1, slit_width=-1, slit_length=-1, slit_obj_name="blank", + slit_obj_priority=0, slit_obj_x=-1, slit_obj_y=-1): + self.slit_number = slit_number + self.slit_width = slit_width + self.slit_length = slit_length + self.slit_x = -1 + self.slit_y = -1 + self.slit_obj_name = slit_obj_name + self.slit_obj_priority = slit_obj_priority + self.obj_mag = None + self.ra = None # degrees + self.dec = None # degrees + self.epoch = None + self.equinox = None + self.wcs_x = None + self.wcs_y = None + self.slit_obj_x = slit_obj_x + self.slit_obj_y = slit_obj_y + self.slit_obj_wcs_x = None + self.slit_obj_wcs_y = None + self.obj_ra = None + self.obj_dec = None + self.slit_mul = -1 + self.target_location = None + + def print_slit(self): + print(f"{self.slit_number}\t{self.ra_hour}\t{self.ra_min}\t{self.ra_sec:.2f}\t" + f"{self.dec_deg}\t{self.dec_min}\t{self.dec_sec:.2f}\t{self.slit_width:.2f}\t" + f"{self.slit_length:.2f}\t{self.slit_obj_name}\t{self.slit_obj_priority:.2f}\t" + f"{self.target_location:.2f}\t{self.obj_ra_hour}\t{self.obj_ra_min}\t" + f"{self.obj_ra_sec:.2f}\t{self.obj_dec_deg}\t{self.obj_dec_min}\t{self.obj_dec_sec:.2f}") + + def print_slit_short(self): + print(f"{self.slit_obj_x} {self.slit_y} {self.slit_width} {self.slit_length}") + + def is_not_blank(self): + return self.slit_obj_name != "blank" + + def is_blank(self): + return self.slit_obj_name == "blank" + +# Clean up all the Slit Multiple Length numbers and Priorities. +def clean_up_slit_mul_pri(array): + for i in range(len(array)): + if array.loc[i,'slit_obj_priority'] > 0: + for j in range(len(array)): + if array.loc[j,'slit_obj_name'] == array.loc[i,'slit_obj_name']: + array.loc[j, 'slit_mul'] = array.loc[i, 'slit_mul'] + array.loc[j, 'slit_obj_priority'] = array.loc[i, 'slit_obj_priority'] + return array +# def clean_up_slit_mul_pri(array): +# """ +# Propagate 'slit_mul' and 'slit_obj_priority' values across rows with the same 'slit_obj_name'. +# Args: +# array (pd.DataFrame): DataFrame containing 'slit_obj_name', 'slit_mul', and 'slit_obj_priority' columns. +# Returns: +# pd.DataFrame: Updated DataFrame with consistent 'slit_mul' and 'slit_obj_priority' values. +# """ +# # Group by 'slit_obj_name' and propagate the maximum 'slit_mul' and 'slit_obj_priority' within each group +# array[['slit_mul', 'slit_obj_priority']] = array.groupby('slit_obj_name')[['slit_mul', 'slit_obj_priority']].transform('max') +# return array + +# Expand slit1 into slit2 and copy everything over. +def expand_slit(slit_array, s1, s2, slit_width): + if slit_array.loc[s1,'slit_obj_name'] != "blank": + slit_array.loc[s2,'slit_obj_name'] = slit_array.loc[s1,'slit_obj_name'] + slit_array.loc[s2,'slit_width'] = slit_width + slit_array.loc[s2,'slit_obj_x'] = slit_array.loc[s1,'slit_obj_x'] + slit_array.loc[s2,'slit_obj_y'] = slit_array.loc[s1,'slit_obj_y'] + slit_array.loc[s1,'slit_mul'] = slit_array.loc[s1,'slit_mul'] + 1 + return slit_array + +def slit_xy_to_radec(slit_array, field_center): + # Convert the wcs x and y coordinates of the input slit dataframe into Ra/Dec coordinates. + slit_array['dec'] = slit_array['wcs_y'] / 3600 # Convert arcseconds to degrees + dec_rad = field_center.y * math.pi / 180 / 3600 # Convert degrees to radians + slit_array['ra'] = slit_array['wcs_x'] / (3600 * math.cos(dec_rad)) # Convert arcseconds to degrees (15 degrees per hour) + return slit_array + +def slit_objxy_to_radec(slit_array, field_center): + # Convert the obj x and y coordinates of the input slit dataframe into Ra/Dec coordinates. + slit_array['obj_dec'] = slit_array['slit_obj_wcs_y'] / 3600 # Convert arcseconds to degrees + dec_rad = field_center.y * math.pi / 180 / 3600 # Convert degrees to radians + slit_array['obj_ra'] = slit_array['slit_obj_wcs_x'] / (3600 * math.cos(dec_rad)) # Convert arcseconds to degrees (15 degrees per hour) + return slit_array + +def slit_configuration_generator(optimum_astro_obj_array, config, csu_params, final_field_center, final_pa, bar_position_list): + """ + Generate the slit configuration from a given input array of AstroObjs. + Args: + optimum_astro_obj_array (list): Optimized AstroObj array. + config (dict): Configuration parameters for the CSU + csu_params (dict): CSU parameters including slit width and overlap. + final_field_center (object): Final field center coordinates. + final_pa (float): Final position angle. + bar_position_list (str): Path to the output file for bar positions. + Returns: + list: Array of Slit objects. + """ + # /** The rest of the program simply takes the data from hPArray2Short and + # * maps it to a slit configuration. The empty bars are filled + # * by expanding original "singles" into "doubles" or "triples" when + # * possible and by expanding original "doubles" (which were created to + # * include a high-priority object in an overlap region) into "triples" + # * when possible. After this first pass expansion is complete, any + # * remaining empty slits are filled by expanding the nearest slit with + # * the higher-priority object. The slit expansion is conducted so as to + # * give more vertical space to the objects that are more likely to need + # * it and then to objects with higher priority. **/ + + # Initialize the slit array with empty Slit objects + slit_array = pd.DataFrame([Slit(slit_number=i+1).__dict__ for i in range(config['num_rows'])]) + # Copy the Overlap region objects into their proper slit assignment + # pair. Object in OverlapRegion# is given slits numbered # and # - 1. + # Also copy the Row region objects into their corresponding slit + # assignments. + for i in range(len(optimum_astro_obj_array)): + OR = int(optimum_astro_obj_array.iloc[i]['obj_or']) + if OR>=0: + slit_array.loc[OR-1,['slit_obj_name','slit_obj_priority','slit_obj_x','slit_obj_y']] \ + = optimum_astro_obj_array.iloc[i][['target', 'priority', 'obj_x', 'obj_y']].tolist() + slit_array.loc[OR-1, 'slit_width'] = csu_params['slitwidth'] + slit_array.loc[OR-1, 'slit_mul'] = 2 + slit_array.loc[OR,['slit_obj_name','slit_obj_x','slit_obj_y']] \ + = optimum_astro_obj_array.iloc[i][['target', 'obj_x', 'obj_y']].tolist() + slit_array.loc[OR, 'slit_width'] = csu_params['slitwidth'] + RR = int(optimum_astro_obj_array.iloc[i]['obj_rr']) + if RR>=0: + slit_array.loc[RR,['slit_obj_name','slit_obj_priority','slit_obj_x','slit_obj_y']] \ + = optimum_astro_obj_array.iloc[i][['target', 'priority', 'obj_x', 'obj_y']].tolist() + slit_array.loc[RR, 'slit_width'] = csu_params['slitwidth'] + slit_array.loc[RR, 'slit_mul'] = 1 + # Clean up slit_mul and slit_obj_priority + slit_array = clean_up_slit_mul_pri(slit_array) + + # print(slit_array[['slit_number', 'slit_obj_name', 'slit_obj_priority', 'slit_mul']]) + + # Extend slits in length to expand singles into double, doubles into + # triples, etc. + # First expand slit 1 into slit 2 and slit 44 into slit 43 if the + # first slits are originally singles and if 44 and 43 are unoccupied + # and if slit 1's object has higher priority than slit 3's (and if + # slit 44's object has higher priority than slit 42's). + # Then, lengthen occupied slits 2 and 45 into 1 and 46, respectively, + # if slits 1 and 46 are unoccupied. There is no need to compare + # priorities since there can be no objects in slits 0 or 47 (there are + # no such slits). + if (slit_array.loc[1,'slit_mul'] == 1 and slit_array.loc[2,'slit_mul'] == -1 and slit_array.loc[1,'slit_obj_priority'] > slit_array.loc[3,'slit_obj_priority']): + slit_array = expand_slit(slit_array, 1, 2, csu_params['slitwidth']) + + if (slit_array.loc[44,'slit_mul'] == 1 and slit_array.loc[43,'slit_mul'] == -1 and slit_array.loc[44,'slit_obj_priority'] > slit_array.loc[42,'slit_obj_priority']): + slit_array = expand_slit(slit_array, 44, 43, csu_params['slitwidth']) + + if (slit_array.loc[1,'slit_mul'] == 1 and slit_array.loc[0,'slit_mul'] == -1): + slit_array.loc[0,'slit_obj_name'] = slit_array.loc[1,'slit_obj_name'] + slit_array = expand_slit(slit_array, 1, 0, csu_params['slitwidth']) + + if (slit_array.loc[44,'slit_mul'] == 1 and slit_array.loc[45,'slit_mul'] == -1): + slit_array.loc[45,'slit_obj_name'] = slit_array.loc[44,'slit_obj_name'] + slit_array = expand_slit(slit_array, 44, 45, csu_params['slitwidth']) + slit_array = clean_up_slit_mul_pri(slit_array) + # Then extend all "singles" into "doubles" (or "triples") when possible + # and so that, if there is a conflict, the slit which is extended is + # the one that contains the higher-priority object. Note that we at + # first ignore the first set of doubles that were created in order to + # surround objects in overlap regions. This makes sense since those + # original doubles are guaranteed to have their target objects near + # their vertical slit center (within the middle overlap region). + for i in range(len(slit_array)): + if slit_array.loc[i,'slit_mul'] == 1: + if i < 44: + if slit_array.loc[i + 1,'slit_mul'] == -1: + if (slit_array.loc[i,'slit_obj_priority'] > slit_array.loc[i + 2,'slit_obj_priority']) or (slit_array.loc[i + 2,'slit_mul'] == 2): + slit_array = expand_slit(slit_array, i, i + 1, csu_params['slitwidth']) + slit_array.loc[i + 1,'slit_mul'] = slit_array.loc[i,'slit_mul'] + if i > 1: + if slit_array.loc[i - 1,'slit_mul'] == -1: + if (slit_array.loc[i,'slit_obj_priority'] > slit_array.loc[i - 2,'slit_obj_priority']) or (slit_array.loc[i - 2,'slit_mul'] == 2): + slit_array = expand_slit(slit_array, i, i - 1, csu_params['slitwidth']) + slit_array.loc[i - 1,'slit_mul'] = slit_array.loc[i,'slit_mul'] + + # Clean up all the Slit Multiple Length numbers and Priorities + slit_array = clean_up_slit_mul_pri(slit_array) + + # Extend all slits to envelope blanks. After this, every row should be occupied by a slit. + for j in range(config['num_rows']): + for i in range(1, len(slit_array) - 1): + if (slit_array.loc[i,'slit_mul'] == -1 and slit_array.loc[i - 1,'slit_mul'] > 0 and + slit_array.loc[i - 1,'slit_obj_priority'] > slit_array.loc[i + 1,'slit_obj_priority']): + slit_array = expand_slit(slit_array, i-1, i, csu_params['slitwidth']) + slit_array = clean_up_slit_mul_pri(slit_array) + + if (slit_array.loc[i,'slit_mul'] == -1 and slit_array.loc[i + 1,'slit_mul'] > 0 and + slit_array.loc[i + 1,'slit_obj_priority'] > slit_array.loc[i - 1,'slit_obj_priority']): + slit_array = expand_slit(slit_array, i + 1, i, csu_params['slitwidth']) + slit_array = clean_up_slit_mul_pri(slit_array) + + if slit_array.loc[i, 'slit_mul'] == -1 and slit_array.loc[i + 1, 'slit_obj_priority'] == slit_array.loc[i - 1, 'slit_obj_priority']: + if (slit_array.loc[i + 1, 'slit_obj_y'] - csu_params['dead_space'] - (i + 1) * config['single_slit_height'] - 2 * i * csu_params['dead_space']) < 0: + slit_array = expand_slit(slit_array, i + 1, i, csu_params['slitwidth']) + else: + slit_array = expand_slit(slit_array, i - 1, i, csu_params['slitwidth']) + slit_array = clean_up_slit_mul_pri(slit_array) + + slit_array = clean_up_slit_mul_pri(slit_array) + + if slit_array.loc[45,'slit_mul'] == -1 and slit_array.loc[44,'slit_mul'] > 0: + slit_array = expand_slit(slit_array, 44, 45, csu_params['slitwidth']) + slit_array = clean_up_slit_mul_pri(slit_array) + if slit_array.loc[0,'slit_mul'] == -1 and slit_array.loc[1,'slit_mul'] > 0: + slit_array = expand_slit(slit_array, 1, 0, csu_params['slitwidth']) + slit_array = clean_up_slit_mul_pri(slit_array) + + # correct the slitnum value for each slit + for i in range(len(slit_array)): + multiple = 0 + for j in range(len(slit_array)): + if slit_array.loc[j,'slit_obj_name'] == slit_array.loc[i,'slit_obj_name']: + multiple += 1 + slit_array.loc[i, 'slit_mul'] = multiple + slit_array = clean_up_slit_mul_pri(slit_array) + + # Calculate the length, y-coordinate, and x-coordinate of each slit. + for i in range(len(slit_array)): + if slit_array.loc[i, 'slit_mul'] > 0: + slit_array.loc[i, 'slit_length'] = slit_array.loc[i, 'slit_mul'] * config['single_slit_height'] + (slit_array.loc[i, 'slit_mul'] - 1) * config['overlap_as'] + slit_array.loc[i, 'slit_y'] = (2*csu_params['dead_space'] + csu_params['row_region_height'])*(i + 1/2) - config['csu_height']/2 + slit_array.loc[i, 'slit_x'] = slit_array.loc[i, 'slit_obj_x'] - final_field_center.x ## TODO: Check this line later + + # Fill in the barPositionArray with the correct values and write it out to the user-specified file. + bar_position_array = [0]*config['num_rows']*2 + for i in range(len(slit_array)): + bar_position_array[2*i] = -slit_array.loc[config['num_rows']-1-i,'slit_obj_x'] - math.tan(config['bar_tilt']*math.pi/180)*(slit_array.loc[i,'slit_obj_y'] - slit_array.loc[i,'slit_y']) - csu_params['slitwidth']/2 + bar_position_array[2*i+1] = -slit_array.loc[config['num_rows']-1-i,'slit_obj_x'] - math.tan(config['bar_tilt']*math.pi/180)*(slit_array.loc[i,'slit_obj_y'] - slit_array.loc[i,'slit_y']) + csu_params['slitwidth']/2 + # Write the bar position list to a file + with open(bar_position_list, 'w') as file: + for i in range(len(bar_position_array)): + file.write(f"{bar_position_array[i]}\n") + + # Create a new slit_array2, which is a copy of slit_array, but with no empty slits. Then renumber the slits + slit_array2 = slit_array[slit_array['slit_mul'] > 0].reset_index(drop=True) # Filter out empty slits + slit_array2['slit_number'] = -1 + renumber = 0 + newslitnum = 1 + while renumber < len(slit_array2): + slit_array2.loc[renumber, 'slit_number'] = newslitnum + renumber = renumber + slit_array2.loc[renumber, 'slit_mul'] + newslitnum += 1 + + # Make slit_array3, which is simply the slit information. Each slit + # is only listed once, not in multiples. The slit length reflects + # if the slit is a single, double, triple, etc. + slit_array3 = slit_array2[slit_array2['slit_number'] > 0].reset_index(drop=True) + + # Give slitArray3 correct slitY values and "close" slitX values. Later, + # the slitX will be modified to account for bar tilt so that the object + # is placed in the horizontal center of the slit, regardless of its + # vertical displacement from center. + slit_array3['slit_y'] += (slit_array3['slit_mul'] - 1) * (csu_params['dead_space']+csu_params['row_region_height'] / 2) + slit_array3['slit_x'] = slit_array3['slit_obj_x'] + + # Rotate the slits in the CSU plane backwards by the Position Angle. + theta = -final_pa * math.pi / 180 + x_old = slit_array3['slit_x'] + y_old = slit_array3['slit_y'] + slit_array3['slit_x'] = x_old * math.cos(theta) - y_old * math.sin(theta) + slit_array3['slit_y'] = x_old * math.sin(theta) + y_old * math.cos(theta) + + x_obj_old = slit_array3['slit_obj_x'] + y_obj_old = slit_array3['slit_obj_y'] + slit_array3['slit_obj_x'] = x_obj_old * math.cos(theta) - y_obj_old * math.sin(theta) + slit_array3['slit_obj_y'] = x_obj_old * math.sin(theta) + y_obj_old * math.cos(theta) + + slit_array3['wcs_x'] = slit_array3['slit_x'] + final_field_center.x - math.tan(config['bar_tilt'] * math.pi / 180)*(slit_array3['slit_obj_y'] - slit_array3['slit_y']) + slit_array3['wcs_y'] = slit_array3['slit_y'] + final_field_center.y + + slit_array3 = slit_xy_to_radec(slit_array3, final_field_center) + slit_array3['target_location'] = slit_array3['slit_obj_y'] - slit_array3['slit_y'] + slit_array3['slit_obj_wcs_x'] = slit_array3['slit_obj_x'] + final_field_center.x + slit_array3['slit_obj_wcs_y'] = slit_array3['slit_obj_y'] + final_field_center.y + slit_array3 = slit_objxy_to_radec(slit_array3, final_field_center) + slit_array3['slit_number'] = len(slit_array3) - slit_array3['slit_number'] + 1 # Reverse the numbering to match the original slit numbering + + return slit_array3 + + +def write_out_slit_list(output_slit_file, slit_array, slit_width): + """ + Write the slit list to a file. + """ + with open(output_slit_file, 'w') as file: + for i in range(len(slit_array)): + slit = slit_array.loc[len(slit_array) - i - 1] + ra_hms = slit.ra_hms.split(':') + dec_hms = slit.dec_dms.split(':') + obj_ra_hms = slit.obj_ra_hms.split(':') + obj_dec_hms = slit.obj_dec_dms.split(':') + file.write(f"{slit.slit_number}\t{ra_hms[0]}\t{ra_hms[1]}\t{float(ra_hms[2]):.2f}\t{dec_hms[0]}\t{dec_hms[1]}\t{float(dec_hms[2]):.2f}\t" + f"{slit_width:.2f}\t{slit.slit_length:.2f}\t{slit.slit_obj_name}\t{slit.slit_obj_priority:.2f}\t{slit.target_location:.2f}\t" + f"{obj_ra_hms[0]}\t{obj_ra_hms[1]}\t{float(obj_ra_hms[2]):.2f}\t{obj_dec_hms[0]}\t{obj_dec_hms[1]}\t{float(obj_dec_hms[2]):.2f}\n") + + +def print_regions_from_slit_array(slit_array, config, csu_params, field_center, fieldcenterra, fieldcenterdec, position_angle, output_reg_file): + from decimal import Decimal + + old_red_box_x = field_center.x + csu_params['xcenter'] + old_red_box_y = field_center.y + red_box_csu_x = old_red_box_x - field_center.x + red_box_csu_y = old_red_box_y - field_center.y + theta = -position_angle * math.pi / 180 + red_box_csu_x_rotated = red_box_csu_x * math.cos(theta) - red_box_csu_y * math.sin(theta) + red_box_csu_y_rotated = red_box_csu_x * math.sin(theta) + red_box_csu_y * math.cos(theta) + red_box_wcs_x_rotated = red_box_csu_x_rotated + field_center.x + red_box_wcs_y_rotated = red_box_csu_y_rotated + field_center.y + + try: + with open(output_reg_file, 'w') as file: + file.write("global color=green font=\"helvetica 10 normal\" " + "select=1 highlite=0 edit=0 move=0 delete=1 " + "include=1 fixed=0 source \nfk5\n") + file.write(f"circle({fieldcenterra},{fieldcenterdec},{config['csu_fp_radius']}\")\t" + "# color=yellow font=\"helvetica 18 normal\" text={Keck Focal Plane}\n") + file.write(f"box({(field_center.x / math.cos(field_center.y * math.pi / 180 / 3600) / 3600):.5f}," + f"{(field_center.y / 3600):.5f}," + f"{(config['csu_width']):.5f}\"," + f"{(config['csu_height']):.5f}\"," + f"{(position_angle):.3f})\t" + "# color=magenta font=\"helvetica 18 normal\" text={CSU Plane}\n") + file.write(f"box({(red_box_wcs_x_rotated / math.cos(field_center.y * math.pi / 180 / 3600) / 3600):.5f}," + f"{(red_box_wcs_y_rotated / 3600):.5f}," + f"{(csu_params['xrange']):.5f}\"," + f"{(config['csu_height']):.5f}\"," + f"{(position_angle):.3f})\t# color=red\n") + + for i in range(len(slit_array)): + slit = slit_array.loc[i] + file.write(f"circle({(slit['slit_obj_wcs_x'] / math.cos(field_center.y * math.pi / 180 / 3600) / 3600):.5f}," + f"{(slit['slit_obj_wcs_y'] / 3600):.5f},0.5\")\t" + f"# text={{ {slit['slit_obj_name']} }}\n") + file.write(f"box({(slit['wcs_x'] / math.cos(field_center.y * math.pi / 180 / 3600) / 3600):.5f}," + f"{(slit['wcs_y'] / 3600):.5f}," + f"{(csu_params['slitwidth']):.5f}\"," + f"{(slit['slit_length']):.5f}\"," + f"{(config['bar_tilt'] + position_angle):.3f})\t" + f"# text={{Slit# {slit['slit_number']} }}\n") + except Exception as error: + print("Error writing to file:", error) + + print(f"The corresponding SAOImage Ds9 region file is: {output_reg_file}") + + + + + + + diff --git a/lris2-mascgen/lris2_mascgen/utils.py b/lris2-mascgen/lris2_mascgen/utils.py new file mode 100644 index 0000000..ec12e91 --- /dev/null +++ b/lris2-mascgen/lris2_mascgen/utils.py @@ -0,0 +1,77 @@ +import numpy as np +from astropy.coordinates import SkyCoord +import astropy.units as u +from numba import jit + +@jit +def transform_coordinates(x, y, center_x, center_y, pa, extra_x=0, extra_y=0): + """ + Transform coordinates (x, y) by rotating around the center (center_x, center_y) by position angle pa. + :param x: x-coordinate of the point. + :param y: y-coordinate of the point. + :param center_x: x-coordinate of the center. + :param center_y: y-coordinate of the center. + :param pa: position angle in degrees. + :return: transformed x and y coordinates. + """ + theta = np.radians(pa) # Convert PA from degrees to radians + cos_pa = np.cos(theta) + sin_pa = np.sin(theta) + x_old = x - center_x + y_old = y - center_y + new_x = x_old * cos_pa - y_old * sin_pa + extra_x + new_y = x_old * sin_pa + y_old * cos_pa + extra_y + return new_x, new_y + +@jit +def inCircle(x, y, center_x, center_y, radius): + """ + Check if a point (x, y) is inside a circle defined by center (center_x, center_y) and radius. + :param x: x-coordinate of the point. + :param y: y-coordinate of the point. + :param center_x: x-coordinate of the circle center. + :param center_y: y-coordinate of the circle center. + :param radius: radius of the circle. + :return: True if the point is inside the circle, False otherwise. + """ + return np.sqrt(np.power(np.abs(x - center_x),2) + np.power(np.abs(y - center_y),2)) < radius + +class Coordinate: + def __init__(self, ra, dec): + if isinstance(ra, str) and isinstance(dec, str) and ':' in ra and ':' in dec: + self.coord = SkyCoord(ra=ra, dec=dec, unit=(u.hourangle, u.deg)) + self.ra = self.coord.ra.deg + self.dec = self.coord.dec.deg + else: + self.ra = ra + self.dec = dec + self.coord = SkyCoord(ra=ra, dec=dec, unit=(u.deg, u.deg)) + self.to_xy(reference_dec=None) + + def to_radec(self): + # Convert the x and y coordinates in arcseconds back to RA/Dec coordinates. + self.dec = self.y / 3600. # Convert arcseconds to degrees + dec_rad = np.radians(self.dec) # Convert degrees to radians + self.ra = self.x / (3600. * np.cos(dec_rad)) # Convert arcseconds to degrees (15 degrees per hour) + + def to_xy(self, reference_dec=None): + self.y = self.dec * 3600. # Convert declination to arcseconds + if reference_dec is None: + reference_dec = self.dec + dec_rad = np.radians(reference_dec) # Convert declination to radians + self.x = np.cos(dec_rad) * (self.ra * 3600.) + + def do_ra_coord_wrap(self): + hms = self.coord.ra.hms + if hms.h == 0: + hms.h = 12 + elif hms.h == 23: + hms.h = 11 + self.coord = SkyCoord(ra=f"{hms.h}:{hms.m}:{hms.s}", dec=self.coord.dec.deg, unit=(u.hourangle, u.deg)) + self.ra = self.coord.ra.deg + self.dec = self.coord.dec.deg + self.to_xy(reference_dec=None) + +class fieldCenter(Coordinate): + def __init__(self, ra, dec): + super().__init__(ra, dec) \ No newline at end of file diff --git a/lris2-mascgen/pyproject.toml b/lris2-mascgen/pyproject.toml new file mode 100755 index 0000000..f4a38b4 --- /dev/null +++ b/lris2-mascgen/pyproject.toml @@ -0,0 +1,53 @@ +[build-system] +# Specifies the build system to use. +requires = ["setuptools>=44"] +build-backend = "setuptools.build_meta" + +[project] +# Basic information about your project. +name = "lris2-mascgen" +version = "0.1.0" +dependencies = [ + "astropy", + "numpy", + "matplotlib", + "scipy", + "pandas", + "notebook", + "numba", + "multiprocess" +] +requires-python = ">=3.9" +authors = [ + { name = "Yashvi Sharma", email = "yssharma@caltech.edu" } +] +maintainers = [ + { name = "Yashvi Sharma", email = "yssharma@caltech.edu" } +] +description = "LRIS2 CSU mask design software" +readme = "README.md" +license = { text = "MIT" } +keywords = ["example", "package", "keywords"] +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Intended Audience :: Science/Research", + "Topic :: Scientific/Engineering :: Astronomy", +] + +[project.urls] +# Various URLs related to your project. These links are displayed on PyPI. +Repository = "https://github.com/CaltechOpticalObservatories/lris2/lris2-mascgen" +"Bug Tracker" = "https://github.com/CaltechOpticalObservatories/lris2/issues" + +[project.optional-dependencies] +# Optional dependencies that can be installed with extra tags, like "dev". +dev = [ + "pytest", + "black", + "flake8" +] + +[tool.setuptools] +packages = { find = { include = ["lris2_mascgen"] } } \ No newline at end of file