Skip to content

Commit 0fbd1e7

Browse files
authored
#29: make other layer2 uploading tasks similar to nature (#398)
* #29: add icrs layer2 uploading similar to nature catalog * add redshift task * move deisgnation to a new model * remove old layer 2 import task
1 parent 9ed6b51 commit 0fbd1e7

17 files changed

Lines changed: 570 additions & 188 deletions

app/data/model/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
CIResultObjectCollision,
1616
CIResultObjectExisting,
1717
CIResultObjectNew,
18+
DesignationRecord,
19+
ICRSRecord,
1820
NatureRecord,
1921
Record,
2022
RecordCrossmatch,
2123
RecordWithPGC,
24+
RedshiftRecord,
2225
)
2326
from app.data.model.redshift import RedshiftCatalogObject
2427
from app.data.model.table import (
@@ -46,7 +49,10 @@
4649
"CIResultObjectCollision",
4750
"CIResultObjectExisting",
4851
"CIResultObjectNew",
52+
"DesignationRecord",
53+
"ICRSRecord",
4954
"NatureRecord",
55+
"RedshiftRecord",
5056
"Record",
5157
"RecordWithPGC",
5258
"Layer2CatalogObject",

app/data/model/interface.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class RawCatalog(enum.Enum):
3030
aggregated data on layer 2.
3131
"""
3232

33-
ALL = "all"
3433
ICRS = "icrs"
3534
DESIGNATION = "designation"
3635
REDSHIFT = "redshift"

app/data/model/records.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,28 @@ class NatureRecord:
5353
pgc: int
5454
record_id: str
5555
type_name: str
56+
57+
58+
@dataclass
59+
class ICRSRecord:
60+
pgc: int
61+
record_id: str
62+
ra: float
63+
e_ra: float
64+
dec: float
65+
e_dec: float
66+
67+
68+
@dataclass
69+
class RedshiftRecord:
70+
pgc: int
71+
record_id: str
72+
cz: float
73+
e_cz: float
74+
75+
76+
@dataclass
77+
class DesignationRecord:
78+
pgc: int
79+
record_id: str
80+
design: str

app/data/repositories/layer1.py

Lines changed: 68 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -76,70 +76,96 @@ def save_data(self, records: list[model.Record]) -> None:
7676
object_count=len(table_items),
7777
)
7878

79-
def get_new_observations(
80-
self, dt: datetime.datetime, limit: int, offset: int, catalog: model.RawCatalog
81-
) -> list[model.RecordWithPGC]:
82-
"""
83-
Returns all objects that were modified since `dt`.
84-
`limit` is the number of PGC numbers to select, not the final number of objects.
85-
As such, this function will return around
86-
`limit * (average_number_of_observations_per_PGC)` objects, not `limit`.
87-
88-
`offset` is the first PGC number from which to start selecting.
89-
90-
This makes the function safe for aggregation - for each returned PGC all of its objects will be returned.
91-
"""
92-
object_cls = model.get_catalog_object_type(catalog)
93-
94-
query = f"""SELECT *
95-
FROM {object_cls.layer1_table()} AS l1
79+
def get_new_nature_records(self, dt: datetime.datetime, limit: int, offset: int) -> list[model.NatureRecord]:
80+
query = """SELECT o.pgc, l1.record_id, l1.type_name
81+
FROM nature.data AS l1
9682
JOIN layer0.records AS o ON l1.record_id = o.id
9783
WHERE o.pgc IN (
9884
SELECT DISTINCT o.pgc
99-
FROM {object_cls.layer1_table()} AS l1
85+
FROM nature.data AS l1
10086
JOIN layer0.records AS o ON l1.record_id = o.id
10187
WHERE o.modification_time > %s AND o.pgc > %s
10288
ORDER BY o.pgc
10389
LIMIT %s
10490
)
10591
ORDER BY o.pgc ASC"""
106-
10792
rows = self._storage.query(query, params=[dt, offset, limit])
93+
return [model.NatureRecord(pgc=int(r["pgc"]), record_id=r["record_id"], type_name=r["type_name"]) for r in rows]
10894

109-
record_data: dict[tuple[int, str], list[model.CatalogObject]] = {}
110-
111-
for row in rows:
112-
record_id = row.pop("record_id")
113-
pgc = int(row.pop("pgc"))
114-
catalog_object = object_cls.from_layer1(row)
115-
116-
key = (pgc, record_id)
117-
if key not in record_data:
118-
record_data[key] = []
119-
record_data[key].append(catalog_object)
120-
121-
records: list[model.RecordWithPGC] = []
122-
for (pgc, record_id), catalog_objects in record_data.items():
123-
record_info = model.Record(id=record_id, data=catalog_objects)
124-
records.append(model.RecordWithPGC(pgc, record_info))
125-
126-
return records
95+
def get_new_icrs_records(self, dt: datetime.datetime, limit: int, offset: int) -> list[model.ICRSRecord]:
96+
query = """SELECT o.pgc, l1.record_id, l1.ra, l1.e_ra, l1.dec, l1.e_dec
97+
FROM icrs.data AS l1
98+
JOIN layer0.records AS o ON l1.record_id = o.id
99+
WHERE o.pgc IN (
100+
SELECT DISTINCT o.pgc
101+
FROM icrs.data AS l1
102+
JOIN layer0.records AS o ON l1.record_id = o.id
103+
WHERE o.modification_time > %s AND o.pgc > %s
104+
ORDER BY o.pgc
105+
LIMIT %s
106+
)
107+
ORDER BY o.pgc ASC"""
108+
rows = self._storage.query(query, params=[dt, offset, limit])
109+
return [
110+
model.ICRSRecord(
111+
pgc=int(r["pgc"]),
112+
record_id=r["record_id"],
113+
ra=float(r["ra"]),
114+
e_ra=float(r["e_ra"]),
115+
dec=float(r["dec"]),
116+
e_dec=float(r["e_dec"]),
117+
)
118+
for r in rows
119+
]
127120

128-
def get_new_nature_records(self, dt: datetime.datetime, limit: int, offset: int) -> list[model.NatureRecord]:
129-
query = """SELECT o.pgc, l1.record_id, l1.type_name
130-
FROM nature.data AS l1
121+
def get_new_redshift_records(self, dt: datetime.datetime, limit: int, offset: int) -> list[model.RedshiftRecord]:
122+
query = """SELECT o.pgc, l1.record_id, l1.cz, l1.e_cz
123+
FROM cz.data AS l1
131124
JOIN layer0.records AS o ON l1.record_id = o.id
132125
WHERE o.pgc IN (
133126
SELECT DISTINCT o.pgc
134-
FROM nature.data AS l1
127+
FROM cz.data AS l1
135128
JOIN layer0.records AS o ON l1.record_id = o.id
136129
WHERE o.modification_time > %s AND o.pgc > %s
137130
ORDER BY o.pgc
138131
LIMIT %s
139132
)
140133
ORDER BY o.pgc ASC"""
141134
rows = self._storage.query(query, params=[dt, offset, limit])
142-
return [model.NatureRecord(pgc=int(r["pgc"]), record_id=r["record_id"], type_name=r["type_name"]) for r in rows]
135+
return [
136+
model.RedshiftRecord(
137+
pgc=int(r["pgc"]),
138+
record_id=r["record_id"],
139+
cz=float(r["cz"]),
140+
e_cz=float(r["e_cz"]),
141+
)
142+
for r in rows
143+
]
144+
145+
def get_new_designation_records(
146+
self, dt: datetime.datetime, limit: int, offset: int
147+
) -> list[model.DesignationRecord]:
148+
query = """SELECT o.pgc, l1.record_id, l1.design
149+
FROM designation.data AS l1
150+
JOIN layer0.records AS o ON l1.record_id = o.id
151+
WHERE o.pgc IN (
152+
SELECT DISTINCT o.pgc
153+
FROM designation.data AS l1
154+
JOIN layer0.records AS o ON l1.record_id = o.id
155+
WHERE o.modification_time > %s AND o.pgc > %s
156+
ORDER BY o.pgc
157+
LIMIT %s
158+
)
159+
ORDER BY o.pgc ASC"""
160+
rows = self._storage.query(query, params=[dt, offset, limit])
161+
return [
162+
model.DesignationRecord(
163+
pgc=int(r["pgc"]),
164+
record_id=r["record_id"],
165+
design=r["design"],
166+
)
167+
for r in rows
168+
]
143169

144170
def query_records(
145171
self,

app/data/repositories/layer2/repository.py

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ def update_last_update_time(self, dt: datetime.datetime, catalog: model.RawCatal
3535
params=[dt, catalog.value],
3636
)
3737

38+
def get_column_units(self, schema: str, table: str) -> dict[str, str]:
39+
rows = self._storage.query(
40+
"SELECT column_name, param->>'unit' as unit FROM meta.column_info "
41+
"WHERE schema_name = %s AND table_name = %s AND param->>'unit' IS NOT NULL",
42+
params=[schema, table],
43+
)
44+
return {row["column_name"]: row["unit"] for row in rows}
45+
3846
def get_orphaned_pgcs(self, catalogs: list[model.RawCatalog]) -> dict[str, list[int]]:
3947
result: dict[str, list[int]] = {}
4048
for catalog in catalogs:
@@ -64,40 +72,6 @@ def remove_pgcs(self, catalogs: list[model.RawCatalog], pgcs: list[int]) -> None
6472
query = f"DELETE FROM {layer2_table} WHERE pgc = ANY(%s)"
6573
self._storage.exec(query, params=[pgcs])
6674

67-
def save_data(self, objects: list[model.Layer2CatalogObject]):
68-
objects_by_table: dict[str, list[model.Layer2CatalogObject]] = {}
69-
for obj in objects:
70-
table = obj.catalog_object.layer2_table()
71-
if table not in objects_by_table:
72-
objects_by_table[table] = []
73-
objects_by_table[table].append(obj)
74-
75-
for table, table_objects in objects_by_table.items():
76-
if len(table_objects) == 0:
77-
continue
78-
79-
columns = table_objects[0].catalog_object.layer2_keys() + ["pgc"]
80-
81-
params = []
82-
for obj in table_objects:
83-
data = obj.catalog_object.layer2_data()
84-
data["pgc"] = obj.pgc
85-
86-
params.extend([data.get(column, None) for column in columns])
87-
88-
placeholders = f"({','.join(['%s'] * len(columns))})"
89-
90-
value_groups = ",".join([placeholders] * len(table_objects))
91-
on_conflict_statement = ", ".join([f"{column} = EXCLUDED.{column}" for column in columns])
92-
93-
query = f"""
94-
INSERT INTO {table} ({", ".join(columns)})
95-
VALUES {value_groups}
96-
ON CONFLICT (pgc) DO UPDATE SET {on_conflict_statement}
97-
"""
98-
99-
self._storage.exec(query, params=params)
100-
10175
def save(self, table: str, columns: list[str], pgcs: list[int], data: list[list[Any]]) -> None:
10276
if not pgcs:
10377
return

app/lib/logging/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from app.lib.logging.table import print_table
2+
3+
__all__ = ["print_table"]

app/lib/logging/table.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from collections.abc import Sequence
2+
3+
4+
def print_table(
5+
headers: Sequence[str],
6+
rows: Sequence[Sequence[str | int]],
7+
sections: Sequence[tuple[str, Sequence[Sequence[str | int]]]] | None = None,
8+
min_column_widths: Sequence[int] | None = None,
9+
) -> None:
10+
ncols = len(headers)
11+
min_widths = list(min_column_widths) if min_column_widths else [0] * ncols
12+
while len(min_widths) < ncols:
13+
min_widths.append(0)
14+
15+
def cell_str(c: str | int) -> str:
16+
return str(c)
17+
18+
def col_width(col_index: int, extra_rows: Sequence[Sequence[str | int]]) -> int:
19+
candidates = [len(headers[col_index]), min_widths[col_index]]
20+
for row in rows:
21+
candidates.append(len(cell_str(row[col_index])))
22+
if sections:
23+
for title, section_rows in sections:
24+
if col_index == 0:
25+
candidates.append(len(title))
26+
for row in section_rows:
27+
candidates.append(len(cell_str(row[col_index])))
28+
for row in extra_rows:
29+
candidates.append(len(cell_str(row[col_index])))
30+
return max(candidates)
31+
32+
widths = [col_width(i, ()) for i in range(ncols)]
33+
alignments = ["<"] + [">"] * (ncols - 1)
34+
35+
def sep_line() -> str:
36+
return "+" + "+".join("-" * (w + 2) for w in widths) + "+"
37+
38+
def data_line(cells: Sequence[str | int], align: Sequence[str] | None = None) -> str:
39+
al = align if align else alignments
40+
return "| " + " | ".join(f"{cell_str(c):{al[i]}{widths[i]}}" for i, c in enumerate(cells)) + " |"
41+
42+
lines = [sep_line(), data_line(headers), sep_line()]
43+
for row in rows:
44+
lines.append(data_line(row))
45+
lines.append(sep_line())
46+
47+
if sections:
48+
for title, section_rows in sections:
49+
lines.append(data_line([title] + [""] * (ncols - 1)))
50+
for row in section_rows:
51+
lines.append(data_line(row))
52+
lines.append(sep_line())
53+
54+
for line in lines:
55+
print(line)

0 commit comments

Comments
 (0)