Skip to content

Commit 3603161

Browse files
authored
#391: add crossmatch information to /records handler (#407)
* #391: add crossmatch infromation to /records handler * add candidates to the output * remove indexes and add table id filter
1 parent defdd17 commit 3603161

6 files changed

Lines changed: 152 additions & 20 deletions

File tree

app/data/model/table.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ class TableRecord:
1919
id: str
2020
original_data: dict[str, Any]
2121
pgc: int | None
22+
triage_status: str
23+
crossmatch_candidates: list[int]
2224

2325

2426
@dataclass

app/data/repositories/layer0/repository.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,11 @@ def fetch_records(
6565
order_direction: str = "asc",
6666
has_pgc: bool | None = None,
6767
pgc_value: int | None = None,
68+
triage_status: str | None = None,
6869
) -> list[model.TableRecord]:
69-
return self.table_repo.fetch_records(table_name, limit, row_offset, order_direction, has_pgc, pgc_value)
70+
return self.table_repo.fetch_records(
71+
table_name, limit, row_offset, order_direction, has_pgc, pgc_value, triage_status
72+
)
7073

7174
def fetch_metadata(self, table_name: str) -> model.Layer0TableMeta:
7275
return self.table_repo.fetch_metadata(table_name)

app/data/repositories/layer0/tables.py

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@ def _row_to_serializable_dict(row: Any, drop: list[str]) -> dict[str, Any]:
3333
return out
3434

3535

