diff --git a/src/ldlite/__init__.py b/src/ldlite/__init__.py index 42d3cf0..1cf80ac 100644 --- a/src/ldlite/__init__.py +++ b/src/ldlite/__init__.py @@ -250,6 +250,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 limit: int | None = None, transform: bool | None = None, keep_raw: bool = True, + use_legacy_transform: bool = False, ) -> list[str]: """Submits a query to a FOLIO module, and transforms and stores the result. @@ -279,6 +280,9 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 If *keep_raw* is set to False, then the raw table of __id, json will be dropped saving an estimated 20% disk space. + *use_legacy_transform* will use the pre 4.0 transformation logic. + This parameter is deprecated and will not function in a future release. + The *transform* parameter is no longer supported and will be removed in the future. Instead, specify *json_depth* as 0 to disable JSON transformation. @@ -307,70 +311,88 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 start = datetime.now(timezone.utc) if not self._quiet: print("ldlite: querying: " + path, file=sys.stderr) - try: - (total_records, records) = self._folio.iterate_records( - path, - self._okapi_timeout, - self._okapi_max_retries, - self.page_size, - query=cast("QueryType", query), - ) - if limit is not None: - total_records = min(total_records, limit) - records = (x for _, x in zip(range(limit), records, strict=False)) - if self._verbose: - print( - "ldlite: estimated row count: " + str(total_records), - file=sys.stderr, - ) - download_started = datetime.now(timezone.utc) - processed = self._database.ingest_records( - table, - cast( - "Iterator[bytes]", - tqdm( - records, - desc="downloading", - total=total_records, - leave=False, - mininterval=5, - disable=self._quiet, - unit=table.split(".")[-1], - unit_scale=True, - delay=5, - ), - ), + (total_records, records) = self._folio.iterate_records( + path, + self._okapi_timeout, + self._okapi_max_retries, + self.page_size, + query=cast("QueryType", query), + ) + if limit is not None: + total_records = min(total_records, limit) + records = (x for _, x in zip(range(limit), records, strict=False)) + if self._verbose: + print( + "ldlite: estimated row count: " + str(total_records), + file=sys.stderr, ) - download = datetime.now(timezone.utc) - download_elapsed = datetime.now(timezone.utc) - download_started - - transform_started = datetime.now(timezone.utc) - self._database.drop_extracted_tables(table) - newtables = [table] - newattrs = {} - if json_depth > 0: - autocommit(self.db, self.dbtype, False) - (jsontables, jsonattrs) = transform_json( - self.db, - self.dbtype, - table, - processed, - self._quiet, - json_depth, - ) - newtables += jsontables - newattrs = jsonattrs - for t in newattrs: - newattrs[t]["__id"] = Attr("__id", "bigint") - newattrs[table] = {"__id": Attr("__id", "bigint")} - - if not keep_raw: - self._database.drop_raw_table(table) - transform_elapsed = datetime.now(timezone.utc) - transform_started - finally: - autocommit(self.db, self.dbtype, True) + download_started = datetime.now(timezone.utc) + processed = self._database.ingest_records( + table, + cast( + "Iterator[bytes]", + tqdm( + records, + desc="downloading", + total=total_records, + leave=False, + mininterval=5, + disable=self._quiet, + unit=table.split(".")[-1], + unit_scale=True, + delay=5, + ), + ), + ) + download = datetime.now(timezone.utc) + download_elapsed = datetime.now(timezone.utc) - download_started + + transform_started = datetime.now(timezone.utc) + if not use_legacy_transform: + newtables = self._database.expand_prefix(table, json_depth, keep_raw) + if keep_raw: + newtables = [table, *newtables] + indexable_attrs = [] + + else: + try: + self._database.drop_extracted_tables(table) + newtables = [table] + newattrs = {} + if json_depth > 0: + autocommit(self.db, self.dbtype, False) + (jsontables, jsonattrs) = transform_json( + self.db, + self.dbtype, + table, + processed, + self._quiet, + json_depth, + ) + newtables += jsontables + newattrs = jsonattrs + for t in newattrs: + newattrs[t]["__id"] = Attr("__id", "bigint") + newattrs[table] = {"__id": Attr("__id", "bigint")} + + if not keep_raw: + self._database.drop_raw_table(table) + + indexable_attrs = [ + (t, a) + for t, attrs in newattrs.items() + for n, a in attrs.items() + if n in ["__id", "id"] + or n.endswith(("_id", "__o")) + or a.datatype == "uuid" + ] + + finally: + autocommit(self.db, self.dbtype, True) + + transform_elapsed = datetime.now(timezone.utc) - transform_started # Create indexes on id columns (for postgres) index_started = datetime.now(timezone.utc) if self.dbtype == DBType.POSTGRES: @@ -381,14 +403,6 @@ def close(self) -> None: ... pbar: tqdm | PbarNoop = PbarNoop() # type:ignore[type-arg] - indexable_attrs = [ - (t, a) - for t, attrs in newattrs.items() - for n, a in attrs.items() - if n in ["__id", "id"] - or n.endswith(("_id", "__o")) - or a.datatype == "uuid" - ] index_total = len(indexable_attrs) if not self._quiet: pbar = tqdm( diff --git a/src/ldlite/database/__init__.py b/src/ldlite/database/__init__.py index 1e67fb7..719656c 100644 --- a/src/ldlite/database/__init__.py +++ b/src/ldlite/database/__init__.py @@ -47,7 +47,7 @@ def ingest_records(self, prefix: str, records: Iterator[bytes]) -> int: """Ingests a stream of records dowloaded from FOLIO to the raw table.""" @abstractmethod - def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> None: + def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[str]: """Unnests and explodes the raw data at the given prefix.""" @abstractmethod diff --git a/src/ldlite/database/_duckdb.py b/src/ldlite/database/_duckdb.py index c0a913a..98ac5bb 100644 --- a/src/ldlite/database/_duckdb.py +++ b/src/ldlite/database/_duckdb.py @@ -94,7 +94,7 @@ def _setup_jfuncs(conn: duckdb.DuckDBPyConnection) -> None: ; CREATE OR REPLACE FUNCTION ldlite_system.jexplode(j) AS TABLE ( - SELECT value FROM main.json_each(j) + SELECT value as ld_value FROM main.json_each(j) ); """, # noqa: E501 @@ -120,7 +120,7 @@ def ingest_records( insert_sql = ( sql.SQL("INSERT INTO {table} VALUES(?, ?);") - .format(table=pfx.schemafy(pfx.raw_table)) + .format(table=pfx.raw_table.id) .as_string() ) # duckdb has better performance bulk inserting in a transaction diff --git a/src/ldlite/database/_expansion/__init__.py b/src/ldlite/database/_expansion/__init__.py index 0afc37e..d6f8bd5 100644 --- a/src/ldlite/database/_expansion/__init__.py +++ b/src/ldlite/database/_expansion/__init__.py @@ -21,7 +21,7 @@ class ExpandContext: source_table: sql.Identifier json_depth: int get_transform_table: Callable[[int], sql.Identifier] - get_output_table: Callable[[str], sql.Identifier] + get_output_table: Callable[[str], tuple[str, sql.Identifier]] # This is necessary for Analyzing the table in pg before querying it # I don't love how this is implemented preprocess: Callable[ @@ -55,19 +55,20 @@ def expand_nonmarc( root_name: str, root_values: list[str], ctx: ExpandContext, -) -> None: - _expand_nonmarc( +) -> list[str]: + (_, created_tables) = _expand_nonmarc( ObjectNode(root_name, "", None, root_values), 0, ctx, ) + return created_tables def _expand_nonmarc( root: ObjectNode, count: int, ctx: ExpandContext, -) -> int: +) -> tuple[int, list[str]]: initial_count = count ctx.preprocess(ctx.conn, ctx.source_table, [root.identifier]) root.unnest( @@ -95,6 +96,8 @@ def _expand_nonmarc( expand_children_of.append(c) count += 1 + created_tables = [] + new_source_table = ctx.get_transform_table(count) arrays = root.descendents_oftype(ArrayNode) ctx.preprocess(ctx.conn, new_source_table, [a.identifier for a in arrays]) @@ -110,7 +113,7 @@ def _expand_nonmarc( count += 1 if an.meta.is_object: - count += _expand_nonmarc( + (sub_index, array_tables) = _expand_nonmarc( ObjectNode( an.name, an.name, @@ -123,8 +126,12 @@ def _expand_nonmarc( ctx.json_depth - len(an.parents), ), ) + count += sub_index + created_tables.extend(array_tables) else: with ctx.conn.cursor() as cur: + (tname, tid) = ctx.get_output_table(an.name) + created_tables.append(tname) cur.execute( sql.SQL( """ @@ -136,7 +143,7 @@ def _expand_nonmarc( """, ) .format( - dest_table=ctx.get_output_table(an.name), + dest_table=tid, source_table=ctx.get_transform_table(count), cols=sql.SQL("\n ,").join( [sql.Identifier(v) for v in [*values, an.name]], @@ -146,12 +153,12 @@ def _expand_nonmarc( ) stamped_values = [ - sql.Identifier(v) - for n in set(root.descendents).difference(arrays) - for v in n.values + sql.Identifier(v) for n in root.descendents if n not in arrays for v in n.values ] with ctx.conn.cursor() as cur: + (tname, tid) = ctx.get_output_table(root.path) + created_tables.append(tname) cur.execute( sql.SQL( """ @@ -163,11 +170,11 @@ def _expand_nonmarc( """, ) .format( - dest_table=ctx.get_output_table(root.path), + dest_table=tid, source_table=new_source_table, cols=sql.SQL("\n ,").join(stamped_values), ) .as_string(), ) - return count + 1 - initial_count + return (count + 1 - initial_count, created_tables) diff --git a/src/ldlite/database/_expansion/metadata.py b/src/ldlite/database/_expansion/metadata.py index 755cd78..15f75f7 100644 --- a/src/ldlite/database/_expansion/metadata.py +++ b/src/ldlite/database/_expansion/metadata.py @@ -31,7 +31,9 @@ def is_object(self) -> bool: @property def snake(self) -> str: - return "".join("_" + c.lower() if c.isupper() else c for c in self.prop) + return "".join("_" + c.lower() if c.isupper() else c for c in self.prop).lstrip( + "_", + ) def select_column( self, @@ -47,6 +49,10 @@ def select_column( "(ldlite_system.jextract_string({json_col}, {prop}))" "::numeric AS {alias}", ) + elif self.json_type == "boolean": + stmt = sql.SQL( + "(ldlite_system.jextract_string({json_col}, {prop}))::bool AS {alias}", + ) elif self.json_type == "string" and self.is_uuid: stmt = sql.SQL( "(ldlite_system.jextract_string({json_col}, {prop}))::uuid AS {alias}", diff --git a/src/ldlite/database/_expansion/nodes.py b/src/ldlite/database/_expansion/nodes.py index 4b5ab5a..7883f51 100644 --- a/src/ldlite/database/_expansion/nodes.py +++ b/src/ldlite/database/_expansion/nodes.py @@ -142,30 +142,30 @@ def unnest( values AS ( SELECT prop - ,ldlite_system.jextract({json_col}, prop) as value + ,ldlite_system.jextract({json_col}, prop) as ld_value FROM {table}, props ), value_and_types AS ( SELECT prop - ,ldlite_system.jtype_of(value) AS json_type - ,value + ,ldlite_system.jtype_of(ld_value) AS json_type + ,ld_value FROM values - WHERE NOT ldlite_system.jis_null(value) + WHERE NOT ldlite_system.jis_null(ld_value) ), array_values AS ( SELECT v.prop - ,ldlite_system.jtype_of(a.value) AS json_type - ,v.value - FROM value_and_types v, ldlite_system.jexplode(v.value) a + ,ldlite_system.jtype_of(a.ld_value) AS json_type + ,v.ld_value + FROM value_and_types v, ldlite_system.jexplode(v.ld_value) a WHERE v.json_type = 'array' ), all_values AS ( SELECT prop ,json_type - ,value + ,ld_value ,FALSE AS is_array FROM value_and_types WHERE json_type <> 'array' @@ -173,18 +173,18 @@ def unnest( SELECT prop ,json_type - ,value + ,ld_value ,TRUE AS is_array FROM array_values - WHERE NOT ldlite_system.jis_null(value) + WHERE NOT ldlite_system.jis_null(ld_value) ) SELECT prop ,STRING_AGG(DISTINCT json_type, '|') AS json_type ,bool_and(is_array) AS is_array - ,bool_and(ldlite_system.jis_uuid(value)) AS is_uuid - ,bool_and(ldlite_system.jis_datetime(value)) AS is_datetime - ,bool_and(ldlite_system.jis_float(value)) AS is_float + ,bool_and(ldlite_system.jis_uuid(ld_value)) AS is_uuid + ,bool_and(ldlite_system.jis_datetime(ld_value)) AS is_datetime + ,bool_and(ldlite_system.jis_float(ld_value)) AS is_float FROM all_values GROUP BY prop """, @@ -261,15 +261,19 @@ def explode( source_cte: str, ) -> list[str]: with conn.cursor() as cur: - o_col = self.name + "_o" + o_col = self.name + "__o" create_columns: list[sql.Composable] = [ - sql.SQL("ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) AS __id"), + sql.SQL( + "(ROW_NUMBER() OVER (ORDER BY (SELECT NULL)))::integer AS __id" + ), *[sql.Identifier(v) for v in self.carryover], - sql.SQL("ROW_NUMBER() OVER (PARTITION BY s.__id) AS {id_alias}").format( + sql.SQL( + "(ROW_NUMBER() OVER (PARTITION BY s.__id))::smallint AS {id_alias}", + ).format( id_alias=sql.Identifier(o_col), ), self.meta.select_column( - sql.Identifier("a", "value"), + sql.Identifier("a", "ld_value"), self.name, ), ] diff --git a/src/ldlite/database/_postgres.py b/src/ldlite/database/_postgres.py index 987810c..a9b4b81 100644 --- a/src/ldlite/database/_postgres.py +++ b/src/ldlite/database/_postgres.py @@ -140,7 +140,7 @@ def _setup_jfuncs(conn: psycopg.Connection) -> None: IMMUTABLE PARALLEL SAFE; -CREATE OR REPLACE FUNCTION ldlite_system.jexplode(j JSONB) RETURNS TABLE (value JSONB) AS $$ +CREATE OR REPLACE FUNCTION ldlite_system.jexplode(j JSONB) RETURNS TABLE (ld_value JSONB) AS $$ SELECT * FROM jsonb_array_elements(j); $$ LANGUAGE sql @@ -174,7 +174,7 @@ def ingest_records( cur.copy( sql.SQL( "COPY {table} (__id, jsonb) FROM STDIN (FORMAT BINARY)", - ).format(table=pfx.schemafy(pfx.raw_table)), + ).format(table=pfx.raw_table.id), ) as copy, ): # postgres jsonb is always version 1 diff --git a/src/ldlite/database/_prefix.py b/src/ldlite/database/_prefix.py index d4ef74d..a93dd81 100644 --- a/src/ldlite/database/_prefix.py +++ b/src/ldlite/database/_prefix.py @@ -1,6 +1,13 @@ +from typing import NamedTuple + from psycopg import sql +class PrefixedTable(NamedTuple): + name: str + id: sql.Identifier + + class Prefix: def __init__(self, prefix: str): self.schema: str | None = None @@ -14,32 +21,34 @@ def __init__(self, prefix: str): else: (self.schema, self._prefix) = sandt - def schemafy(self, table: str) -> sql.Identifier: + def _prefixed_table(self, name: str) -> PrefixedTable: if self.schema is None: - return sql.Identifier(table) - return sql.Identifier(self.schema, table) + return PrefixedTable(name, sql.Identifier(name)) + return PrefixedTable(name, sql.Identifier(self.schema, name)) @property - def raw_table(self) -> str: - return self._prefix + def raw_table(self) -> PrefixedTable: + return self._prefixed_table(self._prefix) @property def _output_table(self) -> str: return self._prefix + "__t" - def output_table(self, prefix: str) -> sql.Identifier: - if len(prefix) == 0: - return self.schemafy(self._output_table) - - return self.schemafy(self._output_table + "__" + prefix) + def output_table(self, prefix: str) -> PrefixedTable: + return self._prefixed_table( + self._output_table + ("" if len(prefix) == 0 else "__" + prefix), + ) @property - def catalog_table(self) -> str: - return f"{self._prefix}__tcatalog" + def catalog_table(self) -> PrefixedTable: + return self._prefixed_table(self._prefix + "__tcatalog") + + def catalog_table_row(self, created_table: str) -> str: + return ((self.schema + ".") if self.schema is not None else "") + created_table @property - def legacy_jtable(self) -> str: - return f"{self._prefix}_jtable" + def legacy_jtable(self) -> PrefixedTable: + return self._prefixed_table(self._prefix + "_jtable") @property def load_history_key(self) -> str: diff --git a/src/ldlite/database/_typed_database.py b/src/ldlite/database/_typed_database.py index 6ef07c7..b67d367 100644 --- a/src/ldlite/database/_typed_database.py +++ b/src/ldlite/database/_typed_database.py @@ -26,7 +26,7 @@ def __init__(self, conn_factory: Callable[[], DB]): with conn.cursor() as cur: cur.execute('CREATE SCHEMA IF NOT EXISTS "ldlite_system";') cur.execute(""" -CREATE TABLE IF NOT EXISTS "ldlite_system"."load_history" ( +CREATE TABLE IF NOT EXISTS "ldlite_system"."load_history_v1" ( "table_name" TEXT UNIQUE ,"path" TEXT ,"query" TEXT @@ -58,7 +58,7 @@ def drop_prefix( self._drop_extracted_tables(conn, pfx) self._drop_raw_table(conn, pfx) conn.execute( - 'DELETE FROM "ldlite_system"."load_history" WHERE "table_name" = $1', + 'DELETE FROM "ldlite_system"."load_history_v1" WHERE "table_name" = $1', (pfx.load_history_key,), ) conn.commit() @@ -79,7 +79,7 @@ def _drop_raw_table( with closing(conn.cursor()) as cur: cur.execute( sql.SQL("DROP TABLE IF EXISTS {table};") - .format(table=prefix.schemafy(prefix.raw_table)) + .format(table=prefix.raw_table.id) .as_string(), ) @@ -104,23 +104,23 @@ def _drop_extracted_tables( WHERE table_schema = $1 and table_name IN ($2, $3);""", ( prefix.schema or self._default_schema, - prefix.catalog_table, - prefix.legacy_jtable, + prefix.catalog_table.name, + prefix.legacy_jtable.name, ), ) for (tname,) in cur.fetchall(): - if tname == prefix.catalog_table: + if tname == prefix.catalog_table.name: cur.execute( sql.SQL("SELECT table_name FROM {catalog};") - .format(catalog=prefix.schemafy(prefix.catalog_table)) + .format(catalog=prefix.catalog_table.id) .as_string(), ) tables.extend(cur.fetchall()) - if tname == prefix.legacy_jtable: + if tname == prefix.legacy_jtable.name: cur.execute( sql.SQL("SELECT table_name FROM {catalog};") - .format(catalog=prefix.schemafy(prefix.legacy_jtable)) + .format(catalog=prefix.legacy_jtable.id) .as_string(), ) tables.extend(cur.fetchall()) @@ -134,12 +134,12 @@ def _drop_extracted_tables( ) cur.execute( sql.SQL("DROP TABLE IF EXISTS {catalog};") - .format(catalog=prefix.schemafy(prefix.catalog_table)) + .format(catalog=prefix.catalog_table.id) .as_string(), ) cur.execute( sql.SQL("DROP TABLE IF EXISTS {catalog};") - .format(catalog=prefix.schemafy(prefix.legacy_jtable)) + .format(catalog=prefix.legacy_jtable.id) .as_string(), ) @@ -162,7 +162,7 @@ def _prepare_raw_table( with closing(conn.cursor()) as cur: cur.execute( self._create_raw_table_sql.format( - table=prefix.schemafy(prefix.raw_table), + table=prefix.raw_table.id, ).as_string(), ) @@ -179,9 +179,14 @@ def preprocess_source_table( @abstractmethod def source_table_cte_stmt(self, keep_source: bool) -> str: ... - def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> None: + def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[str]: pfx = Prefix(prefix) with closing(self._conn_factory()) as conn: + self._drop_extracted_tables(conn, pfx) + if json_depth < 1: + conn.commit() + return [] + with conn.cursor() as cur: cur.execute( sql.SQL( @@ -195,7 +200,7 @@ def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> None: ) .format( dest_table=pfx.origin_table, - source_table=pfx.schemafy(pfx.raw_table), + source_table=pfx.raw_table.id, ) .as_string(), ) @@ -203,7 +208,7 @@ def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> None: if not keep_raw: self._drop_raw_table(conn, pfx) - expand_nonmarc( + created_tables = expand_nonmarc( "jsonb", ["__id"], ExpandContext( @@ -217,13 +222,36 @@ def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> None: ), ) + with conn.cursor() as cur: + cur.execute( + sql.SQL( + """ +CREATE TABLE {catalog_table} ( + table_name text +) +""", + ) + .format(catalog_table=pfx.catalog_table.id) + .as_string(), + ) + cur.executemany( + sql.SQL("INSERT INTO {catalog_table} VALUES ($1)") + .format( + catalog_table=pfx.catalog_table.id, + ) + .as_string(), + [(pfx.catalog_table_row(t),) for t in created_tables], + ) + conn.commit() + return created_tables + def record_history(self, history: LoadHistory) -> None: with closing(self._conn_factory()) as conn, conn.cursor() as cur: cur.execute( """ -INSERT INTO "ldlite_system"."load_history" VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9) +INSERT INTO "ldlite_system"."load_history_v1" VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9) ON CONFLICT ("table_name") DO UPDATE SET "path" = EXCLUDED."path" ,"query" = EXCLUDED."query" diff --git a/tests/test___init__.py b/tests/test___init__.py index e363ed4..e19eb19 100644 --- a/tests/test___init__.py +++ b/tests/test___init__.py @@ -54,6 +54,20 @@ def test_ok_legacy(self, folio_params: tuple[bool, FolioParams]) -> None: ld.query(table="g", path="/groups", query="cql.allRecords=1 sortby id") ld.select(table="g__t") + def test_ok_legacy_transform(self, folio_params: tuple[bool, FolioParams]) -> None: + from ldlite import LDLite as uut + + ld = uut() + ld.connect_folio(*astuple(folio_params[1])) + ld.connect_db() + ld.query( + table="g", + path="/groups", + query="cql.allRecords=1 sortby id", + use_legacy_transform=True, + ) + ld.select(table="g__t") + def test_ok_limit(self, folio_params: tuple[bool, FolioParams]) -> None: from ldlite import LDLite as uut diff --git a/tests/test_drop_tables.py b/tests/test_drop_tables.py index a0d1d42..1050cd3 100644 --- a/tests/test_drop_tables.py +++ b/tests/test_drop_tables.py @@ -28,11 +28,11 @@ class DropTablesTC(MockedResponseTestCase): def case_one_table(keep_raw: bool) -> DropTablesTC: return DropTablesTC( Call( - "prefix", + "sch.prefix", returns={"purchaseOrders": [{"id": "1"}]}, keep_raw=keep_raw, ), - drop="prefix", + drop="sch.prefix", expected_tables=[], ) @@ -41,7 +41,7 @@ def case_one_table(keep_raw: bool) -> DropTablesTC: def case_two_tables(keep_raw: bool) -> DropTablesTC: return DropTablesTC( Call( - "prefix", + "sch.prefix", returns={ "purchaseOrders": [ { @@ -52,7 +52,7 @@ def case_two_tables(keep_raw: bool) -> DropTablesTC: }, keep_raw=keep_raw, ), - drop="prefix", + drop="sch.prefix", expected_tables=[], ) @@ -69,7 +69,7 @@ def case_separate_table(keep_raw: bool) -> DropTablesTC: return DropTablesTC( [ Call( - "prefix", + "sch.prefix", returns={"purchaseOrders": [{"id": "1"}]}, keep_raw=keep_raw, ), @@ -79,7 +79,7 @@ def case_separate_table(keep_raw: bool) -> DropTablesTC: keep_raw=keep_raw, ), ], - drop="prefix", + drop="sch.prefix", expected_tables=expected_tables, ) @@ -120,11 +120,11 @@ def _assert( ) assert sorted([r[0] for r in cur.fetchall()]) == sorted(tc.expected_tables) - cur.execute('SELECT COUNT(*) FROM "ldlite_system"."load_history"') + cur.execute('SELECT COUNT(*) FROM "ldlite_system"."load_history_v1"') assert (ud := cur.fetchone()) is not None assert ud[0] == len(tc.calls_list) - 1 cur.execute( - 'SELECT COUNT(*) FROM "ldlite_system"."load_history" ' + 'SELECT COUNT(*) FROM "ldlite_system"."load_history_v1" ' 'WHERE "table_name" = $1', (tc.drop,), ) diff --git a/tests/test_expansion.py b/tests/test_expansion.py index bcb62b7..8967736 100644 --- a/tests/test_expansion.py +++ b/tests/test_expansion.py @@ -51,6 +51,7 @@ def case_typed_columns() -> ExpansionTC: "id": "id1", "numeric": 1, "text": "value", + "boolean": false, "uuid": "88888888-8888-1888-8888-888888888888" } """, @@ -59,6 +60,7 @@ def case_typed_columns() -> ExpansionTC: "id": "id2", "numeric": 2, "text": "00000000-0000-1000-A000-000000000000", + "boolean": false, "uuid": "11111111-1111-1111-8111-111111111111" } """, @@ -77,6 +79,7 @@ def case_typed_columns() -> ExpansionTC: ("numeric", "DECIMAL(18,3)"), ("text", "VARCHAR"), ("uuid", "UUID"), + ("boolean", "BOOLEAN"), ] ], ) @@ -88,6 +91,7 @@ def case_typed_columns() -> ExpansionTC: ("all_null", None, None), ("nullable_numeric", "numeric", "DECIMAL(18,3)"), ("nullable_uuid", "uuid", "UUID"), + ("nullable_bool", "boolean", "BOOLEAN"), ("nullable_object__id", "numeric", "DECIMAL(18,3)"), ("nullable_array", "numeric", "DECIMAL(18,3)"), ("sortof_nullable_array__id", "numeric", "DECIMAL(18,3)"), @@ -101,6 +105,7 @@ def case_null(assertion: tuple[str, str | None, str | None]) -> ExpansionTC: { "all_null": null, "nullable_numeric": null, + "nullable_bool": null, "nullable_uuid": null, "nullable_object": null, "nullable_array": [], @@ -112,6 +117,7 @@ def case_null(assertion: tuple[str, str | None, str | None]) -> ExpansionTC: "all_null": null, "nullable_numeric": 5, "nullable_uuid": null, + "nullable_bool": false, "nullable_object": { "id": 5 }, "nullable_array": null, "sortof_nullable_array": [{}, {}] @@ -121,6 +127,7 @@ def case_null(assertion: tuple[str, str | None, str | None]) -> ExpansionTC: { "all_null": null, "nullable_numeric": null, + "nullable_bool": true, "nullable_uuid": "0b03c888-102b-18e9-afb7-85e22229ca4d", "nullable_object": { "id": null}, "nullable_array": [null, 5, null], @@ -216,6 +223,14 @@ def case_basic_array() -> ExpansionTC: assertions=[ Assertion("""SELECT COUNT(*) FROM tests.prefix__t__list1""", expect=6), Assertion("""SELECT COUNT(*) FROM tests.prefix__t__list2""", expect=2), + Assertion( + """SELECT * FROM tests.prefix__tcatalog ORDER BY table_name""", + expect=[ + ("tests.prefix__t",), + ("tests.prefix__t__list1",), + ("tests.prefix__t__list2",), + ], + ), Assertion( """ SELECT id, list1 @@ -233,9 +248,9 @@ def case_basic_array() -> ExpansionTC: ), Assertion( """ -SELECT id, list1_o +SELECT id, list1__o FROM tests.prefix__t__list1 -ORDER BY id, list1_o +ORDER BY id, list1__o """, expect=[ ("id1", 1), @@ -255,15 +270,15 @@ def case_basic_array() -> ExpansionTC: ORDER BY ORDINAL_POSITION """, exp_duck=[ - ("__id", "BIGINT"), + ("__id", "INTEGER"), ("id", "VARCHAR"), - (f"{a[0]}_o", "BIGINT"), + (f"{a[0]}__o", "SMALLINT"), (f"{a[0]}", a[1]), ], exp_pg=[ - ("__id", "bigint"), + ("__id", "integer"), ("id", "text"), - (f"{a[0]}_o", "bigint"), + (f"{a[0]}__o", "smallint"), (f"{a[0]}", a[2]), ], ) @@ -294,6 +309,13 @@ def case_nested_arrays() -> ExpansionTC: ], assertions=[ Assertion("""SELECT COUNT(*) FROM tests.prefix__t__sub""", expect=4), + Assertion( + """SELECT * FROM tests.prefix__tcatalog ORDER BY table_name""", + expect=[ + ("tests.prefix__t",), + ("tests.prefix__t__sub",), + ], + ), Assertion( """ SELECT id, sub__id @@ -309,9 +331,9 @@ def case_nested_arrays() -> ExpansionTC: ), Assertion( """ -SELECT id, sub_o +SELECT id, sub__o FROM tests.prefix__t__sub -ORDER BY id, sub_o +ORDER BY id, sub__o """, expect=[ ("id1", 1), @@ -328,15 +350,15 @@ def case_nested_arrays() -> ExpansionTC: ORDER BY ORDINAL_POSITION """, exp_duck=[ - ("__id", "BIGINT"), + ("__id", "INTEGER"), ("id", "VARCHAR"), - ("sub_o", "BIGINT"), + ("sub__o", "SMALLINT"), ("sub__id", "VARCHAR"), ], exp_pg=[ - ("__id", "bigint"), + ("__id", "integer"), ("id", "text"), - ("sub_o", "bigint"), + ("sub__o", "smallint"), ("sub__id", "text"), ], ), @@ -352,6 +374,12 @@ def case_basic_object() -> ExpansionTC: ], assertions=[ Assertion("SELECT COUNT(*) FROM tests.prefix__t;", 2), + Assertion( + """SELECT * FROM tests.prefix__tcatalog ORDER BY table_name""", + expect=[ + ("tests.prefix__t",), + ], + ), Assertion("SELECT id FROM tests.prefix__t WHERE __id = 1", "id1"), Assertion( "SELECT camel_value FROM tests.prefix__t WHERE __id = 1", @@ -410,6 +438,12 @@ def case_nested_objects() -> ExpansionTC: ], assertions=[ Assertion("SELECT COUNT(*) FROM tests.prefix__t;", 2), + Assertion( + """SELECT * FROM tests.prefix__tcatalog ORDER BY table_name""", + expect=[ + ("tests.prefix__t",), + ], + ), Assertion("SELECT id FROM tests.prefix__t WHERE __id = 1", "id1"), Assertion("SELECT sub__id FROM tests.prefix__t WHERE __id = 1", "sub_id1"), Assertion( @@ -483,6 +517,15 @@ def case_json_depth() -> ExpansionTC: ], assertions=[ Assertion("""SELECT "depth2_obj4__id" FROM tests.prefix__t""", "id2"), + Assertion( + """SELECT * FROM tests.prefix__tcatalog ORDER BY table_name""", + expect=[ + ("tests.prefix__t",), + ("tests.prefix__t__depth2_arr",), + ("tests.prefix__t__depth2_arr__depth3_arr",), + ("tests.prefix__t__depth2_obj__depth3_arr",), + ], + ), Assertion( """ SELECT "depth2_obj__depth3_obj__id" diff --git a/tests/test_json_operators.py b/tests/test_json_operators.py index 29aaf3d..d89cf44 100644 --- a/tests/test_json_operators.py +++ b/tests/test_json_operators.py @@ -159,11 +159,11 @@ def case_jexplode(p: tuple[Any, ...]) -> JsonTC: """ {assertion} ( - SELECT a.value FROM j, ldlite_system.jexplode(j.jc->$1) a + SELECT a.ld_value FROM j, ldlite_system.jexplode(j.jc->$1) a EXCEPT SELECT value::{jtype} FROM unnest($2::text[]) AS expect(value) UNION ALL SELECT value::{jtype} FROM unnest($2::text[]) AS expect(value) - EXCEPT SELECT a.value FROM j, ldlite_system.jexplode(j.jc->$1) a + EXCEPT SELECT a.ld_value FROM j, ldlite_system.jexplode(j.jc->$1) a ) """, p, diff --git a/tests/test_load_history.py b/tests/test_load_history.py index b6afa24..d26120e 100644 --- a/tests/test_load_history.py +++ b/tests/test_load_history.py @@ -126,13 +126,14 @@ def _assert( tc: LoadHistoryTC, ) -> None: with closing(conn.cursor()) as cur: - cur.execute('SELECT COUNT(*) FROM "ldlite_system"."load_history"') + cur.execute('SELECT COUNT(*) FROM "ldlite_system"."load_history_v1"') assert (ud := cur.fetchone()) is not None assert ud[0] == len(tc.expected_loads) for tn, (q, t) in tc.expected_loads.items(): cur.execute( - 'SELECT * FROM "ldlite_system"."load_history" WHERE "table_name" = $1', + 'SELECT * FROM "ldlite_system"."load_history_v1" ' + 'WHERE "table_name" = $1', (tn,), ) assert (d := cur.fetchone()) is not None diff --git a/tests/test_query.py b/tests/test_query.py index f75a369..96ce1e6 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1,4 +1,3 @@ -import json from collections.abc import Callable from contextlib import closing from dataclasses import dataclass @@ -419,13 +418,8 @@ def case_nested_object_underexpansion() -> QueryTC: ( "b096504a-3d54-4664-9bf5-1b872466fd66", "value", - json.dumps( - { - "id": "2b94c631-fca9-4892-a730-03ee529ffe2a", - "value": "sub-value", - }, - indent=4, - ), + '{"id":"2b94c631-fca9-4892-a730-03ee529ffe2a",' + '"value":"sub-value"}', ), ], ), @@ -587,29 +581,27 @@ def case_id_generation() -> QueryTC: ], expected_values={ "prefix__t__sub_objects": ( - ["__id", "id", "sub_objects__o", "sub_objects__id"], + ["__id", "id", "sub_objects__o"], [ ( "1", "b096504a-3d54-4664-9bf5-1b872466fd66", "1", - "2b94c631-fca9-4892-a730-03ee529ffe2a", ), ( "2", "b096504a-3d54-4664-9bf5-1b872466fd66", "2", - "b5d8cdc4-9441-487c-90cf-0c7ec97728eb", ), ], ), "prefix__t__sub_objects__sub_sub_objects": ( - ["__id", "sub_objects__o", "sub_objects__sub_sub_objects__o"], + ["sub_objects__o", "sub_objects__sub_sub_objects__o"], [ - ("1", "1", "1"), - ("2", "1", "2"), - ("3", "2", "1"), - ("4", "2", "2"), + ("1", "1"), + ("1", "2"), + ("2", "1"), + ("2", "2"), ], ), }, @@ -783,10 +775,15 @@ def _assert( for table, (cols, values) in tc.expected_values.items(): cur.execute( - sql.SQL("SELECT {cols}::text FROM {table};") + sql.SQL('SELECT {cols} FROM {table} ORDER BY {cols} COLLATE "C";') .format( - cols=sql.SQL("::text, ").join( - [sql.Identifier(c) for c in cols], + cols=sql.SQL(", ").join( + [ + sql.SQL( + "REGEXP_REPLACE({col}::text, '\\s+', '', 'g')", + ).format(col=sql.Identifier(c)) + for c in cols + ], ), table=sql.Identifier(table), )