diff --git a/app/data/model/interface.py b/app/data/model/interface.py index f8df8c24..010a4222 100644 --- a/app/data/model/interface.py +++ b/app/data/model/interface.py @@ -32,6 +32,7 @@ class RawCatalog(enum.Enum): ICRS = "icrs" DESIGNATION = "designation" + ADDITIONAL_DESIGNATIONS = "additional_designations" REDSHIFT = "redshift" NATURE = "nature" diff --git a/app/data/model/layer2.py b/app/data/model/layer2.py index afd90d3d..bcf4473f 100644 --- a/app/data/model/layer2.py +++ b/app/data/model/layer2.py @@ -5,12 +5,6 @@ @dataclass class Layer2CatalogObject: - pgc: int - catalog_object: interface.CatalogObject - - -@dataclass -class Layer2Object: pgc: int data: list[interface.CatalogObject] @@ -20,3 +14,66 @@ def get[T](self, t: type[T]) -> T | None: return obj return None + + +@dataclass +class DesignationCatalog: + name: str + + +@dataclass +class ICRSCatalog: + ra: float + e_ra: float + dec: float + e_dec: float + + +@dataclass +class RedshiftCatalog: + cz: float + e_cz: float + + +@dataclass +class NatureCatalog: + type_name: str + + +@dataclass +class Source: + bibcode: str + title: str + authors: list[str] + year: int + + +@dataclass +class AdditionalDesignation: + name: str + source: Source + + +@dataclass +class AdditionalDesignationsCatalog: + names: list[AdditionalDesignation] + + +@dataclass +class Catalogs: + """ + Dsscription of catalogs as they are stored on layer 2. To properly analyze them one probably needs + to read units from metadata of these tables. + """ + + designation: DesignationCatalog | None = None + additional_designations: AdditionalDesignationsCatalog | None = None + icrs: ICRSCatalog | None = None + redshift: RedshiftCatalog | None = None + nature: NatureCatalog | None = None + + +@dataclass +class Layer2Object: + pgc: int + catalogs: Catalogs diff --git a/app/data/repositories/layer2/repository.py b/app/data/repositories/layer2/repository.py index 3b6d0571..0e0b4030 100644 --- a/app/data/repositories/layer2/repository.py +++ b/app/data/repositories/layer2/repository.py @@ -7,9 +7,11 @@ from psycopg import rows from app.data import model +from app.data.model import Layer2CatalogObject, Layer2Object +from app.data.model import layer2 as layer2_model from app.data.repositories.layer2 import filters as repofilters from app.data.repositories.layer2 import params -from app.lib import containers +from app.lib import concurrency, containers from app.lib.storage import postgres catalogs = [ @@ -156,21 +158,21 @@ def _construct_batch_query( conditions=" OR ".join(condition_statements), ), params - def query_batch( + def query_catalogs_batch( self, catalogs: list[model.RawCatalog], search_types: Mapping[str, repofilters.Filter], search_params: Mapping[str, params.SearchParams], limit: int, offset: int, - ) -> dict[str, list[model.Layer2Object]]: + ) -> dict[str, list[model.Layer2CatalogObject]]: query, params = self._construct_batch_query(catalogs, search_types, search_params, limit, offset) records = self._storage.query(query, params=params) records_by_id = containers.group_by(records, key_func=lambda obj: str(obj["record_id"])) - result: dict[str, list[model.Layer2Object]] = {} + result: dict[str, list[model.Layer2CatalogObject]] = {} for record_id, records in records_by_id.items(): if record_id not in result: @@ -180,12 +182,12 @@ def query_batch( return result - def _group_by_pgc(self, objects: list[rows.DictRow]) -> list[model.Layer2Object]: + def _group_by_pgc(self, objects: list[rows.DictRow]) -> list[model.Layer2CatalogObject]: objects_by_pgc = containers.group_by(objects, key_func=lambda obj: int(obj["pgc"])) result = [] for pgc, pgc_objects in objects_by_pgc.items(): - layer2_obj = model.Layer2Object(pgc, []) + layer2_obj = model.Layer2CatalogObject(pgc, []) # TODO: what if for each pgc there are multiple rows? For example, if # the catalog does not have a UNIQUE constraint on pgc. @@ -220,13 +222,180 @@ def _group_by_pgc(self, objects: list[rows.DictRow]) -> list[model.Layer2Object] return result + def _query_designations(self, pgcs: list[int]) -> dict[int, layer2_model.DesignationCatalog]: + if not pgcs: + return {} + rows = self._storage.query( + "SELECT pgc, design FROM layer2.designation WHERE pgc = ANY(%s) ORDER BY pgc", + params=[pgcs], + ) + return {int(row["pgc"]): layer2_model.DesignationCatalog(name=str(row["design"])) for row in rows} + + def _query_icrs(self, pgcs: list[int]) -> dict[int, layer2_model.ICRSCatalog]: + if not pgcs: + return {} + rows = self._storage.query( + "SELECT pgc, ra, e_ra, dec, e_dec FROM layer2.icrs WHERE pgc = ANY(%s) ORDER BY pgc", + params=[pgcs], + ) + result: dict[int, layer2_model.ICRSCatalog] = {} + for row in rows: + if all(row.get(k) is not None for k in ("ra", "e_ra", "dec", "e_dec")): + result[int(row["pgc"])] = layer2_model.ICRSCatalog( + ra=float(row["ra"]), + e_ra=float(row["e_ra"]), + dec=float(row["dec"]), + e_dec=float(row["e_dec"]), + ) + return result + + def _query_redshift(self, pgcs: list[int]) -> dict[int, layer2_model.RedshiftCatalog]: + if not pgcs: + return {} + rows = self._storage.query( + "SELECT pgc, cz, e_cz FROM layer2.cz WHERE pgc = ANY(%s) ORDER BY pgc", + params=[pgcs], + ) + return { + int(row["pgc"]): layer2_model.RedshiftCatalog(cz=float(row["cz"]), e_cz=float(row["e_cz"])) + for row in rows + if row.get("cz") is not None and row.get("e_cz") is not None + } + + def _query_nature(self, pgcs: list[int]) -> dict[int, layer2_model.NatureCatalog]: + if not pgcs: + return {} + rows = self._storage.query( + "SELECT pgc, type_name FROM layer2.nature WHERE pgc = ANY(%s) ORDER BY pgc", + params=[pgcs], + ) + return { + int(row["pgc"]): layer2_model.NatureCatalog(type_name=str(row["type_name"])) + for row in rows + if row.get("type_name") is not None + } + + def _query_additional_designations(self, pgcs: list[int]) -> dict[int, layer2_model.AdditionalDesignationsCatalog]: + if not pgcs: + return {} + rows = self._storage.query( + "SELECT pgc, design, code, year, author, title FROM layer2.designations " + "WHERE pgc = ANY(%s) ORDER BY pgc, design", + params=[pgcs], + ) + result: dict[int, list[layer2_model.AdditionalDesignation]] = {} + for row in rows: + pgc = int(row["pgc"]) + author_val = row.get("author") + authors = ( + author_val if isinstance(author_val, list) else [str(author_val)] if author_val is not None else [] + ) + source = layer2_model.Source( + bibcode=str(row["code"]) if row.get("code") is not None else "", + title=str(row["title"]) if row.get("title") is not None else "", + authors=authors, + year=int(row["year"]) if row.get("year") is not None else 0, + ) + ad = layer2_model.AdditionalDesignation( + name=str(row["design"]) if row.get("design") is not None else "", + source=source, + ) + result.setdefault(pgc, []).append(ad) + return {pgc: layer2_model.AdditionalDesignationsCatalog(names=names) for pgc, names in result.items()} + def query_pgc( self, catalogs: list[model.RawCatalog], pgc_numbers: list[int], limit: int, offset: int = 0, - ): + ) -> list[Layer2Object]: + if not catalogs or not pgc_numbers: + return [] + + pgcs_page = sorted(pgc_numbers)[offset : offset + limit] + if not pgcs_page: + return [] + + errgr = concurrency.ErrorGroup() + designation_task: concurrency.TaskResult[dict[int, layer2_model.DesignationCatalog]] | None = None + additional_designations_task: ( + concurrency.TaskResult[dict[int, layer2_model.AdditionalDesignationsCatalog]] | None + ) = None + icrs_task: concurrency.TaskResult[dict[int, layer2_model.ICRSCatalog]] | None = None + redshift_task: concurrency.TaskResult[dict[int, layer2_model.RedshiftCatalog]] | None = None + nature_task: concurrency.TaskResult[dict[int, layer2_model.NatureCatalog]] | None = None + + if model.RawCatalog.DESIGNATION in catalogs: + designation_task = errgr.run(self._query_designations, pgcs_page) + if model.RawCatalog.ADDITIONAL_DESIGNATIONS in catalogs: + additional_designations_task = errgr.run(self._query_additional_designations, pgcs_page) + if model.RawCatalog.ICRS in catalogs: + icrs_task = errgr.run(self._query_icrs, pgcs_page) + if model.RawCatalog.REDSHIFT in catalogs: + redshift_task = errgr.run(self._query_redshift, pgcs_page) + if model.RawCatalog.NATURE in catalogs: + nature_task = errgr.run(self._query_nature, pgcs_page) + + errgr.wait() + + designation_map = designation_task.result() if designation_task is not None else {} + additional_designations_map = ( + additional_designations_task.result() if additional_designations_task is not None else {} + ) + icrs_map = icrs_task.result() if icrs_task is not None else {} + redshift_map = redshift_task.result() if redshift_task is not None else {} + nature_map = nature_task.result() if nature_task is not None else {} + + return [ + self._layer2_object_from_maps( + pgc, + catalogs, + designation_map, + additional_designations_map, + icrs_map, + redshift_map, + nature_map, + ) + for pgc in pgcs_page + ] + + def _layer2_object_from_maps( + self, + pgc: int, + catalogs: list[model.RawCatalog], + designation_map: dict[int, layer2_model.DesignationCatalog], + additional_designations_map: dict[int, layer2_model.AdditionalDesignationsCatalog], + icrs_map: dict[int, layer2_model.ICRSCatalog], + redshift_map: dict[int, layer2_model.RedshiftCatalog], + nature_map: dict[int, layer2_model.NatureCatalog], + ) -> Layer2Object: + designation = designation_map.get(pgc) if model.RawCatalog.DESIGNATION in catalogs else None + additional_designations = ( + additional_designations_map.get(pgc) if model.RawCatalog.ADDITIONAL_DESIGNATIONS in catalogs else None + ) + icrs = icrs_map.get(pgc) if model.RawCatalog.ICRS in catalogs else None + redshift = redshift_map.get(pgc) if model.RawCatalog.REDSHIFT in catalogs else None + nature = nature_map.get(pgc) if model.RawCatalog.NATURE in catalogs else None + + return Layer2Object( + pgc=pgc, + catalogs=layer2_model.Catalogs( + designation=designation, + additional_designations=additional_designations, + icrs=icrs, + redshift=redshift, + nature=nature, + ), + ) + + def query_catalogs_pgc( + self, + catalogs: list[model.RawCatalog], + pgc_numbers: list[int], + limit: int, + offset: int = 0, + ) -> list[Layer2CatalogObject]: if not catalogs: return [] @@ -275,15 +444,17 @@ def query_pgc( return self._group_by_pgc(objects) - def query( + def query_catalogs( self, catalogs: list[model.RawCatalog], filters: repofilters.Filter, search_params: params.SearchParams, limit: int, offset: int, - ) -> list[model.Layer2Object]: - res = self.query_batch(catalogs, {search_params.name(): filters}, {"obj": search_params}, limit, offset) + ) -> list[model.Layer2CatalogObject]: + res = self.query_catalogs_batch( + catalogs, {search_params.name(): filters}, {"obj": search_params}, limit, offset + ) if "obj" not in res: return [] diff --git a/app/domain/adminapi/crossmatch.py b/app/domain/adminapi/crossmatch.py index 82afa79d..8d96e46b 100644 --- a/app/domain/adminapi/crossmatch.py +++ b/app/domain/adminapi/crossmatch.py @@ -210,7 +210,7 @@ def get_record_crossmatch(self, r: adminapi.GetRecordCrossmatchRequest) -> admin if len(candidate_pgcs) == 0: return response - layer2_objects = self.layer2_repo.query_pgc( + layer2_objects = self.layer2_repo.query_catalogs_pgc( catalogs=[model.RawCatalog.ICRS, model.RawCatalog.DESIGNATION, model.RawCatalog.REDSHIFT], pgc_numbers=list(candidate_pgcs), limit=len(candidate_pgcs), diff --git a/app/domain/dataapi/actions.py b/app/domain/dataapi/actions.py index 68f6bb34..c6250a46 100644 --- a/app/domain/dataapi/actions.py +++ b/app/domain/dataapi/actions.py @@ -28,7 +28,7 @@ def __init__( def query(self, query: dataapi.QueryRequest) -> dataapi.QueryResponse: filters, search_params = search_parsers.query_to_filters(query.q, search_parsers.DEFAULT_PARSERS) - objects = self.layer2_repo.query( + objects = self.layer2_repo.query_catalogs( ENABLED_CATALOGS, filters, search_params, @@ -36,7 +36,7 @@ def query(self, query: dataapi.QueryRequest) -> dataapi.QueryResponse: query.page, ) responder = responders.StructuredResponder(self.catalog_cfg) - pgc_objects = responder.build_response(objects).objects + pgc_objects = responder.build_response_from_catalog(objects).objects return dataapi.QueryResponse(objects=pgc_objects) def query_fits(self, query: dataapi.FITSRequest) -> bytes: diff --git a/app/domain/dataapi/parameterized_query.py b/app/domain/dataapi/parameterized_query.py index 8867bc0a..08f87969 100644 --- a/app/domain/dataapi/parameterized_query.py +++ b/app/domain/dataapi/parameterized_query.py @@ -3,6 +3,14 @@ from app.domain import responders from app.presentation import dataapi +CATALOGS_FOR_PGC_QUERY = [ + model.RawCatalog.DESIGNATION, + model.RawCatalog.ADDITIONAL_DESIGNATIONS, + model.RawCatalog.ICRS, + model.RawCatalog.REDSHIFT, + model.RawCatalog.NATURE, +] + class ParameterizedQueryManager: def __init__( @@ -40,7 +48,7 @@ def _build_filters_and_params( def query_fits(self, query: dataapi.FITSRequest) -> bytes: filters, search_params = self._build_filters_and_params(query) - objects = self.layer2_repo.query( + objects = self.layer2_repo.query_catalogs( self.enabled_catalogs, filters, search_params, @@ -49,26 +57,26 @@ def query_fits(self, query: dataapi.FITSRequest) -> bytes: ) responder = responders.FITSResponder() - return responder.build_response(objects) + return responder.build_response_from_catalog(objects) def query_simple(self, query: dataapi.QuerySimpleRequest) -> dataapi.QuerySimpleResponse: - filters, search_params = self._build_filters_and_params(query) - - if not query.pgcs: - objects = self.layer2_repo.query( - self.enabled_catalogs, - filters, - search_params, - query.page_size, - query.page, - ) - else: + responder = responders.StructuredResponder(self.catalog_config) + if query.pgcs: objects = self.layer2_repo.query_pgc( - self.enabled_catalogs, + CATALOGS_FOR_PGC_QUERY, query.pgcs, query.page_size, query.page, ) + return responder.build_response(objects) - responder = responders.StructuredResponder(self.catalog_config) - return responder.build_response(objects) + filters, search_params = self._build_filters_and_params(query) + + objects = self.layer2_repo.query_catalogs( + self.enabled_catalogs, + filters, + search_params, + query.page_size, + query.page, + ) + return responder.build_response_from_catalog(objects) diff --git a/app/domain/responders/fits_responder.py b/app/domain/responders/fits_responder.py index 80737d51..abc3d015 100644 --- a/app/domain/responders/fits_responder.py +++ b/app/domain/responders/fits_responder.py @@ -7,7 +7,7 @@ from app.domain.responders import interface -def extract_object_data(objects: list[model.Layer2Object]) -> dict[str, np.ndarray]: +def extract_object_data(objects: list[model.Layer2CatalogObject]) -> dict[str, np.ndarray]: data_dict = {} for obj in objects: @@ -61,7 +61,7 @@ def extract_object_data(objects: list[model.Layer2Object]) -> dict[str, np.ndarr return data_dict -def create_fits_hdul(objects: list[model.Layer2Object]) -> fits.HDUList: +def create_fits_hdul(objects: list[model.Layer2CatalogObject]) -> fits.HDUList: data_dict = extract_object_data(objects) columns = [] @@ -87,7 +87,7 @@ def create_fits_hdul(objects: list[model.Layer2Object]) -> fits.HDUList: class FITSResponder(interface.ObjectResponder): - def build_response(self, objects: list[model.Layer2Object]) -> bytes: + def build_response_from_catalog(self, objects: list[model.Layer2CatalogObject]) -> bytes: hdul = create_fits_hdul(objects) with io.BytesIO() as f: diff --git a/app/domain/responders/interface.py b/app/domain/responders/interface.py index 7bebc46b..faf01356 100644 --- a/app/domain/responders/interface.py +++ b/app/domain/responders/interface.py @@ -10,5 +10,5 @@ class ObjectResponder(ABC): """ @abstractmethod - def build_response(self, objects: list[model.Layer2Object]) -> Any: + def build_response_from_catalog(self, objects: list[model.Layer2CatalogObject]) -> Any: pass diff --git a/app/domain/responders/json_responder.py b/app/domain/responders/json_responder.py index 0aeeb26c..033e969a 100644 --- a/app/domain/responders/json_responder.py +++ b/app/domain/responders/json_responder.py @@ -5,7 +5,7 @@ from app.presentation import dataapi -def objects_to_response(objects: list[model.Layer2Object]) -> list[dataapi.PGCObject]: +def objects_to_response(objects: list[model.Layer2CatalogObject]) -> list[dataapi.PGCObject]: response_objects = [] for obj in objects: catalog_data = {o.catalog().value: o.layer2_data() for o in obj.data} @@ -15,5 +15,5 @@ def objects_to_response(objects: list[model.Layer2Object]) -> list[dataapi.PGCOb class JSONResponder(interface.ObjectResponder): - def build_response(self, objects: list[model.Layer2Object]) -> Any: + def build_response(self, objects: list[model.Layer2CatalogObject]) -> Any: return objects_to_response(objects) diff --git a/app/domain/responders/structured_responder.py b/app/domain/responders/structured_responder.py index 9fd9ed23..1fb0c2a5 100644 --- a/app/domain/responders/structured_responder.py +++ b/app/domain/responders/structured_responder.py @@ -88,7 +88,7 @@ def _equatorial_to_galactic( return lon, lat, e_lon, e_lat - def build_response(self, objects: list[layer2.Layer2Object]) -> Any: + def build_response_from_catalog(self, objects: list[layer2.Layer2CatalogObject]) -> Any: catalog_schema = DATA_SCHEMA pgc_objects = [] @@ -149,3 +149,80 @@ def build_response(self, objects: list[layer2.Layer2Object]) -> Any: pgc_objects.append(pgc_object) return dataapi.QuerySimpleResponse(objects=pgc_objects, schema=catalog_schema) + + def build_response(self, objects: list[layer2.Layer2Object]) -> Any: + catalog_schema = DATA_SCHEMA + pgc_objects = [] + + for obj in objects: + catalogs = dataapi.Catalogs() + + if obj.catalogs.designation is not None: + catalogs.designation = dataapi.Designation(name=obj.catalogs.designation.name) + + if obj.catalogs.additional_designations is not None: + catalogs.additional_designations = [ + dataapi.AdditionalDesignation( + name=ad.name, + source=dataapi.Source( + bibcode=ad.source.bibcode, + title=ad.source.title, + authors=ad.source.authors, + year=ad.source.year, + ), + ) + for ad in obj.catalogs.additional_designations.names + ] + + icrs = obj.catalogs.icrs + if icrs is not None: + ra, dec, e_ra, e_dec = self._equatorial(icrs.ra, icrs.dec, icrs.e_ra, icrs.e_dec) + lon, lat, e_lon, e_lat = self._equatorial_to_galactic(icrs.ra, icrs.dec, icrs.e_ra, icrs.e_dec) + + catalogs.coordinates = dataapi.Coordinates( + equatorial=dataapi.EquatorialCoordinates(ra=ra, dec=dec, e_ra=e_ra, e_dec=e_dec), + galactic=dataapi.GalacticCoordinates(lon=lon, lat=lat, e_lon=e_lon, e_lat=e_lat), + ) + + if obj.catalogs.redshift is not None: + redshift = obj.catalogs.redshift + catalogs.redshift = dataapi.Redshift( + z=self._heliocentric_to_redshift(redshift.cz), + e_z=self._heliocentric_to_redshift(redshift.e_cz), + ) + + if obj.catalogs.nature is not None: + catalogs.nature = dataapi.Nature(type_name=obj.catalogs.nature.type_name) + + if icrs is not None and obj.catalogs.redshift is not None: + redshift = obj.catalogs.redshift + catalogs.velocity = {} + + for key, apex in self.config.velocity.apexes.items(): + vel_wr_apex, vel_wr_apex_err = astronomy.velocity_wr_apex( + vel=redshift.cz * u.Unit("m/s"), + lon=lon * u.Unit("deg"), + lat=lat * u.Unit("deg"), + vel_apex=apex.vel.value * u.Unit("km/s"), + lon_apex=apex.lon.value * u.Unit("deg"), + lat_apex=apex.lat.value * u.Unit("deg"), + vel_err=redshift.e_cz * u.Unit("m/s"), + lon_err=e_lon * u.Unit("arcsec"), + lat_err=e_lat * u.Unit("arcsec"), + vel_apex_err=apex.vel.error * u.Unit("km/s"), + lon_apex_err=apex.lon.error * u.Unit("arcsec"), + lat_apex_err=apex.lat.error * u.Unit("arcsec"), + ) + + schema = VELOCITY_SCHEMA + + catalog_schema.units.velocity[key] = schema + catalogs.velocity[key] = dataapi.AbsoluteVelocity( + v=vel_wr_apex.to(u.Unit(schema.v)).value, + e_v=vel_wr_apex_err.to(u.Unit(schema.e_v)).value, + ) + + pgc_object = dataapi.PGCObject(pgc=obj.pgc, catalogs=catalogs) + pgc_objects.append(pgc_object) + + return dataapi.QuerySimpleResponse(objects=pgc_objects, schema=catalog_schema) diff --git a/app/lib/storage/postgres/postgres_storage.py b/app/lib/storage/postgres/postgres_storage.py index 75ee93b8..4058705c 100644 --- a/app/lib/storage/postgres/postgres_storage.py +++ b/app/lib/storage/postgres/postgres_storage.py @@ -68,8 +68,8 @@ def connect(self) -> None: self._logger.debug("connecting to Postgres", endpoint=self._config.endpoint, port=self._config.port) self._pool = ConnectionPool( self._config.get_dsn(), - min_size=2, - max_size=10, + min_size=10, + max_size=30, kwargs={"row_factory": rows.dict_row, "autocommit": True}, configure=self._configure_connection, ) diff --git a/app/presentation/dataapi/interface.py b/app/presentation/dataapi/interface.py index 222f9e7f..8bc09ed9 100644 --- a/app/presentation/dataapi/interface.py +++ b/app/presentation/dataapi/interface.py @@ -36,12 +36,25 @@ class Designation(pydantic.BaseModel): name: str +class Source(pydantic.BaseModel): + bibcode: str + title: str + authors: list[str] + year: int + + +class AdditionalDesignation(pydantic.BaseModel): + name: str + source: Source + + class Nature(pydantic.BaseModel): type_name: str class Catalogs(pydantic.BaseModel): designation: Designation | None = None + additional_designations: list[AdditionalDesignation] | None = None coordinates: Coordinates | None = None velocity: dict[str, AbsoluteVelocity] | None = None redshift: Redshift | None = None @@ -89,7 +102,7 @@ class Schema(pydantic.BaseModel): class QuerySimpleRequest(pydantic.BaseModel): pgcs: list[int] | None = pydantic.Field( default=None, - description="List of PGC numbers. If specified, all other filters will be ignored", + description="List of PGC numbers. If specified, no other filters are allowed", ) ra: float | None = pydantic.Field( default=None, @@ -124,6 +137,14 @@ class QuerySimpleRequest(pydantic.BaseModel): description="Page number", ) + @pydantic.model_validator(mode="after") + def _pgcs_exclusive_with_filters(self) -> "QuerySimpleRequest": + if self.pgcs: + filters = [self.ra, self.dec, self.radius, self.name, self.cz, self.cz_err_percent] + if any(f is not None for f in filters): + raise ValueError("When pgcs is specified, no other filters are allowed") + return self + class QuerySimpleResponse(pydantic.BaseModel): objects: list[PGCObject] diff --git a/postgres/migrations/V030__additional_designations.sql b/postgres/migrations/V030__additional_designations.sql new file mode 100644 index 00000000..09ef79f2 --- /dev/null +++ b/postgres/migrations/V030__additional_designations.sql @@ -0,0 +1,23 @@ +CREATE VIEW layer2.designations AS +SELECT + r.pgc +, d.design +, t.bib +, b.code +, b.year +, b.author +, b.title +FROM + designation.data AS d + LEFT JOIN layer0.records AS r ON (d.record_id = r.id) + LEFT JOIN layer0.tables AS t ON (r.table_id = t.id) + LEFT JOIN common.bib AS b ON (t.bib = b.id) +WHERE r.pgc IS NOT NULL + AND NOT EXISTS ( + SELECT 1 FROM layer2.designation AS ld + WHERE ld.pgc = r.pgc AND ld.design = d.design + ); + +CREATE INDEX IF NOT EXISTS layer0_records_id_pgc_not_null +ON layer0.records (id) +WHERE pgc IS NOT NULL; diff --git a/tests/integration/layer2_import_test.py b/tests/integration/layer2_import_test.py index 8943d3cc..05ca20c3 100644 --- a/tests/integration/layer2_import_test.py +++ b/tests/integration/layer2_import_test.py @@ -51,14 +51,14 @@ def test_import_two_catalogs(self): self.task.run() - actual = self.layer2_repo.query( + actual = self.layer2_repo.query_catalogs( [model.RawCatalog.ICRS, model.RawCatalog.DESIGNATION], layer2.PGCOneOfFilter([1234]), layer2.CombinedSearchParams([]), 10, 0, ) - expected = model.Layer2Object( + expected = model.Layer2CatalogObject( 1234, [model.ICRSCatalogObject(ra=12, e_ra=0.2, dec=13, e_dec=0.2), model.DesignationCatalogObject("test1")] ) @@ -82,7 +82,7 @@ def test_updated_objects(self): new_last_update_dt = self.layer2_repo.get_last_update_time(model.RawCatalog.DESIGNATION) self.assertGreater(new_last_update_dt, last_update_dt) - actual = self.layer2_repo.query( + actual = self.layer2_repo.query_catalogs( [model.RawCatalog.DESIGNATION], layer2.PGCOneOfFilter([1234]), layer2.CombinedSearchParams([]), diff --git a/tests/integration/layer2_repository_test.py b/tests/integration/layer2_repository_test.py index 3629737d..877d338a 100644 --- a/tests/integration/layer2_repository_test.py +++ b/tests/integration/layer2_repository_test.py @@ -22,18 +22,19 @@ def tearDown(self): self.pg_storage.clear() def _save_layer2_data(self, objects: list[model.Layer2CatalogObject]) -> None: - by_table: dict[str, list[model.Layer2CatalogObject]] = {} + by_table: dict[str, list[tuple[int, model.CatalogObject]]] = {} for obj in objects: - table = obj.catalog_object.layer2_table() - if table not in by_table: - by_table[table] = [] - by_table[table].append(obj) - for table, table_objects in by_table.items(): - if not table_objects: + for catalog_obj in obj.data: + table = catalog_obj.layer2_table() + if table not in by_table: + by_table[table] = [] + by_table[table].append((obj.pgc, catalog_obj)) + for table, table_entries in by_table.items(): + if not table_entries: continue - columns = table_objects[0].catalog_object.layer2_keys() - pgcs = [obj.pgc for obj in table_objects] - data = [[obj.catalog_object.layer2_data()[c] for c in columns] for obj in table_objects] + columns = table_entries[0][1].layer2_keys() + pgcs = [pgc for pgc, _ in table_entries] + data = [[catalog_obj.layer2_data()[c] for c in columns] for _, catalog_obj in table_entries] self.layer2_repo.save(table, columns, pgcs, data) def _get_table(self, table_name: str) -> int: @@ -43,34 +44,34 @@ def _get_table(self, table_name: str) -> int: def test_one_object(self): objects: list[model.Layer2CatalogObject] = [ - model.Layer2CatalogObject(1, model.DesignationCatalogObject(design="test")), - model.Layer2CatalogObject(2, model.DesignationCatalogObject(design="test2")), + model.Layer2CatalogObject(1, [model.DesignationCatalogObject(design="test")]), + model.Layer2CatalogObject(2, [model.DesignationCatalogObject(design="test2")]), ] self.common_repo.register_pgcs([1, 2]) self._save_layer2_data(objects) - actual = self.layer2_repo.query( + actual = self.layer2_repo.query_catalogs( [model.RawCatalog.DESIGNATION], layer2.DesignationEqualsFilter("test"), layer2.CombinedSearchParams([]), 10, 0, ) - expected = [model.Layer2Object(1, [model.DesignationCatalogObject(design="test")])] + expected = [model.Layer2CatalogObject(1, [model.DesignationCatalogObject(design="test")])] self.assertEqual(actual, expected) def test_several_objects(self): objects: list[model.Layer2CatalogObject] = [ - model.Layer2CatalogObject(1, model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(2, model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)), + model.Layer2CatalogObject(1, [model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(2, [model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)]), ] self.common_repo.register_pgcs([1, 2]) self._save_layer2_data(objects) - actual = self.layer2_repo.query( + actual = self.layer2_repo.query_catalogs( [model.RawCatalog.ICRS], layer2.ICRSCoordinatesInRadiusFilter(10), layer2.ICRSSearchParams(12, 12), @@ -78,23 +79,28 @@ def test_several_objects(self): 0, ) expected = [ - model.Layer2Object(1, [model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)]), - model.Layer2Object(2, [model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(1, [model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(2, [model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)]), ] self.assertEqual(actual, expected) def test_several_catalogs(self): objects = [ - model.Layer2CatalogObject(1, model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(2, model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(2, model.DesignationCatalogObject(design="test2")), + model.Layer2CatalogObject(1, [model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject( + 2, + [ + model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1), + model.DesignationCatalogObject(design="test2"), + ], + ), ] self.common_repo.register_pgcs([1, 2]) self._save_layer2_data(objects) - actual = self.layer2_repo.query( + actual = self.layer2_repo.query_catalogs( [model.RawCatalog.ICRS, model.RawCatalog.DESIGNATION], layer2.DesignationEqualsFilter("test2"), layer2.CombinedSearchParams([]), @@ -102,7 +108,7 @@ def test_several_catalogs(self): 0, ) expected = [ - model.Layer2Object( + model.Layer2CatalogObject( 2, [ model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1), @@ -115,16 +121,26 @@ def test_several_catalogs(self): def test_several_filters(self): objects = [ - model.Layer2CatalogObject(1, model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(2, model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(2, model.DesignationCatalogObject(design="test2")), - model.Layer2CatalogObject(1, model.DesignationCatalogObject(design="test")), + model.Layer2CatalogObject( + 1, + [ + model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1), + model.DesignationCatalogObject(design="test"), + ], + ), + model.Layer2CatalogObject( + 2, + [ + model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1), + model.DesignationCatalogObject(design="test2"), + ], + ), ] self.common_repo.register_pgcs([1, 2]) self._save_layer2_data(objects) - actual = self.layer2_repo.query( + actual = self.layer2_repo.query_catalogs( [model.RawCatalog.ICRS, model.RawCatalog.DESIGNATION], layer2.AndFilter( [ @@ -142,7 +158,7 @@ def test_several_filters(self): ) expected = [ - model.Layer2Object( + model.Layer2CatalogObject( 2, [ model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1), @@ -155,17 +171,17 @@ def test_several_filters(self): def test_pagination(self): objects: list[model.Layer2CatalogObject] = [ - model.Layer2CatalogObject(1, model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(2, model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(3, model.ICRSCatalogObject(ra=12, dec=12, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(4, model.ICRSCatalogObject(ra=13, dec=13, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(5, model.ICRSCatalogObject(ra=14, dec=14, e_ra=0.1, e_dec=0.1)), + model.Layer2CatalogObject(1, [model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(2, [model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(3, [model.ICRSCatalogObject(ra=12, dec=12, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(4, [model.ICRSCatalogObject(ra=13, dec=13, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(5, [model.ICRSCatalogObject(ra=14, dec=14, e_ra=0.1, e_dec=0.1)]), ] self.common_repo.register_pgcs([1, 2, 3, 4, 5]) self._save_layer2_data(objects) - actual = self.layer2_repo.query( + actual = self.layer2_repo.query_catalogs( [model.RawCatalog.ICRS], layer2.ICRSCoordinatesInRadiusFilter(10), layer2.ICRSSearchParams(12, 12), @@ -177,17 +193,17 @@ def test_pagination(self): def test_batch_query(self): objects: list[model.Layer2CatalogObject] = [ - model.Layer2CatalogObject(1, model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(2, model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(3, model.ICRSCatalogObject(ra=12, dec=12, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(4, model.ICRSCatalogObject(ra=13, dec=13, e_ra=0.1, e_dec=0.1)), - model.Layer2CatalogObject(5, model.ICRSCatalogObject(ra=14, dec=14, e_ra=0.1, e_dec=0.1)), + model.Layer2CatalogObject(1, [model.ICRSCatalogObject(ra=10, dec=10, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(2, [model.ICRSCatalogObject(ra=11, dec=11, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(3, [model.ICRSCatalogObject(ra=12, dec=12, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(4, [model.ICRSCatalogObject(ra=13, dec=13, e_ra=0.1, e_dec=0.1)]), + model.Layer2CatalogObject(5, [model.ICRSCatalogObject(ra=14, dec=14, e_ra=0.1, e_dec=0.1)]), ] self.common_repo.register_pgcs([1, 2, 3, 4, 5]) self._save_layer2_data(objects) - actual = self.layer2_repo.query_batch( + actual = self.layer2_repo.query_catalogs_batch( [model.RawCatalog.ICRS], {"icrs": layer2.ICRSCoordinatesInRadiusFilter(10)}, { @@ -227,8 +243,8 @@ def test_get_orphaned_pgcs_returns_pgcs_without_layer1_data(self) -> None: self.common_repo.register_pgcs([1, 2]) self._save_layer2_data( [ - model.Layer2CatalogObject(1, model.DesignationCatalogObject(design="a")), - model.Layer2CatalogObject(2, model.DesignationCatalogObject(design="b")), + model.Layer2CatalogObject(1, [model.DesignationCatalogObject(design="a")]), + model.Layer2CatalogObject(2, [model.DesignationCatalogObject(design="b")]), ] ) @@ -243,7 +259,7 @@ def test_get_orphaned_pgcs_returns_empty_when_layer1_present(self) -> None: self.common_repo.register_pgcs([100]) self.layer0_repo.upsert_pgc({"r1": 100}) self.layer1_repo.save_structured_data("designation.data", ["design"], ["r1"], [["x"]]) - self._save_layer2_data([model.Layer2CatalogObject(100, model.DesignationCatalogObject(design="x"))]) + self._save_layer2_data([model.Layer2CatalogObject(100, [model.DesignationCatalogObject(design="x")])]) orphaned = self.layer2_repo.get_orphaned_pgcs([model.RawCatalog.DESIGNATION]) @@ -257,8 +273,8 @@ def test_get_orphaned_pgcs_returns_only_pgcs_without_layer1_data(self) -> None: self.layer1_repo.save_structured_data("designation.data", ["design"], ["r1"], [["linked"]]) self._save_layer2_data( [ - model.Layer2CatalogObject(100, model.DesignationCatalogObject(design="linked")), - model.Layer2CatalogObject(200, model.DesignationCatalogObject(design="orphan")), + model.Layer2CatalogObject(100, [model.DesignationCatalogObject(design="linked")]), + model.Layer2CatalogObject(200, [model.DesignationCatalogObject(design="orphan")]), ] ) @@ -271,14 +287,14 @@ def test_remove_pgcs_removes_specified_pgcs(self) -> None: self.common_repo.register_pgcs([1, 2]) self._save_layer2_data( [ - model.Layer2CatalogObject(1, model.DesignationCatalogObject(design="d1")), - model.Layer2CatalogObject(2, model.DesignationCatalogObject(design="d2")), + model.Layer2CatalogObject(1, [model.DesignationCatalogObject(design="d1")]), + model.Layer2CatalogObject(2, [model.DesignationCatalogObject(design="d2")]), ] ) self.layer2_repo.remove_pgcs([model.RawCatalog.DESIGNATION], [1]) - actual = self.layer2_repo.query( + actual = self.layer2_repo.query_catalogs( [model.RawCatalog.DESIGNATION], layer2.DesignationEqualsFilter("d1"), layer2.CombinedSearchParams([]), @@ -286,11 +302,11 @@ def test_remove_pgcs_removes_specified_pgcs(self) -> None: 0, ) self.assertEqual(actual, []) - actual = self.layer2_repo.query( + actual = self.layer2_repo.query_catalogs( [model.RawCatalog.DESIGNATION], layer2.DesignationEqualsFilter("d2"), layer2.CombinedSearchParams([]), 10, 0, ) - self.assertEqual(actual, [model.Layer2Object(2, [model.DesignationCatalogObject(design="d2")])]) + self.assertEqual(actual, [model.Layer2CatalogObject(2, [model.DesignationCatalogObject(design="d2")])]) diff --git a/tests/lib/postgres.py b/tests/lib/postgres.py index 4b5fefe1..4e9ea222 100644 --- a/tests/lib/postgres.py +++ b/tests/lib/postgres.py @@ -135,7 +135,12 @@ def clear(self): "SELECT table_schema, table_name FROM information_schema.tables " "WHERE table_schema = 'layer2' OR table_schema = 'layer0'" ): - self.storage.exec(f"TRUNCATE {table['table_schema']}.{table['table_name']} CASCADE") + try: + self.storage.exec(f"TRUNCATE {table['table_schema']}.{table['table_name']} CASCADE") + except psycopg.Error as e: + logger.warning( + "truncate skipped", schema=table["table_schema"], table=table["table_name"], error=str(e) + ) self.storage.exec("INSERT INTO layer2.last_update (dt, catalog) VALUES (to_timestamp(0), 'designation')") self.storage.exec("INSERT INTO layer2.last_update (dt, catalog) VALUES (to_timestamp(0), 'icrs')") diff --git a/tests/unit/domain/fits_responder_test.py b/tests/unit/domain/fits_responder_test.py index cdc3c8b5..8a09236a 100644 --- a/tests/unit/domain/fits_responder_test.py +++ b/tests/unit/domain/fits_responder_test.py @@ -10,7 +10,7 @@ class ExtractObjectDataTest(unittest.TestCase): def setUp(self): self.objects = [ - model.Layer2Object( + model.Layer2CatalogObject( pgc=1234, data=[ model.DesignationCatalogObject(design="Galaxy1"), @@ -18,7 +18,7 @@ def setUp(self): model.RedshiftCatalogObject(cz=11.8, e_cz=0.2), ], ), - model.Layer2Object( + model.Layer2CatalogObject( pgc=5678, data=[ model.DesignationCatalogObject(design="Galaxy2"), @@ -68,7 +68,7 @@ def test_data_values(self): class CreateFitsHdulTest(unittest.TestCase): def setUp(self): self.objects = [ - model.Layer2Object( + model.Layer2CatalogObject( pgc=1234, data=[ model.DesignationCatalogObject(design="Galaxy1"), @@ -100,7 +100,7 @@ def test_table_columns(self): class FitsResponderTest(unittest.TestCase): def setUp(self): self.objects = [ - model.Layer2Object( + model.Layer2CatalogObject( pgc=1234, data=[ model.DesignationCatalogObject(design="Galaxy1"), @@ -111,7 +111,7 @@ def setUp(self): self.responder = fits_responder.FITSResponder() def test_build_response(self): - fits_data = self.responder.build_response(self.objects) + fits_data = self.responder.build_response_from_catalog(self.objects) self.assertIsInstance(fits_data, bytes) self.assertGreater(len(fits_data), 0)