Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ dependencies:
- python>=3.10,<3.11
- pdm==2.26.6
- precious==0.10.2
- libpq>=13.0
- libpq>=14.0
33 changes: 31 additions & 2 deletions src/ldlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
tqdm(
records,
desc="downloading",
leave=False,
total=total_records,
mininterval=5,
disable=self._quiet,
Expand All @@ -350,12 +351,40 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915

transform_started = datetime.now(timezone.utc)
if not use_legacy_transform:
newtables = self._database.expand_prefix(table, json_depth, keep_raw)
no_iters_format = (
"{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]"
)
with (
tqdm(
desc="scanning",
leave=False,
disable=self._quiet,
bar_format=no_iters_format,
) as scan_progress,
tqdm(
desc="transforming",
leave=False,
disable=self._quiet,
bar_format=no_iters_format,
) as transform_progress,
):
newtables = self._database.expand_prefix(
table,
json_depth,
keep_raw,
scan_progress,
transform_progress,
)
if keep_raw:
newtables = [table, *newtables]
transform_elapsed = datetime.now(timezone.utc) - transform_started

with tqdm(desc="indexing", disable=self._quiet) as progress:
with tqdm(
desc="indexing",
leave=False,
disable=self._quiet,
bar_format=no_iters_format,
) as progress:
index_started = datetime.now(timezone.utc)
self._database.index_prefix(table, progress)

Expand Down
9 changes: 8 additions & 1 deletion src/ldlite/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ def ingest_records(self, prefix: str, records: Iterator[bytes]) -> int:
"""Ingests a stream of records dowloaded from FOLIO to the raw table."""

@abstractmethod
def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[str]:
def expand_prefix(
self,
prefix: str,
json_depth: int,
keep_raw: bool,
scan_progress: tqdm[NoReturn] | None = None,
transform_progress: tqdm[NoReturn] | None = None,
) -> list[str]:
"""Unnests and explodes the raw data at the given prefix."""

@abstractmethod
Expand Down
47 changes: 9 additions & 38 deletions src/ldlite/database/_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,62 +41,33 @@ def _setup_jfuncs(conn: duckdb.DuckDBPyConnection) -> None:
END
;

CREATE OR REPLACE FUNCTION ldlite_system.jextract(j, p) AS
CASE ldlite_system.jtype_of(main.json_extract(j, p))
WHEN 'string' THEN
CASE
WHEN lower(main.json_extract_string(j, p)) = 'null' THEN 'null'::JSON
WHEN length(main.json_extract_string(j, p)) = 0 THEN 'null'::JSON
ELSE main.json_extract(j, p)
END
WHEN 'object' THEN
CASE
WHEN main.json_extract_string(j, p) = '{}' THEN 'null'::JSON
ELSE main.json_extract(j, p)
END
WHEN 'array' THEN
CASE
WHEN length(list_filter((main.json_extract(j, p))::JSON[], lambda x : x != 'null'::JSON)) = 0 THEN 'null'::JSON
ELSE list_filter((main.json_extract(j, p))::JSON[], lambda x : x != 'null'::JSON)
END
ELSE coalesce(main.json_extract(j, p), 'null'::JSON)
END
;

CREATE OR REPLACE FUNCTION ldlite_system.jextract_string(j, p) AS
main.json_extract_string(ldlite_system.jextract(j, p), '$')
;

CREATE OR REPLACE FUNCTION ldlite_system.jobject_keys(j) AS
unnest(main.json_keys(j))
CREATE OR REPLACE FUNCTION ldlite_system.jobject_keys(j) AS TABLE
SELECT je.key as ld_key FROM json_each(j) je ORDER BY je.id
;

CREATE OR REPLACE FUNCTION ldlite_system.jis_uuid(j) AS
CASE ldlite_system.jtype_of(j)
WHEN 'string' THEN regexp_full_match(main.json_extract_string(j, '$'), '^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[1-5][a-fA-F0-9]{3}-[89abAB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}$')
ELSE FALSE
END
regexp_full_match(main.json_extract_string(j, '$'), '(?i)^[a-f0-9]{8}-[a-f0-9]{4}-[1-5][a-f0-9]{3}-[89abAB][a-f0-9]{3}-[a-f0-9]{12}$')
;

CREATE OR REPLACE FUNCTION ldlite_system.jis_datetime(j) AS
CASE ldlite_system.jtype_of(j)
WHEN 'string' THEN regexp_full_match(main.json_extract_string(j, '$'), '^\d{4}-[01]\d-[0123]\dT[012]\d:[012345]\d:[012345]\d\.\d{3}(\+\d{2}:\d{2})?$')
ELSE FALSE
END
regexp_full_match(main.json_extract_string(j, '$'), '^\d{4}-[01]\d-[0123]\dT[012]\d:[012345]\d:[012345]\d\.\d{3}(\+\d{2}:\d{2})?$')
;

CREATE OR REPLACE FUNCTION ldlite_system.jis_float(j) AS
coalesce(main.json_type(j), 'NULL') == 'DOUBLE'
main.json_type(j) == 'DOUBLE'
;

CREATE OR REPLACE FUNCTION ldlite_system.jis_null(j) AS
j is NULL or j == 'null'::JSON
j IS NULL OR j == 'null'::JSON OR main.json_extract_string(j, '$') IN ('NULL', 'null', '', '{}', '[]')
;

CREATE OR REPLACE FUNCTION ldlite_system.jexplode(j) AS TABLE (
SELECT value as ld_value FROM main.json_each(j)
);

CREATE OR REPLACE FUNCTION ldlite_system.jself_string(j) AS
main.json_extract_string(j, '$')
;
""", # noqa: E501
)

Expand Down
62 changes: 18 additions & 44 deletions src/ldlite/database/_expansion/__init__.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,14 @@
from __future__ import annotations

from collections import deque
from dataclasses import dataclass
from typing import TYPE_CHECKING

from psycopg import sql

from .nodes import ArrayNode, ObjectNode

if TYPE_CHECKING:
from collections.abc import Callable

import duckdb
import psycopg


@dataclass
class ExpandContext:
conn: duckdb.DuckDBPyConnection | psycopg.Connection
source_table: sql.Identifier
json_depth: int
get_transform_table: Callable[[int], sql.Identifier]
get_output_table: Callable[[str], tuple[str, sql.Identifier]]
# This is necessary for Analyzing the table in pg before querying it
# I don't love how this is implemented
preprocess: Callable[
[
duckdb.DuckDBPyConnection | psycopg.Connection,
sql.Identifier,
list[sql.Identifier],
],
None,
]
# source_cte will go away when DuckDB implements CTAS RETURNING
source_cte: Callable[[bool], str]

def array_context(
self,
new_source_table: sql.Identifier,
new_json_depth: int,
) -> ExpandContext:
return ExpandContext(
self.conn,
new_source_table,
new_json_depth,
self.get_transform_table,
self.get_output_table,
self.preprocess,
self.source_cte,
)
from .context import ExpandContext


def expand_nonmarc(
Expand All @@ -69,37 +29,50 @@ def _expand_nonmarc(
count: int,
ctx: ExpandContext,
) -> tuple[int, list[str]]:
ctx.scan_progress.total = (ctx.scan_progress.total or 0) + 1
ctx.scan_progress.refresh()
ctx.transform_progress.total = (ctx.transform_progress.total or 0) + 1
ctx.transform_progress.refresh()
initial_count = count
ctx.preprocess(ctx.conn, ctx.source_table, [root.identifier])
root.unnest(
ctx.conn,
has_rows = root.unnest(
ctx,
ctx.source_table,
ctx.get_transform_table(count),
ctx.source_cte(False),
)
ctx.transform_progress.update(1)
if not has_rows:
return (0, [])

expand_children_of = deque([root])
while expand_children_of:
on = expand_children_of.popleft()
if ctx.transform_progress:
ctx.transform_progress.total += len(on.object_children)
ctx.transform_progress.refresh()
for c in on.object_children:
if len(c.parents) >= ctx.json_depth:
if c.parent is not None:
c.parent.values.append(c.name)
continue
ctx.preprocess(ctx.conn, ctx.get_transform_table(count), [c.identifier])
c.unnest(
ctx.conn,
ctx,
ctx.get_transform_table(count),
ctx.get_transform_table(count + 1),
ctx.source_cte(False),
)
expand_children_of.append(c)
count += 1
ctx.transform_progress.update(1)

created_tables = []

new_source_table = ctx.get_transform_table(count)
arrays = root.descendents_oftype(ArrayNode)
ctx.transform_progress.total += len(arrays)
ctx.transform_progress.refresh()
ctx.preprocess(ctx.conn, new_source_table, [a.identifier for a in arrays])
for an in arrays:
if len(an.parents) >= ctx.json_depth:
Expand All @@ -111,6 +84,7 @@ def _expand_nonmarc(
ctx.source_cte(True),
)
count += 1
ctx.transform_progress.update(1)

if an.meta.is_object:
(sub_index, array_tables) = _expand_nonmarc(
Expand Down
52 changes: 52 additions & 0 deletions src/ldlite/database/_expansion/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, NoReturn

if TYPE_CHECKING:
from collections.abc import Callable

import duckdb
import psycopg
from psycopg import sql
from tqdm import tqdm


@dataclass
class ExpandContext:
conn: duckdb.DuckDBPyConnection | psycopg.Connection
source_table: sql.Identifier
json_depth: int
get_transform_table: Callable[[int], sql.Identifier]
get_output_table: Callable[[str], tuple[str, sql.Identifier]]
# This is necessary for Analyzing the table in pg before querying it
# I don't love how this is implemented
preprocess: Callable[
[
duckdb.DuckDBPyConnection | psycopg.Connection,
sql.Identifier,
list[sql.Identifier],
],
None,
]
# source_cte will go away when DuckDB implements CTAS RETURNING
source_cte: Callable[[bool], str]
scan_progress: tqdm[NoReturn]
transform_progress: tqdm[NoReturn]

def array_context(
self,
new_source_table: sql.Identifier,
new_json_depth: int,
) -> ExpandContext:
return ExpandContext(
self.conn,
new_source_table,
new_json_depth,
self.get_transform_table,
self.get_output_table,
self.preprocess,
self.source_cte,
self.scan_progress,
self.transform_progress,
)
30 changes: 24 additions & 6 deletions src/ldlite/database/_expansion/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,44 @@ def select_column(
json_col: sql.Identifier,
alias: str,
) -> sql.Composed:
# '$' is a special character that means the root of the json
# I couldn't figure out how to make the array expansion work
# without it
if self.is_array or self.is_object:
stmt = sql.SQL(
"(ldlite_system.jextract({json_col}, {prop})) AS {alias}",
"{json_col}->{prop} AS {alias}"
if self.prop != "$"
else "{json_col} AS {alias}",
)
elif self.json_type == "number":
stmt = sql.SQL(
"(ldlite_system.jextract_string({json_col}, {prop}))"
"::numeric AS {alias}",
"({json_col}->>{prop})::numeric AS {alias}"
if self.prop != "$"
else "ldlite_system.jself_string({json_col})::numeric AS {alias}",
)
elif self.json_type == "boolean":
stmt = sql.SQL(
"(ldlite_system.jextract_string({json_col}, {prop}))::bool AS {alias}",
"NULLIF(NULLIF({json_col}->>{prop}, ''), 'null')::bool AS {alias}"
if self.prop != "$"
else "NULLIF(NULLIF("
"ldlite_system.jself_string({json_col})"
", ''), 'null')::bool AS {alias}",
)
elif self.json_type == "string" and self.is_uuid:
stmt = sql.SQL(
"(ldlite_system.jextract_string({json_col}, {prop}))::uuid AS {alias}",
"NULLIF(NULLIF({json_col}->>{prop}, ''), 'null')::uuid AS {alias}"
if self.prop != "$"
else "NULLIF(NULLIF("
"ldlite_system.jself_string({json_col})"
", ''), 'null')::uuid AS {alias}",
)
else:
stmt = sql.SQL(
"(ldlite_system.jextract_string({json_col}, {prop})) AS {alias}",
"NULLIF(NULLIF({json_col}->>{prop}, ''), 'null') AS {alias}"
if self.prop != "$"
else "NULLIF(NULLIF("
"ldlite_system.jself_string({json_col})"
", ''), 'null') AS {alias}",
)

return stmt.format(
Expand Down
Loading