36+
def _crossmatch_metadata_to_candidates(metadata: dict[str, Any] | None) -> list[int]:
37+
if metadata is None:
38+
return []
39+
candidates: list[int] = []
40+
if "pgc" in metadata and metadata["pgc"] is not None:
41+
candidates.append(int(metadata["pgc"]))
42+
if "possible_matches" in metadata and metadata["possible_matches"] is not None:
43+
candidates.extend(int(p) for p in metadata["possible_matches"])
44+
return candidates
45+
46+
3647
@dataclass
3748
class QuantityMock:
3849
values: pandas.Series
@@ -289,6 +300,7 @@ def fetch_records(
289300
order_direction: str = "asc",
290301
has_pgc: bool | None = None,
291302
pgc_value: int | None = None,
303+
triage_status: str | None = None,
292304
) -> list[model.TableRecord]:
293305
where_parts: list[str] = []
294306
if has_pgc is True:
@@ -301,39 +313,91 @@ def fetch_records(
301313
params: list[Any] = []
302314
if pgc_value is not None:
303315
params.append(pgc_value)
304-
params.append(limit)
305-
params.append(row_offset)
306316

307317
id_col = sql.Identifier(INTERNAL_ID_COLUMN_NAME)
308-
parts: list[sql.Composable] = [
309-
sql.SQL("SELECT r.*, o.pgc FROM {}.{} AS r JOIN layer0.records AS o ON r.{} = o.id").format(
310-
sql.Identifier(RAWDATA_SCHEMA),
311-
sql.Identifier(table_name),
312-
id_col,
313-
),
314-
]
318+
direction = sql.SQL(order_direction if order_direction in ("asc", "desc") else "asc")
319+
320+
if triage_status == "unprocessed":
321+
where_parts.append("NOT EXISTS (SELECT 1 FROM layer0.crossmatch c WHERE c.record_id = o.id)")
322+
params.append(limit)
323+
params.append(row_offset)
324+
parts: list[sql.Composable] = [
325+
sql.SQL(
326+
"SELECT r.*, o.pgc "
327+
"FROM {}.{} AS r "
328+
"JOIN layer0.records AS o ON r.{} = o.id "
329+
"AND o.table_id = (SELECT id FROM layer0.tables WHERE table_name = %s)"
330+
).format(
331+
sql.Identifier(RAWDATA_SCHEMA),
332+
sql.Identifier(table_name),
333+
id_col,
334+
),
335+
]
336+
params.insert(0, table_name)
337+
elif triage_status in ("pending", "resolved"):
338+
where_parts.append("c.triage_status = %s")
339+
params.append(triage_status)
340+
params.append(limit)
341+
params.append(row_offset)
342+
parts = [
343+
sql.SQL(
344+
"SELECT r.*, o.pgc, c.triage_status, c.metadata AS crossmatch_metadata "
345+
"FROM {}.{} AS r "
346+
"JOIN layer0.records AS o ON r.{} = o.id "
347+
"AND o.table_id = (SELECT id FROM layer0.tables WHERE table_name = %s) "
348+
"JOIN layer0.crossmatch AS c ON o.id = c.record_id"
349+
).format(
350+
sql.Identifier(RAWDATA_SCHEMA),
351+
sql.Identifier(table_name),
352+
id_col,
353+
),
354+
]
355+
params.insert(0, table_name)
356+
else:
357+
params.append(limit)
358+
params.append(row_offset)
359+
parts = [
360+
sql.SQL(
361+
"SELECT r.*, o.pgc, c.triage_status, c.metadata AS crossmatch_metadata "
362+
"FROM {}.{} AS r "
363+
"JOIN layer0.records AS o ON r.{} = o.id "
364+
"AND o.table_id = (SELECT id FROM layer0.tables WHERE table_name = %s) "
365+
"LEFT JOIN layer0.crossmatch AS c ON o.id = c.record_id"
366+
).format(
367+
sql.Identifier(RAWDATA_SCHEMA),
368+
sql.Identifier(table_name),
369+
id_col,
370+
),
371+
]
372+
params.insert(0, table_name)
373+
315374
if where_parts:
316375
parts.append(sql.SQL(" WHERE "))
317376
parts.append(sql.SQL(" AND ").join([sql.SQL(w) for w in where_parts]))
318377
parts.append(sql.SQL(" ORDER BY r.{} ").format(id_col))
319-
parts.append(sql.SQL(order_direction if order_direction in ("asc", "desc") else "asc"))
378+
parts.append(direction)
320379
parts.append(sql.SQL(" LIMIT %s OFFSET %s"))
321380

322381
rows = self._storage.query(sql.Composed(parts), params=params)
323382
id_col_name = INTERNAL_ID_COLUMN_NAME
324-
drop_labels = [id_col_name, "pgc"]
383+
drop_labels = [id_col_name, "pgc", "triage_status", "crossmatch_metadata"]
325384
result: list[model.TableRecord] = []
326385
for row in rows:
327386
record_id = str(row[id_col_name])
328387
original_data = _row_to_serializable_dict(row, drop=drop_labels)
329388
pgc_val = row.get("pgc")
330389
if pgc_val is not None and (pandas.isna(pgc_val) or (isinstance(pgc_val, float) and np.isnan(pgc_val))):
331390
pgc_val = None
391+
raw_triage = row.get("triage_status")
392+
triage_val = raw_triage if raw_triage is not None else "unprocessed"
393+
candidates = _crossmatch_metadata_to_candidates(row.get("crossmatch_metadata"))
332394
result.append(
333395
model.TableRecord(
334396
id=record_id,
335397
original_data=original_data,
336398
pgc=int(pgc_val) if pgc_val is not None else None,
399+
triage_status=triage_val,
400+
crossmatch_candidates=candidates,
337401
)
338402
)
339403
return result

app/domain/adminapi/table_upload.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ def get_records(self, r: adminapi.GetRecordsRequest) -> adminapi.GetRecordsRespo
216216
elif r.upload_status == adminapi.UploadStatus.PENDING:
217217
has_pgc = False
218218

219+
triage_filter = r.triage_status.value if r.triage_status is not None else None
219220
errgr = concurrency.ErrorGroup()
220221
records_task = errgr.run(
221222
self.layer0_repo.fetch_records,
@@ -225,6 +226,7 @@ def get_records(self, r: adminapi.GetRecordsRequest) -> adminapi.GetRecordsRespo
225226
order_direction="asc",
226227
has_pgc=has_pgc,
227228
pgc_value=r.pgc,
229+
triage_status=triage_filter,
228230
)
229231
schema_task = errgr.run(
230232
self.common_repo.get_schema,
@@ -241,6 +243,10 @@ def get_records(self, r: adminapi.GetRecordsRequest) -> adminapi.GetRecordsRespo
241243
id=rec.id,
242244
original_data=rec.original_data,
243245
pgc=rec.pgc,
246+
crossmatch=adminapi.RecordCrossmatchInfo(
247+
triage_status=adminapi.CrossmatchTriageStatus(rec.triage_status),
248+
candidates=[adminapi.RecordCrossmatchCandidate(pgc=p) for p in rec.crossmatch_candidates],
249+
),
244250
)
245251
for rec in raw_records
246252
]

app/presentation/adminapi/interface.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,24 +212,48 @@ class UploadStatus(enum.Enum):
212212
PENDING = "pending"
213213

214214

215+
class CrossmatchTriageStatus(enum.Enum):
216+
UNPROCESSED = "unprocessed"
217+
PENDING = "pending"
218+
RESOLVED = "resolved"
219+
220+
215221
class GetRecordsRequest(pydantic.BaseModel):
216222
table_name: str
217223
page: int = 0
218224
page_size: int = 25
219-
upload_status: UploadStatus | None = None
220225
pgc: int | None = None
226+
upload_status: UploadStatus | None = None
227+
triage_status: CrossmatchTriageStatus | None = None
221228

222229
@pydantic.model_validator(mode="after")
223-
def check_pending_and_pgc_exclusive(self) -> "GetRecordsRequest":
224-
if self.upload_status == UploadStatus.PENDING and self.pgc is not None:
225-
raise ValueError("upload_status pending and pgc filter cannot be specified at the same time")
230+
def check_exclusive_pgc_filter(self) -> "GetRecordsRequest":
231+
if self.pgc is not None:
232+
if any([self.upload_status is not None, self.triage_status is not None]):
233+
raise ValueError("When pgc filter is specified, no other filters are allowed.")
226234
return self
227235

