diff --git a/environment.yaml b/environment.yaml index 22c24ca..ea467a4 100644 --- a/environment.yaml +++ b/environment.yaml @@ -7,4 +7,4 @@ dependencies: - python>=3.10,<3.11 - pdm==2.26.6 - precious==0.10.2 - - libpq>=13.0 + - libpq>=14.0 diff --git a/src/ldlite/__init__.py b/src/ldlite/__init__.py index 1c118ae..3d9d3ef 100644 --- a/src/ldlite/__init__.py +++ b/src/ldlite/__init__.py @@ -336,6 +336,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 tqdm( records, desc="downloading", + leave=False, total=total_records, mininterval=5, disable=self._quiet, @@ -350,12 +351,40 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915 transform_started = datetime.now(timezone.utc) if not use_legacy_transform: - newtables = self._database.expand_prefix(table, json_depth, keep_raw) + no_iters_format = ( + "{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]" + ) + with ( + tqdm( + desc="scanning", + leave=False, + disable=self._quiet, + bar_format=no_iters_format, + ) as scan_progress, + tqdm( + desc="transforming", + leave=False, + disable=self._quiet, + bar_format=no_iters_format, + ) as transform_progress, + ): + newtables = self._database.expand_prefix( + table, + json_depth, + keep_raw, + scan_progress, + transform_progress, + ) if keep_raw: newtables = [table, *newtables] transform_elapsed = datetime.now(timezone.utc) - transform_started - with tqdm(desc="indexing", disable=self._quiet) as progress: + with tqdm( + desc="indexing", + leave=False, + disable=self._quiet, + bar_format=no_iters_format, + ) as progress: index_started = datetime.now(timezone.utc) self._database.index_prefix(table, progress) diff --git a/src/ldlite/database/__init__.py b/src/ldlite/database/__init__.py index 9acd7b8..e1df517 100644 --- a/src/ldlite/database/__init__.py +++ b/src/ldlite/database/__init__.py @@ -54,7 +54,14 @@ def ingest_records(self, prefix: str, records: Iterator[bytes]) -> int: """Ingests a stream of records dowloaded from FOLIO to the raw table.""" @abstractmethod - def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[str]: + def expand_prefix( + self, + prefix: str, + json_depth: int, + keep_raw: bool, + scan_progress: tqdm[NoReturn] | None = None, + transform_progress: tqdm[NoReturn] | None = None, + ) -> list[str]: """Unnests and explodes the raw data at the given prefix.""" @abstractmethod diff --git a/src/ldlite/database/_duckdb.py b/src/ldlite/database/_duckdb.py index 98ac5bb..1ca8508 100644 --- a/src/ldlite/database/_duckdb.py +++ b/src/ldlite/database/_duckdb.py @@ -41,62 +41,33 @@ def _setup_jfuncs(conn: duckdb.DuckDBPyConnection) -> None: END ; -CREATE OR REPLACE FUNCTION ldlite_system.jextract(j, p) AS - CASE ldlite_system.jtype_of(main.json_extract(j, p)) - WHEN 'string' THEN - CASE - WHEN lower(main.json_extract_string(j, p)) = 'null' THEN 'null'::JSON - WHEN length(main.json_extract_string(j, p)) = 0 THEN 'null'::JSON - ELSE main.json_extract(j, p) - END - WHEN 'object' THEN - CASE - WHEN main.json_extract_string(j, p) = '{}' THEN 'null'::JSON - ELSE main.json_extract(j, p) - END - WHEN 'array' THEN - CASE - WHEN length(list_filter((main.json_extract(j, p))::JSON[], lambda x : x != 'null'::JSON)) = 0 THEN 'null'::JSON - ELSE list_filter((main.json_extract(j, p))::JSON[], lambda x : x != 'null'::JSON) - END - ELSE coalesce(main.json_extract(j, p), 'null'::JSON) - END -; - -CREATE OR REPLACE FUNCTION ldlite_system.jextract_string(j, p) AS - main.json_extract_string(ldlite_system.jextract(j, p), '$') -; - -CREATE OR REPLACE FUNCTION ldlite_system.jobject_keys(j) AS - unnest(main.json_keys(j)) +CREATE OR REPLACE FUNCTION ldlite_system.jobject_keys(j) AS TABLE + SELECT je.key as ld_key FROM json_each(j) je ORDER BY je.id ; CREATE OR REPLACE FUNCTION ldlite_system.jis_uuid(j) AS - CASE ldlite_system.jtype_of(j) - WHEN 'string' THEN regexp_full_match(main.json_extract_string(j, '$'), '^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[1-5][a-fA-F0-9]{3}-[89abAB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}$') - ELSE FALSE - END + regexp_full_match(main.json_extract_string(j, '$'), '(?i)^[a-f0-9]{8}-[a-f0-9]{4}-[1-5][a-f0-9]{3}-[89abAB][a-f0-9]{3}-[a-f0-9]{12}$') ; CREATE OR REPLACE FUNCTION ldlite_system.jis_datetime(j) AS - CASE ldlite_system.jtype_of(j) - WHEN 'string' THEN regexp_full_match(main.json_extract_string(j, '$'), '^\d{4}-[01]\d-[0123]\dT[012]\d:[012345]\d:[012345]\d\.\d{3}(\+\d{2}:\d{2})?$') - ELSE FALSE - END + regexp_full_match(main.json_extract_string(j, '$'), '^\d{4}-[01]\d-[0123]\dT[012]\d:[012345]\d:[012345]\d\.\d{3}(\+\d{2}:\d{2})?$') ; CREATE OR REPLACE FUNCTION ldlite_system.jis_float(j) AS - coalesce(main.json_type(j), 'NULL') == 'DOUBLE' + main.json_type(j) == 'DOUBLE' ; CREATE OR REPLACE FUNCTION ldlite_system.jis_null(j) AS - j is NULL or j == 'null'::JSON + j IS NULL OR j == 'null'::JSON OR main.json_extract_string(j, '$') IN ('NULL', 'null', '', '{}', '[]') ; CREATE OR REPLACE FUNCTION ldlite_system.jexplode(j) AS TABLE ( SELECT value as ld_value FROM main.json_each(j) ); +CREATE OR REPLACE FUNCTION ldlite_system.jself_string(j) AS + main.json_extract_string(j, '$') +; """, # noqa: E501 ) diff --git a/src/ldlite/database/_expansion/__init__.py b/src/ldlite/database/_expansion/__init__.py index d6f8bd5..ae78c48 100644 --- a/src/ldlite/database/_expansion/__init__.py +++ b/src/ldlite/database/_expansion/__init__.py @@ -1,7 +1,6 @@ from __future__ import annotations from collections import deque -from dataclasses import dataclass from typing import TYPE_CHECKING from psycopg import sql @@ -9,46 +8,7 @@ from .nodes import ArrayNode, ObjectNode if TYPE_CHECKING: - from collections.abc import Callable - - import duckdb - import psycopg - - -@dataclass -class ExpandContext: - conn: duckdb.DuckDBPyConnection | psycopg.Connection - source_table: sql.Identifier - json_depth: int - get_transform_table: Callable[[int], sql.Identifier] - get_output_table: Callable[[str], tuple[str, sql.Identifier]] - # This is necessary for Analyzing the table in pg before querying it - # I don't love how this is implemented - preprocess: Callable[ - [ - duckdb.DuckDBPyConnection | psycopg.Connection, - sql.Identifier, - list[sql.Identifier], - ], - None, - ] - # source_cte will go away when DuckDB implements CTAS RETURNING - source_cte: Callable[[bool], str] - - def array_context( - self, - new_source_table: sql.Identifier, - new_json_depth: int, - ) -> ExpandContext: - return ExpandContext( - self.conn, - new_source_table, - new_json_depth, - self.get_transform_table, - self.get_output_table, - self.preprocess, - self.source_cte, - ) + from .context import ExpandContext def expand_nonmarc( @@ -69,18 +29,28 @@ def _expand_nonmarc( count: int, ctx: ExpandContext, ) -> tuple[int, list[str]]: + ctx.scan_progress.total = (ctx.scan_progress.total or 0) + 1 + ctx.scan_progress.refresh() + ctx.transform_progress.total = (ctx.transform_progress.total or 0) + 1 + ctx.transform_progress.refresh() initial_count = count ctx.preprocess(ctx.conn, ctx.source_table, [root.identifier]) - root.unnest( - ctx.conn, + has_rows = root.unnest( + ctx, ctx.source_table, ctx.get_transform_table(count), ctx.source_cte(False), ) + ctx.transform_progress.update(1) + if not has_rows: + return (0, []) expand_children_of = deque([root]) while expand_children_of: on = expand_children_of.popleft() + if ctx.transform_progress: + ctx.transform_progress.total += len(on.object_children) + ctx.transform_progress.refresh() for c in on.object_children: if len(c.parents) >= ctx.json_depth: if c.parent is not None: @@ -88,18 +58,21 @@ def _expand_nonmarc( continue ctx.preprocess(ctx.conn, ctx.get_transform_table(count), [c.identifier]) c.unnest( - ctx.conn, + ctx, ctx.get_transform_table(count), ctx.get_transform_table(count + 1), ctx.source_cte(False), ) expand_children_of.append(c) count += 1 + ctx.transform_progress.update(1) created_tables = [] new_source_table = ctx.get_transform_table(count) arrays = root.descendents_oftype(ArrayNode) + ctx.transform_progress.total += len(arrays) + ctx.transform_progress.refresh() ctx.preprocess(ctx.conn, new_source_table, [a.identifier for a in arrays]) for an in arrays: if len(an.parents) >= ctx.json_depth: @@ -111,6 +84,7 @@ def _expand_nonmarc( ctx.source_cte(True), ) count += 1 + ctx.transform_progress.update(1) if an.meta.is_object: (sub_index, array_tables) = _expand_nonmarc( diff --git a/src/ldlite/database/_expansion/context.py b/src/ldlite/database/_expansion/context.py new file mode 100644 index 0000000..7b9e29a --- /dev/null +++ b/src/ldlite/database/_expansion/context.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, NoReturn + +if TYPE_CHECKING: + from collections.abc import Callable + + import duckdb + import psycopg + from psycopg import sql + from tqdm import tqdm + + +@dataclass +class ExpandContext: + conn: duckdb.DuckDBPyConnection | psycopg.Connection + source_table: sql.Identifier + json_depth: int + get_transform_table: Callable[[int], sql.Identifier] + get_output_table: Callable[[str], tuple[str, sql.Identifier]] + # This is necessary for Analyzing the table in pg before querying it + # I don't love how this is implemented + preprocess: Callable[ + [ + duckdb.DuckDBPyConnection | psycopg.Connection, + sql.Identifier, + list[sql.Identifier], + ], + None, + ] + # source_cte will go away when DuckDB implements CTAS RETURNING + source_cte: Callable[[bool], str] + scan_progress: tqdm[NoReturn] + transform_progress: tqdm[NoReturn] + + def array_context( + self, + new_source_table: sql.Identifier, + new_json_depth: int, + ) -> ExpandContext: + return ExpandContext( + self.conn, + new_source_table, + new_json_depth, + self.get_transform_table, + self.get_output_table, + self.preprocess, + self.source_cte, + self.scan_progress, + self.transform_progress, + ) diff --git a/src/ldlite/database/_expansion/metadata.py b/src/ldlite/database/_expansion/metadata.py index 15f75f7..e18b854 100644 --- a/src/ldlite/database/_expansion/metadata.py +++ b/src/ldlite/database/_expansion/metadata.py @@ -40,26 +40,44 @@ def select_column( json_col: sql.Identifier, alias: str, ) -> sql.Composed: + # '$' is a special character that means the root of the json + # I couldn't figure out how to make the array expansion work + # without it if self.is_array or self.is_object: stmt = sql.SQL( - "(ldlite_system.jextract({json_col}, {prop})) AS {alias}", + "{json_col}->{prop} AS {alias}" + if self.prop != "$" + else "{json_col} AS {alias}", ) elif self.json_type == "number": stmt = sql.SQL( - "(ldlite_system.jextract_string({json_col}, {prop}))" - "::numeric AS {alias}", + "({json_col}->>{prop})::numeric AS {alias}" + if self.prop != "$" + else "ldlite_system.jself_string({json_col})::numeric AS {alias}", ) elif self.json_type == "boolean": stmt = sql.SQL( - "(ldlite_system.jextract_string({json_col}, {prop}))::bool AS {alias}", + "NULLIF(NULLIF({json_col}->>{prop}, ''), 'null')::bool AS {alias}" + if self.prop != "$" + else "NULLIF(NULLIF(" + "ldlite_system.jself_string({json_col})" + ", ''), 'null')::bool AS {alias}", ) elif self.json_type == "string" and self.is_uuid: stmt = sql.SQL( - "(ldlite_system.jextract_string({json_col}, {prop}))::uuid AS {alias}", + "NULLIF(NULLIF({json_col}->>{prop}, ''), 'null')::uuid AS {alias}" + if self.prop != "$" + else "NULLIF(NULLIF(" + "ldlite_system.jself_string({json_col})" + ", ''), 'null')::uuid AS {alias}", ) else: stmt = sql.SQL( - "(ldlite_system.jextract_string({json_col}, {prop})) AS {alias}", + "NULLIF(NULLIF({json_col}->>{prop}, ''), 'null') AS {alias}" + if self.prop != "$" + else "NULLIF(NULLIF(" + "ldlite_system.jself_string({json_col})" + ", ''), 'null') AS {alias}", ) return stmt.format( diff --git a/src/ldlite/database/_expansion/nodes.py b/src/ldlite/database/_expansion/nodes.py index 7883f51..75ea842 100644 --- a/src/ldlite/database/_expansion/nodes.py +++ b/src/ldlite/database/_expansion/nodes.py @@ -1,7 +1,8 @@ from __future__ import annotations from collections import deque -from typing import TYPE_CHECKING, TypeVar +from math import floor +from typing import TYPE_CHECKING, TypeVar, cast from psycopg import sql @@ -11,6 +12,8 @@ import duckdb import psycopg + from .context import ExpandContext + from .metadata import Metadata TNode = TypeVar("TNode", bound="ExpansionNode") @@ -117,90 +120,148 @@ def object_children(self) -> list[ObjectNode]: def unnest( self, - conn: duckdb.DuckDBPyConnection | psycopg.Connection, + ctx: ExpandContext, source_table: sql.Identifier, dest_table: sql.Identifier, source_cte: str, - ) -> None: + ) -> bool: self.unnested = True create_columns: list[sql.Composable] = [ sql.Identifier(v) for v in self.carryover ] - with conn.cursor() as cur: + with ctx.conn.cursor() as cur: + cur.execute( + sql.SQL("SELECT COUNT(*) FROM {table}") + .format(table=source_table) + .as_string(), + ) + total = cast("tuple[int]", cur.fetchone())[0] + if total == 0: + return False + + with ctx.conn.cursor() as cur: cur.execute( sql.SQL( """ WITH - one_object AS ( - SELECT {json_col} AS json - FROM {table} - WHERE NOT ldlite_system.jis_null({json_col}) - LIMIT 1 + keys AS ( + SELECT + keys.ld_key AS k + ,ROW_NUMBER() OVER (PARTITION BY t.__id) AS idx + FROM {source_table} t, ldlite_system.jobject_keys(t.{json_col}) keys + WHERE {json_col} IS NOT NULL AND ldlite_system.jtype_of(t.{json_col}) = 'object' ), - props AS (SELECT ldlite_system.jobject_keys(json) AS prop FROM one_object), - values AS ( + ordered_keys AS ( SELECT - prop - ,ldlite_system.jextract({json_col}, prop) as ld_value - FROM {table}, props + k + ,MAX(idx) idx + ,COUNT(idx) freq + FROM keys + GROUP BY k + ) +SELECT k +FROM ordered_keys +ORDER BY idx, freq DESC +""", + ) + .format(source_table=source_table, json_col=self.identifier) + .as_string(), + ) + props = [prop[0] for prop in cur.fetchall()] + + prop_count = len(props) + ctx.scan_progress.total += prop_count + ctx.scan_progress.refresh() + ctx.scan_progress.update(1) + + for prop in props: + with ctx.conn.cursor() as cur: + cur.execute( + sql.SQL( + """ +WITH + values AS ( + SELECT {json_col}->$1 as ld_value + FROM {table} ), value_and_types AS ( SELECT - prop - ,ldlite_system.jtype_of(ld_value) AS json_type + ldlite_system.jtype_of(ld_value) AS json_type ,ld_value FROM values WHERE NOT ldlite_system.jis_null(ld_value) ), array_values AS ( SELECT - v.prop - ,ldlite_system.jtype_of(a.ld_value) AS json_type - ,v.ld_value - FROM value_and_types v, ldlite_system.jexplode(v.ld_value) a + ldlite_system.jtype_of(a.ld_value) AS json_type + ,a.ld_value + FROM value_and_types v + CROSS JOIN LATERAL ( + SELECT s.ld_value + FROM ldlite_system.jexplode(v.ld_value) AS s + WHERE NOT ldlite_system.jis_null(s.ld_value) + LIMIT 3 + ) a WHERE v.json_type = 'array' ), all_values AS ( SELECT - prop - ,json_type + json_type ,ld_value ,FALSE AS is_array FROM value_and_types WHERE json_type <> 'array' UNION SELECT - prop - ,json_type + json_type ,ld_value ,TRUE AS is_array FROM array_values WHERE NOT ldlite_system.jis_null(ld_value) ) SELECT - prop - ,STRING_AGG(DISTINCT json_type, '|') AS json_type - ,bool_and(is_array) AS is_array - ,bool_and(ldlite_system.jis_uuid(ld_value)) AS is_uuid - ,bool_and(ldlite_system.jis_datetime(ld_value)) AS is_datetime - ,bool_and(ldlite_system.jis_float(ld_value)) AS is_float -FROM all_values -GROUP BY prop + (SELECT STRING_AGG(DISTINCT json_type, '|') FROM all_values) AS json_type + ,(SELECT BOOL_AND(is_array) FROM all_values) AS is_array + ,NOT EXISTS + ( + SELECT 1 FROM all_values + WHERE json_type = 'string' AND NOT ldlite_system.jis_uuid(ld_value) + ) AS is_uuid + ,NOT EXISTS + ( + SELECT 1 FROM all_values + WHERE json_type = 'string' AND NOT ldlite_system.jis_datetime(ld_value) + ) AS is_datetime + ,NOT EXISTS + ( + SELECT 1 FROM all_values + WHERE json_type = 'number' AND NOT ldlite_system.jis_float(ld_value) + ) AS is_float """, + ) + .format( + table=source_table, + json_col=self.identifier, + sample=sql.Literal(min(100, floor((100000 / total) * 100))), + ) + .as_string(), + (prop,), ) - .format(table=source_table, json_col=self.identifier) - .as_string(), - ) - - create_columns.extend( - [ - row.select_column(self.identifier, self.add(row)) - for row in [Metadata(*r) for r in cur.fetchall()] - ], - ) - - with conn.cursor() as cur: + if (row := cur.fetchone()) is not None and all( + c is not None for c in row + ): + meta = Metadata(prop, *row) + create_columns.append( + meta.select_column(self.identifier, self.add(meta)), + ) + if meta.is_object: + ctx.scan_progress.total += 1 + ctx.scan_progress.refresh() + + ctx.scan_progress.update(1) + + with ctx.conn.cursor() as cur: cur.execute( sql.SQL( """ @@ -210,22 +271,26 @@ def unnest( + """ SELECT {cols} -FROM ld_source; +FROM ld_source +WHERE NOT ldlite_system.jis_null({json_col}) """, ) .format( source_table=source_table, dest_table=dest_table, + json_col=self.identifier, cols=sql.SQL("\n ,").join(create_columns), ) .as_string(), ) + return True + def _carryover(self) -> Iterator[str]: for n in self.root.descendents: - if isinstance(n, ObjectNode) and not n.unnested: + if isinstance(n, ObjectNode) and not n.unnested and n.name != "jsonb": yield n.name - else: + if isinstance(n, ArrayNode): yield n.name yield from n.values @@ -264,7 +329,7 @@ def explode( o_col = self.name + "__o" create_columns: list[sql.Composable] = [ sql.SQL( - "(ROW_NUMBER() OVER (ORDER BY (SELECT NULL)))::integer AS __id" + "(ROW_NUMBER() OVER (ORDER BY (SELECT NULL)))::integer AS __id", ), *[sql.Identifier(v) for v in self.carryover], sql.SQL( @@ -306,7 +371,7 @@ def explode( def _carryover(self) -> Iterator[str]: for n in reversed(self.parents): - yield from [v for v in n.values if v != "__id"] + yield from [v for v in n.values if v not in ("__id", "jsonb")] @property def carryover(self) -> list[str]: diff --git a/src/ldlite/database/_postgres.py b/src/ldlite/database/_postgres.py index a9b4b81..2b51f9c 100644 --- a/src/ldlite/database/_postgres.py +++ b/src/ldlite/database/_postgres.py @@ -41,59 +41,7 @@ def _setup_jfuncs(conn: psycopg.Connection) -> None: IMMUTABLE PARALLEL SAFE; -CREATE OR REPLACE FUNCTION ldlite_system.jextract(j JSONB, p TEXT) RETURNS JSONB AS $$ -WITH jp AS ( - -- This is somewhat of a hack. - -- There isn't a really good way to get the element unchanged - -- which works for duckdb and postgres AND CRUCIALLY - -- has a similar syntax to everything else so that we don't - -- have to have special cases for exploding the array and it - -- can share all the same type checking / statement generation code. - -- We're pretending that postgres supports -> '$' style syntax like duckdb. - SELECT - CASE - WHEN p = '$' THEN j #> '{}' - ELSE j->p - END AS val - ,CASE - WHEN p = '$' THEN j #>> '{}' - ELSE j->>p - END AS str -) -SELECT - CASE - WHEN ldlite_system.jtype_of(jp.val) = 'string' THEN - CASE - WHEN lower(jp.str) = 'null' THEN 'null'::JSONB - WHEN length(jp.str) = 0 THEN 'null'::JSONB - ELSE jp.val - END - WHEN ldlite_system.jtype_of(jp.val) = 'array' THEN - CASE - WHEN jsonb_array_length(jsonb_path_query_array(jp.val, '$[*] ? (@ != null)')) = 0 THEN 'null'::JSONB - ELSE jsonb_path_query_array(jp.val, '$[*] ? (@ != null)') - END - WHEN ldlite_system.jtype_of(jp.val) = 'object' THEN - CASE - WHEN jp.str = '{}' THEN 'null'::JSONB - ELSE jp.val - END - ELSE jp.val - END -FROM jp; -$$ -LANGUAGE sql -IMMUTABLE -PARALLEL SAFE; - -CREATE OR REPLACE FUNCTION ldlite_system.jextract_string(j JSONB, p TEXT) RETURNS TEXT AS $$ -SELECT ldlite_system.jextract(j, p) #>> '{}' -$$ -LANGUAGE sql -IMMUTABLE -PARALLEL SAFE; - -CREATE OR REPLACE FUNCTION ldlite_system.jobject_keys(j JSONB) RETURNS SETOF TEXT AS $$ +CREATE OR REPLACE FUNCTION ldlite_system.jobject_keys(j JSONB) RETURNS TABLE (ld_key TEXT) AS $$ SELECT jsonb_object_keys(j); $$ LANGUAGE sql @@ -101,40 +49,31 @@ def _setup_jfuncs(conn: psycopg.Connection) -> None: PARALLEL SAFE; CREATE OR REPLACE FUNCTION ldlite_system.jis_uuid(j JSONB) RETURNS BOOLEAN AS $$ -SELECT - CASE - WHEN ldlite_system.jtype_of(j) = 'string' THEN j->>0 ~ '^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[1-5][a-fA-F0-9]{3}-[89abAB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}$' - ELSE FALSE - END; +SELECT j::text ~* '^"[a-f0-9]{8}-[a-f0-9]{4}-[1-5][a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}"$'; $$ LANGUAGE sql IMMUTABLE -PARALLEL SAFE; +PARALLEL SAFE +STRICT; CREATE OR REPLACE FUNCTION ldlite_system.jis_datetime(j JSONB) RETURNS BOOLEAN AS $$ -SELECT - CASE - WHEN ldlite_system.jtype_of(j) = 'string' THEN j->>0 ~ '^\d{4}-[01]\d-[0123]\dT[012]\d:[012345]\d:[012345]\d\.\d{3}(\+\d{2}:\d{2})?$' - ELSE FALSE - END; +SELECT j::text ~ '^"\d{4}-[01]\d-[0123]\dT[012]\d:[012345]\d:[012345]\d\.\d{3}(\+\d{2}:\d{2})?"$' $$ LANGUAGE sql IMMUTABLE -PARALLEL SAFE; +PARALLEL SAFE +STRICT; CREATE OR REPLACE FUNCTION ldlite_system.jis_float(j JSONB) RETURNS BOOLEAN AS $$ -SELECT - CASE - WHEN ldlite_system.jtype_of(j) = 'number' THEN j->>0 LIKE '%.%' - ELSE FALSE - END; +SELECT SCALE((j)::numeric) > 0 $$ LANGUAGE sql IMMUTABLE -PARALLEL SAFE; +PARALLEL SAFE +STRICT; CREATE OR REPLACE FUNCTION ldlite_system.jis_null(j JSONB) RETURNS BOOLEAN AS $$ -SELECT j IS NULL OR j = 'null'::JSONB; +SELECT j IS NULL OR j = 'null'::jsonb OR j #>> '{}' IN ('NULL', 'null', '', '{}', '[]') $$ LANGUAGE sql IMMUTABLE @@ -146,6 +85,14 @@ def _setup_jfuncs(conn: psycopg.Connection) -> None: LANGUAGE sql IMMUTABLE PARALLEL SAFE; + + +CREATE OR REPLACE FUNCTION ldlite_system.jself_string(j JSONB) RETURNS TEXT AS $$ +SELECT j #>> '{}' +$$ +LANGUAGE sql +IMMUTABLE +PARALLEL SAFE; """, # noqa: E501 ) diff --git a/src/ldlite/database/_typed_database.py b/src/ldlite/database/_typed_database.py index eb4a1f3..1303931 100644 --- a/src/ldlite/database/_typed_database.py +++ b/src/ldlite/database/_typed_database.py @@ -9,16 +9,18 @@ import psycopg from psycopg import sql +from tqdm import tqdm from . import Database, LoadHistory -from ._expansion import ExpandContext, expand_nonmarc +from ._expansion import expand_nonmarc +from ._expansion.context import ExpandContext from ._prefix import Prefix if TYPE_CHECKING: from collections.abc import Callable, Sequence + from typing import NoReturn import duckdb - from tqdm import tqdm DB = TypeVar("DB", bound="duckdb.DuckDBPyConnection | psycopg.Connection") @@ -134,7 +136,7 @@ def _drop_extracted_tables( for (et,) in tables: cur.execute( sql.SQL("DROP TABLE IF EXISTS {table};") - .format(table=sql.Identifier(cast("str", et))) + .format(table=sql.Identifier(*cast("str", et).split("."))) .as_string(), ) cur.execute( @@ -184,7 +186,14 @@ def preprocess_source_table( @abstractmethod def source_table_cte_stmt(self, keep_source: bool) -> str: ... - def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[str]: + def expand_prefix( + self, + prefix: str, + json_depth: int, + keep_raw: bool, + scan_progress: tqdm[NoReturn] | None = None, + transform_progress: tqdm[NoReturn] | None = None, + ) -> list[str]: pfx = Prefix(prefix) with closing(self._conn_factory()) as conn: self._drop_extracted_tables(conn, pfx) @@ -224,6 +233,10 @@ def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[st pfx.output_table, self.preprocess_source_table, # type: ignore [arg-type] self.source_table_cte_stmt, + scan_progress if scan_progress is not None else tqdm(disable=True), + transform_progress + if transform_progress is not None + else tqdm(disable=True), ), ) @@ -239,14 +252,15 @@ def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[st .format(catalog_table=pfx.catalog_table.id) .as_string(), ) - cur.executemany( - sql.SQL("INSERT INTO {catalog_table} VALUES ($1)") - .format( - catalog_table=pfx.catalog_table.id, + if len(created_tables) > 0: + cur.executemany( + sql.SQL("INSERT INTO {catalog_table} VALUES ($1)") + .format( + catalog_table=pfx.catalog_table.id, + ) + .as_string(), + [(pfx.catalog_table_row(t),) for t in created_tables], ) - .as_string(), - [(pfx.catalog_table_row(t),) for t in created_tables], - ) conn.commit() @@ -272,10 +286,10 @@ def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> N cur.execute( sql.SQL( r""" -SELECT TABLE_NAME, COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS +SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = $1 AND - TABLE_NAME IN (SELECT TABLE_NAME FROM {catalog}) AND + TABLE_NAME IN (SELECT SPLIT_PART(TABLE_NAME, '.', -1) FROM {catalog}) AND ( DATA_TYPE IN ('UUID', 'uuid') OR COLUMN_NAME = 'id' OR @@ -299,8 +313,8 @@ def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> N sql.SQL("CREATE INDEX {name} ON {table} ({column});") .format( name=sql.Identifier(str(uuid4()).split("-")[0]), - table=sql.Identifier(*index[0].split(".")), - column=sql.Identifier(index[1]), + table=sql.Identifier(index[0], index[1]), + column=sql.Identifier(index[2]), ) .as_string(), ) diff --git a/tests/test_json_operators.py b/tests/test_json_operators.py index d89cf44..8b1f8c8 100644 --- a/tests/test_json_operators.py +++ b/tests/test_json_operators.py @@ -27,95 +27,16 @@ class JsonTC: assertion_params: tuple[Any, ...] -@parametrize( - p=[ - ("str", '"str_val"'), - ("str_empty", "null"), - ("num", "12"), - ("float", "16.3"), - ("bool", "true"), - ("obj", '{"k1":"v1","k2":"v2"}'), - ("obj_some", '{"k1":"v1","k2":null}'), - ("obj_empty", "null"), - ("arr_zero", "null"), - ("na", "null"), - ("na_str1", "null"), - ("na_str2", "null"), - ], -) -def case_jextract(p: tuple[Any, ...]) -> JsonTC: - return JsonTC( - """SELECT ldlite_system.jextract(jc, $1){assertion} FROM j;""", - p[:1], - """= $2::{jtype}""", - p[1:], - ) - - -# Duckdb through 1.3 and 1.4 have different json comparison behavior here -# Whitespace matters in 1.4 and not 1.3 -# This makes the arrays text and compares the values as a workaround -@parametrize( - p=[ - ("arr_str", '["s1", "s2", "s3"]'), - ("arr_str_some", '["s1", "s2"]'), - ("arr_obj_some", '[{"k1":"v1"}]'), - ], -) -def case_jextract_duckdb(p: tuple[Any, ...]) -> JsonTC: - return JsonTC( - """SELECT ldlite_system.jextract(jc, $1){assertion} FROM j;""", - p[:1], - """::text[] = $2::JSON::text[]""", - p[1:], - ) - - -# The differences betweeen postgres/duckdb here only matters for tests -# This can all be rectified when duckdb 1.4 is the minimum version -@parametrize( - p=[ - ("arr_str", '["s1", "s2", "s3"]'), - ("arr_str_some", '["s1", "s2"]'), - ("arr_obj_some", '[{"k1":"v1"}]'), - ], -) -def case_jextract_postgres(p: tuple[Any, ...]) -> JsonTC: - return JsonTC( - """SELECT ldlite_system.jextract(jc, $1){assertion} FROM j;""", - p[:1], - """ = $2::JSONB""", - p[1:], - ) - - -@parametrize( - p=[ - ("str", "str_val"), - ("num", "12"), - ("float", "16.3"), - ("bool", "true"), - ("na",), - ("na_str1",), - ("na_str2",), - ], -) -def case_jextract_string(p: tuple[Any, ...]) -> JsonTC: - return JsonTC( - """SELECT ldlite_system.jextract_string(jc, $1){assertion} FROM j;""", - p[:1], - """ = $2""" if len(p) == 2 else """ IS NULL""", - p[1:], - ) - - def case_jobject_keys() -> JsonTC: return JsonTC( """ {assertion} (SELECT e.jkey, a.jkey FROM (SELECT 'k1' jkey UNION SELECT 'k2' jkey) as e -FULL OUTER JOIN (SELECT ldlite_system.jobject_keys(jc->'obj') jkey FROM j) as a +FULL OUTER JOIN ( + SELECT k.ld_key as jkey + FROM j, ldlite_system.jobject_keys(j.jc->'obj') k +) as a USING (jkey) WHERE e.jkey IS NULL or a.jkey IS NULL) as q;""", (), @@ -177,7 +98,6 @@ def case_jexplode(p: tuple[Any, ...]) -> JsonTC: ("str", False), ("str_empty", False), ("num", False), - ("na", False), ("na_str1", False), ("na_str2", False), ("uuid_nof", False), @@ -198,10 +118,10 @@ def case_jis_uuid(p: tuple[Any, ...]) -> JsonTC: @parametrize( p=[ ("na", True), - ("obj_empty", False), - ("arr_zero", False), - ("na_str1", False), - ("na_str2", False), + ("obj_empty", True), + ("arr_zero", True), + ("na_str1", True), + ("na_str2", True), ], ) def case_jis_null(p: tuple[Any, ...]) -> JsonTC: @@ -218,7 +138,6 @@ def case_jis_null(p: tuple[Any, ...]) -> JsonTC: ("str", False), ("str_empty", False), ("num", False), - ("na", False), ("na_str1", False), ("na_str2", False), ("uuid_nof", False), @@ -239,15 +158,6 @@ def case_jis_datetime(p: tuple[Any, ...]) -> JsonTC: @parametrize( p=[ - ("str", False), - ("str_empty", False), - ("num", False), - ("na", False), - ("na_str1", False), - ("na_str2", False), - ("uuid_nof", False), - ("uuid", False), - ("dt", False), ("num", False), ("float", True), ], diff --git a/tests/test_query.py b/tests/test_query.py index 41ad44e..8993e7d 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -31,7 +31,7 @@ class QueryTC(MockedResponseTestCase): def case_one_table(json_depth: int) -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=json_depth, returns={ "purchaseOrders": [ @@ -48,7 +48,7 @@ def case_one_table(json_depth: int) -> QueryTC: ["id", "value"], [("b096504a-3d54-4664-9bf5-1b872466fd66", "value")], ), - "prefix__tcatalog": (["table_name"], [("prefix__t",)]), + "prefix__tcatalog": (["table_name"], [("tests.prefix__t",)]), }, expected_indexes=[ ("prefix__t", "id"), @@ -60,7 +60,7 @@ def case_one_table(json_depth: int) -> QueryTC: def case_two_tables(json_depth: int) -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=json_depth, returns={ "purchaseOrders": [ @@ -109,7 +109,7 @@ def case_two_tables(json_depth: int) -> QueryTC: ), "prefix__tcatalog": ( ["table_name"], - [("prefix__t",), ("prefix__t__sub_objects",)], + [("tests.prefix__t",), ("tests.prefix__t__sub_objects",)], ), }, expected_indexes=[ @@ -124,7 +124,7 @@ def case_two_tables(json_depth: int) -> QueryTC: def case_table_no_expansion(json_depth: int) -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=json_depth, returns={ "purchaseOrders": [ @@ -149,7 +149,7 @@ def case_table_no_expansion(json_depth: int) -> QueryTC: def case_table_underexpansion() -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=2, returns={ "purchaseOrders": [ @@ -194,7 +194,7 @@ def case_table_underexpansion() -> QueryTC: ), "prefix__tcatalog": ( ["table_name"], - [("prefix__t",), ("prefix__t__sub_objects",)], + [("tests.prefix__t",), ("tests.prefix__t__sub_objects",)], ), }, ) @@ -204,7 +204,7 @@ def case_table_underexpansion() -> QueryTC: def case_three_tables(json_depth: int) -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=json_depth, returns={ "purchaseOrders": [ @@ -254,9 +254,9 @@ def case_three_tables(json_depth: int) -> QueryTC: "prefix__tcatalog": ( ["table_name"], [ - ("prefix__t",), - ("prefix__t__sub_objects",), - ("prefix__t__sub_objects__sub_sub_objects",), + ("tests.prefix__t",), + ("tests.prefix__t__sub_objects",), + ("tests.prefix__t__sub_objects__sub_sub_objects",), ], ), }, @@ -277,7 +277,7 @@ def case_three_tables(json_depth: int) -> QueryTC: def case_nested_object() -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=2, returns={ "purchaseOrders": [ @@ -307,7 +307,7 @@ def case_nested_object() -> QueryTC: ), "prefix__tcatalog": ( ["table_name"], - [("prefix__t",)], + [("tests.prefix__t",)], ), }, expected_indexes=[ @@ -320,7 +320,7 @@ def case_nested_object() -> QueryTC: def case_doubly_nested_object() -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=3, returns={ "purchaseOrders": [ @@ -361,7 +361,7 @@ def case_doubly_nested_object() -> QueryTC: ), "prefix__tcatalog": ( ["table_name"], - [("prefix__t",)], + [("tests.prefix__t",)], ), }, expected_indexes=[ @@ -375,7 +375,7 @@ def case_doubly_nested_object() -> QueryTC: def case_nested_object_underexpansion() -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=1, returns={ "purchaseOrders": [ @@ -405,7 +405,7 @@ def case_nested_object_underexpansion() -> QueryTC: ), "prefix__tcatalog": ( ["table_name"], - [("prefix__t",)], + [("tests.prefix__t",)], ), }, ) @@ -414,7 +414,7 @@ def case_nested_object_underexpansion() -> QueryTC: def case_tables_and_lists() -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=3, returns={ "purchaseOrders": [ @@ -502,9 +502,9 @@ def case_tables_and_lists() -> QueryTC: "prefix__tcatalog": ( ["table_name"], [ - ("prefix__t",), - ("prefix__t__sub_object__sub_objects",), - ("prefix__t__sub_objects",), + ("tests.prefix__t",), + ("tests.prefix__t__sub_object__sub_objects",), + ("tests.prefix__t__sub_objects",), ], ), }, @@ -518,7 +518,7 @@ def case_tables_and_lists() -> QueryTC: def case_id_generation() -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=4, returns={ "purchaseOrders": [ @@ -591,7 +591,7 @@ def case_id_generation() -> QueryTC: def case_indexing_id_like() -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=4, returns={ "purchaseOrders": [ @@ -623,7 +623,7 @@ def case_indexing_id_like() -> QueryTC: def case_drop_raw(json_depth: int) -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=json_depth, keep_raw=False, returns={ @@ -641,7 +641,7 @@ def case_drop_raw(json_depth: int) -> QueryTC: ["id", "value"], [("b096504a-3d54-4664-9bf5-1b872466fd66", "value")], ), - "prefix__tcatalog": (["table_name"], [("prefix__t",)]), + "prefix__tcatalog": (["table_name"], [("tests.prefix__t",)]), }, expected_indexes=[ ("prefix__t", "id"), @@ -654,7 +654,7 @@ def case_drop_raw(json_depth: int) -> QueryTC: def case_null_records() -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=1, returns={ "purchaseOrders": [ @@ -677,7 +677,7 @@ def case_null_records() -> QueryTC: def case_erm_keys() -> QueryTC: return QueryTC( Call( - "prefix", + "tests.prefix", json_depth=3, returns={ "pageSize": 30, @@ -699,7 +699,7 @@ def case_erm_keys() -> QueryTC: ["id", "value"], [("b096504a-3d54-4664-9bf5-1b872466fd66", "value")], ), - "prefix__tcatalog": (["table_name"], [("prefix__t",)]), + "prefix__tcatalog": (["table_name"], [("tests.prefix__t",)]), }, expected_indexes=[ ("prefix__t", "id"), @@ -732,7 +732,6 @@ def _act(uut: "ldlite.LDLite", tc: QueryTC) -> None: def _assert( conn: "dbapi.DBAPIConnection", - res_schema: str, # TODO: have schema be part of tc tc: QueryTC, ) -> None: with closing(conn.cursor()) as cur: @@ -740,9 +739,8 @@ def _assert( """ SELECT table_name FROM information_schema.tables - WHERE table_schema=$1 + WHERE table_schema='tests' """, - (res_schema,), ) assert sorted([r[0] for r in cur.fetchall()]) == sorted(tc.expected_tables) @@ -758,7 +756,7 @@ def _assert( for c in cols ], ), - table=sql.Identifier(table), + table=sql.Identifier("tests", table), ) .as_string(), ) @@ -820,7 +818,7 @@ def test_duckdb( _act(uut, tc) with duckdb.connect(dsn) as conn: - _assert(cast("dbapi.DBAPIConnection", conn), "main", tc) + _assert(cast("dbapi.DBAPIConnection", conn), tc) @mock.patch("httpx_folio.auth.httpx.post") @@ -842,4 +840,4 @@ def test_postgres( _act(uut, tc) with psycopg.connect(dsn, cursor_factory=psycopg.RawCursor) as conn: - _assert(cast("dbapi.DBAPIConnection", conn), "public", tc) + _assert(cast("dbapi.DBAPIConnection", conn), tc)