Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 83 additions & 69 deletions src/ldlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
limit: int | None = None,
transform: bool | None = None,
keep_raw: bool = True,
use_legacy_transform: bool = False,
) -> list[str]:
"""Submits a query to a FOLIO module, and transforms and stores the result.

Expand Down Expand Up @@ -279,6 +280,9 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
If *keep_raw* is set to False, then the raw table of
__id, json will be dropped saving an estimated 20% disk space.

*use_legacy_transform* will use the pre 4.0 transformation logic.
This parameter is deprecated and will not function in a future release.

The *transform* parameter is no longer supported and will be
removed in the future. Instead, specify *json_depth* as 0 to
disable JSON transformation.
Expand Down Expand Up @@ -307,70 +311,88 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
start = datetime.now(timezone.utc)
if not self._quiet:
print("ldlite: querying: " + path, file=sys.stderr)
try:
(total_records, records) = self._folio.iterate_records(
path,
self._okapi_timeout,
self._okapi_max_retries,
self.page_size,
query=cast("QueryType", query),
)
if limit is not None:
total_records = min(total_records, limit)
records = (x for _, x in zip(range(limit), records, strict=False))
if self._verbose:
print(
"ldlite: estimated row count: " + str(total_records),
file=sys.stderr,
)

download_started = datetime.now(timezone.utc)
processed = self._database.ingest_records(
table,
cast(
"Iterator[bytes]",
tqdm(
records,
desc="downloading",
total=total_records,
leave=False,
mininterval=5,
disable=self._quiet,
unit=table.split(".")[-1],
unit_scale=True,
delay=5,
),
),
(total_records, records) = self._folio.iterate_records(
path,
self._okapi_timeout,
self._okapi_max_retries,
self.page_size,
query=cast("QueryType", query),
)
if limit is not None:
total_records = min(total_records, limit)
records = (x for _, x in zip(range(limit), records, strict=False))
if self._verbose:
print(
"ldlite: estimated row count: " + str(total_records),
file=sys.stderr,
)
download = datetime.now(timezone.utc)
download_elapsed = datetime.now(timezone.utc) - download_started

transform_started = datetime.now(timezone.utc)
self._database.drop_extracted_tables(table)
newtables = [table]
newattrs = {}
if json_depth > 0:
autocommit(self.db, self.dbtype, False)
(jsontables, jsonattrs) = transform_json(
self.db,
self.dbtype,
table,
processed,
self._quiet,
json_depth,
)
newtables += jsontables
newattrs = jsonattrs
for t in newattrs:
newattrs[t]["__id"] = Attr("__id", "bigint")
newattrs[table] = {"__id": Attr("__id", "bigint")}

if not keep_raw:
self._database.drop_raw_table(table)

transform_elapsed = datetime.now(timezone.utc) - transform_started
finally:
autocommit(self.db, self.dbtype, True)
download_started = datetime.now(timezone.utc)
processed = self._database.ingest_records(
table,
cast(
"Iterator[bytes]",
tqdm(
records,
desc="downloading",
total=total_records,
leave=False,
mininterval=5,
disable=self._quiet,
unit=table.split(".")[-1],
unit_scale=True,
delay=5,
),
),
)
download = datetime.now(timezone.utc)
download_elapsed = datetime.now(timezone.utc) - download_started

transform_started = datetime.now(timezone.utc)
if not use_legacy_transform:
newtables = self._database.expand_prefix(table, json_depth, keep_raw)
if keep_raw:
newtables = [table, *newtables]
indexable_attrs = []

else:
try:
self._database.drop_extracted_tables(table)
newtables = [table]
newattrs = {}
if json_depth > 0:
autocommit(self.db, self.dbtype, False)
(jsontables, jsonattrs) = transform_json(
self.db,
self.dbtype,
table,
processed,
self._quiet,
json_depth,
)
newtables += jsontables
newattrs = jsonattrs
for t in newattrs:
newattrs[t]["__id"] = Attr("__id", "bigint")
newattrs[table] = {"__id": Attr("__id", "bigint")}

if not keep_raw:
self._database.drop_raw_table(table)

indexable_attrs = [
(t, a)
for t, attrs in newattrs.items()
for n, a in attrs.items()
if n in ["__id", "id"]
or n.endswith(("_id", "__o"))
or a.datatype == "uuid"
]

finally:
autocommit(self.db, self.dbtype, True)

transform_elapsed = datetime.now(timezone.utc) - transform_started
# Create indexes on id columns (for postgres)
index_started = datetime.now(timezone.utc)
if self.dbtype == DBType.POSTGRES:
Expand All @@ -381,14 +403,6 @@ def close(self) -> None: ...

pbar: tqdm | PbarNoop = PbarNoop() # type:ignore[type-arg]

indexable_attrs = [
(t, a)
for t, attrs in newattrs.items()
for n, a in attrs.items()
if n in ["__id", "id"]
or n.endswith(("_id", "__o"))
or a.datatype == "uuid"
]
index_total = len(indexable_attrs)
if not self._quiet:
pbar = tqdm(
Expand Down
2 changes: 1 addition & 1 deletion src/ldlite/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def ingest_records(self, prefix: str, records: Iterator[bytes]) -> int:
"""Ingests a stream of records dowloaded from FOLIO to the raw table."""

@abstractmethod
def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> None:
def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[str]:
"""Unnests and explodes the raw data at the given prefix."""

@abstractmethod
Expand Down
4 changes: 2 additions & 2 deletions src/ldlite/database/_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _setup_jfuncs(conn: duckdb.DuckDBPyConnection) -> None:
;

CREATE OR REPLACE FUNCTION ldlite_system.jexplode(j) AS TABLE (
SELECT value FROM main.json_each(j)
SELECT value as ld_value FROM main.json_each(j)
);

""", # noqa: E501
Expand All @@ -120,7 +120,7 @@ def ingest_records(

insert_sql = (
sql.SQL("INSERT INTO {table} VALUES(?, ?);")
.format(table=pfx.schemafy(pfx.raw_table))
.format(table=pfx.raw_table.id)
.as_string()
)
# duckdb has better performance bulk inserting in a transaction
Expand Down
29 changes: 18 additions & 11 deletions src/ldlite/database/_expansion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ExpandContext:
source_table: sql.Identifier
json_depth: int
get_transform_table: Callable[[int], sql.Identifier]
get_output_table: Callable[[str], sql.Identifier]
get_output_table: Callable[[str], tuple[str, sql.Identifier]]
# This is necessary for Analyzing the table in pg before querying it
# I don't love how this is implemented
preprocess: Callable[
Expand Down Expand Up @@ -55,19 +55,20 @@ def expand_nonmarc(
root_name: str,
root_values: list[str],
ctx: ExpandContext,
) -> None:
_expand_nonmarc(
) -> list[str]:
(_, created_tables) = _expand_nonmarc(
ObjectNode(root_name, "", None, root_values),
0,
ctx,
)
return created_tables


def _expand_nonmarc(
root: ObjectNode,
count: int,
ctx: ExpandContext,
) -> int:
) -> tuple[int, list[str]]:
initial_count = count
ctx.preprocess(ctx.conn, ctx.source_table, [root.identifier])
root.unnest(
Expand Down Expand Up @@ -95,6 +96,8 @@ def _expand_nonmarc(
expand_children_of.append(c)
count += 1

created_tables = []

new_source_table = ctx.get_transform_table(count)
arrays = root.descendents_oftype(ArrayNode)
ctx.preprocess(ctx.conn, new_source_table, [a.identifier for a in arrays])
Expand All @@ -110,7 +113,7 @@ def _expand_nonmarc(
count += 1

if an.meta.is_object:
count += _expand_nonmarc(
(sub_index, array_tables) = _expand_nonmarc(
ObjectNode(
an.name,
an.name,
Expand All @@ -123,8 +126,12 @@ def _expand_nonmarc(
ctx.json_depth - len(an.parents),
),
)
count += sub_index
created_tables.extend(array_tables)
else:
with ctx.conn.cursor() as cur:
(tname, tid) = ctx.get_output_table(an.name)
created_tables.append(tname)
cur.execute(
sql.SQL(
"""
Expand All @@ -136,7 +143,7 @@ def _expand_nonmarc(
""",
)
.format(
dest_table=ctx.get_output_table(an.name),
dest_table=tid,
source_table=ctx.get_transform_table(count),
cols=sql.SQL("\n ,").join(
[sql.Identifier(v) for v in [*values, an.name]],
Expand All @@ -146,12 +153,12 @@ def _expand_nonmarc(
)

stamped_values = [
sql.Identifier(v)
for n in set(root.descendents).difference(arrays)
for v in n.values
sql.Identifier(v) for n in root.descendents if n not in arrays for v in n.values
]

with ctx.conn.cursor() as cur:
(tname, tid) = ctx.get_output_table(root.path)
created_tables.append(tname)
cur.execute(
sql.SQL(
"""
Expand All @@ -163,11 +170,11 @@ def _expand_nonmarc(
""",
)
.format(
dest_table=ctx.get_output_table(root.path),
dest_table=tid,
source_table=new_source_table,
cols=sql.SQL("\n ,").join(stamped_values),
)
.as_string(),
)

return count + 1 - initial_count
return (count + 1 - initial_count, created_tables)
8 changes: 7 additions & 1 deletion src/ldlite/database/_expansion/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def is_object(self) -> bool:

@property
def snake(self) -> str:
return "".join("_" + c.lower() if c.isupper() else c for c in self.prop)
return "".join("_" + c.lower() if c.isupper() else c for c in self.prop).lstrip(
"_",
)

def select_column(
self,
Expand All @@ -47,6 +49,10 @@ def select_column(
"(ldlite_system.jextract_string({json_col}, {prop}))"
"::numeric AS {alias}",
)
elif self.json_type == "boolean":
stmt = sql.SQL(
"(ldlite_system.jextract_string({json_col}, {prop}))::bool AS {alias}",
)
elif self.json_type == "string" and self.is_uuid:
stmt = sql.SQL(
"(ldlite_system.jextract_string({json_col}, {prop}))::uuid AS {alias}",
Expand Down
Loading
Loading