Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/data/model/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class RawCatalog(enum.Enum):

ICRS = "icrs"
DESIGNATION = "designation"
ADDITIONAL_DESIGNATIONS = "additional_designations"
REDSHIFT = "redshift"
NATURE = "nature"

Expand Down
69 changes: 63 additions & 6 deletions app/data/model/layer2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@

@dataclass
class Layer2CatalogObject:
pgc: int
catalog_object: interface.CatalogObject


@dataclass
class Layer2Object:
pgc: int
data: list[interface.CatalogObject]

Expand All @@ -20,3 +14,66 @@ def get[T](self, t: type[T]) -> T | None:
return obj

return None


@dataclass
class DesignationCatalog:
name: str


@dataclass
class ICRSCatalog:
ra: float
e_ra: float
dec: float
e_dec: float


@dataclass
class RedshiftCatalog:
cz: float
e_cz: float


@dataclass
class NatureCatalog:
type_name: str


@dataclass
class Source:
bibcode: str
title: str
authors: list[str]
year: int


@dataclass
class AdditionalDesignation:
name: str
source: Source


@dataclass
class AdditionalDesignationsCatalog:
names: list[AdditionalDesignation]


@dataclass
class Catalogs:
"""
Dsscription of catalogs as they are stored on layer 2. To properly analyze them one probably needs
to read units from metadata of these tables.
"""

designation: DesignationCatalog | None = None
additional_designations: AdditionalDesignationsCatalog | None = None
icrs: ICRSCatalog | None = None
redshift: RedshiftCatalog | None = None
nature: NatureCatalog | None = None


@dataclass
class Layer2Object:
pgc: int
catalogs: Catalogs
191 changes: 181 additions & 10 deletions app/data/repositories/layer2/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from psycopg import rows

from app.data import model
from app.data.model import Layer2CatalogObject, Layer2Object
from app.data.model import layer2 as layer2_model
from app.data.repositories.layer2 import filters as repofilters
from app.data.repositories.layer2 import params
from app.lib import containers
from app.lib import concurrency, containers
from app.lib.storage import postgres