236+
@pydantic.model_validator(mode="after")
237+
def check_upload_status_and_triage_status(self) -> "GetRecordsRequest":
238+
if self.upload_status == UploadStatus.UPLOADED and self.triage_status is not None:
239+
raise ValueError("When upload_status is UPLOADED, triage_status is not allowed.")
240+
return self
241+
242+
243+
class RecordCrossmatchCandidate(pydantic.BaseModel):
244+
pgc: int
245+
246+
247+
class RecordCrossmatchInfo(pydantic.BaseModel):
248+
triage_status: CrossmatchTriageStatus
249+
candidates: list[RecordCrossmatchCandidate]
250+
228251

229252
class Record(pydantic.BaseModel):
230253
id: str
231254
original_data: dict[str, Any]
232255
pgc: int | None
256+
crossmatch: RecordCrossmatchInfo
233257

234258

235259
class DescriptionSchema(pydantic.BaseModel):

tests/unit/domain/table_upload_test.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,16 @@ def setUp(self) -> None:
266266

267267
def test_get_records_returns_records_with_pgc(self) -> None:
268268
self.manager.layer0_repo.fetch_records.return_value = [
269-
model.TableRecord(id="rec1", original_data={"name": "A"}, pgc=1001),
270-
model.TableRecord(id="rec2", original_data={"name": "B"}, pgc=1002),
269+
model.TableRecord(
270+
id="rec1", original_data={"name": "A"}, pgc=1001, triage_status="resolved", crossmatch_candidates=[1001]
271+
),
272+
model.TableRecord(
273+
id="rec2",
274+
original_data={"name": "B"},
275+
pgc=1002,
276+
triage_status="pending",
277+
crossmatch_candidates=[],
278+
),
271279
]
272280

273281
request = presentation.GetRecordsRequest(table_name="t", page=0, page_size=25)
@@ -277,9 +285,16 @@ def test_get_records_returns_records_with_pgc(self) -> None:
277285
self.assertEqual(response.records[0].id, "rec1")
278286
self.assertEqual(response.records[0].original_data, {"name": "A"})
279287
self.assertEqual(response.records[0].pgc, 1001)
288+
self.assertEqual(response.records[0].crossmatch.triage_status, presentation.CrossmatchTriageStatus.RESOLVED)
289+
self.assertEqual(
290+
response.records[0].crossmatch.candidates,
291+
[presentation.RecordCrossmatchCandidate(pgc=1001)],
292+
)
280293
self.assertEqual(response.records[1].id, "rec2")
281294
self.assertEqual(response.records[1].original_data, {"name": "B"})
282295
self.assertEqual(response.records[1].pgc, 1002)
296+
self.assertEqual(response.records[1].crossmatch.triage_status, presentation.CrossmatchTriageStatus.PENDING)
297+
self.assertEqual(response.records[1].crossmatch.candidates, [])
283298

284299
def test_get_records_passes_filters_to_fetch_records(self) -> None:
285300
self.manager.layer0_repo.fetch_records.return_value = []
@@ -302,6 +317,12 @@ def test_get_records_passes_filters_to_fetch_records(self) -> None:
302317
self.assertIsNone(call_kw["has_pgc"])
303318
self.assertEqual(call_kw["pgc_value"], 42)
304319

320+
self.manager.get_records(
321+
presentation.GetRecordsRequest(table_name="t", triage_status=presentation.CrossmatchTriageStatus.PENDING)
322+
)
323+
call_kw = self.manager.layer0_repo.fetch_records.call_args[1]
324+
self.assertEqual(call_kw["triage_status"], "pending")
325+
305326
def test_get_records_pagination(self) -> None:
306327
self.manager.layer0_repo.fetch_records.return_value = []
307328

@@ -312,8 +333,20 @@ def test_get_records_pagination(self) -> None:
312333

313334
def test_get_records_pgc_none_when_missing_or_nan(self) -> None:
314335
self.manager.layer0_repo.fetch_records.return_value = [
315-
model.TableRecord(id="rec1", original_data={"name": "A"}, pgc=1001),
316-
model.TableRecord(id="rec2", original_data={"name": "B"}, pgc=None),
336+
model.TableRecord(
337+
id="rec1",
338+
original_data={"name": "A"},
339+
pgc=1001,
340+
triage_status="resolved",
341+
crossmatch_candidates=[],
342+
),
343+
model.TableRecord(
344+
id="rec2",
345+
original_data={"name": "B"},
346+
pgc=None,
347+
triage_status="unprocessed",
348+
crossmatch_candidates=[],
349+
),
317350
]
318351

319352
response = self.manager.get_records(presentation.GetRecordsRequest(table_name="t", page=0, page_size=25))

0 commit comments

Comments
 (0)