From b25738215e13dddaa4d4d63f9f6f29a49b4ee93a Mon Sep 17 00:00:00 2001 From: Katherine Bargar Date: Tue, 10 Mar 2026 18:49:48 +0000 Subject: [PATCH 1/3] Transform all rows even with missing objects --- src/ldlite/database/_expansion/nodes.py | 1 - tests/test_expansion.py | 60 +++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/src/ldlite/database/_expansion/nodes.py b/src/ldlite/database/_expansion/nodes.py index 8d9e092..4f91027 100644 --- a/src/ldlite/database/_expansion/nodes.py +++ b/src/ldlite/database/_expansion/nodes.py @@ -277,7 +277,6 @@ def unnest( SELECT {cols} FROM ld_source -WHERE NOT ldlite_system.jis_null({json_col}) """, ) .format( diff --git a/tests/test_expansion.py b/tests/test_expansion.py index b24a844..1602b20 100644 --- a/tests/test_expansion.py +++ b/tests/test_expansion.py @@ -91,6 +91,66 @@ def case_typed_columns() -> ExpansionTC: ) +def case_jagged_json() -> ExpansionTC: + return ExpansionTC( + records=[ + b""" +{ + "id": "id1", + "both": { "id": "both_id1" }, + "first_only": { "id": "first_id1" } +} +""", + b""" +{ + "id": "id2", + "both": { "id": "both_id2" }, + "second_only": { "id": "second_id2" } +} +""", + b""" +{ + "id": "id3", + "both": { "id": "both_id3" }, + "first_only": { "id": "first_id3" } +} +""", + ], + assertions=[ + Assertion( + """ +SELECT COUNT(*) +FROM INFORMATION_SCHEMA.COLUMNS +WHERE TABLE_NAME = 'prefix__t' +""", + expect=5, + ), + Assertion( + """ +SELECT COUNT(*) +FROM tests.prefix__t +""", + expect=3, + ), + Assertion( + """ +SELECT COLUMN_NAME +FROM INFORMATION_SCHEMA.COLUMNS +WHERE TABLE_NAME = 'prefix__t' +ORDER BY COLUMN_NAME COLLATE "C"; +""", + expect=[ + ("__id",), + ("both__id",), + ("first_only__id",), + ("id",), + ("second_only__id",), + ], + ), + ], + ) + + @parametrize( "assertion", [ From 9ace59bb7755b53c968eddf0bca86d2d4d4387be Mon Sep 17 00:00:00 2001 From: Katherine Bargar Date: Tue, 10 Mar 2026 19:55:51 +0000 Subject: [PATCH 2/3] Record history stepwise in transactions --- src/ldlite/__init__.py | 33 ++----- src/ldlite/database/__init__.py | 21 +--- src/ldlite/database/_duckdb.py | 7 +- src/ldlite/database/_postgres.py | 5 + src/ldlite/database/_typed_database.py | 130 ++++++++++++++++++++----- 5 files changed, 127 insertions(+), 69 deletions(-) diff --git a/src/ldlite/__init__.py b/src/ldlite/__init__.py index 3d9d3ef..d84ccf4 100644 --- a/src/ldlite/__init__.py +++ b/src/ldlite/__init__.py @@ -35,7 +35,6 @@ """ import sys -from datetime import datetime, timezone from typing import TYPE_CHECKING, NoReturn, cast import duckdb @@ -53,7 +52,7 @@ autocommit, sqlid, ) -from .database import Database, LoadHistory +from .database import Database if TYPE_CHECKING: from collections.abc import Iterator @@ -308,7 +307,12 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 if self.db is None or self._database is None: self._check_db() return [] - start = datetime.now(timezone.utc) + + self._database.prepare_history( + table, + path, + str(query), + ) if not self._quiet: print("ldlite: querying: " + path, file=sys.stderr) @@ -328,7 +332,6 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 file=sys.stderr, ) - download_started = datetime.now(timezone.utc) processed = self._database.ingest_records( table, cast( @@ -346,10 +349,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 ), ), ) - download = datetime.now(timezone.utc) - download_elapsed = datetime.now(timezone.utc) - download_started - transform_started = datetime.now(timezone.utc) if not use_legacy_transform: no_iters_format = ( "{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]" @@ -377,7 +377,6 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 ) if keep_raw: newtables = [table, *newtables] - transform_elapsed = datetime.now(timezone.utc) - transform_started with tqdm( desc="indexing", @@ -385,7 +384,6 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 disable=self._quiet, bar_format=no_iters_format, ) as progress: - index_started = datetime.now(timezone.utc) self._database.index_prefix(table, progress) else: @@ -424,10 +422,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 finally: autocommit(self.db, self.dbtype, True) - transform_elapsed = datetime.now(timezone.utc) - transform_started - # Create indexes on id columns (for postgres) - index_started = datetime.now(timezone.utc) if self.dbtype == DBType.POSTGRES: class PbarNoop: @@ -464,20 +459,6 @@ def close(self) -> None: ... pbar.update(1) pbar.close() - index_elapsed = datetime.now(timezone.utc) - index_started - self._database.record_history( - LoadHistory( - table, - path, - query if query and isinstance(query, str) else None, - processed, - download, - start, - download_elapsed, - transform_elapsed, - index_elapsed, - ), - ) # Return table names if not self._quiet: print("ldlite: created tables: " + ", ".join(newtables), file=sys.stderr) diff --git a/src/ldlite/database/__init__.py b/src/ldlite/database/__init__.py index e1df517..074a2c5 100644 --- a/src/ldlite/database/__init__.py +++ b/src/ldlite/database/__init__.py @@ -3,31 +3,14 @@ from __future__ import annotations from abc import ABC, abstractmethod -from dataclasses import dataclass from typing import TYPE_CHECKING, NoReturn if TYPE_CHECKING: - import datetime from collections.abc import Iterator from tqdm import tqdm -@dataclass(frozen=True) -class LoadHistory: - """Represents the statistics and history of a single ldlite operation.""" - - table_name: str - path: str - query: str | None - total: int - download_time: datetime.datetime - start_time: datetime.datetime - download_interval: datetime.timedelta - transform_interval: datetime.timedelta - index_interval: datetime.timedelta - - class Database(ABC): """The required interface for LDLite to utilite a database.""" @@ -69,5 +52,5 @@ def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> N """Finds and indexes all tables at the given prefix.""" @abstractmethod - def record_history(self, history: LoadHistory) -> None: - """Records the statistics and history of a single ldlite operation.""" + def prepare_history(self, prefix: str, path: str, query: str) -> None: + """Creates an entry with the current parameters in the history table.""" diff --git a/src/ldlite/database/_duckdb.py b/src/ldlite/database/_duckdb.py index 241b85b..7ab8cc8 100644 --- a/src/ldlite/database/_duckdb.py +++ b/src/ldlite/database/_duckdb.py @@ -1,4 +1,5 @@ from collections.abc import Iterator +from datetime import datetime, timezone from itertools import count from typing import TYPE_CHECKING, Any, cast @@ -89,6 +90,7 @@ def ingest_records( records: Iterator[bytes], ) -> int: pfx = Prefix(prefix) + download_started = datetime.now(timezone.utc) pkey = count(1) with self._conn_factory() as conn: self._prepare_raw_table(conn, pfx) @@ -102,9 +104,12 @@ def ingest_records( with conn.begin() as tx, tx.cursor() as cur: for r in records: cur.execute(insert_sql, (next(pkey), r.decode())) + + total = next(pkey) - 1 + self._download_complete(conn, pfx, total, download_started) tx.commit() - return next(pkey) - 1 + return total def source_table_cte_stmt(self, keep_source: bool) -> str: # noqa: ARG002 return "WITH ld_source AS (SELECT * FROM {source_table})" diff --git a/src/ldlite/database/_postgres.py b/src/ldlite/database/_postgres.py index 3084784..9c7dd2a 100644 --- a/src/ldlite/database/_postgres.py +++ b/src/ldlite/database/_postgres.py @@ -1,4 +1,5 @@ from collections.abc import Iterator +from datetime import datetime, timezone from itertools import count import psycopg @@ -120,6 +121,7 @@ def ingest_records( records: Iterator[bytes], ) -> int: pfx = Prefix(prefix) + download_started = datetime.now(timezone.utc) pkey = count(1) with self._conn_factory() as conn: self._prepare_raw_table(conn, pfx) @@ -141,7 +143,10 @@ def ingest_records( rb.extend(r) copy.write_row((next(pkey).to_bytes(4, "big"), rb)) + total = next(pkey) - 1 + self._download_complete(conn, pfx, total, download_started) conn.commit() + return next(pkey) - 1 def preprocess_source_table( diff --git a/src/ldlite/database/_typed_database.py b/src/ldlite/database/_typed_database.py index 1303931..d4a00c8 100644 --- a/src/ldlite/database/_typed_database.py +++ b/src/ldlite/database/_typed_database.py @@ -3,7 +3,7 @@ from abc import abstractmethod from contextlib import closing -from datetime import timezone +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Generic, NoReturn, TypeVar, cast from uuid import uuid4 @@ -11,7 +11,7 @@ from psycopg import sql from tqdm import tqdm -from . import Database, LoadHistory +from . import Database from ._expansion import expand_nonmarc from ._expansion.context import ExpandContext from ._prefix import Prefix @@ -37,9 +37,14 @@ def __init__(self, conn_factory: Callable[[], DB]): "table_name" TEXT UNIQUE ,"path" TEXT ,"query" TEXT - ,"row_count" INTEGER - ,"download_complete_utc" TIMESTAMP - ,"start_utc" TIMESTAMP + + ,"rowcount" INTEGER + ,"download_complete" TIMESTAMPTZ + + ,"final_rowcount" INTEGER + ,"transform_complete" TIMESTAMPTZ + ,"data_refreshed" TIMESTAMPTZ + ,"download_time" INTERVAL ,"transform_time" INTERVAL ,"index_time" INTERVAL @@ -195,6 +200,7 @@ def expand_prefix( transform_progress: tqdm[NoReturn] | None = None, ) -> list[str]: pfx = Prefix(prefix) + transform_started = datetime.now(timezone.utc) with closing(self._conn_factory()) as conn: self._drop_extracted_tables(conn, pfx) if json_depth < 1: @@ -262,12 +268,22 @@ def expand_prefix( [(pfx.catalog_table_row(t),) for t in created_tables], ) + with conn.cursor() as cur: + cur.execute( + sql.SQL("SELECT COUNT(*) FROM {table}") + .format(table=pfx.output_table("").id) + .as_string(), + ) + total = cast("tuple[int]", cur.fetchone())[0] + + self._transform_complete(conn, pfx, total, transform_started) conn.commit() return created_tables def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> None: pfx = Prefix(prefix) + index_started = datetime.now(timezone.utc) with closing(self._conn_factory()) as conn: with closing(conn.cursor()) as cur: cur.execute( @@ -321,33 +337,101 @@ def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> N if progress is not None: progress.update(1) + self._index_complete(conn, pfx, index_started) conn.commit() - def record_history(self, history: LoadHistory) -> None: - with closing(self._conn_factory()) as conn, conn.cursor() as cur: + def prepare_history( + self, + prefix: str, + path: str, + query: str, + ) -> None: + with closing(self._conn_factory()) as conn, closing(conn.cursor()) as cur: cur.execute( """ -INSERT INTO "ldlite_system"."load_history_v1" VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9) +INSERT INTO "ldlite_system"."load_history_v1" +( + "table_name" + ,"path" + ,"query" +) +VALUES($1,$2,$3) ON CONFLICT ("table_name") DO UPDATE SET "path" = EXCLUDED."path" ,"query" = EXCLUDED."query" - ,"row_count" = EXCLUDED."row_count" - ,"download_complete_utc" = EXCLUDED."download_complete_utc" - ,"start_utc" = EXCLUDED."start_utc" - ,"download_time" = EXCLUDED."download_time" - ,"transform_time" = EXCLUDED."transform_time" - ,"index_time" = EXCLUDED."index_time" """, ( - Prefix(history.table_name).load_history_key, - history.path, - history.query, - history.total, - history.download_time.astimezone(timezone.utc), - history.start_time.astimezone(timezone.utc), - history.download_interval, - history.transform_interval, - history.index_interval, + Prefix(prefix).load_history_key, + path, + query, ), ) conn.commit() + + def _download_complete( + self, + conn: DB, + pfx: Prefix, + rowcount: int, + download_start: datetime, + ) -> None: + with conn.cursor() as cur: + cur.execute( + """ +UPDATE "ldlite_system"."load_history_v1" SET + "rowcount" = $2 + ,"download_complete" = $3 + ,"download_time" = $4 +WHERE "table_name" = $1; +""", + ( + pfx.load_history_key, + rowcount, + datetime.now(timezone.utc), + download_start - datetime.now(timezone.utc), + ), + ) + + def _transform_complete( + self, + conn: DB, + pfx: Prefix, + final_rowcount: int, + transform_start: datetime, + ) -> None: + with conn.cursor() as cur: + cur.execute( + """ +UPDATE "ldlite_system"."load_history_v1" SET + "final_rowcount" = $2 + ,"transform_complete" = $3 + ,"data_refreshed" = "download_complete" + ,"transform_time" = $4 +WHERE "table_name" = $1 +""", + ( + pfx.load_history_key, + final_rowcount, + datetime.now(timezone.utc), + datetime.now(timezone.utc) - transform_start, + ), + ) + + def _index_complete( + self, + conn: DB, + pfx: Prefix, + index_start: datetime, + ) -> None: + with conn.cursor() as cur: + cur.execute( + """ +UPDATE "ldlite_system"."load_history_v1" SET + "index_time" = $2 +WHERE "table_name" = $1 +""", + ( + pfx.load_history_key, + datetime.now(timezone.utc) - index_start, + ), + ) From 2b36fb1dee3c1740f6e9587bb89c17dd6bff18ed Mon Sep 17 00:00:00 2001 From: Katherine Bargar Date: Tue, 10 Mar 2026 20:02:53 +0000 Subject: [PATCH 3/3] Update tests to new table structure --- src/ldlite/__init__.py | 2 +- src/ldlite/database/__init__.py | 2 +- src/ldlite/database/_typed_database.py | 4 ++-- tests/test_load_history.py | 14 +++++++++----- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/ldlite/__init__.py b/src/ldlite/__init__.py index d84ccf4..e6cd1f8 100644 --- a/src/ldlite/__init__.py +++ b/src/ldlite/__init__.py @@ -311,7 +311,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 self._database.prepare_history( table, path, - str(query), + query if query and isinstance(query, str) else None, ) if not self._quiet: print("ldlite: querying: " + path, file=sys.stderr) diff --git a/src/ldlite/database/__init__.py b/src/ldlite/database/__init__.py index 074a2c5..b7fb915 100644 --- a/src/ldlite/database/__init__.py +++ b/src/ldlite/database/__init__.py @@ -52,5 +52,5 @@ def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> N """Finds and indexes all tables at the given prefix.""" @abstractmethod - def prepare_history(self, prefix: str, path: str, query: str) -> None: + def prepare_history(self, prefix: str, path: str, query: str | None) -> None: """Creates an entry with the current parameters in the history table.""" diff --git a/src/ldlite/database/_typed_database.py b/src/ldlite/database/_typed_database.py index d4a00c8..9a93899 100644 --- a/src/ldlite/database/_typed_database.py +++ b/src/ldlite/database/_typed_database.py @@ -344,7 +344,7 @@ def prepare_history( self, prefix: str, path: str, - query: str, + query: str | None, ) -> None: with closing(self._conn_factory()) as conn, closing(conn.cursor()) as cur: cur.execute( @@ -388,7 +388,7 @@ def _download_complete( pfx.load_history_key, rowcount, datetime.now(timezone.utc), - download_start - datetime.now(timezone.utc), + datetime.now(timezone.utc) - download_start, ), ) diff --git a/tests/test_load_history.py b/tests/test_load_history.py index d26120e..8452898 100644 --- a/tests/test_load_history.py +++ b/tests/test_load_history.py @@ -140,13 +140,17 @@ def _assert( assert d[1] == "/patched" assert d[2] == q assert d[3] == t - assert d[4] > d[5] - assert d[6] > timedelta(microseconds=0) - assert d[6] < timedelta(seconds=1) - assert d[7] > timedelta(microseconds=0) - assert d[7] < timedelta(seconds=1) + + assert d[6] > d[4] + assert d[7] == d[4] + assert d[5] == t + assert d[8] > timedelta(microseconds=0) assert d[8] < timedelta(seconds=1) + assert d[9] > timedelta(microseconds=0) + assert d[9] < timedelta(seconds=1) + assert d[10] > timedelta(microseconds=0) + assert d[10] < timedelta(seconds=1) @mock.patch("httpx_folio.auth.httpx.post")