catalogs = [
Expand Down Expand Up @@ -156,21 +158,21 @@ def _construct_batch_query(
conditions=" OR ".join(condition_statements),
), params

def query_batch(
def query_catalogs_batch(
self,
catalogs: list[model.RawCatalog],
search_types: Mapping[str, repofilters.Filter],
search_params: Mapping[str, params.SearchParams],
limit: int,
offset: int,
) -> dict[str, list[model.Layer2Object]]:
) -> dict[str, list[model.Layer2CatalogObject]]:
query, params = self._construct_batch_query(catalogs, search_types, search_params, limit, offset)

records = self._storage.query(query, params=params)

records_by_id = containers.group_by(records, key_func=lambda obj: str(obj["record_id"]))

result: dict[str, list[model.Layer2Object]] = {}
result: dict[str, list[model.Layer2CatalogObject]] = {}

for record_id, records in records_by_id.items():
if record_id not in result:
Expand All @@ -180,12 +182,12 @@ def query_batch(

return result

def _group_by_pgc(self, objects: list[rows.DictRow]) -> list[model.Layer2Object]:
def _group_by_pgc(self, objects: list[rows.DictRow]) -> list[model.Layer2CatalogObject]:
objects_by_pgc = containers.group_by(objects, key_func=lambda obj: int(obj["pgc"]))
result = []

for pgc, pgc_objects in objects_by_pgc.items():
layer2_obj = model.Layer2Object(pgc, [])
layer2_obj = model.Layer2CatalogObject(pgc, [])

# TODO: what if for each pgc there are multiple rows? For example, if
# the catalog does not have a UNIQUE constraint on pgc.
Expand Down Expand Up @@ -220,13 +222,180 @@ def _group_by_pgc(self, objects: list[rows.DictRow]) -> list[model.Layer2Object]

return result

def _query_designations(self, pgcs: list[int]) -> dict[int, layer2_model.DesignationCatalog]:
if not pgcs:
return {}
rows = self._storage.query(
"SELECT pgc, design FROM layer2.designation WHERE pgc = ANY(%s) ORDER BY pgc",
params=[pgcs],
)
return {int(row["pgc"]): layer2_model.DesignationCatalog(name=str(row["design"])) for row in rows}

def _query_icrs(self, pgcs: list[int]) -> dict[int, layer2_model.ICRSCatalog]:
if not pgcs:
return {}
rows = self._storage.query(
"SELECT pgc, ra, e_ra, dec, e_dec FROM layer2.icrs WHERE pgc = ANY(%s) ORDER BY pgc",
params=[pgcs],
)
result: dict[int, layer2_model.ICRSCatalog] = {}
for row in rows:
if all(row.get(k) is not None for k in ("ra", "e_ra", "dec", "e_dec")):
result[int(row["pgc"])] = layer2_model.ICRSCatalog(
ra=float(row["ra"]),
e_ra=float(row["e_ra"]),
dec=float(row["dec"]),
e_dec=float(row["e_dec"]),
)
return result

def _query_redshift(self, pgcs: list[int]) -> dict[int, layer2_model.RedshiftCatalog]:
if not pgcs:
return {}
rows = self._storage.query(
"SELECT pgc, cz, e_cz FROM layer2.cz WHERE pgc = ANY(%s) ORDER BY pgc",
params=[pgcs],
)
return {
int(row["pgc"]): layer2_model.RedshiftCatalog(cz=float(row["cz"]), e_cz=float(row["e_cz"]))
for row in rows
if row.get("cz") is not None and row.get("e_cz") is not None
}

def _query_nature(self, pgcs: list[int]) -> dict[int, layer2_model.NatureCatalog]:
if not pgcs:
return {}
rows = self._storage.query(
"SELECT pgc, type_name FROM layer2.nature WHERE pgc = ANY(%s) ORDER BY pgc",
params=[pgcs],
)
return {
int(row["pgc"]): layer2_model.NatureCatalog(type_name=str(row["type_name"]))
for row in rows
if row.get("type_name") is not None
}

def _query_additional_designations(self, pgcs: list[int]) -> dict[int, layer2_model.AdditionalDesignationsCatalog]:
if not pgcs:
return {}
rows = self._storage.query(
"SELECT pgc, design, code, year, author, title FROM layer2.designations "
"WHERE pgc = ANY(%s) ORDER BY pgc, design",
params=[pgcs],
)
result: dict[int, list[layer2_model.AdditionalDesignation]] = {}
for row in rows:
pgc = int(row["pgc"])
author_val = row.get("author")
authors = (
author_val if isinstance(author_val, list) else [str(author_val)] if author_val is not None else []
)
source = layer2_model.Source(
bibcode=str(row["code"]) if row.get("code") is not None else "",
title=str(row["title"]) if row.get("title") is not None else "",
authors=authors,
year=int(row["year"]) if row.get("year") is not None else 0,
)
ad = layer2_model.AdditionalDesignation(
name=str(row["design"]) if row.get("design") is not None else "",
source=source,
)
result.setdefault(pgc, []).append(ad)
return {pgc: layer2_model.AdditionalDesignationsCatalog(names=names) for pgc, names in result.items()}

def query_pgc(
self,
catalogs: list[model.RawCatalog],
pgc_numbers: list[int],
limit: int,
offset: int = 0,
):
) -> list[Layer2Object]:
if not catalogs or not pgc_numbers:
return []

pgcs_page = sorted(pgc_numbers)[offset : offset + limit]
if not pgcs_page:
return []

errgr = concurrency.ErrorGroup()
designation_task: concurrency.TaskResult[dict[int, layer2_model.DesignationCatalog]] | None = None
additional_designations_task: (
concurrency.TaskResult[dict[int, layer2_model.AdditionalDesignationsCatalog]] | None
) = None
icrs_task: concurrency.TaskResult[dict[int, layer2_model.ICRSCatalog]] | None = None
redshift_task: concurrency.TaskResult[dict[int, layer2_model.RedshiftCatalog]] | None = None
nature_task: concurrency.TaskResult[dict[int, layer2_model.NatureCatalog]] | None = None

if model.RawCatalog.DESIGNATION in catalogs:
designation_task = errgr.run(self._query_designations, pgcs_page)
if model.RawCatalog.ADDITIONAL_DESIGNATIONS in catalogs:
additional_designations_task = errgr.run(self._query_additional_designations, pgcs_page)
if model.RawCatalog.ICRS in catalogs:
icrs_task = errgr.run(self._query_icrs, pgcs_page)
if model.RawCatalog.REDSHIFT in catalogs:
redshift_task = errgr.run(self._query_redshift, pgcs_page)
if model.RawCatalog.NATURE in catalogs:
nature_task = errgr.run(self._query_nature, pgcs_page)

errgr.wait()

designation_map = designation_task.result() if designation_task is not None else {}
additional_designations_map = (
additional_designations_task.result() if additional_designations_task is not None else {}
)
icrs_map = icrs_task.result() if icrs_task is not None else {}
redshift_map = redshift_task.result() if redshift_task is not None else {}
nature_map = nature_task.result() if nature_task is not None else {}

return [
self._layer2_object_from_maps(
pgc,
catalogs,
designation_map,
additional_designations_map,
icrs_map,
redshift_map,
nature_map,
)
for pgc in pgcs_page
]

def _layer2_object_from_maps(
self,
pgc: int,
catalogs: list[model.RawCatalog],
designation_map: dict[int, layer2_model.DesignationCatalog],
additional_designations_map: dict[int, layer2_model.AdditionalDesignationsCatalog],
icrs_map: dict[int, layer2_model.ICRSCatalog],
redshift_map: dict[int, layer2_model.RedshiftCatalog],
nature_map: dict[int, layer2_model.NatureCatalog],
) -> Layer2Object:
designation = designation_map.get(pgc) if model.RawCatalog.DESIGNATION in catalogs else None
additional_designations = (
additional_designations_map.get(pgc) if model.RawCatalog.ADDITIONAL_DESIGNATIONS in catalogs else None
)
icrs = icrs_map.get(pgc) if model.RawCatalog.ICRS in catalogs else None
redshift = redshift_map.get(pgc) if model.RawCatalog.REDSHIFT in catalogs else None
nature = nature_map.get(pgc) if model.RawCatalog.NATURE in catalogs else None

return Layer2Object(
pgc=pgc,
catalogs=layer2_model.Catalogs(
designation=designation,
additional_designations=additional_designations,
icrs=icrs,
redshift=redshift,
nature=nature,
),
)

def query_catalogs_pgc(
self,
catalogs: list[model.RawCatalog],
pgc_numbers: list[int],
limit: int,
offset: int = 0,
) -> list[Layer2CatalogObject]:
if not catalogs:
return []

Expand Down Expand Up @@ -275,15 +444,17 @@ def query_pgc(

return self._group_by_pgc(objects)

def query(
def query_catalogs(
self,
catalogs: list[model.RawCatalog],
filters: repofilters.Filter,
search_params: params.SearchParams,
limit: int,
offset: int,
) -> list[model.Layer2Object]:
res = self.query_batch(catalogs, {search_params.name(): filters}, {"obj": search_params}, limit, offset)
) -> list[model.Layer2CatalogObject]:
res = self.query_catalogs_batch(
catalogs, {search_params.name(): filters}, {"obj": search_params}, limit, offset
)

if "obj" not in res:
return []
Expand Down
2 changes: 1 addition & 1 deletion app/domain/adminapi/crossmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def get_record_crossmatch(self, r: adminapi.GetRecordCrossmatchRequest) -> admin
if len(candidate_pgcs) == 0:
return response

layer2_objects = self.layer2_repo.query_pgc(
layer2_objects = self.layer2_repo.query_catalogs_pgc(
catalogs=[model.RawCatalog.ICRS, model.RawCatalog.DESIGNATION, model.RawCatalog.REDSHIFT],
pgc_numbers=list(candidate_pgcs),
limit=len(candidate_pgcs),
Expand Down
4 changes: 2 additions & 2 deletions app/domain/dataapi/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def __init__(

def query(self, query: dataapi.QueryRequest) -> dataapi.QueryResponse:
filters, search_params = search_parsers.query_to_filters(query.q, search_parsers.DEFAULT_PARSERS)
objects = self.layer2_repo.query(
objects = self.layer2_repo.query_catalogs(
ENABLED_CATALOGS,
filters,
search_params,
query.page_size,
query.page,
)
responder = responders.StructuredResponder(self.catalog_cfg)
pgc_objects = responder.build_response(objects).objects
pgc_objects = responder.build_response_from_catalog(objects).objects
return dataapi.QueryResponse(objects=pgc_objects)

def query_fits(self, query: dataapi.FITSRequest) -> bytes:
Expand Down
Loading
Loading