Skip to content

Commit 493b5fc

Browse files
authored
Switch LDLite to using the new server side transform (#66)
This switches LDLite to using the new transform. The legacy transform is still available by using a parameter passed to the query method. After making the switch and testing with both the unit tests and against Five Colleges production instance a number of regressions were found and fixed. While testing against the production instance the load_history table proved to be a thorn in my side and the v1 was added in lieu of actually using a migration framework for it. * The catalog table was never implemented * The array index column was misnamed * Booleans weren't created as boolean columns * The column named "value" used during expansion conflicted with underlying column names * The columns in the output tables were ordered in a random way * Fields that started with caps would get an underscore at the front of their column name Indexing for the new transformation will be implemented in the next PR. In testing, this implementation runs out of memory for large tables with large objects. I've experimented with fixes and there will be a follow up performance PR to address this issue.
1 parent 156e165 commit 493b5fc

15 files changed

Lines changed: 299 additions & 176 deletions

src/ldlite/__init__.py

Lines changed: 83 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
250250
limit: int | None = None,
251251
transform: bool | None = None,
252252
keep_raw: bool = True,
253+
use_legacy_transform: bool = False,
253254
) -> list[str]:
254255
"""Submits a query to a FOLIO module, and transforms and stores the result.
255256
@@ -279,6 +280,9 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
279280
If *keep_raw* is set to False, then the raw table of
280281
__id, json will be dropped saving an estimated 20% disk space.
281282
283+
*use_legacy_transform* will use the pre 4.0 transformation logic.
284+
This parameter is deprecated and will not function in a future release.
285+
282286
The *transform* parameter is no longer supported and will be
283287
removed in the future. Instead, specify *json_depth* as 0 to
284288
disable JSON transformation.
@@ -307,70 +311,88 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
307311
start = datetime.now(timezone.utc)
308312
if not self._quiet:
309313
print("ldlite: querying: " + path, file=sys.stderr)
310-
try:
311-
(total_records, records) = self._folio.iterate_records(
312-
path,
313-
self._okapi_timeout,
314-
self._okapi_max_retries,
315-
self.page_size,
316-
query=cast("QueryType", query),
317-
)
318-
if limit is not None:
319-
total_records = min(total_records, limit)
320-
records = (x for _, x in zip(range(limit), records, strict=False))
321-
if self._verbose:
322-
print(
323-
"ldlite: estimated row count: " + str(total_records),
324-
file=sys.stderr,
325-
)
326314

327-
download_started = datetime.now(timezone.utc)
328-
processed = self._database.ingest_records(
329-
table,
330-
cast(
331-
"Iterator[bytes]",
332-
tqdm(
333-
records,
334-
desc="downloading",
335-
total=total_records,
336-
leave=False,
337-
mininterval=5,
338-
disable=self._quiet,
339-
unit=table.split(".")[-1],
340-
unit_scale=True,
341-
delay=5,
342-
),
343-
),
315+
(total_records, records) = self._folio.iterate_records(
316+
path,
317+
self._okapi_timeout,
318+
self._okapi_max_retries,
319+
self.page_size,
320+
query=cast("QueryType", query),
321+
)
322+
if limit is not None:
323+
total_records = min(total_records, limit)
324+
records = (x for _, x in zip(range(limit), records, strict=False))
325+
if self._verbose:
326+
print(
327+
"ldlite: estimated row count: " + str(total_records),
328+
file=sys.stderr,
344329
)
345-
download = datetime.now(timezone.utc)
346-
download_elapsed = datetime.now(timezone.utc) - download_started
347-
348-
transform_started = datetime.now(timezone.utc)
349-
self._database.drop_extracted_tables(table)
350-
newtables = [table]
351-
newattrs = {}
352-
if json_depth > 0:
353-
autocommit(self.db, self.dbtype, False)
354-
(jsontables, jsonattrs) = transform_json(
355-
self.db,
356-
self.dbtype,
357-
table,
358-
processed,
359-
self._quiet,
360-
json_depth,
361-
)
362-
newtables += jsontables
363-
newattrs = jsonattrs
364-
for t in newattrs:
365-
newattrs[t]["__id"] = Attr("__id", "bigint")
366-
newattrs[table] = {"__id": Attr("__id", "bigint")}
367-
368-
if not keep_raw:
369-
self._database.drop_raw_table(table)
370330

371-
transform_elapsed = datetime.now(timezone.utc) - transform_started
372-
finally:
373-
autocommit(self.db, self.dbtype, True)
331+
download_started = datetime.now(timezone.utc)
332+
processed = self._database.ingest_records(
333+
table,
334+
cast(
335+
"Iterator[bytes]",
336+
tqdm(
337+
records,
338+
desc="downloading",
339+
total=total_records,
340+
leave=False,
341+
mininterval=5,
342+
disable=self._quiet,
343+
unit=table.split(".")[-1],
344+
unit_scale=True,
345+
delay=5,
346+
),
347+
),
348+
)
349+
download = datetime.now(timezone.utc)
350+
download_elapsed = datetime.now(timezone.utc) - download_started
351+
352+
transform_started = datetime.now(timezone.utc)
353+
if not use_legacy_transform:
354+
newtables = self._database.expand_prefix(table, json_depth, keep_raw)
355+
if keep_raw:
356+
newtables = [table, *newtables]
357+
indexable_attrs = []
358+
359+
else:
360+
try:
361+
self._database.drop_extracted_tables(table)
362+
newtables = [table]
363+
newattrs = {}
364+
if json_depth > 0:
365+
autocommit(self.db, self.dbtype, False)
366+
(jsontables, jsonattrs) = transform_json(
367+
self.db,
368+
self.dbtype,
369+
table,
370+
processed,
371+
self._quiet,
372+
json_depth,
373+
)
374+
newtables += jsontables
375+
newattrs = jsonattrs
376+
for t in newattrs:
377+
newattrs[t]["__id"] = Attr("__id", "bigint")
378+
newattrs[table] = {"__id": Attr("__id", "bigint")}
379+
380+
if not keep_raw:
381+
self._database.drop_raw_table(table)
382+
383+
indexable_attrs = [
384+
(t, a)
385+
for t, attrs in newattrs.items()
386+
for n, a in attrs.items()
387+
if n in ["__id", "id"]
388+
or n.endswith(("_id", "__o"))
389+
or a.datatype == "uuid"
390+
]
391+
392+
finally:
393+
autocommit(self.db, self.dbtype, True)
394+
395+
transform_elapsed = datetime.now(timezone.utc) - transform_started
374396
# Create indexes on id columns (for postgres)
375397
index_started = datetime.now(timezone.utc)
376398
if self.dbtype == DBType.POSTGRES:
@@ -381,14 +403,6 @@ def close(self) -> None: ...
381403

382404
pbar: tqdm | PbarNoop = PbarNoop() # type:ignore[type-arg]
383405

384-
indexable_attrs = [
385-
(t, a)
386-
for t, attrs in newattrs.items()
387-
for n, a in attrs.items()
388-
if n in ["__id", "id"]
389-
or n.endswith(("_id", "__o"))
390-
or a.datatype == "uuid"
391-
]
392406
index_total = len(indexable_attrs)
393407
if not self._quiet:
394408
pbar = tqdm(

src/ldlite/database/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def ingest_records(self, prefix: str, records: Iterator[bytes]) -> int:
4747
"""Ingests a stream of records dowloaded from FOLIO to the raw table."""
4848

4949
@abstractmethod
50-
def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> None:
50+
def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[str]:
5151
"""Unnests and explodes the raw data at the given prefix."""
5252

5353
@abstractmethod

src/ldlite/database/_duckdb.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def _setup_jfuncs(conn: duckdb.DuckDBPyConnection) -> None:
9494
;
9595
9696
CREATE OR REPLACE FUNCTION ldlite_system.jexplode(j) AS TABLE (
97-
SELECT value FROM main.json_each(j)
97+
SELECT value as ld_value FROM main.json_each(j)
9898
);
9999
100100
""", # noqa: E501
@@ -120,7 +120,7 @@ def ingest_records(
120120

121121
insert_sql = (
122122
sql.SQL("INSERT INTO {table} VALUES(?, ?);")
123-
.format(table=pfx.schemafy(pfx.raw_table))
123+
.format(table=pfx.raw_table.id)
124124
.as_string()
125125
)
126126
# duckdb has better performance bulk inserting in a transaction

src/ldlite/database/_expansion/__init__.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class ExpandContext:
2121
source_table: sql.Identifier
2222
json_depth: int
2323
get_transform_table: Callable[[int], sql.Identifier]
24-
get_output_table: Callable[[str], sql.Identifier]
24+
get_output_table: Callable[[str], tuple[str, sql.Identifier]]
2525
# This is necessary for Analyzing the table in pg before querying it
2626
# I don't love how this is implemented
2727
preprocess: Callable[
@@ -55,19 +55,20 @@ def expand_nonmarc(
5555
root_name: str,
5656
root_values: list[str],
5757
ctx: ExpandContext,
58-
) -> None:
59-
_expand_nonmarc(
58+
) -> list[str]:
59+
(_, created_tables) = _expand_nonmarc(
6060
ObjectNode(root_name, "", None, root_values),
6161
0,
6262
ctx,
6363
)
64+
return created_tables
6465

6566

6667
def _expand_nonmarc(
6768
root: ObjectNode,
6869
count: int,
6970
ctx: ExpandContext,
70-
) -> int:
71+
) -> tuple[int, list[str]]:
7172
initial_count = count
7273
ctx.preprocess(ctx.conn, ctx.source_table, [root.identifier])
7374
root.unnest(
@@ -95,6 +96,8 @@ def _expand_nonmarc(
9596
expand_children_of.append(c)
9697
count += 1
9798

99+
created_tables = []
100+
98101
new_source_table = ctx.get_transform_table(count)
99102
arrays = root.descendents_oftype(ArrayNode)
100103
ctx.preprocess(ctx.conn, new_source_table, [a.identifier for a in arrays])
@@ -110,7 +113,7 @@ def _expand_nonmarc(
110113
count += 1
111114

112115
if an.meta.is_object:
113-
count += _expand_nonmarc(
116+
(sub_index, array_tables) = _expand_nonmarc(
114117
ObjectNode(
115118
an.name,
116119
an.name,
@@ -123,8 +126,12 @@ def _expand_nonmarc(
123126
ctx.json_depth - len(an.parents),
124127
),
125128
)
129+
count += sub_index
130+
created_tables.extend(array_tables)
126131
else:
127132
with ctx.conn.cursor() as cur:
133+
(tname, tid) = ctx.get_output_table(an.name)
134+
created_tables.append(tname)
128135
cur.execute(
129136
sql.SQL(
130137
"""
@@ -136,7 +143,7 @@ def _expand_nonmarc(
136143
""",
137144
)
138145
.format(
139-
dest_table=ctx.get_output_table(an.name),
146+
dest_table=tid,
140147
source_table=ctx.get_transform_table(count),
141148
cols=sql.SQL("\n ,").join(
142149
[sql.Identifier(v) for v in [*values, an.name]],
@@ -146,12 +153,12 @@ def _expand_nonmarc(
146153
)
147154

148155
stamped_values = [
149-
sql.Identifier(v)
150-
for n in set(root.descendents).difference(arrays)
151-
for v in n.values
156+
sql.Identifier(v) for n in root.descendents if n not in arrays for v in n.values
152157
]
153158

154159
with ctx.conn.cursor() as cur:
160+
(tname, tid) = ctx.get_output_table(root.path)
161+
created_tables.append(tname)
155162
cur.execute(
156163
sql.SQL(
157164
"""
@@ -163,11 +170,11 @@ def _expand_nonmarc(
163170
""",
164171
)
165172
.format(
166-
dest_table=ctx.get_output_table(root.path),
173+
dest_table=tid,
167174
source_table=new_source_table,
168175
cols=sql.SQL("\n ,").join(stamped_values),
169176
)
170177
.as_string(),
171178
)
172179

173-
return count + 1 - initial_count
180+
return (count + 1 - initial_count, created_tables)

src/ldlite/database/_expansion/metadata.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ def is_object(self) -> bool:
3131

3232
@property
3333
def snake(self) -> str:
34-
return "".join("_" + c.lower() if c.isupper() else c for c in self.prop)
34+
return "".join("_" + c.lower() if c.isupper() else c for c in self.prop).lstrip(
35+
"_",
36+
)
3537

3638
def select_column(
3739
self,
@@ -47,6 +49,10 @@ def select_column(
4749
"(ldlite_system.jextract_string({json_col}, {prop}))"
4850
"::numeric AS {alias}",
4951
)
52+
elif self.json_type == "boolean":
53+
stmt = sql.SQL(
54+
"(ldlite_system.jextract_string({json_col}, {prop}))::bool AS {alias}",
55+
)
5056
elif self.json_type == "string" and self.is_uuid:
5157
stmt = sql.SQL(
5258
"(ldlite_system.jextract_string({json_col}, {prop}))::uuid AS {alias}",

0 commit comments

Comments
 (0)